From 8bcf2289041404a660155e1288149ee4171367d0 Mon Sep 17 00:00:00 2001 From: core-devops Date: Wed, 3 Jun 2026 19:36:00 -0700 Subject: [PATCH] fix(workspace-server): persist push-mode chat round-trip synchronously (E2E Chat reload flake = real data-loss race) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause of the intermittent `E2E Chat / E2E Chat` red (`chat-mobile.spec.ts › history persists across reload`) is a REAL product persistence race, not test fragility. The push-mode A2A success path (`logA2ASuccess`) wrote the `a2a_receive` activity_logs row — the ONLY durable record of a chat round-trip (request_body = user message, response_body = agent reply, both read back by chat-history hydration) — in a DETACHED goroutine via `goAsync`. `ProxyA2A` flushes the HTTP 200 (carrying the reply) the moment `proxyA2ARequest` returns, i.e. BEFORE that goroutine's INSERT commits. The test's `page.reload()` then fires `GET /chat-history`, which reads activity_logs and can miss the not-yet-committed row → "Mobile persistence" absent → red. Outside the test the same window loses the message on a reload / workspace-server restart / deploy / OOM between the 200 and the goroutine commit. The poll-mode sibling path (`logA2AReceiveQueued` / `persistUserMessageAtIngest`) was already made synchronous for exactly this incident class (internal#470 / #1347 / RFC#2945). The push-mode counterpart was left async — fixed here by writing the row inline (context.WithoutCancel so a chat-exit disconnect can't abort it; still best-effort so a DB hiccup never fails the user's send). The 200 is now emitted only after the durable row exists. Secondary determinism hardening: - chat-mobile spec: after reload, deterministically wait for the `GET /chat-history` 2xx that rehydrates the transcript before asserting visibility, instead of racing a fixed 5s render timeout against an in-flight fetch. - e2e-chat.yml canvas readiness: probe the real `/?m=chat` route for a 2xx (Turbopack compiles routes lazily — a bare `curl /` 200s before the page the tests load has compiled) and raise the cold-start budget 30s→120s to kill the `Canvas did not start in 30s` flake. Verification: `go build`, `go vet`, full `internal/handlers` + `internal/messagestore` test suites green (sqlmock, no DB needed); Playwright spec compiles + lists; eslint clean. Browser E2E not run locally (needs Postgres+Redis+platform+canvas servers). Co-Authored-By: Claude Opus 4.8 (1M context) --- .gitea/workflows/e2e-chat.yml | 33 ++++++++--- canvas/e2e/chat-mobile.spec.ts | 19 +++++- .../internal/handlers/a2a_proxy_helpers.go | 59 +++++++++++++------ 3 files changed, 84 insertions(+), 27 deletions(-) diff --git a/.gitea/workflows/e2e-chat.yml b/.gitea/workflows/e2e-chat.yml index a186f5a3d..59276345a 100644 --- a/.gitea/workflows/e2e-chat.yml +++ b/.gitea/workflows/e2e-chat.yml @@ -278,16 +278,33 @@ jobs: export NEXT_PUBLIC_WS_URL="ws://127.0.0.1:${PLATFORM_PORT}/ws" npx next dev --turbopack -p "${CANVAS_PORT}" > canvas.log 2>&1 & echo $! > canvas.pid - for i in $(seq 1 30); do - if curl -sf "http://localhost:${CANVAS_PORT}" > /dev/null 2>&1; then - echo "Canvas up after ${i}s" - exit 0 + # Readiness must wait for the actual chat route to *compile*, not + # just for the dev server to bind the port. `next dev --turbopack` + # accepts the TCP connection well before it has compiled a route + # on first request, so a bare `curl /` can 200 (or hang) while the + # page the tests load is still building. We therefore probe the + # real route the specs navigate to (`/?m=chat`) and require a 2xx, + # which only happens once Turbopack has finished the first + # compile. The previous 30s budget was also too tight for a cold + # Turbopack first-compile on a loaded operator-host runner — the + # `Canvas did not start in 30s` flake. Raise to 120s (job + # timeout-minutes is 15, so this is comfortably bounded) and probe + # every 2s. + READY="" + for i in $(seq 1 60); do + CODE=$(curl -s -o /dev/null -w '%{http_code}' "http://localhost:${CANVAS_PORT}/?m=chat" 2>/dev/null || echo 000) + if [ "$CODE" -ge 200 ] && [ "$CODE" -lt 400 ]; then + echo "Canvas (chat route compiled) up after ~$((i*2))s (HTTP ${CODE})" + READY=1 + break fi - sleep 1 + sleep 2 done - echo "::error::Canvas did not start in 30s" - cat canvas.log || true - exit 1 + if [ -z "$READY" ]; then + echo "::error::Canvas chat route did not compile in 120s (last HTTP ${CODE})" + cat canvas.log || true + exit 1 + fi - name: Run Playwright E2E tests if: needs.detect-changes.outputs.chat == 'true' diff --git a/canvas/e2e/chat-mobile.spec.ts b/canvas/e2e/chat-mobile.spec.ts index ddc2bab70..d51a6fc90 100644 --- a/canvas/e2e/chat-mobile.spec.ts +++ b/canvas/e2e/chat-mobile.spec.ts @@ -60,11 +60,26 @@ test.describe("MobileChat", () => { await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 15_000 }); + // Reload and deterministically wait for the chat-history GET that + // rehydrates the transcript to come back 2xx, rather than racing a + // fixed-timeout render assertion against an in-flight fetch. The + // server now persists the a2a_receive row SYNCHRONOUSLY before the + // send's 200 (workspace-server logA2ASuccess), so the row is + // guaranteed present by the time this GET runs — the wait is for + // hydration latency, not for a still-racing write. + const historyResponse = page.waitForResponse( + (resp) => + resp.url().includes("/chat-history") && + resp.request().method() === "GET" && + resp.status() === 200, + { timeout: 15_000 }, + ); await page.reload(); await page.waitForSelector("[data-testid='chat-panel']", { timeout: 10_000 }); + await historyResponse; - await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible({ timeout: 5_000 }); - await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 5_000 }); + await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible(); + await expect(page.getByText("Echo: Mobile persistence")).toBeVisible(); }); test("composer auto-grows with multi-line text", async ({ page }) => { diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 920e8b5ba..06434ec60 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -383,23 +383,48 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - parent := ctx - h.goAsync(func() { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) - defer cancel() - LogActivity(logCtx, h.broadcaster, ActivityParams{ - WorkspaceID: workspaceID, - ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), - TargetID: &workspaceID, - Method: &a2aMethod, - Summary: &summary, - RequestBody: json.RawMessage(body), - ResponseBody: json.RawMessage(respBody), - ToolTrace: toolTrace, - DurationMs: &durationMs, - Status: logStatus, - }) + + // DATA-LOSS FIX (internal#470 / #1347 push-mode sibling): this + // a2a_receive row is the ONLY durable record of a push-mode chat + // round-trip — request_body carries the user's message, response_body + // carries the agent's reply, and chat-history hydration + // (messagestore.PostgresMessageStore) reads BOTH back to rebuild the + // transcript on canvas reopen / reload. It MUST be written + // SYNCHRONOUSLY, before proxyA2ARequest returns and ProxyA2A flushes + // the 200 to the canvas — otherwise the canvas sees the reply + // acknowledged (and rendered optimistically) while the row is still + // racing in a detached goroutine, and a reload (or a workspace-server + // restart / deploy / OOM) between the 200 and the goroutine's commit + // loses the message permanently on reopen. + // + // This mirrors the discipline already applied to the poll-mode ingest + // path (logA2AReceiveQueued / persistUserMessageAtIngest); the + // push-mode counterpart was left async, which the E2E Chat + // "history persists across reload" test surfaced as an intermittent + // red (the reload out-raced the INSERT). + // + // - context.WithoutCancel: a client disconnect on chat-exit (which + // cancels the inbound request ctx) MUST NOT abort this write. + // - SYNCHRONOUS (no goAsync): the row must be durable before the 200. + // - Best-effort: LogActivity logs+swallows INSERT errors internally, + // so a DB hiccup never blocks or fails the user's send — behaviour + // for that one request is never worse than the pre-fix async path. + // - The post-commit ACTIVITY_LOGGED broadcast still fires inside + // LogActivity; the durable row is the truth the canvas re-reads. + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, + DurationMs: &durationMs, + Status: logStatus, }) if callerID == "" && statusCode < 400 { -- 2.52.0