diff --git a/.gitea/workflows/e2e-chat.yml b/.gitea/workflows/e2e-chat.yml index a186f5a3d..59276345a 100644 --- a/.gitea/workflows/e2e-chat.yml +++ b/.gitea/workflows/e2e-chat.yml @@ -278,16 +278,33 @@ jobs: export NEXT_PUBLIC_WS_URL="ws://127.0.0.1:${PLATFORM_PORT}/ws" npx next dev --turbopack -p "${CANVAS_PORT}" > canvas.log 2>&1 & echo $! > canvas.pid - for i in $(seq 1 30); do - if curl -sf "http://localhost:${CANVAS_PORT}" > /dev/null 2>&1; then - echo "Canvas up after ${i}s" - exit 0 + # Readiness must wait for the actual chat route to *compile*, not + # just for the dev server to bind the port. `next dev --turbopack` + # accepts the TCP connection well before it has compiled a route + # on first request, so a bare `curl /` can 200 (or hang) while the + # page the tests load is still building. We therefore probe the + # real route the specs navigate to (`/?m=chat`) and require a 2xx, + # which only happens once Turbopack has finished the first + # compile. The previous 30s budget was also too tight for a cold + # Turbopack first-compile on a loaded operator-host runner — the + # `Canvas did not start in 30s` flake. Raise to 120s (job + # timeout-minutes is 15, so this is comfortably bounded) and probe + # every 2s. + READY="" + for i in $(seq 1 60); do + CODE=$(curl -s -o /dev/null -w '%{http_code}' "http://localhost:${CANVAS_PORT}/?m=chat" 2>/dev/null || echo 000) + if [ "$CODE" -ge 200 ] && [ "$CODE" -lt 400 ]; then + echo "Canvas (chat route compiled) up after ~$((i*2))s (HTTP ${CODE})" + READY=1 + break fi - sleep 1 + sleep 2 done - echo "::error::Canvas did not start in 30s" - cat canvas.log || true - exit 1 + if [ -z "$READY" ]; then + echo "::error::Canvas chat route did not compile in 120s (last HTTP ${CODE})" + cat canvas.log || true + exit 1 + fi - name: Run Playwright E2E tests if: needs.detect-changes.outputs.chat == 'true' diff --git a/canvas/e2e/chat-mobile.spec.ts b/canvas/e2e/chat-mobile.spec.ts index ddc2bab70..d51a6fc90 100644 --- a/canvas/e2e/chat-mobile.spec.ts +++ b/canvas/e2e/chat-mobile.spec.ts @@ -60,11 +60,26 @@ test.describe("MobileChat", () => { await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 15_000 }); + // Reload and deterministically wait for the chat-history GET that + // rehydrates the transcript to come back 2xx, rather than racing a + // fixed-timeout render assertion against an in-flight fetch. The + // server now persists the a2a_receive row SYNCHRONOUSLY before the + // send's 200 (workspace-server logA2ASuccess), so the row is + // guaranteed present by the time this GET runs — the wait is for + // hydration latency, not for a still-racing write. + const historyResponse = page.waitForResponse( + (resp) => + resp.url().includes("/chat-history") && + resp.request().method() === "GET" && + resp.status() === 200, + { timeout: 15_000 }, + ); await page.reload(); await page.waitForSelector("[data-testid='chat-panel']", { timeout: 10_000 }); + await historyResponse; - await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible({ timeout: 5_000 }); - await expect(page.getByText("Echo: Mobile persistence")).toBeVisible({ timeout: 5_000 }); + await expect(page.getByText("Mobile persistence", { exact: true })).toBeVisible(); + await expect(page.getByText("Echo: Mobile persistence")).toBeVisible(); }); test("composer auto-grows with multi-line text", async ({ page }) => { diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 920e8b5ba..06434ec60 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -383,23 +383,48 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - parent := ctx - h.goAsync(func() { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) - defer cancel() - LogActivity(logCtx, h.broadcaster, ActivityParams{ - WorkspaceID: workspaceID, - ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), - TargetID: &workspaceID, - Method: &a2aMethod, - Summary: &summary, - RequestBody: json.RawMessage(body), - ResponseBody: json.RawMessage(respBody), - ToolTrace: toolTrace, - DurationMs: &durationMs, - Status: logStatus, - }) + + // DATA-LOSS FIX (internal#470 / #1347 push-mode sibling): this + // a2a_receive row is the ONLY durable record of a push-mode chat + // round-trip — request_body carries the user's message, response_body + // carries the agent's reply, and chat-history hydration + // (messagestore.PostgresMessageStore) reads BOTH back to rebuild the + // transcript on canvas reopen / reload. It MUST be written + // SYNCHRONOUSLY, before proxyA2ARequest returns and ProxyA2A flushes + // the 200 to the canvas — otherwise the canvas sees the reply + // acknowledged (and rendered optimistically) while the row is still + // racing in a detached goroutine, and a reload (or a workspace-server + // restart / deploy / OOM) between the 200 and the goroutine's commit + // loses the message permanently on reopen. + // + // This mirrors the discipline already applied to the poll-mode ingest + // path (logA2AReceiveQueued / persistUserMessageAtIngest); the + // push-mode counterpart was left async, which the E2E Chat + // "history persists across reload" test surfaced as an intermittent + // red (the reload out-raced the INSERT). + // + // - context.WithoutCancel: a client disconnect on chat-exit (which + // cancels the inbound request ctx) MUST NOT abort this write. + // - SYNCHRONOUS (no goAsync): the row must be durable before the 200. + // - Best-effort: LogActivity logs+swallows INSERT errors internally, + // so a DB hiccup never blocks or fails the user's send — behaviour + // for that one request is never worse than the pre-fix async path. + // - The post-commit ACTIVITY_LOGGED broadcast still fires inside + // LogActivity; the durable row is the truth the canvas re-reads. + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, + DurationMs: &durationMs, + Status: logStatus, }) if callerID == "" && statusCode < 400 {