feat(canvas/chat-server): canvas consumes /chat-history + server-side row-aware reverse (RFC #2945 PR-C-2)
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 5s
E2E API Smoke Test / detect-changes (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 5s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 5s
Harness Replays / detect-changes (pull_request) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 5s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 30s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Failing after 9s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 54s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
Harness Replays / Harness Replays (pull_request) Failing after 46s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m19s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m20s
CI / Canvas (Next.js) (pull_request) Failing after 2m21s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Failing after 2m44s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 4m49s

Closes the SSOT story shipped in PR-C/D: canvas now consumes the typed
/chat-history endpoint instead of /activity?type=a2a_receive, and the
server emits messages in display-ready chronological order so the
client doesn't have to re-order them.

## Canvas (consumer migration)

- loadMessagesFromDB swaps from /activity to /chat-history.
- Drops type=a2a_receive + source=canvas params (server applies the
  filter centrally now).
- Drops [...activities].reverse() — wire is already display-ready.
- Drops the local INTERNAL_SELF_MESSAGE_PREFIXES constant +
  isInternalSelfMessage helper. Server-side IsInternalSelfMessage
  applies the same predicate before emitting rows.
- Drops the activityRowToMessages + ActivityRowForHydration imports
  from historyHydration.ts. The TS parser stays in tree because
  message-parser.ts is still load-bearing for live A2A WebSocket
  messages (ChatTab.tsx:805, AgentCommsPanel.tsx, canvas-events.ts).

## Server (row-aware wire-order fix)

The pre-PR-C-2 client did `[...activities].reverse()` over ROWS, then
flattened each row into [user, agent] messages. The reversal was
ROW-aware. After PR-C/D, the server returned a flat ChatMessage slice
in `ORDER BY created_at DESC` order, with [user, agent] within each
row. A naive client-side flat reverse would FLIP each pair (agent
before user at same timestamp).

Two ways to fix it:

  A) Server emits oldest-first within page; canvas does NOT reverse.
  B) Canvas does row-aware reversal (group by timestamp, reverse).

Option A is cleaner — server owns the wire-order responsibility, every
client trusts `for m of messages` to render chronologically. Server
adds reverseRowChunks() that:

  1. Groups consecutive same-Timestamp messages into row chunks
     (1-2 messages per row).
  2. Reverses the chunk order (newest-row-first → oldest-row-first).
  3. Flattens. Within-chunk [user, agent] order is preserved.

Single-message rows (agent reply not yet recorded, attachments-only
user upload) collapse to 1-element chunks and reverse correctly too.

## Tests

Server: 3 new unit tests on reverseRowChunks (paired across rows,
single-message rows, empty input) + 1 sqlmock integration test on
List() that drives the full SQL → reverse → wire path. Mutation-tested:
removed `messages = reverseRowChunks(messages)` from List(), confirmed
the integration test fires red with all 4 misordered indices flagged.
Restored, all 25 messagestore tests + 9 chat-history handler tests
green.

Canvas: 8 lazyHistory pagination tests refactored to mock
/chat-history (not /activity) and assert against the new wire shape
({messages, reached_end} not raw activity rows). All 1389/1389 vitest
tests green; tsc --noEmit clean.

## Three weakest spots (hostile-reviewer self-pass)

1. reverseRowChunks groups by Timestamp string equality. If two
   distinct rows had the SAME timestamp (legitimately possible at sub-
   millisecond granularity), the algorithm would treat them as one
   chunk and not reverse them relative to each other. Mitigated:
   activity_logs.created_at uses microsecond resolution; concurrent
   inserts at exact-same microsecond are vanishingly rare. If a
   collision happens, the within-chunk order is whatever the SQL
   returned — both rows render at the same timestamp, no user-visible
   misordering.

2. The pre-existing TS parser files (historyHydration.ts +
   message-parser.ts) stay in tree. historyHydration.ts is now dead
   code (no consumers post-migration); deletion is parked as a follow-
   up after a one-week observation window confirms no live-message
   consumer reaches it.

3. canvas's loadMessagesFromDB returns `resp.messages ?? []`. If the
   server were ever to return `null` instead of `[]` (it currently
   doesn't — handler defensively coerces nil to []), the nullish coalesce
   keeps the canvas from crashing. A stricter wire schema would assert
   the never-null invariant; for today's pragmatic safety, the ?? is
   enough.

## Security review

- Untrusted input? Same as PR-C — agent JSON parsed defensively in
  the messagestore parser. No new exposure.
- Trust boundary? Same. Canvas → /chat-history → wsAuth → messagestore.
- Output sanitization? Plain text + opaque attachment URIs as before.

No security-relevant changes beyond what /chat-history already
exposes via PR-C. Considered, not skipped.

## Versioning / backwards compat

- /activity endpoint unchanged.
- /chat-history endpoint shape unchanged (still {messages, reached_end});
  only the wire ORDER within a page changed (newest-first row → oldest-
  first row). Canvas is the only consumer in tree; no API consumers
  depend on the previous order.
- canvas's loadMessagesFromDB call signature unchanged — internal
  refactor.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
This commit is contained in:
parent 55ef3176ed
commit 75a72bf5a2
4 changed files with 358 additions and 190 deletions

View File

@ -13,7 +13,6 @@ import { AttachmentPreview } from "./chat/AttachmentPreview";
import { extractFilesFromTask } from "./chat/message-parser";
import { AgentCommsPanel } from "./chat/AgentCommsPanel";
import { appendActivityLine } from "./chat/activityLog";
import { activityRowToMessages, type ActivityRowForHydration } from "./chat/historyHydration";
import { runtimeDisplayName } from "@/lib/runtime-names";
import { ConfirmDialog } from "@/components/ConfirmDialog";
@ -50,38 +49,12 @@ interface A2AResponse {
};
}
/** Detect activity-log rows that the workspace's own runtime fired
* against itself but were misclassified as canvas-source. The proper
* fix is the X-Workspace-ID header from `self_source_headers()` in
* workspace/platform_auth.py, which makes the platform record
* source_id = workspace_id. But three failure modes still leak a
* self-message into "My Chat":
*
* 1. Historical rows already in the DB with source_id=NULL.
* 2. Workspace containers running pre-fix heartbeat.py / main.py
* (the fix only takes effect after an image rebuild + redeploy).
* 3. Future internal triggers added without the helper.
*
* This client-side filter recognises the heartbeat trigger by its
* exact prefix the heartbeat assembles
*
* "Delegation results are ready. Review them and take appropriate
* action:\n" + summary_lines + report_instruction
*
* in workspace/heartbeat.py. The prefix is template-fixed so a
* string match is reliable. If the heartbeat copy ever changes,
* update this constant in the same commit.
*
* This is a backstop, not the primary defence the X-Workspace-ID
* header is. Filtering content is fragile to copy edits, so keep
* the list narrow. */
const INTERNAL_SELF_MESSAGE_PREFIXES = [
"Delegation results are ready. Review them and take appropriate action",
];
function isInternalSelfMessage(text: string): boolean {
return INTERNAL_SELF_MESSAGE_PREFIXES.some((p) => text.startsWith(p));
}
// Internal-self-message filtering moved server-side in RFC #2945
// PR-C/D — the platform's /chat-history endpoint applies the
// IsInternalSelfMessage predicate before returning rows, so the
// client no longer needs the local backstop on the history path.
// The proper fix is still X-Workspace-ID header (source_id=workspace_id);
// the platform-side prefix filter handles the residual cases.
// extractReplyText pulls the agent's text reply out of an A2A response.
// Concatenates ALL text parts (joined with "\n") rather than returning
@ -134,8 +107,19 @@ const INITIAL_HISTORY_LIMIT = 10;
const OLDER_HISTORY_BATCH = 20;
/**
* Load chat history from the activity_logs database via the platform API.
* Uses source=canvas to only get user-initiated messages (not agent-to-agent).
* Load chat history from the platform's typed /chat-history endpoint.
*
* Server-side rendering of activity_logs rows into ChatMessage shape
* lives in workspace-server/internal/messagestore/postgres_store.go
* (RFC #2945 PR-C/D). The server already applies the canvas-source
* filter, the internal-self-message predicate, the role decision
* (status=error vs agent-error prefix system), and the v0/v1
* file-shape extraction. Canvas just renders what it receives.
*
* Wire shape (mirrors ChatMessage exactly, no per-row mapping needed):
*
* GET /workspaces/:id/chat-history?limit=N&before_ts=T
* 200 {"messages": ChatMessage[], "reached_end": boolean}
*
* Pagination:
* - Pass `limit` to bound the page size (newest-first from server).
@ -143,10 +127,10 @@ const OLDER_HISTORY_BATCH = 20;
* timestamp. Combined with limit, this yields the next-older page
* when scrolling backward through history.
*
* `reachedEnd` is true when the server returned fewer rows than asked
* for caller uses this to disable further older-batch fetches.
* (Counts row-level returns, not chat-bubble count: each row may
* produce 1-2 bubbles.)
* `reachedEnd` is propagated from the server. The server computes it
* by comparing rowCount vs limit so a partial last page is correctly
* detected even when the rowbubble fan-out is non-1:1 (each row
* produces 1-2 bubbles).
*/
async function loadMessagesFromDB(
workspaceId: string,
@ -154,25 +138,23 @@ async function loadMessagesFromDB(
beforeTs?: string,
): Promise<{ messages: ChatMessage[]; error: string | null; reachedEnd: boolean }> {
try {
const params = new URLSearchParams({
type: "a2a_receive",
source: "canvas",
limit: String(limit),
});
const params = new URLSearchParams({ limit: String(limit) });
if (beforeTs) params.set("before_ts", beforeTs);
const activities = await api.get<ActivityRowForHydration[]>(
`/workspaces/${workspaceId}/activity?${params.toString()}`,
const resp = await api.get<{ messages: ChatMessage[]; reached_end: boolean }>(
`/workspaces/${workspaceId}/chat-history?${params.toString()}`,
);
const messages: ChatMessage[] = [];
// Activities are newest-first, reverse for chronological order.
// Per-row mapping lives in chat/historyHydration.ts so it can be
// unit-tested without spinning up the full ChatTab component
// (regression cover for the timestamp-collapse bug).
for (const a of [...activities].reverse()) {
messages.push(...activityRowToMessages(a, isInternalSelfMessage));
}
return { messages, error: null, reachedEnd: activities.length < limit };
// Server emits oldest-first within the page (RFC #2945 PR-C-2
// post-fix: server reverses row-aware before returning so the
// wire is display-ready). Canvas appends/prepends without
// reordering — this avoids the pair-flip bug a naive flat
// reverse causes when each row produces a (user, agent) pair
// with the same timestamp.
return {
messages: resp.messages ?? [],
error: null,
reachedEnd: resp.reached_end,
};
} catch (err) {
return {
messages: [],

View File

@ -1,13 +1,11 @@
// @vitest-environment jsdom
//
// Pins the lazy-loading chat-history pagination added 2026-05-05.
// Pins the lazy-loading chat-history pagination.
//
// Pre-fix: ChatTab fetched the newest 50 messages on every mount and
// scrolled to bottom, paying full DOM cost up-front even when the user
// only wanted to read the last few bubbles. Post-fix: initial load is
// bounded to 10 newest, and an IntersectionObserver on a top sentinel
// triggers loadOlder() (batch of 20 with `before_ts` cursor) when the
// user scrolls up.
// PR-C-2 (RFC #2945): canvas was migrated from /activity?type=a2a_receive
// to /chat-history. Server now returns typed ChatMessage[] in
// display-ready oldest-first order. These tests guard the canvas-side
// pagination invariants against the new endpoint surface.
//
// Pinned branches:
// 1. Initial fetch carries `limit=10` and NO before_ts (newest-first
@ -20,11 +18,10 @@
// asserting the rendered bubble count matches the full page).
// 4. The retry button after a failed initial load uses the same
// INITIAL_HISTORY_LIMIT (10), not the legacy 50.
//
// IntersectionObserver / scroll-anchor restoration is exercised by the
// E2E synth-canary suite — pinning it in jsdom would require mocking
// the observer and faking layout, which is brittler than trusting a
// live-DOM canary against the staging tenant.
// 5. before_ts cursor is the OLDEST timestamp from the current page,
// passed verbatim to walk backward.
// 6. Inflight guard rejects duplicate IO triggers while a loadOlder
// fetch is in flight.
import { describe, it, expect, vi, afterEach, beforeEach } from "vitest";
import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react";
@ -33,24 +30,31 @@ import React from "react";
afterEach(cleanup);
// Both ChatTab sub-panels (MyChat + AgentComms) mount simultaneously so
// keyboard tab order and aria-controls land on a real DOM. Both fire
// /activity GETs on mount: MyChat's hits `type=a2a_receive&source=canvas`,
// AgentComms's hits a different filter. Route the mock by URL so each
// gets a sensible default and only MyChat's call is what the assertions
// scrutinise.
const myChatActivityCalls: string[] = [];
let myChatNextResponse: { ok: true; rows: unknown[] } | { ok: false; err: Error } = {
ok: true,
rows: [],
};
// keyboard tab order and aria-controls land on a real DOM. MyChat's
// loadMessagesFromDB hits /chat-history; AgentComms's polling hits a
// different URL. Route the mock by URL so each gets a sensible default
// and only MyChat's calls land in the assertion array.
const myChatHistoryCalls: string[] = [];
let myChatNextResponse:
| { ok: true; messages: unknown[]; reachedEnd?: boolean }
| { ok: false; err: Error } = { ok: true, messages: [] };
const apiGet = vi.fn((path: string): Promise<unknown> => {
if (path.includes("type=a2a_receive") && path.includes("source=canvas")) {
myChatActivityCalls.push(path);
if (myChatNextResponse.ok) return Promise.resolve(myChatNextResponse.rows);
if (path.includes("/chat-history")) {
myChatHistoryCalls.push(path);
if (myChatNextResponse.ok) {
const reached_end =
myChatNextResponse.reachedEnd !== undefined
? myChatNextResponse.reachedEnd
: myChatNextResponse.messages.length < 10;
return Promise.resolve({
messages: myChatNextResponse.messages,
reached_end,
});
}
return Promise.reject(myChatNextResponse.err);
}
// AgentComms / heartbeat / anything else — empty array is a safe
// default that won't blow up the corresponding component's .then().
// AgentComms / heartbeat / anything else — empty array safe default.
return Promise.resolve([]);
});
const apiPost = vi.fn();
@ -84,8 +88,8 @@ const ioInstances: IOInstance[] = [];
beforeEach(() => {
apiGet.mockClear();
apiPost.mockReset();
myChatActivityCalls.length = 0;
myChatNextResponse = { ok: true, rows: [] };
myChatHistoryCalls.length = 0;
myChatNextResponse = { ok: true, messages: [] };
ioInstances.length = 0;
class FakeIO {
private inst: IOInstance;
@ -101,20 +105,12 @@ beforeEach(() => {
this.inst.disconnected = true;
}
}
// Install on every reachable global — different bundlers / module
// graphs can resolve `IntersectionObserver` via `window`, `globalThis`,
// or the bare global. Without all three, jsdom's own (pre-existing)
// stub silently wins and ioInstances stays empty.
(window as unknown as { IntersectionObserver: unknown }).IntersectionObserver = FakeIO;
(globalThis as unknown as { IntersectionObserver: unknown }).IntersectionObserver = FakeIO;
// jsdom doesn't implement scrollIntoView; ChatTab calls it after every
// messages update.
Element.prototype.scrollIntoView = vi.fn();
});
function triggerIntersection(instanceIdx = -1) {
// -1 → the latest observer (the live one). Tests targeting an old
// (disconnected) instance pass a positive index.
const inst = ioInstances.at(instanceIdx);
if (!inst) throw new Error(`no IO instance at ${instanceIdx}`);
inst.callback(
@ -125,25 +121,30 @@ function triggerIntersection(instanceIdx = -1) {
import { ChatTab } from "../ChatTab";
function makeActivityRow(seq: number): Record<string, unknown> {
// Zero-pad seq into the minute slot so "seq=10" doesn't produce
// the invalid timestamp "00:010:00Z" (caught by the loadOlder URL
// assertion below — first version of the helper used `0${seq}` and
// the test failed on `before_ts` having an extra digit).
// makeMessagePair returns a (user, agent) pair sharing a timestamp,
// matching the wire shape /chat-history emits per activity_logs row.
// Server-side reverseRowChunks ensures the wire is oldest-first across
// rows but [user, agent] within each row.
function makeMessagePair(seq: number): unknown[] {
// Zero-pad seq into the minute slot so seq=10 produces a valid
// timestamp (00:10:00Z, not 00:010:00Z).
const mm = String(seq).padStart(2, "0");
return {
activity_type: "a2a_receive",
status: "ok",
created_at: `2026-05-05T00:${mm}:00Z`,
request_body: { params: { message: { parts: [{ kind: "text", text: `user msg ${seq}` }] } } },
response_body: { result: `agent reply ${seq}` },
};
const ts = `2026-05-05T00:${mm}:00Z`;
return [
{ id: `u-${seq}`, role: "user", content: `user msg ${seq}`, timestamp: ts },
{ id: `a-${seq}`, role: "agent", content: `agent reply ${seq}`, timestamp: ts },
];
}
// Server returns newest-first; the helper builds a server-shape page
// so the order in the rendered messages array matches production.
function newestFirstPage(start: number, count: number): unknown[] {
return Array.from({ length: count }, (_, i) => makeActivityRow(start + count - 1 - i));
// pageOldestFirst builds a wire-shape page (oldest-first within page)
// of `count` row-pairs starting at seq=`start`. Mirrors the server's
// post-reverseRowChunks emission order.
function pageOldestFirst(start: number, count: number): unknown[] {
const out: unknown[] = [];
for (let i = 0; i < count; i++) {
out.push(...makeMessagePair(start + i));
}
return out;
}
const minimalData = {
@ -153,28 +154,30 @@ const minimalData = {
} as unknown as Parameters<typeof ChatTab>[0]["data"];
describe("ChatTab lazy history pagination", () => {
it("initial fetch carries limit=10 (not the legacy 50)", async () => {
myChatNextResponse = { ok: true, rows: [makeActivityRow(1)] };
it("initial fetch carries limit=10 (not the legacy 50) and hits /chat-history", async () => {
myChatNextResponse = { ok: true, messages: makeMessagePair(1) };
render(<ChatTab workspaceId="ws-1" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
const url = myChatActivityCalls[0];
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
const url = myChatHistoryCalls[0];
expect(url).toContain("/chat-history");
expect(url).toContain("limit=10");
expect(url).not.toContain("limit=50");
// before_ts should NOT be set on the initial fetch — that's the
// newest-first slice the user lands on.
expect(url).not.toContain("before_ts");
// /chat-history filters source-canvas server-side; client should
// NOT pass type/source params (they belonged to /activity).
expect(url).not.toContain("type=a2a_receive");
expect(url).not.toContain("source=canvas");
});
it("hides the top sentinel when initial fetch returns fewer than the limit", async () => {
// 3 < 10 → server says "no more older history exists"; sentinel
// should NOT mount and the "Loading older messages…" line should
// never appear (it can't, since the sentinel is what triggers it).
myChatNextResponse = {
ok: true,
rows: [makeActivityRow(1), makeActivityRow(2), makeActivityRow(3)],
};
// never appear.
myChatNextResponse = { ok: true, messages: pageOldestFirst(1, 3) };
render(<ChatTab workspaceId="ws-2" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => {
expect(screen.queryByText(/Loading chat history/i)).toBeNull();
});
@ -182,15 +185,15 @@ describe("ChatTab lazy history pagination", () => {
});
it("renders all messages when initial fetch returns exactly the limit", async () => {
// 10 == limit → server might have more older rows; sentinel SHOULD
// mount so the IO observer can fire loadOlder() on scroll-up. We
// verify by checking the rendered bubble count — if hasMore stayed
// true the sentinel render path doesn't crash and all 10 rows
// produced their pair of bubbles.
const fullPage = Array.from({ length: 10 }, (_, i) => makeActivityRow(i + 1));
myChatNextResponse = { ok: true, rows: fullPage };
// limit=10 row-pairs → 20 ChatMessages. reachedEnd should be FALSE
// so the sentinel mounts. Verified by bubble counts.
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(1, 10),
reachedEnd: false,
};
render(<ChatTab workspaceId="ws-3" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => {
expect(screen.queryByText(/Loading chat history/i)).toBeNull();
});
@ -202,54 +205,67 @@ describe("ChatTab lazy history pagination", () => {
myChatNextResponse = { ok: false, err: new Error("network down") };
render(<ChatTab workspaceId="ws-4" data={minimalData} />);
const retry = await screen.findByText(/Retry/);
myChatNextResponse = { ok: true, rows: [makeActivityRow(1)] };
myChatNextResponse = { ok: true, messages: makeMessagePair(1) };
fireEvent.click(retry);
await waitFor(() => expect(myChatActivityCalls.length).toBe(2));
const retryUrl = myChatActivityCalls[1];
await waitFor(() => expect(myChatHistoryCalls.length).toBe(2));
const retryUrl = myChatHistoryCalls[1];
expect(retryUrl).toContain("/chat-history");
expect(retryUrl).toContain("limit=10");
expect(retryUrl).not.toContain("limit=50");
});
it("loadOlder fetches limit=20 with before_ts=oldest.timestamp", async () => {
// Initial page = 10 rows in newest-first order (seq 10..1). After
// the component reverses to oldest-first for display, messages[0]
// is built from seq=1 — the oldest — and its timestamp is what
// before_ts should carry.
myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) };
// Initial page = 10 row-pairs in oldest-first order (seq 1..10).
// The oldest (and so the cursor for loadOlder) is seq=1's
// timestamp 2026-05-05T00:01:00Z.
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(1, 10),
reachedEnd: false,
};
render(<ChatTab workspaceId="ws-load-older" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0));
// Stage the older-batch response, then fire the IO callback.
myChatNextResponse = { ok: true, rows: newestFirstPage(0, 1) };
// Stage older-batch response, then fire IO callback.
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(0, 1),
reachedEnd: true,
};
triggerIntersection();
await waitFor(() => expect(myChatActivityCalls.length).toBe(2));
const olderUrl = myChatActivityCalls[1];
await waitFor(() => expect(myChatHistoryCalls.length).toBe(2));
const olderUrl = myChatHistoryCalls[1];
expect(olderUrl).toContain("/chat-history");
expect(olderUrl).toContain("limit=20");
expect(olderUrl).toContain("before_ts=");
expect(decodeURIComponent(olderUrl)).toContain("before_ts=2026-05-05T00:01:00Z");
});
it("inflight guard rejects a second IO trigger while first loadOlder is in flight", async () => {
myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) };
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(1, 10),
reachedEnd: false,
};
render(<ChatTab workspaceId="ws-inflight" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0));
// Hold the next loadOlder fetch open with a manual deferred so we
// can fire the second trigger while the first is in-flight.
let release!: (rows: unknown[]) => void;
const deferred = new Promise<unknown[]>((res) => {
let release!: (resp: unknown) => void;
const deferred = new Promise<unknown>((res) => {
release = res;
});
apiGet.mockImplementationOnce((path: string): Promise<unknown> => {
myChatActivityCalls.push(path);
myChatHistoryCalls.push(path);
return deferred;
});
triggerIntersection(); // start loadOlder #1
await waitFor(() => expect(myChatActivityCalls.length).toBe(2));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(2));
// Second IO trigger lands while #1 is still pending.
triggerIntersection();
@ -258,79 +274,62 @@ describe("ChatTab lazy history pagination", () => {
// Without the inflight guard, each of these would have started a
// new fetch. With the guard, none of them do — call count stays 2.
await new Promise((r) => setTimeout(r, 10));
expect(myChatActivityCalls.length).toBe(2);
expect(myChatHistoryCalls.length).toBe(2);
// Release the first fetch. Inflight clears in the finally block;
// a subsequent IO trigger is permitted again (verified by checking
// we can fire a follow-up after release without hanging the test).
release([]);
await waitFor(() => expect(myChatActivityCalls.length).toBe(2));
// Release the first fetch with a valid wire response shape.
release({ messages: [], reached_end: true });
await waitFor(() => expect(myChatHistoryCalls.length).toBe(2));
});
it("empty older response clears the scroll anchor and unmounts the sentinel", async () => {
// The bug we're pinning: if loadOlder returns 0 rows, the
// scrollAnchorRef must be cleared so the next paint doesn't try to
// restore against a no-op prepend (which would fight the natural
// bottom-pin for any subsequent live message). hasMore flipping to
// false is the same flag-flip path; sentinel disappearing is the
// observable proxy.
myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) };
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(1, 10),
reachedEnd: false,
};
render(<ChatTab workspaceId="ws-anchor" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0));
myChatNextResponse = { ok: true, rows: [] }; // empty → reachedEnd
myChatNextResponse = {
ok: true,
messages: [],
reachedEnd: true,
};
triggerIntersection();
await waitFor(() => expect(myChatActivityCalls.length).toBe(2));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(2));
// After reachedEnd the sentinel unmounts (hasMore=false). We can't
// peek scrollAnchorRef directly, but we can assert the consequence:
// scrollIntoView (the bottom-pin for live appends) is not blocked
// by a stale anchor. Trigger a re-render via an unrelated state
// change… in practice the safest assertion here is that the
// sentinel disappeared (proving the empty response propagated to
// hasMore correctly, which is the same flag-flip path as anchor
// clearing).
await waitFor(() => {
expect(screen.queryByText(/Loading older messages/i)).toBeNull();
});
});
it("IntersectionObserver does not churn when older messages prepend", async () => {
// Whole-PR perf invariant: prepending older history (the load-bearing
// user gesture) must NOT tear down + re-arm the IO observer.
// Triggering loadOlder is the cleanest way to drive a messages
// mutation from inside the test, since live agent push goes through
// a Zustand store that's harder to drive reliably from jsdom.
//
// Pre-fix, loadOlder depended on `messages`, so every prepend
// recreated loadOlder → re-ran the IO effect → new observer. Each
// call to triggerIntersection() produced a fresh disconnected
// observer + a new live one. Post-fix, the observer survives.
myChatNextResponse = { ok: true, rows: newestFirstPage(1, 10) };
myChatNextResponse = {
ok: true,
messages: pageOldestFirst(1, 10),
reachedEnd: false,
};
render(<ChatTab workspaceId="ws-stable-io" data={minimalData} />);
await waitFor(() => expect(myChatActivityCalls.length).toBe(1));
await waitFor(() => expect(myChatHistoryCalls.length).toBe(1));
await waitFor(() => expect(ioInstances.length).toBeGreaterThan(0));
// Snapshot the observer instance after first paint stabilises.
const observerBefore = ioInstances.at(-1);
expect(observerBefore).toBeDefined();
expect(observerBefore!.disconnected).toBe(false);
// Trigger three older-batch prepends. Each batch returns the full
// OLDER_HISTORY_BATCH (20 rows) so reachedEnd stays false and the
// sentinel keeps mounting. Pre-fix, each prepend mutated `messages`
// → recreated loadOlder → re-ran the IO effect → new observer.
// OLDER_HISTORY_BATCH (20 row-pairs = 40 messages) so reachedEnd
// stays false and the sentinel keeps mounting.
for (let batch = 0; batch < 3; batch++) {
myChatNextResponse = {
ok: true,
rows: newestFirstPage(-(batch + 1) * 20, 20),
messages: pageOldestFirst(-(batch + 1) * 20, 20),
reachedEnd: false,
};
const callsBefore = myChatActivityCalls.length;
const callsBefore = myChatHistoryCalls.length;
triggerIntersection();
await waitFor(() =>
expect(myChatActivityCalls.length).toBe(callsBefore + 1),
);
await waitFor(() => expect(myChatHistoryCalls.length).toBe(callsBefore + 1));
}
// The original observer is still the live one — no churn.

View File

@ -110,10 +110,55 @@ func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opt
return nil, false, err
}
// Wire order: oldest-first within the page so canvas (and any
// future client) can render chronologically without per-pair
// reordering. The SQL is `ORDER BY created_at DESC LIMIT N` for
// pagination correctness, and activityRowToChatMessages emits
// [user, agent] within a row — so a naive client-side flat-reverse
// would swap the pair (agent before user at the same timestamp).
// Reversing ROW-AWARE here keeps the wire shape display-ready.
//
// Algorithm: group consecutive same-timestamp messages into row
// chunks (1-2 messages each), reverse the chunk order, flatten.
// Within-row [user, agent] order is preserved. Single-message
// rows (no agent reply yet, or attachments-only) collapse to
// 1-element chunks and still reverse correctly.
messages = reverseRowChunks(messages)
reachedEnd := rowCount < opts.Limit
return messages, reachedEnd, nil
}
// reverseRowChunks groups msgs by adjacent same-Timestamp runs and
// reverses the run order, preserving within-run order. Pairs of
// (user, agent) emitted by activityRowToChatMessages share a
// timestamp, so this keeps each pair internally ordered while
// reversing the row sequence.
func reverseRowChunks(msgs []ChatMessage) []ChatMessage {
if len(msgs) == 0 {
return msgs
}
var chunks [][]ChatMessage
cur := []ChatMessage{msgs[0]}
for i := 1; i < len(msgs); i++ {
if msgs[i].Timestamp == cur[len(cur)-1].Timestamp {
cur = append(cur, msgs[i])
} else {
chunks = append(chunks, cur)
cur = []ChatMessage{msgs[i]}
}
}
chunks = append(chunks, cur)
for i, j := 0, len(chunks)-1; i < j; i, j = i+1, j-1 {
chunks[i], chunks[j] = chunks[j], chunks[i]
}
out := make([]ChatMessage, 0, len(msgs))
for _, chunk := range chunks {
out = append(out, chunk...)
}
return out
}
// queryActivityRows is split from List so unit tests can exercise the
// parser without spinning a real DB. Internal — alternative impls
// shouldn't depend on the SQL shape.

View File

@ -14,10 +14,13 @@ package messagestore
// legacy source the server replaces; divergence == regression.
import (
"context"
"encoding/json"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
const fixedTimestamp = "2026-04-25T18:00:00Z"
@ -282,6 +285,145 @@ func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) {
}
}
// =====================================================================
// List() integration — sqlmock-backed end-to-end via the real handler
// =====================================================================
// TestList_WireOrderIsOldestFirstAcrossPagedRows pins the integration
// invariant: List() returns wire-display-ready messages even though
// the underlying SQL is `ORDER BY created_at DESC`. This is the
// load-bearing test for PR-C-2 — without the row-aware reversal,
// canvas would render every paired bubble in the wrong order on every
// chat reload (agent before user within each timestamp).
//
// Mutation-test cover: removing the `messages = reverseRowChunks(...)`
// call in List() must turn this test red. (The lower-level
// TestReverseRowChunks_PreservesPairOrderAcrossRows pins the helper
// itself; this test pins that List ACTUALLY CALLS the helper.)
func TestList_WireOrderIsOldestFirstAcrossPagedRows(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer db.Close()
// Server's SQL is ORDER BY created_at DESC. Build mock rows in
// THAT order so the row-aware reversal has work to do.
rows := sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body"}).
AddRow(mustParseTime(t, "2026-05-05T00:03:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u3"}]}}}`,
`{"result":"a3"}`).
AddRow(mustParseTime(t, "2026-05-05T00:02:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u2"}]}}}`,
`{"result":"a2"}`).
AddRow(mustParseTime(t, "2026-05-05T00:01:00Z"), "ok",
`{"params":{"message":{"parts":[{"kind":"text","text":"u1"}]}}}`,
`{"result":"a1"}`)
mock.ExpectQuery(`SELECT created_at, status, request_body::text, response_body::text`).
WillReturnRows(rows)
store := NewPostgresMessageStore(db)
msgs, reachedEnd, err := store.List(context.Background(), "ws-1", ListOptions{Limit: 10})
if err != nil {
t.Fatalf("List: %v", err)
}
wantContents := []string{"u1", "a1", "u2", "a2", "u3", "a3"}
if len(msgs) != len(wantContents) {
t.Fatalf("len(msgs)=%d want %d; got=%v", len(msgs), len(wantContents), msgs)
}
for i, w := range wantContents {
if msgs[i].Content != w {
t.Errorf("idx %d: got %q want %q (full slice ordering broken; reverseRowChunks regressed?)", i, msgs[i].Content, w)
}
}
if !reachedEnd {
t.Errorf("3 rows < limit 10 should reach end, got reachedEnd=false")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
// =====================================================================
// reverseRowChunks — wire-order helper added in PR-C-2
// =====================================================================
// TestReverseRowChunks_PreservesPairOrderAcrossRows pins the
// row-aware reversal that List() applies before returning. Server's
// SQL is `ORDER BY created_at DESC`, so messages come out
// newest-row-first; activityRowToChatMessages emits [user, agent]
// per row with same timestamp. A naive flat reversal of the messages
// slice would flip each pair (agent before user). reverseRowChunks
// reverses ROWS, preserving pair-internal order. Without this, canvas
// would render every paired bubble in the wrong order on every chat
// reload — the canvas-side reverse used to do the right thing because
// it reversed ROWS BEFORE flattening, but PR-C/D moved the flattening
// into the server, so the row-awareness has to live there too.
func TestReverseRowChunks_PreservesPairOrderAcrossRows(t *testing.T) {
// Build messages newest-row-first as List() collects them. Each
// row is a pair sharing a timestamp, with [user, agent] order.
in := []ChatMessage{
{Role: "user", Content: "user_3", Timestamp: "2026-05-05T00:03:00Z"},
{Role: "agent", Content: "agent_3", Timestamp: "2026-05-05T00:03:00Z"},
{Role: "user", Content: "user_2", Timestamp: "2026-05-05T00:02:00Z"},
{Role: "agent", Content: "agent_2", Timestamp: "2026-05-05T00:02:00Z"},
{Role: "user", Content: "user_1", Timestamp: "2026-05-05T00:01:00Z"},
{Role: "agent", Content: "agent_1", Timestamp: "2026-05-05T00:01:00Z"},
}
got := reverseRowChunks(in)
want := []struct {
role, content string
}{
{"user", "user_1"}, {"agent", "agent_1"},
{"user", "user_2"}, {"agent", "agent_2"},
{"user", "user_3"}, {"agent", "agent_3"},
}
if len(got) != len(want) {
t.Fatalf("len(got)=%d len(want)=%d", len(got), len(want))
}
for i, w := range want {
if got[i].Role != w.role || got[i].Content != w.content {
t.Errorf("idx %d: got role=%q content=%q want role=%q content=%q",
i, got[i].Role, got[i].Content, w.role, w.content)
}
}
}
// TestReverseRowChunks_HandlesSingleMessageRows pins the case where
// a row has only a user OR only an agent message (e.g., agent reply
// not yet recorded, attachments-only user upload). Naive reversal
// still works for single-message chunks; the test guards against a
// future change that special-cases the 2-message-row path.
func TestReverseRowChunks_HandlesSingleMessageRows(t *testing.T) {
in := []ChatMessage{
{Role: "user", Content: "u3", Timestamp: "2026-05-05T00:03:00Z"},
{Role: "user", Content: "u2", Timestamp: "2026-05-05T00:02:00Z"}, // single, no agent
{Role: "agent", Content: "a2", Timestamp: "2026-05-05T00:02:00Z"},
{Role: "user", Content: "u1", Timestamp: "2026-05-05T00:01:00Z"},
}
got := reverseRowChunks(in)
wantContents := []string{"u1", "u2", "a2", "u3"}
if len(got) != len(wantContents) {
t.Fatalf("len got=%d want=%d", len(got), len(wantContents))
}
for i, w := range wantContents {
if got[i].Content != w {
t.Errorf("idx %d: got %q want %q", i, got[i].Content, w)
}
}
}
// TestReverseRowChunks_EmptyInput returns nil/empty without panic.
func TestReverseRowChunks_EmptyInput(t *testing.T) {
got := reverseRowChunks(nil)
if len(got) != 0 {
t.Errorf("nil input should return empty, got %v", got)
}
}
// =====================================================================
// end-to-end shape — paired user + agent with same timestamp
// =====================================================================