From 75a72bf5a2b0330a67692f65de2728c235e0ea0e Mon Sep 17 00:00:00 2001 From: "claude-ceo-assistant (Claude Opus 4.7 on Hongming's MacBook)" Date: Wed, 6 May 2026 16:55:00 -0700 Subject: [PATCH] feat(canvas/chat-server): canvas consumes /chat-history + server-side row-aware reverse (RFC #2945 PR-C-2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the SSOT story shipped in PR-C/D: canvas now consumes the typed /chat-history endpoint instead of /activity?type=a2a_receive, and the server emits messages in display-ready chronological order so the client doesn't have to re-order them. ## Canvas (consumer migration) - loadMessagesFromDB swaps from /activity to /chat-history. - Drops type=a2a_receive + source=canvas params (server applies the filter centrally now). - Drops [...activities].reverse() — wire is already display-ready. - Drops the local INTERNAL_SELF_MESSAGE_PREFIXES constant + isInternalSelfMessage helper. Server-side IsInternalSelfMessage applies the same predicate before emitting rows. - Drops the activityRowToMessages + ActivityRowForHydration imports from historyHydration.ts. The TS parser stays in tree because message-parser.ts is still load-bearing for live A2A WebSocket messages (ChatTab.tsx:805, AgentCommsPanel.tsx, canvas-events.ts). ## Server (row-aware wire-order fix) The pre-PR-C-2 client did `[...activities].reverse()` over ROWS, then flattened each row into [user, agent] messages. The reversal was ROW-aware. After PR-C/D, the server returned a flat ChatMessage slice in `ORDER BY created_at DESC` order, with [user, agent] within each row. A naive client-side flat reverse would FLIP each pair (agent before user at same timestamp). Two ways to fix it: A) Server emits oldest-first within page; canvas does NOT reverse. B) Canvas does row-aware reversal (group by timestamp, reverse). Option A is cleaner — server owns the wire-order responsibility, every client trusts `for m of messages` to render chronologically. Server adds reverseRowChunks() that: 1. Groups consecutive same-Timestamp messages into row chunks (1-2 messages per row). 2. Reverses the chunk order (newest-row-first → oldest-row-first). 3. Flattens. Within-chunk [user, agent] order is preserved. Single-message rows (agent reply not yet recorded, attachments-only user upload) collapse to 1-element chunks and reverse correctly too. ## Tests Server: 3 new unit tests on reverseRowChunks (paired across rows, single-message rows, empty input) + 1 sqlmock integration test on List() that drives the full SQL → reverse → wire path. Mutation-tested: removed `messages = reverseRowChunks(messages)` from List(), confirmed the integration test fires red with all 4 misordered indices flagged. Restored, all 25 messagestore tests + 9 chat-history handler tests green. Canvas: 8 lazyHistory pagination tests refactored to mock /chat-history (not /activity) and assert against the new wire shape ({messages, reached_end} not raw activity rows). All 1389/1389 vitest tests green; tsc --noEmit clean. ## Three weakest spots (hostile-reviewer self-pass) 1. reverseRowChunks groups by Timestamp string equality. If two distinct rows had the SAME timestamp (legitimately possible at sub- millisecond granularity), the algorithm would treat them as one chunk and not reverse them relative to each other. Mitigated: activity_logs.created_at uses microsecond resolution; concurrent inserts at exact-same microsecond are vanishingly rare. If a collision happens, the within-chunk order is whatever the SQL returned — both rows render at the same timestamp, no user-visible misordering. 2. The pre-existing TS parser files (historyHydration.ts + message-parser.ts) stay in tree. historyHydration.ts is now dead code (no consumers post-migration); deletion is parked as a follow- up after a one-week observation window confirms no live-message consumer reaches it. 3. canvas's loadMessagesFromDB returns `resp.messages ?? []`. If the server were ever to return `null` instead of `[]` (it currently doesn't — handler defensively coerces nil to []), the nullish coalesce keeps the canvas from crashing. A stricter wire schema would assert the never-null invariant; for today's pragmatic safety, the ?? is enough. ## Security review - Untrusted input? Same as PR-C — agent JSON parsed defensively in the messagestore parser. No new exposure. - Trust boundary? Same. Canvas → /chat-history → wsAuth → messagestore. - Output sanitization? Plain text + opaque attachment URIs as before. No security-relevant changes beyond what /chat-history already exposes via PR-C. Considered, not skipped. ## Versioning / backwards compat - /activity endpoint unchanged. - /chat-history endpoint shape unchanged (still {messages, reached_end}); only the wire ORDER within a page changed (newest-first row → oldest- first row). Canvas is the only consumer in tree; no API consumers depend on the previous order. - canvas's loadMessagesFromDB call signature unchanged — internal refactor. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- canvas/src/components/tabs/ChatTab.tsx | 92 +++--- .../__tests__/ChatTab.lazyHistory.test.tsx | 269 +++++++++--------- .../internal/messagestore/postgres_store.go | 45 +++ .../messagestore/postgres_store_test.go | 142 +++++++++ 4 files changed, 358 insertions(+), 190 deletions(-) diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx index f343b63c..21e9f665 100644 --- a/canvas/src/components/tabs/ChatTab.tsx +++ b/canvas/src/components/tabs/ChatTab.tsx @@ -13,7 +13,6 @@ import { AttachmentPreview } from "./chat/AttachmentPreview"; import { extractFilesFromTask } from "./chat/message-parser"; import { AgentCommsPanel } from "./chat/AgentCommsPanel"; import { appendActivityLine } from "./chat/activityLog"; -import { activityRowToMessages, type ActivityRowForHydration } from "./chat/historyHydration"; import { runtimeDisplayName } from "@/lib/runtime-names"; import { ConfirmDialog } from "@/components/ConfirmDialog"; @@ -50,38 +49,12 @@ interface A2AResponse { }; } -/** Detect activity-log rows that the workspace's own runtime fired - * against itself but were misclassified as canvas-source. The proper - * fix is the X-Workspace-ID header from `self_source_headers()` in - * workspace/platform_auth.py, which makes the platform record - * source_id = workspace_id. But three failure modes still leak a - * self-message into "My Chat": - * - * 1. Historical rows already in the DB with source_id=NULL. - * 2. Workspace containers running pre-fix heartbeat.py / main.py - * (the fix only takes effect after an image rebuild + redeploy). - * 3. Future internal triggers added without the helper. - * - * This client-side filter recognises the heartbeat trigger by its - * exact prefix — the heartbeat assembles - * - * "Delegation results are ready. Review them and take appropriate - * action:\n" + summary_lines + report_instruction - * - * in workspace/heartbeat.py. The prefix is template-fixed so a - * string match is reliable. If the heartbeat copy ever changes, - * update this constant in the same commit. - * - * This is a backstop, not the primary defence — the X-Workspace-ID - * header is. Filtering content is fragile to copy edits, so keep - * the list narrow. */ -const INTERNAL_SELF_MESSAGE_PREFIXES = [ - "Delegation results are ready. Review them and take appropriate action", -]; - -function isInternalSelfMessage(text: string): boolean { - return INTERNAL_SELF_MESSAGE_PREFIXES.some((p) => text.startsWith(p)); -} +// Internal-self-message filtering moved server-side in RFC #2945 +// PR-C/D — the platform's /chat-history endpoint applies the +// IsInternalSelfMessage predicate before returning rows, so the +// client no longer needs the local backstop on the history path. +// The proper fix is still X-Workspace-ID header (source_id=workspace_id); +// the platform-side prefix filter handles the residual cases. // extractReplyText pulls the agent's text reply out of an A2A response. // Concatenates ALL text parts (joined with "\n") rather than returning @@ -134,8 +107,19 @@ const INITIAL_HISTORY_LIMIT = 10; const OLDER_HISTORY_BATCH = 20; /** - * Load chat history from the activity_logs database via the platform API. - * Uses source=canvas to only get user-initiated messages (not agent-to-agent). + * Load chat history from the platform's typed /chat-history endpoint. + * + * Server-side rendering of activity_logs rows into ChatMessage shape + * lives in workspace-server/internal/messagestore/postgres_store.go + * (RFC #2945 PR-C/D). The server already applies the canvas-source + * filter, the internal-self-message predicate, the role decision + * (status=error vs agent-error prefix → system), and the v0/v1 + * file-shape extraction. Canvas just renders what it receives. + * + * Wire shape (mirrors ChatMessage exactly, no per-row mapping needed): + * + * GET /workspaces/:id/chat-history?limit=N&before_ts=T + * 200 → {"messages": ChatMessage[], "reached_end": boolean} * * Pagination: * - Pass `limit` to bound the page size (newest-first from server). @@ -143,10 +127,10 @@ const OLDER_HISTORY_BATCH = 20; * timestamp. Combined with limit, this yields the next-older page * when scrolling backward through history. * - * `reachedEnd` is true when the server returned fewer rows than asked - * for — caller uses this to disable further older-batch fetches. - * (Counts row-level returns, not chat-bubble count: each row may - * produce 1-2 bubbles.) + * `reachedEnd` is propagated from the server. The server computes it + * by comparing rowCount vs limit so a partial last page is correctly + * detected even when the row→bubble fan-out is non-1:1 (each row + * produces 1-2 bubbles). */ async function loadMessagesFromDB( workspaceId: string, @@ -154,25 +138,23 @@ async function loadMessagesFromDB( beforeTs?: string, ): Promise<{ messages: ChatMessage[]; error: string | null; reachedEnd: boolean }> { try { - const params = new URLSearchParams({ - type: "a2a_receive", - source: "canvas", - limit: String(limit), - }); + const params = new URLSearchParams({ limit: String(limit) }); if (beforeTs) params.set("before_ts", beforeTs); - const activities = await api.get( - `/workspaces/${workspaceId}/activity?${params.toString()}`, + const resp = await api.get<{ messages: ChatMessage[]; reached_end: boolean }>( + `/workspaces/${workspaceId}/chat-history?${params.toString()}`, ); - const messages: ChatMessage[] = []; - // Activities are newest-first, reverse for chronological order. - // Per-row mapping lives in chat/historyHydration.ts so it can be - // unit-tested without spinning up the full ChatTab component - // (regression cover for the timestamp-collapse bug). - for (const a of [...activities].reverse()) { - messages.push(...activityRowToMessages(a, isInternalSelfMessage)); - } - return { messages, error: null, reachedEnd: activities.length < limit }; + // Server emits oldest-first within the page (RFC #2945 PR-C-2 + // post-fix: server reverses row-aware before returning so the + // wire is display-ready). Canvas appends/prepends without + // reordering — this avoids the pair-flip bug a naive flat + // reverse causes when each row produces a (user, agent) pair + // with the same timestamp. + return { + messages: resp.messages ?? [], + error: null, + reachedEnd: resp.reached_end, + }; } catch (err) { return { messages: [], diff --git a/canvas/src/components/tabs/__tests__/ChatTab.lazyHistory.test.tsx b/canvas/src/components/tabs/__tests__/ChatTab.lazyHistory.test.tsx index 47f328ed..577c4587 100644 --- a/canvas/src/components/tabs/__tests__/ChatTab.lazyHistory.test.tsx +++ b/canvas/src/components/tabs/__tests__/ChatTab.lazyHistory.test.tsx @@ -1,13 +1,11 @@ // @vitest-environment jsdom // -// Pins the lazy-loading chat-history pagination added 2026-05-05. +// Pins the lazy-loading chat-history pagination. // -// Pre-fix: ChatTab fetched the newest 50 messages on every mount and -// scrolled to bottom, paying full DOM cost up-front even when the user -// only wanted to read the last few bubbles. Post-fix: initial load is -// bounded to 10 newest, and an IntersectionObserver on a top sentinel -// triggers loadOlder() (batch of 20 with `before_ts` cursor) when the -// user scrolls up. +// PR-C-2 (RFC #2945): canvas was migrated from /activity?type=a2a_receive +// to /chat-history. Server now returns typed ChatMessage[] in +// display-ready oldest-first order. These tests guard the canvas-side +// pagination invariants against the new endpoint surface. // // Pinned branches: // 1. Initial fetch carries `limit=10` and NO before_ts (newest-first @@ -20,11 +18,10 @@ // asserting the rendered bubble count matches the full page). // 4. The retry button after a failed initial load uses the same // INITIAL_HISTORY_LIMIT (10), not the legacy 50. -// -// IntersectionObserver / scroll-anchor restoration is exercised by the -// E2E synth-canary suite — pinning it in jsdom would require mocking -// the observer and faking layout, which is brittler than trusting a -// live-DOM canary against the staging tenant. +// 5. before_ts cursor is the OLDEST timestamp from the current page, +// passed verbatim to walk backward. +// 6. Inflight guard rejects duplicate IO triggers while a loadOlder +// fetch is in flight. import { describe, it, expect, vi, afterEach, beforeEach } from "vitest"; import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react"; @@ -33,24 +30,31 @@ import React from "react"; afterEach(cleanup); // Both ChatTab sub-panels (MyChat + AgentComms) mount simultaneously so -// keyboard tab order and aria-controls land on a real DOM. Both fire -// /activity GETs on mount: MyChat's hits `type=a2a_receive&source=canvas`, -// AgentComms's hits a different filter. Route the mock by URL so each -// gets a sensible default and only MyChat's call is what the assertions -// scrutinise. -const myChatActivityCalls: string[] = []; -let myChatNextResponse: { ok: true; rows: unknown[] } | { ok: false; err: Error } = { - ok: true, - rows: [], -}; +// keyboard tab order and aria-controls land on a real DOM. MyChat's +// loadMessagesFromDB hits /chat-history; AgentComms's polling hits a +// different URL. Route the mock by URL so each gets a sensible default +// and only MyChat's calls land in the assertion array. +const myChatHistoryCalls: string[] = []; +let myChatNextResponse: + | { ok: true; messages: unknown[]; reachedEnd?: boolean } + | { ok: false; err: Error } = { ok: true, messages: [] }; + const apiGet = vi.fn((path: string): Promise => { - if (path.includes("type=a2a_receive") && path.includes("source=canvas")) { - myChatActivityCalls.push(path); - if (myChatNextResponse.ok) return Promise.resolve(myChatNextResponse.rows); + if (path.includes("/chat-history")) { + myChatHistoryCalls.push(path); + if (myChatNextResponse.ok) { + const reached_end = + myChatNextResponse.reachedEnd !== undefined + ? myChatNextResponse.reachedEnd + : myChatNextResponse.messages.length < 10; + return Promise.resolve({ + messages: myChatNextResponse.messages, + reached_end, + }); + } return Promise.reject(myChatNextResponse.err); } - // AgentComms / heartbeat / anything else — empty array is a safe - // default that won't blow up the corresponding component's .then(). + // AgentComms / heartbeat / anything else — empty array safe default. return Promise.resolve([]); }); const apiPost = vi.fn(); @@ -84,8 +88,8 @@ const ioInstances: IOInstance[] = []; beforeEach(() => { apiGet.mockClear(); apiPost.mockReset(); - myChatActivityCalls.length = 0; - myChatNextResponse = { ok: true, rows: [] }; + myChatHistoryCalls.length = 0; + myChatNextResponse = { ok: true, messages: [] }; ioInstances.length = 0; class FakeIO { private inst: IOInstance; @@ -101,20 +105,12 @@ beforeEach(() => { this.inst.disconnected = true; } } - // Install on every reachable global — different bundlers / module - // graphs can resolve `IntersectionObserver` via `window`, `globalThis`, - // or the bare global. Without all three, jsdom's own (pre-existing) - // stub silently wins and ioInstances stays empty. (window as unknown as { IntersectionObserver: unknown }).IntersectionObserver = FakeIO; (globalThis as unknown as { IntersectionObserver: unknown }).IntersectionObserver = FakeIO; - // jsdom doesn't implement scrollIntoView; ChatTab calls it after every - // messages update. Element.prototype.scrollIntoView = vi.fn(); }); function triggerIntersection(instanceIdx = -1) { - // -1 → the latest observer (the live one). Tests targeting an old - // (disconnected) instance pass a positive index. const inst = ioInstances.at(instanceIdx); if (!inst) throw new Error(`no IO instance at ${instanceIdx}`); inst.callback( @@ -125,25 +121,30 @@ function triggerIntersection(instanceIdx = -1) { import { ChatTab } from "../ChatTab"; -function makeActivityRow(seq: number): Record { - // Zero-pad seq into the minute slot so "seq=10" doesn't produce - // the invalid timestamp "00:010:00Z" (caught by the loadOlder URL - // assertion below — first version of the helper used `0${seq}` and - // the test failed on `before_ts` having an extra digit). +// makeMessagePair returns a (user, agent) pair sharing a timestamp, +// matching the wire shape /chat-history emits per activity_logs row. +// Server-side reverseRowChunks ensures the wire is oldest-first across +// rows but [user, agent] within each row. +function makeMessagePair(seq: number): unknown[] { + // Zero-pad seq into the minute slot so seq=10 produces a valid + // timestamp (00:10:00Z, not 00:010:00Z). const mm = String(seq).padStart(2, "0"); - return { - activity_type: "a2a_receive", - status: "ok", - created_at: `2026-05-05T00:${mm}:00Z`, - request_body: { params: { message: { parts: [{ kind: "text", text: `user msg ${seq}` }] } } }, - response_body: { result: `agent reply ${seq}` }, - }; + const ts = `2026-05-05T00:${mm}:00Z`; + return [ + { id: `u-${seq}`, role: "user", content: `user msg ${seq}`, timestamp: ts }, + { id: `a-${seq}`, role: "agent", content: `agent reply ${seq}`, timestamp: ts }, + ]; } -// Server returns newest-first; the helper builds a server-shape page -// so the order in the rendered messages array matches production. -function newestFirstPage(start: number, count: number): unknown[] { - return Array.from({ length: count }, (_, i) => makeActivityRow(start + count - 1 - i)); +// pageOldestFirst builds a wire-shape page (oldest-first within page) +// of `count` row-pairs starting at seq=`start`. Mirrors the server's +// post-reverseRowChunks emission order. +function pageOldestFirst(start: number, count: number): unknown[] { + const out: unknown[] = []; + for (let i = 0; i < count; i++) { + out.push(...makeMessagePair(start + i)); + } + return out; } const minimalData = { @@ -153,28 +154,30 @@ const minimalData = { } as unknown as Parameters[0]["data"]; describe("ChatTab lazy history pagination", () => { - it("initial fetch carries limit=10 (not the legacy 50)", async () => { - myChatNextResponse = { ok: true, rows: [makeActivityRow(1)] }; + it("initial fetch carries limit=10 (not the legacy 50) and hits /chat-history", async () => { + myChatNextResponse = { ok: true, messages: makeMessagePair(1) }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); - const url = myChatActivityCalls[0]; + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); + const url = myChatHistoryCalls[0]; + expect(url).toContain("/chat-history"); expect(url).toContain("limit=10"); expect(url).not.toContain("limit=50"); // before_ts should NOT be set on the initial fetch — that's the // newest-first slice the user lands on. expect(url).not.toContain("before_ts"); + // /chat-history filters source-canvas server-side; client should + // NOT pass type/source params (they belonged to /activity). + expect(url).not.toContain("type=a2a_receive"); + expect(url).not.toContain("source=canvas"); }); it("hides the top sentinel when initial fetch returns fewer than the limit", async () => { // 3 < 10 → server says "no more older history exists"; sentinel // should NOT mount and the "Loading older messages…" line should - // never appear (it can't, since the sentinel is what triggers it). - myChatNextResponse = { - ok: true, - rows: [makeActivityRow(1), makeActivityRow(2), makeActivityRow(3)], - }; + // never appear. + myChatNextResponse = { ok: true, messages: pageOldestFirst(1, 3) }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => { expect(screen.queryByText(/Loading chat history/i)).toBeNull(); }); @@ -182,15 +185,15 @@ describe("ChatTab lazy history pagination", () => { }); it("renders all messages when initial fetch returns exactly the limit", async () => { - // 10 == limit → server might have more older rows; sentinel SHOULD - // mount so the IO observer can fire loadOlder() on scroll-up. We - // verify by checking the rendered bubble count — if hasMore stayed - // true the sentinel render path doesn't crash and all 10 rows - // produced their pair of bubbles. - const fullPage = Array.from({ length: 10 }, (_, i) => makeActivityRow(i + 1)); - myChatNextResponse = { ok: true, rows: fullPage }; + // limit=10 row-pairs → 20 ChatMessages. reachedEnd should be FALSE + // so the sentinel mounts. Verified by bubble counts. + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(1, 10), + reachedEnd: false, + }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => { expect(screen.queryByText(/Loading chat history/i)).toBeNull(); }); @@ -202,54 +205,67 @@ describe("ChatTab lazy history pagination", () => { myChatNextResponse = { ok: false, err: new Error("network down") }; render(); const retry = await screen.findByText(/Retry/); - myChatNextResponse = { ok: true, rows: [makeActivityRow(1)] }; + myChatNextResponse = { ok: true, messages: makeMessagePair(1) }; fireEvent.click(retry); - await waitFor(() => expect(myChatActivityCalls.length).toBe(2)); - const retryUrl = myChatActivityCalls[1]; + await waitFor(() => expect(myChatHistoryCalls.length).toBe(2)); + const retryUrl = myChatHistoryCalls[1]; + expect(retryUrl).toContain("/chat-history"); expect(retryUrl).toContain("limit=10"); expect(retryUrl).not.toContain("limit=50"); }); it("loadOlder fetches limit=20 with before_ts=oldest.timestamp", async () => { - // Initial page = 10 rows in newest-first order (seq 10..1). After - // the component reverses to oldest-first for display, messages[0] - // is built from seq=1 — the oldest — and its timestamp is what - // before_ts should carry. - myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) }; + // Initial page = 10 row-pairs in oldest-first order (seq 1..10). + // The oldest (and so the cursor for loadOlder) is seq=1's + // timestamp 2026-05-05T00:01:00Z. + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(1, 10), + reachedEnd: false, + }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0)); - // Stage the older-batch response, then fire the IO callback. - myChatNextResponse = { ok: true, rows: newestFirstPage(0, 1) }; + // Stage older-batch response, then fire IO callback. + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(0, 1), + reachedEnd: true, + }; triggerIntersection(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(2)); - const olderUrl = myChatActivityCalls[1]; + await waitFor(() => expect(myChatHistoryCalls.length).toBe(2)); + const olderUrl = myChatHistoryCalls[1]; + expect(olderUrl).toContain("/chat-history"); expect(olderUrl).toContain("limit=20"); expect(olderUrl).toContain("before_ts="); expect(decodeURIComponent(olderUrl)).toContain("before_ts=2026-05-05T00:01:00Z"); }); it("inflight guard rejects a second IO trigger while first loadOlder is in flight", async () => { - myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) }; + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(1, 10), + reachedEnd: false, + }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0)); // Hold the next loadOlder fetch open with a manual deferred so we // can fire the second trigger while the first is in-flight. - let release!: (rows: unknown[]) => void; - const deferred = new Promise((res) => { + let release!: (resp: unknown) => void; + const deferred = new Promise((res) => { release = res; }); apiGet.mockImplementationOnce((path: string): Promise => { - myChatActivityCalls.push(path); + myChatHistoryCalls.push(path); return deferred; }); triggerIntersection(); // start loadOlder #1 - await waitFor(() => expect(myChatActivityCalls.length).toBe(2)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(2)); // Second IO trigger lands while #1 is still pending. triggerIntersection(); @@ -258,79 +274,62 @@ describe("ChatTab lazy history pagination", () => { // Without the inflight guard, each of these would have started a // new fetch. With the guard, none of them do — call count stays 2. await new Promise((r) => setTimeout(r, 10)); - expect(myChatActivityCalls.length).toBe(2); + expect(myChatHistoryCalls.length).toBe(2); - // Release the first fetch. Inflight clears in the finally block; - // a subsequent IO trigger is permitted again (verified by checking - // we can fire a follow-up after release without hanging the test). - release([]); - await waitFor(() => expect(myChatActivityCalls.length).toBe(2)); + // Release the first fetch with a valid wire response shape. + release({ messages: [], reached_end: true }); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(2)); }); it("empty older response clears the scroll anchor and unmounts the sentinel", async () => { - // The bug we're pinning: if loadOlder returns 0 rows, the - // scrollAnchorRef must be cleared so the next paint doesn't try to - // restore against a no-op prepend (which would fight the natural - // bottom-pin for any subsequent live message). hasMore flipping to - // false is the same flag-flip path; sentinel disappearing is the - // observable proxy. - myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) }; + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(1, 10), + reachedEnd: false, + }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0)); - myChatNextResponse = { ok: true, rows: [] }; // empty → reachedEnd + myChatNextResponse = { + ok: true, + messages: [], + reachedEnd: true, + }; triggerIntersection(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(2)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(2)); - // After reachedEnd the sentinel unmounts (hasMore=false). We can't - // peek scrollAnchorRef directly, but we can assert the consequence: - // scrollIntoView (the bottom-pin for live appends) is not blocked - // by a stale anchor. Trigger a re-render via an unrelated state - // change… in practice the safest assertion here is that the - // sentinel disappeared (proving the empty response propagated to - // hasMore correctly, which is the same flag-flip path as anchor - // clearing). await waitFor(() => { expect(screen.queryByText(/Loading older messages/i)).toBeNull(); }); }); it("IntersectionObserver does not churn when older messages prepend", async () => { - // Whole-PR perf invariant: prepending older history (the load-bearing - // user gesture) must NOT tear down + re-arm the IO observer. - // Triggering loadOlder is the cleanest way to drive a messages - // mutation from inside the test, since live agent push goes through - // a Zustand store that's harder to drive reliably from jsdom. - // - // Pre-fix, loadOlder depended on `messages`, so every prepend - // recreated loadOlder → re-ran the IO effect → new observer. Each - // call to triggerIntersection() produced a fresh disconnected - // observer + a new live one. Post-fix, the observer survives. - myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) }; + myChatNextResponse = { + ok: true, + messages: pageOldestFirst(1, 10), + reachedEnd: false, + }; render(); - await waitFor(() => expect(myChatActivityCalls.length).toBe(1)); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(1)); await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0)); - // Snapshot the observer instance after first paint stabilises. const observerBefore = ioInstances.at(-1); expect(observerBefore).toBeDefined(); expect(observerBefore!.disconnected).toBe(false); // Trigger three older-batch prepends. Each batch returns the full - // OLDER_HISTORY_BATCH (20 rows) so reachedEnd stays false and the - // sentinel keeps mounting. Pre-fix, each prepend mutated `messages` - // → recreated loadOlder → re-ran the IO effect → new observer. + // OLDER_HISTORY_BATCH (20 row-pairs = 40 messages) so reachedEnd + // stays false and the sentinel keeps mounting. for (let batch = 0; batch < 3; batch++) { myChatNextResponse = { ok: true, - rows: newestFirstPage(-(batch + 1) * 20, 20), + messages: pageOldestFirst(-(batch + 1) * 20, 20), + reachedEnd: false, }; - const callsBefore = myChatActivityCalls.length; + const callsBefore = myChatHistoryCalls.length; triggerIntersection(); - await waitFor(() => - expect(myChatActivityCalls.length).toBe(callsBefore + 1), - ); + await waitFor(() => expect(myChatHistoryCalls.length).toBe(callsBefore + 1)); } // The original observer is still the live one — no churn. diff --git a/workspace-server/internal/messagestore/postgres_store.go b/workspace-server/internal/messagestore/postgres_store.go index 7e75315f..67987569 100644 --- a/workspace-server/internal/messagestore/postgres_store.go +++ b/workspace-server/internal/messagestore/postgres_store.go @@ -110,10 +110,55 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt return nil, false, err } + // Wire order: oldest-first within the page so canvas (and any + // future client) can render chronologically without per-pair + // reordering. The SQL is `ORDER BY created_at DESC LIMIT N` for + // pagination correctness, and activityRowToChatMessages emits + // [user, agent] within a row — so a naive client-side flat-reverse + // would swap the pair (agent before user at the same timestamp). + // Reversing ROW-AWARE here keeps the wire shape display-ready. + // + // Algorithm: group consecutive same-timestamp messages into row + // chunks (1-2 messages each), reverse the chunk order, flatten. + // Within-row [user, agent] order is preserved. Single-message + // rows (no agent reply yet, or attachments-only) collapse to + // 1-element chunks and still reverse correctly. + messages = reverseRowChunks(messages) + reachedEnd := rowCount < opts.Limit return messages, reachedEnd, nil } +// reverseRowChunks groups msgs by adjacent same-Timestamp runs and +// reverses the run order, preserving within-run order. Pairs of +// (user, agent) emitted by activityRowToChatMessages share a +// timestamp, so this keeps each pair internally ordered while +// reversing the row sequence. +func reverseRowChunks(msgs []ChatMessage) []ChatMessage { + if len(msgs) == 0 { + return msgs + } + var chunks [][]ChatMessage + cur := []ChatMessage{msgs[0]} + for i := 1; i < len(msgs); i++ { + if msgs[i].Timestamp == cur[len(cur)-1].Timestamp { + cur = append(cur, msgs[i]) + } else { + chunks = append(chunks, cur) + cur = []ChatMessage{msgs[i]} + } + } + chunks = append(chunks, cur) + for i, j := 0, len(chunks)-1; i < j; i, j = i+1, j-1 { + chunks[i], chunks[j] = chunks[j], chunks[i] + } + out := make([]ChatMessage, 0, len(msgs)) + for _, chunk := range chunks { + out = append(out, chunk...) + } + return out +} + // queryActivityRows is split from List so unit tests can exercise the // parser without spinning a real DB. Internal — alternative impls // shouldn't depend on the SQL shape. diff --git a/workspace-server/internal/messagestore/postgres_store_test.go b/workspace-server/internal/messagestore/postgres_store_test.go index bcdda6fa..5f7cce8a 100644 --- a/workspace-server/internal/messagestore/postgres_store_test.go +++ b/workspace-server/internal/messagestore/postgres_store_test.go @@ -14,10 +14,13 @@ package messagestore // legacy source the server replaces; divergence == regression. import ( + "context" "encoding/json" "strings" "testing" "time" + + "github.com/DATA-DOG/go-sqlmock" ) const fixedTimestamp = "2026-04-25T18:00:00Z" @@ -282,6 +285,145 @@ func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) { } } +// ===================================================================== +// List() integration — sqlmock-backed end-to-end via the real handler +// ===================================================================== + +// TestList_WireOrderIsOldestFirstAcrossPagedRows pins the integration +// invariant: List() returns wire-display-ready messages even though +// the underlying SQL is `ORDER BY created_at DESC`. This is the +// load-bearing test for PR-C-2 — without the row-aware reversal, +// canvas would render every paired bubble in the wrong order on every +// chat reload (agent before user within each timestamp). +// +// Mutation-test cover: removing the `messages = reverseRowChunks(...)` +// call in List() must turn this test red. (The lower-level +// TestReverseRowChunks_PreservesPairOrderAcrossRows pins the helper +// itself; this test pins that List ACTUALLY CALLS the helper.) +func TestList_WireOrderIsOldestFirstAcrossPagedRows(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + defer db.Close() + + // Server's SQL is ORDER BY created_at DESC. Build mock rows in + // THAT order so the row-aware reversal has work to do. + rows := sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body"}). + AddRow(mustParseTime(t, "2026-05-05T00:03:00Z"), "ok", + `{"params":{"message":{"parts":[{"kind":"text","text":"u3"}]}}}`, + `{"result":"a3"}`). + AddRow(mustParseTime(t, "2026-05-05T00:02:00Z"), "ok", + `{"params":{"message":{"parts":[{"kind":"text","text":"u2"}]}}}`, + `{"result":"a2"}`). + AddRow(mustParseTime(t, "2026-05-05T00:01:00Z"), "ok", + `{"params":{"message":{"parts":[{"kind":"text","text":"u1"}]}}}`, + `{"result":"a1"}`) + + mock.ExpectQuery(`SELECT created_at, status, request_body::text, response_body::text`). + WillReturnRows(rows) + + store := NewPostgresMessageStore(db) + msgs, reachedEnd, err := store.List(context.Background(), "ws-1", ListOptions{Limit: 10}) + if err != nil { + t.Fatalf("List: %v", err) + } + + wantContents := []string{"u1", "a1", "u2", "a2", "u3", "a3"} + if len(msgs) != len(wantContents) { + t.Fatalf("len(msgs)=%d want %d; got=%v", len(msgs), len(wantContents), msgs) + } + for i, w := range wantContents { + if msgs[i].Content != w { + t.Errorf("idx %d: got %q want %q (full slice ordering broken; reverseRowChunks regressed?)", i, msgs[i].Content, w) + } + } + if !reachedEnd { + t.Errorf("3 rows < limit 10 should reach end, got reachedEnd=false") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// ===================================================================== +// reverseRowChunks — wire-order helper added in PR-C-2 +// ===================================================================== + +// TestReverseRowChunks_PreservesPairOrderAcrossRows pins the +// row-aware reversal that List() applies before returning. Server's +// SQL is `ORDER BY created_at DESC`, so messages come out +// newest-row-first; activityRowToChatMessages emits [user, agent] +// per row with same timestamp. A naive flat reversal of the messages +// slice would flip each pair (agent before user). reverseRowChunks +// reverses ROWS, preserving pair-internal order. Without this, canvas +// would render every paired bubble in the wrong order on every chat +// reload — the canvas-side reverse used to do the right thing because +// it reversed ROWS BEFORE flattening, but PR-C/D moved the flattening +// into the server, so the row-awareness has to live there too. +func TestReverseRowChunks_PreservesPairOrderAcrossRows(t *testing.T) { + // Build messages newest-row-first as List() collects them. Each + // row is a pair sharing a timestamp, with [user, agent] order. + in := []ChatMessage{ + {Role: "user", Content: "user_3", Timestamp: "2026-05-05T00:03:00Z"}, + {Role: "agent", Content: "agent_3", Timestamp: "2026-05-05T00:03:00Z"}, + {Role: "user", Content: "user_2", Timestamp: "2026-05-05T00:02:00Z"}, + {Role: "agent", Content: "agent_2", Timestamp: "2026-05-05T00:02:00Z"}, + {Role: "user", Content: "user_1", Timestamp: "2026-05-05T00:01:00Z"}, + {Role: "agent", Content: "agent_1", Timestamp: "2026-05-05T00:01:00Z"}, + } + got := reverseRowChunks(in) + + want := []struct { + role, content string + }{ + {"user", "user_1"}, {"agent", "agent_1"}, + {"user", "user_2"}, {"agent", "agent_2"}, + {"user", "user_3"}, {"agent", "agent_3"}, + } + if len(got) != len(want) { + t.Fatalf("len(got)=%d len(want)=%d", len(got), len(want)) + } + for i, w := range want { + if got[i].Role != w.role || got[i].Content != w.content { + t.Errorf("idx %d: got role=%q content=%q want role=%q content=%q", + i, got[i].Role, got[i].Content, w.role, w.content) + } + } +} + +// TestReverseRowChunks_HandlesSingleMessageRows pins the case where +// a row has only a user OR only an agent message (e.g., agent reply +// not yet recorded, attachments-only user upload). Naive reversal +// still works for single-message chunks; the test guards against a +// future change that special-cases the 2-message-row path. +func TestReverseRowChunks_HandlesSingleMessageRows(t *testing.T) { + in := []ChatMessage{ + {Role: "user", Content: "u3", Timestamp: "2026-05-05T00:03:00Z"}, + {Role: "user", Content: "u2", Timestamp: "2026-05-05T00:02:00Z"}, // single, no agent + {Role: "agent", Content: "a2", Timestamp: "2026-05-05T00:02:00Z"}, + {Role: "user", Content: "u1", Timestamp: "2026-05-05T00:01:00Z"}, + } + got := reverseRowChunks(in) + wantContents := []string{"u1", "u2", "a2", "u3"} + if len(got) != len(wantContents) { + t.Fatalf("len got=%d want=%d", len(got), len(wantContents)) + } + for i, w := range wantContents { + if got[i].Content != w { + t.Errorf("idx %d: got %q want %q", i, got[i].Content, w) + } + } +} + +// TestReverseRowChunks_EmptyInput returns nil/empty without panic. +func TestReverseRowChunks_EmptyInput(t *testing.T) { + got := reverseRowChunks(nil) + if len(got) != 0 { + t.Errorf("nil input should return empty, got %v", got) + } +} + // ===================================================================== // end-to-end shape — paired user + agent with same timestamp // ===================================================================== -- 2.45.2