fix(workspace-server): persist push-mode chat round-trip synchronously — E2E Chat reload flake is a real data-loss race #2195

Merged
claude-ceo-assistant merged 1 commits from fix/e2e-chat-mobile-history-reload-flake into main 2026-06-04 02:55:08 +00:00
3 changed files with 84 additions and 27 deletions
+25 -8
View File
@@ -278,16 +278,33 @@ jobs:
export NEXT_PUBLIC_WS_URL="ws://127.0.0.1:${PLATFORM_PORT}/ws"
npx next dev --turbopack -p "${CANVAS_PORT}" > canvas.log 2>&1 &
echo $! > canvas.pid
for i in $(seq 1 30); do
if curl -sf "http://localhost:${CANVAS_PORT}" > /dev/null 2>&1; then
echo "Canvas up after ${i}s"
exit 0
# Readiness must wait for the actual chat route to *compile*, not
# just for the dev server to bind the port. `next dev --turbopack`
# accepts the TCP connection well before it has compiled a route
# on first request, so a bare `curl /` can 200 (or hang) while the
# page the tests load is still building. We therefore probe the
# real route the specs navigate to (`/?m=chat`) and require a 2xx,
# which only happens once Turbopack has finished the first
# compile. The previous 30s budget was also too tight for a cold
# Turbopack first-compile on a loaded operator-host runner — the
# `Canvas did not start in 30s` flake. Raise to 120s (job
# timeout-minutes is 15, so this is comfortably bounded) and probe
# every 2s.
READY=""
for i in $(seq 1 60); do
CODE=$(curl -s -o /dev/null -w '%{http_code}' "http://localhost:${CANVAS_PORT}/?m=chat" 2>/dev/null || echo 000)
if [ "$CODE" -ge 200 ] && [ "$CODE" -lt 400 ]; then
echo "Canvas (chat route compiled) up after ~$((i*2))s (HTTP ${CODE})"
READY=1
break
fi
sleep 1
sleep 2
done
echo "::error::Canvas did not start in 30s"
cat canvas.log || true
exit 1
if [ -z "$READY" ]; then
echo "::error::Canvas chat route did not compile in 120s (last HTTP ${CODE})"
cat canvas.log || true
exit 1
fi
- name: Run Playwright E2E tests
if: needs.detect-changes.outputs.chat == 'true'
+17 -2
View File
@@ -60,11 +60,26 @@ test.describe("MobileChat", () => {
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 15_000 });
// Reload and deterministically wait for the chat-history GET that
// rehydrates the transcript to come back 2xx, rather than racing a
// fixed-timeout render assertion against an in-flight fetch. The
// server now persists the a2a_receive row SYNCHRONOUSLY before the
// send's 200 (workspace-server logA2ASuccess), so the row is
// guaranteed present by the time this GET runs — the wait is for
// hydration latency, not for a still-racing write.
const historyResponse = page.waitForResponse(
(resp) =>
resp.url().includes("/chat-history") &&
resp.request().method() === "GET" &&
resp.status() === 200,
{ timeout: 15_000 },
);
await page.reload();
await page.waitForSelector("[data-testid='chat-panel']", { timeout: 10_000 });
await historyResponse;
await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible({ timeout: 5_000 });
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 5_000 });
await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible();
await expect(page.getByText("Echo: Mobile persistence")).toBeVisible();
});
test("composer auto-grows with multi-line text", async ({ page }) => {
@@ -383,23 +383,48 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
}
summary := a2aMethod + " → " + wsNameForLog
toolTrace := extractToolTrace(respBody)
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
})
// DATA-LOSS FIX (internal#470 / #1347 push-mode sibling): this
// a2a_receive row is the ONLY durable record of a push-mode chat
// round-trip — request_body carries the user's message, response_body
// carries the agent's reply, and chat-history hydration
// (messagestore.PostgresMessageStore) reads BOTH back to rebuild the
// transcript on canvas reopen / reload. It MUST be written
// SYNCHRONOUSLY, before proxyA2ARequest returns and ProxyA2A flushes
// the 200 to the canvas — otherwise the canvas sees the reply
// acknowledged (and rendered optimistically) while the row is still
// racing in a detached goroutine, and a reload (or a workspace-server
// restart / deploy / OOM) between the 200 and the goroutine's commit
// loses the message permanently on reopen.
//
// This mirrors the discipline already applied to the poll-mode ingest
// path (logA2AReceiveQueued / persistUserMessageAtIngest); the
// push-mode counterpart was left async, which the E2E Chat
// "history persists across reload" test surfaced as an intermittent
// red (the reload out-raced the INSERT).
//
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
// - SYNCHRONOUS (no goAsync): the row must be durable before the 200.
// - Best-effort: LogActivity logs+swallows INSERT errors internally,
// so a DB hiccup never blocks or fails the user's send — behaviour
// for that one request is never worse than the pre-fix async path.
// - The post-commit ACTIVITY_LOGGED broadcast still fires inside
// LogActivity; the durable row is the truth the canvas re-reads.
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
})
if callerID == "" && statusCode < 400 {