From a92beb5d496019dd6e4bb0d608cbb0931766880c Mon Sep 17 00:00:00 2001 From: core-be Date: Sat, 16 May 2026 06:04:14 -0700 Subject: [PATCH 1/2] fix(workspace-server): persist poll-mode canvas user message synchronously before queued 200 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sibling of #1347/internal#470 — the POLL-mode arm of the canvas user-message data-loss bug Hongming reported ("i sometimes lose my own message when i exit chat", 2026-05-16). Hongming's tenant is entirely poll-mode (4 external workspaces, no URL — verified empirically: every workspace returns the {delivery_mode:poll, status:queued} short-circuit envelope), so #1347 (push-mode only, persists AFTER the poll short-circuit) structurally cannot cover his reported case. #1347's "poll-mode was never affected" framing is overstated: logA2AReceiveQueued's durable activity_logs INSERT ran inside h.goAsync(...) — a detached goroutine with no happens-before barrier against the synthetic {status:queued} 200. The canvas sees the send acknowledged while the row may still be racing; a workspace-server restart / deploy / OOM / EC2 hibernation between the 200 and the goroutine's commit loses the message permanently (chat-history reads activity_logs; missing row = message gone on reopen). No fallback either, unlike push-mode's legacy-INSERT path. Fix: make the poll-mode ingest persist SYNCHRONOUS — committed before the queued 200 — on a context.WithoutCancel context (parity with persistUserMessageAtIngest). Best-effort preserved (LogActivity logs+swallows INSERT errors, never blocks the send). Post-commit broadcast still fires inside LogActivity (a missed WS event is not data loss; the durable row is the truth chat-history re-reads on reopen). TDD: a2a_poll_ingest_persist_test.go — deterministic RED (queued 200 returned in ~0.5ms, before the 150ms INSERT → DATA LOSS) → GREEN after fix. Full internal/handlers + internal/messagestore suites green; vet clean. Refs: molecule-ai/internal#471 (tracking), molecule-ai/internal#470 (push-mode sibling, PR #1347) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handlers/a2a_poll_ingest_persist_test.go | 136 ++++++++++++++++++ .../internal/handlers/a2a_proxy_helpers.go | 52 +++++-- 2 files changed, 174 insertions(+), 14 deletions(-) create mode 100644 workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go diff --git a/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go new file mode 100644 index 00000000..f16d100b --- /dev/null +++ b/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go @@ -0,0 +1,136 @@ +package handlers + +// Regression coverage for the POLL-mode arm of the canvas user-message +// data-loss bug (internal#470 sibling — tracked on internal#471). +// +// Bug (reported 2026-05-16 by CTO Hongming): "in canvas i sometimes lose +// my own message when i exit chat". The push-mode arm was fixed by +// #1347 (persistUserMessageAtIngest — a SYNCHRONOUS, before-dispatch, +// context.WithoutCancel INSERT). #1347's framing asserted "poll-mode +// workspaces were never affected — logA2AReceiveQueued already persists +// at ingest". That assertion is OVERSTATED. +// +// Hongming's tenant (slug `hongming`, org 2c940477-...) has 4 workspaces, +// ALL runtime=external with empty URL → ALL delivery_mode=poll (proven +// empirically: a benign A2A probe returns the synthetic +// {"delivery_mode":"poll","status":"queued"} envelope for every one). +// So his reported loss is the POLL path, NOT the push path #1347 fixes. +// +// Root cause (poll arm): the poll-mode short-circuit (a2a_proxy.go ~402) +// calls logA2AReceiveQueued and then IMMEDIATELY returns the synthetic +// 200 {status:"queued"} to the canvas. But logA2AReceiveQueued's durable +// INSERT runs inside h.goAsync(...) — a DETACHED goroutine with NO +// happens-before barrier against the HTTP response. The canvas sees 200 +// ("message accepted") while the activity_logs row may not yet be — and, +// on a workspace-server restart / deploy / OOM / EC2 hibernation between +// the 200 and the goroutine's commit, NEVER will be — durable. There is +// also no fallback (unlike push-mode's legacy-INSERT fallback): a +// swallowed LogActivity error loses the message with only a log line. +// Chat-history reads activity_logs (postgres_store.go:165-187); a missing +// row = message gone on reopen. That is exactly Hongming's symptom. +// +// Fix (parity with push-mode): the poll-mode ingest persist of the +// canvas user message must be SYNCHRONOUS — committed before the queued +// 200 is returned — on a context.WithoutCancel derived context, so a +// client disconnect on chat-exit and a post-response restart cannot lose +// it. Behavior is never worse than today (best-effort; a persist error +// still returns queued). + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse +// is the defining contract: for a poll-mode workspace, the canvas user +// message MUST be durably INSERTed into activity_logs BEFORE the synthetic +// queued 200 is returned to the client — with NO reliance on a detached +// async goroutine completing later. +// +// The test proves the ordering by making the INSERT block briefly and +// asserting the handler does NOT return until the INSERT has completed. +// Pre-fix (INSERT in h.goAsync, response returned immediately) the +// handler returns ~instantly while the INSERT is still pending in the +// goroutine → the elapsed time is far below the injected INSERT delay and +// ExpectationsWereMet() is racy/unmet at return. Post-fix (synchronous +// persist before the queued response) the handler return is gated on the +// INSERT, so elapsed >= the injected delay and the expectation is met +// deterministically at return WITHOUT any waitAsyncForTest()/sleep. +func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-poll-sync-persist" + const insertDelay = 150 * time.Millisecond + + expectBudgetCheck(mock, wsID) + + // lookupDeliveryMode → poll, triggering the short-circuit. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll")) + + // workspace-name lookup inside logA2AReceiveQueued. + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Poll WS")) + + // The durable user-message write. We delay it so a synchronous + // persist visibly gates the handler return; a detached-goroutine + // persist (pre-fix) does not. The fix must keep using + // context.WithoutCancel so this write survives a chat-exit cancel. + mock.ExpectExec("INSERT INTO activity_logs"). + WillDelayFor(insertDelay). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + // callerID == "" (no X-Workspace-ID) → this is a canvas_user message, + // exactly Hongming's case. + body := `{"jsonrpc":"2.0","id":"poll-canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"my own message"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + // Defining assertion #1: the handler must not have returned the + // queued response before the durable INSERT committed. Pre-fix this + // fails (elapsed ≈ 0, INSERT still racing in goAsync). + if elapsed < insertDelay { + t.Fatalf("poll-mode queued response returned in %v, before the %v user-message INSERT — "+ + "the message is not durable when the client/process goes away (DATA LOSS). "+ + "Persist must be synchronous before the queued 200.", elapsed, insertDelay) + } + + // Defining assertion #2: the durable write actually happened by the + // time the handler returned — checked WITHOUT waitAsyncForTest()/sleep. + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err) + } + + // Sanity: still the correct poll-mode envelope + status. + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response is not valid JSON: %v", err) + } + if resp["status"] != "queued" || resp["delivery_mode"] != "poll" { + t.Errorf("poll envelope changed: got status=%v delivery_mode=%v, want queued/poll", + resp["status"], resp["delivery_mode"]) + } +} diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 3d4fc4dd..8145a66a 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -504,25 +504,49 @@ func lookupDeliveryMode(ctx context.Context, workspaceID string) string { // reads in PR 3 — that's how a poll-mode workspace receives inbound A2A // without a public URL. func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) { + // DATA-LOSS FIX (internal#471 — poll-mode sibling of #1347/internal#470): + // this is the ONLY durable write of a poll-mode inbound message, + // including a canvas_user message (callerID == "") typed in the canvas + // chat. It MUST be SYNCHRONOUS and complete BEFORE the caller returns + // the synthetic {status:"queued"} 200 — otherwise the canvas sees the + // send acknowledged while the activity_logs row is still racing in a + // detached goroutine, and a workspace-server restart / deploy / OOM / + // EC2 hibernation between the 200 and the goroutine's commit loses the + // user's message permanently (chat-history reads activity_logs, so a + // missing row = message gone on reopen). Hongming's tenant is entirely + // poll-mode (4 external workspaces, no URL — verified empirically), so + // his reported loss is THIS path; #1347 (push-mode, persists AFTER the + // poll short-circuit) structurally cannot cover it. + // + // Mirrors persistUserMessageAtIngest's discipline: + // - context.WithoutCancel: a client disconnect on chat-exit (which + // cancels the inbound request ctx) MUST NOT abort this write. + // - SYNCHRONOUS (no goAsync): the row must be durable before the + // queued 200 is returned to the caller. + // - Best-effort: LogActivity already logs+swallows INSERT errors, so + // a hiccup never blocks or fails the user's send (behavior for + // that one request is never worse than the pre-fix async path). + // The post-commit broadcast still fires inside LogActivity; a missed + // WebSocket event is not data loss (the durable row is the truth the + // canvas re-reads on reopen). + insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + var wsName string - db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) if wsName == "" { wsName = workspaceID } summary := a2aMethod + " → " + wsName + " (queued for poll)" - h.goAsync(func() { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) - defer cancel() - LogActivity(logCtx, h.broadcaster, ActivityParams{ - WorkspaceID: workspaceID, - ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), - TargetID: &workspaceID, - Method: &a2aMethod, - Summary: &summary, - RequestBody: json.RawMessage(body), - Status: "ok", - }) + LogActivity(insCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + Status: "ok", }) } From 1d29e9ea247d3a7b952467ac02c86cdac244830c Mon Sep 17 00:00:00 2001 From: Molecule AI Core-BE Date: Sat, 16 May 2026 14:47:07 +0000 Subject: [PATCH 2/2] fix(handlers): prevent poll-mode sync-persist test from hanging CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit sqlmock.ExpectationsWereMet() hangs indefinitely when the expected INSERT mock never fires. If the production code ever regresses to goAsync (pre-fix shape), the handler returns before the INSERT fires, the mock never fires, and ExpectationsWereMet() blocks for the full test/-suite timeout — wedging the CI run with no diagnostic. Fix: check expectations in a goroutine with a 2s hard timeout. When the mock has fired (synchronous production code), ExpectationsWereMet() returns <1ms and the select fires the `case err := <-expectDone` arm. When the mock has NOT fired (async regression), the 2s timeout fires and the test fails with a clear message instead of hanging. Also reduce insertDelay from 150ms → 50ms. 50ms is ~50× the normal INSERT latency and sufficient to prove synchronous blocking; the larger value was adding unnecessary suite-level wall-clock under -race detection, where mock delays are amplified by the instrumenter's goroutine overhead. Co-Authored-By: Claude Opus 4.7 --- .../handlers/a2a_poll_ingest_persist_test.go | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go index f16d100b..06dae2b1 100644 --- a/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go +++ b/workspace-server/internal/handlers/a2a_poll_ingest_persist_test.go @@ -35,6 +35,15 @@ package handlers // client disconnect on chat-exit and a post-response restart cannot lose // it. Behavior is never worse than today (best-effort; a persist error // still returns queued). +// +// TEST DESIGN NOTE: sqlmock.ExpectationsWereMet() hangs indefinitely if +// the expected query never fires. We use a select+default+time.After +// pattern so the test FAILS fast (not hangs) when the production code +// regresses to async (the INSERT never fires before handler returns), +// while still returning promptly when all expectations are met. The +// insertDelay is kept small (50ms) to minimise suite-level timing +// impact under -race detection, where mock delays are amplified by +// the instrumenter's goroutine overhead. import ( "bytes" @@ -70,7 +79,10 @@ func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse( handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) const wsID = "ws-poll-sync-persist" - const insertDelay = 150 * time.Millisecond + // Keep delay small: -race detection amplifies mock delays significantly. + // A 50ms delay is sufficient to prove synchronous blocking (~50× the + // normal INSERT latency) without bloating the full ./... suite runtime. + const insertDelay = 50 * time.Millisecond expectBudgetCheck(mock, wsID) @@ -116,9 +128,21 @@ func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse( } // Defining assertion #2: the durable write actually happened by the - // time the handler returned — checked WITHOUT waitAsyncForTest()/sleep. - if err := mock.ExpectationsWereMet(); err != nil { - t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err) + // time the handler returned. ExpectionsWereMet() hangs indefinitely if + // the mock never fires (e.g. production code regressed to async), + // so we check it in a goroutine with a hard 2s timeout — fails fast + // (no CI hang) on regression while returning promptly on success. + expectDone := make(chan error, 1) + go func() { expectDone <- mock.ExpectationsWereMet() }() + select { + case err := <-expectDone: + if err != nil { + t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " + + "Likely cause: production code regressed logA2AReceiveQueued to goAsync " + + "(INSERT fires after handler returns, not before).") } // Sanity: still the correct poll-mode envelope + status.