From 0ee093ca9a1114813d885d1cfdbaa9c439590d2a Mon Sep 17 00:00:00 2001 From: core-be Date: Sat, 16 May 2026 05:35:13 -0700 Subject: [PATCH 1/4] fix(workspace-server): persist canvas user message at ingest, before agent round-trip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Canvas chat lost the user's own message when they exited the chat before the agent replied, for push-mode (HTTP-dispatched) workspaces. Root cause: chat-history is reconstructed solely from activity_logs rows (activity_type='a2a_receive', source_id IS NULL). For push-mode that row was written ONLY in logA2ASuccess/logA2AFailure, AFTER the full agent A2A round-trip. The user message was never persisted at ingest. If the user exited the chat (fetch abort on unmount / tab close / dropped conn) before the agent finished, no row was ever written and the message was permanently lost on reopen. poll-mode was unaffected (logA2AReceiveQueued already persists at ingest). This is the inbound mirror of the reno-stars 2026-05-05 outbound data-loss incident (RFC #2945). Fix: persistUserMessageAtIngest() does a synchronous INSERT of the user message (status='pending', response_body NULL) BEFORE dispatchA2A, on a context.WithoutCancel context so a client disconnect cannot abort the write. logA2ASuccess finalizes that row via UPDATE (no duplicate user bubble — preserves the one-row-(user,agent) read contract). logA2AFailure with an ingest row is a no-op: the pending row already durably holds the message, and busy->enqueue requests stay pending to be answered by the queue drain. Best-effort: on persist failure, falls back to the legacy post-round-trip INSERT (never worse than pre-fix, never blocks the send). The read path already renders a user bubble from a row with empty response_body, so the message shows on reopen even before the agent answers. 8 new TDD regression tests (a2a_ingest_persist_test.go). Full internal/handlers + internal/messagestore suites green. Refs: molecule-ai/internal#470 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handlers/a2a_ingest_persist_test.go | 218 ++++++++++++++++++ .../internal/handlers/a2a_proxy.go | 29 ++- .../internal/handlers/a2a_proxy_helpers.go | 207 +++++++++++++++-- .../internal/handlers/a2a_proxy_test.go | 14 +- .../internal/handlers/mock_runtime.go | 6 +- .../internal/handlers/native_session_test.go | 4 +- 6 files changed, 445 insertions(+), 33 deletions(-) create mode 100644 workspace-server/internal/handlers/a2a_ingest_persist_test.go diff --git a/workspace-server/internal/handlers/a2a_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_ingest_persist_test.go new file mode 100644 index 000000000..215b8917e --- /dev/null +++ b/workspace-server/internal/handlers/a2a_ingest_persist_test.go @@ -0,0 +1,218 @@ +package handlers + +// Regression coverage for the canvas user-message data-loss fix. +// +// Bug (reported 2026-05-16): "in canvas i sometimes lose my own message +// when i exit chat". Root cause: for a push-mode workspace the user's +// message became durable ONLY inside logA2ASuccess / logA2AFailure, +// which run AFTER the full agent A2A round-trip. The single chat-history +// source is activity_logs rows (activity_type='a2a_receive', source_id +// IS NULL). A user who typed a message then exited the chat (fetch +// aborted on unmount / tab close / dropped connection) before the agent +// replied left NO a2a_receive row written → message gone on reopen. +// +// Fix: persistUserMessageAtIngest writes the user message into +// activity_logs SYNCHRONOUSLY, BEFORE dispatch, on a context.WithoutCancel +// derived context; logA2ASuccess finalizes that row with the reply; +// logA2AFailure leaves it 'pending' (message preserved) instead of +// INSERTing a duplicate / overwriting with an agent-error bubble. + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +// persistUserMessageAtIngest must INSERT a pending a2a_receive row +// carrying the user's request_body and RETURN its id — and it must do so +// even when the passed-in ctx is ALREADY cancelled, because the whole +// point of the fix is that a client disconnect on chat-exit cannot lose +// the message. The persist runs on context.WithoutCancel internally. +func TestPersistUserMessageAtIngest_WritesPendingRowEvenWhenCtxCancelled(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Both the workspace-name lookup AND the INSERT run on + // context.WithoutCancel inside the helper, so they reach the DB even + // though the caller's ctx is already cancelled — that is the fix. + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-ingest"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ingest WS")) + // The defining assertion: an INSERT ... RETURNING id is issued for + // the user message, with status 'pending' and the request body. + mock.ExpectQuery(`INSERT INTO activity_logs`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("row-abc-123")) + + // Cancel the context BEFORE calling — simulates the canvas aborting + // the inbound request because the user exited the chat. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"do not lose me"}]}}}`) + rowID := handler.persistUserMessageAtIngest(ctx, "ws-ingest", "", body, "message/send") + + if rowID != "row-abc-123" { + t.Fatalf("expected ingest row id 'row-abc-123' (message persisted despite cancelled ctx), got %q", rowID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// On INSERT failure the helper must return "" (so the caller falls back +// to the legacy post-round-trip INSERT) and must NOT panic — a DB hiccup +// at ingest can never make the user's send fail. +func TestPersistUserMessageAtIngest_FallsBackOnInsertError(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-x"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("")) + mock.ExpectQuery(`INSERT INTO activity_logs .* RETURNING id`). + WillReturnError(fmt.Errorf("test: activity_logs unavailable")) + + rowID := handler.persistUserMessageAtIngest(context.Background(), "ws-x", "", []byte(`{}`), "message/send") + if rowID != "" { + t.Fatalf("expected empty rowID on INSERT error (legacy-INSERT fallback), got %q", rowID) + } +} + +// finalizeIngestRow must UPDATE the pre-persisted row (matching on +// id + status='pending') with the agent response, and report handled. +func TestFinalizeIngestRow_UpdatesPendingRow(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`). + WithArgs("row-1", sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", 1234, nil). + WillReturnResult(sqlmock.NewResult(0, 1)) + + ok := finalizeIngestRow(context.Background(), "row-1", + []byte(`{"result":"hello back"}`), nil, "ok", 1234, nil) + if !ok { + t.Fatal("expected finalizeIngestRow to report the row finalized") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// finalizeIngestRow with an empty id is a no-op returning false — the +// caller must then take the legacy INSERT path (poll-mode / system +// callers / ingest-INSERT-failed). It must issue NO SQL. +func TestFinalizeIngestRow_EmptyIDIsNoOp(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + if finalizeIngestRow(context.Background(), "", []byte(`{}`), nil, "ok", 1, nil) { + t.Fatal("empty ingest id must return false (legacy INSERT fallback)") + } +} + +// When the row is gone / no longer pending (RowsAffected==0), the helper +// returns true so the caller does NOT double-INSERT a duplicate user +// bubble — the message + a reply already landed on that row. +func TestFinalizeIngestRow_AlreadyFinalizedSkipsDoubleInsert(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + mock.ExpectExec(`UPDATE activity_logs`). + WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows: not pending + + if !finalizeIngestRow(context.Background(), "row-stale", []byte(`{}`), nil, "ok", 1, nil) { + t.Fatal("a not-pending row must report handled=true to suppress the legacy duplicate INSERT") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// THE CORE REGRESSION: logA2AFailure with an ingest row present must +// NOT INSERT a (duplicate) activity_logs row and must NOT mutate the +// preserved 'pending' row to an error bubble. The user's message stays +// durable as a pending row — visible on chat reopen — which is exactly +// the data-loss the fix prevents. (Pre-fix this path INSERTed an +// error-status row AFTER the round-trip; if it never ran, the message +// was lost. Post-fix the message is already safe at ingest.) +func TestLogA2AFailure_WithIngestRow_PreservesMessageNoInsertNoMutation(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Intentionally set NO expectations: any INSERT or UPDATE here is a + // bug. sqlmock fails the test on an unexpected statement. + handler.logA2AFailure(context.Background(), "ws-fail", "", + []byte(`{"params":{"message":{"parts":[{"text":"keep me"}]}}}`), + "message/send", errors.New("agent unreachable"), 5, "row-pending-keep") + time.Sleep(60 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("logA2AFailure with an ingest row must issue no SQL (message already preserved): %v", err) + } +} + +// logA2ASuccess WITH an ingest row finalizes that row (UPDATE) instead +// of INSERTing a second a2a_receive row — preventing the duplicate user +// bubble that postgres_store.go's one-row-(user,agent) reconstruction +// would otherwise produce. +func TestLogA2ASuccess_WithIngestRow_FinalizesInsteadOfInsert(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Workspace-name lookup still happens (synchronous). + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-ok"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("OK WS")) + // The finalize UPDATE — and crucially NO "INSERT INTO activity_logs". + mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ASuccess(context.Background(), "ws-ok", "", + []byte(`{}`), []byte(`{"result":"reply"}`), "message/send", 200, 10, "row-finalize-me") + time.Sleep(60 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations (expected finalize UPDATE, no INSERT): %v", err) + } +} + +// Guard the literal contract the read side depends on: the ingest INSERT +// must target activity_type='a2a_receive' with status 'pending' and a +// RETURNING id — that's what makes postgres_store.go render the user +// bubble immediately and what lets the finalize UPDATE find the row. +// sqlmock matches the issued query against this regex, so a green +// expectation IS the proof the SQL carries every required fragment. +func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-shape"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Shape")) + + // All four contract fragments in one regex — only matches if the + // real INSERT contains a2a_receive + 'pending' + request_body + + // RETURNING id (sqlmock does a regexp.MatchString of this against + // the actual query text). + mock.ExpectQuery(`(?s)INSERT INTO activity_logs.*request_body.*'a2a_receive'.*'pending'.*RETURNING id`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("rid")) + + rid := handler.persistUserMessageAtIngest(context.Background(), "ws-shape", "", + []byte(`{"params":{"message":{"parts":[{"text":"x"}]}}}`), "message/send") + if rid != "rid" { + t.Fatalf("expected rid from contract-shaped INSERT, got %q", rid) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("ingest INSERT did not match the required shape contract: %v", err) + } +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 8fbef20c6..4379f2ab2 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -467,6 +467,29 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } } + // DATA-LOSS FIX: persist the canvas user's message into activity_logs + // SYNCHRONOUSLY, BEFORE dispatch. For a push-mode workspace the only + // pre-fix durable write of the user message was logA2ASuccess / + // logA2AFailure AFTER the agent round-trip — so a user who typed a + // message then exited the chat (fetch aborted on unmount / tab close / + // dropped connection) before the agent replied lost the message + // entirely on reopen, because chat-history reads only a2a_receive rows + // and none was ever written. Persisting here, with a WithoutCancel + // context inside persistUserMessageAtIngest, makes the message durable + // at submit time regardless of what the client does next. The returned + // row id is threaded into the success/failure log so they UPDATE this + // row with the agent reply rather than INSERT a duplicate. + // + // Scoped to: canvas-initiated (callerID == ""), message-bearing + // (message/send), logActivity on, and past the poll-mode short-circuit + // above (poll-mode already persists at ingest via logA2AReceiveQueued). + // Best-effort: a "" return falls back to the legacy post-round-trip + // INSERT, so this can never make a send fail. + var ingestRowID string + if logActivity && callerID == "" && a2aMethod == "message/send" { + ingestRowID = h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod) + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { @@ -474,7 +497,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } durationMs := int(time.Since(startTime).Milliseconds()) if err != nil { - return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity) + return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity, ingestRowID) } defer func() { _ = resp.Body.Close() }() @@ -492,7 +515,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) if logActivity && deliveryConfirmed { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID) } // Preserve the actual HTTP status code and any body bytes already read. // Previously this returned (0, nil, error) which discarded both. @@ -536,7 +559,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } if logActivity { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID) } // Track LLM token usage for cost transparency (#593). diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 8145a66a1..d8dfdc5ed 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -27,10 +27,138 @@ type proxyDispatchBuildError struct{ err error } func (e *proxyDispatchBuildError) Error() string { return e.err.Error() } +// persistUserMessageAtIngest writes the canvas user's message into +// activity_logs SYNCHRONOUSLY, BEFORE the agent is dispatched, and +// returns the new row's id (empty string if the persist could not be +// done — caller falls back to the legacy post-round-trip INSERT). +// +// # Why this exists (the data-loss window it closes) +// +// Pre-fix, for a push-mode (HTTP-dispatched) workspace the user's +// message became durable ONLY inside logA2ASuccess / logA2AFailure, +// which run AFTER the full agent round-trip completes. The single +// chat-history source is `activity_logs` rows with +// activity_type='a2a_receive' AND source_id IS NULL +// (messagestore/postgres_store.go). So if the user typed a message and +// exited the chat (component unmount aborts the fetch / tab close / +// dropped connection) before the agent finished, NO a2a_receive row was +// ever written and the message was permanently lost on chat reopen. +// This is the inbound (user→agent) mirror of the reno-stars 2026-05-05 +// outbound data-loss incident (RFC #2945) — which only fixed the +// agent→user side (AgentMessageWriter). +// +// poll-mode workspaces were never affected: logA2AReceiveQueued already +// persists the request_body at ingest. This helper brings push-mode to +// parity: the user message is durable at submit time, independent of +// and prior to agent processing. +// +// # Contract +// +// - INSERT a row with request_body set, response_body NULL, +// status='pending'. The chat-history read path +// (activityRowToChatMessages) already renders a user bubble from a +// row whose response_body is empty, so a pending row shows the +// user's message immediately on reopen even before the agent replies. +// - Use context.WithoutCancel: a client disconnect on chat-exit MUST +// NOT abort this write — that is the entire point of the fix. +// - SYNCHRONOUS (blocking): the row must exist before dispatchA2A is +// allowed to start, otherwise the disconnect race reopens. +// - On any failure return "" and log; the caller proceeds with the +// legacy logA2ASuccess/logA2AFailure INSERT so behavior is never +// worse than pre-fix (best-effort, never blocks the user's send). +// +// The returned id is threaded into logA2ASuccess/logA2AFailure so they +// UPDATE this same row with the agent response instead of INSERTing a +// second row — preserving the one-row-carries-(user,agent) read +// contract that postgres_store.go depends on (no duplicate user bubble). +func (h *WorkspaceHandler) persistUserMessageAtIngest(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) string { + // context.WithoutCancel: the canvas exiting the chat aborts the + // inbound HTTP request, cancelling ctx. The whole reason this fix + // exists is so that cancellation does NOT lose the user's message — + // so the entire persist (name lookup + INSERT) must run on a + // non-cancellable derived context with its own short deadline. Doing + // the name lookup on the raw ctx would skip it on a cancelled client + // and degrade the summary; WithoutCancel keeps the summary accurate + // too. + insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second) + defer cancel() + + var wsName string + db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (received)" + + var rowID string + err := db.DB.QueryRowContext(insCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, status) + VALUES ($1, 'a2a_receive', $2, $3, $4, $5, $6::jsonb, 'pending') + RETURNING id + `, workspaceID, nilIfEmpty(callerID), &workspaceID, &a2aMethod, &summary, string(body)).Scan(&rowID) + if err != nil { + // Best-effort: never block or fail the user's send because the + // ingest persist hiccuped. Caller falls back to the legacy + // post-round-trip INSERT (behavior == pre-fix for this request). + log.Printf("persistUserMessageAtIngest: INSERT failed for %s: %v — falling back to post-round-trip persist", workspaceID, err) + return "" + } + return rowID +} + +// finalizeIngestRow UPDATEs the pre-persisted ingest row (created by +// persistUserMessageAtIngest) with the agent's response once the A2A +// round-trip completes. Returns true if the row was found+updated; false +// means the caller must fall through to the legacy INSERT (so a missing +// ingest row never silently drops the agent reply). +// +// Runs on a WithoutCancel context for the same reason as the ingest +// INSERT: the client may already be gone, but the agent reply still +// needs to land on the row that already holds the user's message. +func finalizeIngestRow(ctx context.Context, ingestRowID string, respBody json.RawMessage, toolTrace json.RawMessage, status string, durationMs int, errDetail *string) bool { + if ingestRowID == "" { + return false + } + upCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + + var respStr, traceStr *string + if len(respBody) > 0 { + s := string(respBody) + respStr = &s + } + if len(toolTrace) > 0 { + s := string(toolTrace) + traceStr = &s + } + res, err := db.DB.ExecContext(upCtx, ` + UPDATE activity_logs + SET response_body = $2::jsonb, + tool_trace = $3::jsonb, + status = $4, + duration_ms = $5, + error_detail = $6 + WHERE id = $1 AND status = 'pending' + `, ingestRowID, respStr, traceStr, status, durationMs, errDetail) + if err != nil { + log.Printf("finalizeIngestRow: UPDATE failed for row %s: %v", ingestRowID, err) + return false + } + n, _ := res.RowsAffected() + if n == 0 { + // Row not found or no longer pending (e.g. a concurrent finalize + // already ran). Don't double-INSERT — the user message + a + // response already landed on this row. Treat as handled. + log.Printf("finalizeIngestRow: row %s not pending (already finalized?) — skipping legacy INSERT", ingestRowID) + return true + } + return true +} + // handleA2ADispatchError translates a forward-call failure into a proxyA2AError, // runs the reactive container-health check, and (when `logActivity` is true) // schedules a detached LogActivity goroutine for the failed attempt. -func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) { +func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool, ingestRowID string) (int, []byte, *proxyA2AError) { // Build-time failure (couldn't even create the http.Request) — return // a 500 without the reactive-health / busy-retry paths. if buildErr, ok := err.(*proxyDispatchBuildError); ok { @@ -46,7 +174,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace containerDead := h.maybeMarkContainerDead(ctx, workspaceID) if logActivity { - h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) + h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, ingestRowID) } if containerDead { return 0, nil, &proxyA2AError{ @@ -254,8 +382,31 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa // logA2AFailure records a failed A2A attempt to activity_logs in a detached // goroutine (the request context may already be done by the time it runs). -func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) { +func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, ingestRowID string) { errMsg := err.Error() + // DATA-LOSS FIX interaction: when the user message was persisted at + // ingest (ingestRowID set), the pending a2a_receive row already holds + // it durably and chat-history renders it as the user's bubble on + // reopen — which is the entire point of the fix. Do NOT mutate that + // row here: + // - This path runs BEFORE the busy/enqueue branch in + // handleA2ADispatchError. A "busy" error is followed by a + // successful enqueue (HTTP 202) and a later drain that answers + // the user — finalizing the row as 'error' now would wrongly + // show the user a failed message that is actually queued. + // - For a genuinely terminal failure the user's message must STILL + // survive (that is the bug we are fixing); leaving the row + // 'pending' preserves it. The failure itself is observable via + // this log line + metrics, not by overwriting the preserved + // user message with an agent-error bubble. + // So: if an ingest row exists, the message is already safe — return + // without the legacy failure INSERT (which would also duplicate the + // user bubble). + if ingestRowID != "" { + log.Printf("logA2AFailure: %s message preserved at ingest (row %s, status=pending) — A2A attempt failed: %v", + workspaceID, ingestRowID, errMsg) + return + } var errWsName string db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName) if errWsName == "" { @@ -283,7 +434,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated // 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can // receive the reply without polling. -func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { +func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int, ingestRowID string) { logStatus := "ok" if statusCode >= 400 { logStatus = "error" @@ -309,23 +460,39 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - h.goAsync(func() { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) - defer cancel() - LogActivity(logCtx, h.broadcaster, ActivityParams{ - WorkspaceID: workspaceID, - ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), - TargetID: &workspaceID, - Method: &a2aMethod, - Summary: &summary, - RequestBody: json.RawMessage(body), - ResponseBody: json.RawMessage(respBody), - ToolTrace: toolTrace, - DurationMs: &durationMs, - Status: logStatus, + + // If the user message was persisted at ingest (canvas push-mode + // path), finalize THAT row with the agent response instead of + // INSERTing a second a2a_receive row — postgres_store.go reconstructs + // (user, agent) bubbles from a single row, so a second row would + // duplicate the user's message in chat-history. finalizeIngestRow + // runs synchronously (it's already on a WithoutCancel context); the + // broadcast still fires below regardless. Falls through to the legacy + // async INSERT when there is no ingest row (poll-mode, system + // callers, or an ingest-INSERT that failed best-effort). + finalized := false + if ingestRowID != "" { + finalized = finalizeIngestRow(ctx, ingestRowID, json.RawMessage(respBody), toolTrace, logStatus, durationMs, nil) + } + if !finalized { + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, + DurationMs: &durationMs, + Status: logStatus, + }) }) - }) + } if callerID == "" && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 3cf954624..4941eb816 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1736,7 +1736,7 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-dl", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected error, got nil") @@ -1757,7 +1757,7 @@ func TestHandleA2ADispatchError_BuildError(t *testing.T) { buildErr := &proxyDispatchBuildError{err: fmt.Errorf("bad url")} _, _, perr := handler.handleA2ADispatchError( - context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, + context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, "", ) if perr == nil || perr.Status != http.StatusInternalServerError { t.Errorf("expected 500, got %+v", perr) @@ -1771,7 +1771,7 @@ func TestHandleA2ADispatchError_GenericReturns502(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-x", "", []byte("{}"), "message/send", - fmt.Errorf("no such host"), 1, false, + fmt.Errorf("no such host"), 1, false, "", ) if perr == nil || perr.Status != http.StatusBadGateway { t.Errorf("expected 502, got %+v", perr) @@ -1969,7 +1969,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42) + handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42, "") time.Sleep(80 * time.Millisecond) } @@ -1986,7 +1986,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1) + handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1, "") time.Sleep(80 * time.Millisecond) } @@ -2002,7 +2002,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) + handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10, "") time.Sleep(80 * time.Millisecond) } @@ -2020,7 +2020,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 1)) // callerID != "" also means no A2A_RESPONSE broadcast. - handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10) + handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10, "") time.Sleep(80 * time.Millisecond) } diff --git a/workspace-server/internal/handlers/mock_runtime.go b/workspace-server/internal/handlers/mock_runtime.go index 9d4493d2c..bc3fac7af 100644 --- a/workspace-server/internal/handlers/mock_runtime.go +++ b/workspace-server/internal/handlers/mock_runtime.go @@ -204,7 +204,11 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle // is identical to a real agent reply. Status 200 + duration 0 // is the "synthesised reply" marker; activity_logs.duration_ms // being 0 is harmless (real fast paths can hit 0 too). - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0) + // ingestRowID == "": the mock short-circuit runs before the + // real-dispatch ingest-persist (it has no resolveAgentURL / no + // dispatchA2A), so logA2ASuccess takes the legacy INSERT path — + // identical to pre-fix behavior for mock workspaces. + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0, "") } return http.StatusOK, respBody, true } diff --git a/workspace-server/internal/handlers/native_session_test.go b/workspace-server/internal/handlers/native_session_test.go index 2f70d882a..60d3382c6 100644 --- a/workspace-server/internal/handlers/native_session_test.go +++ b/workspace-server/internal/handlers/native_session_test.go @@ -31,7 +31,7 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { // sqlmock's ExpectationsWereMet implicitly enforces that on teardown. _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-native", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected proxy error, got nil") @@ -74,7 +74,7 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-platform-queue", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected proxy error, got nil") -- 2.52.0 From eb62d2ff89bf403068860c71bd476b92123aef2e Mon Sep 17 00:00:00 2001 From: core-be Date: Sat, 16 May 2026 05:46:46 -0700 Subject: [PATCH 2/4] =?UTF-8?q?test(workspace-server):=20integration=20e2e?= =?UTF-8?q?=20=E2=80=94=20canvas=20client=20disconnect=20mid-flight=20pres?= =?UTF-8?q?erves=20user=20message?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drives the real ProxyA2A HTTP handler through the literal bug scenario: canvas message/send, mock agent hangs, client request context cancelled (user exits chat) before any reply. Asserts the ingest INSERT fires synchronously before dispatch (on context.WithoutCancel) so the user message is durable even though no logA2ASuccess/finalize ever runs — exactly the pre-fix loss window, now closed. Refs: molecule-ai/internal#470 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handlers/a2a_ingest_persist_test.go | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/workspace-server/internal/handlers/a2a_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_ingest_persist_test.go index 215b8917e..8c82255df 100644 --- a/workspace-server/internal/handlers/a2a_ingest_persist_test.go +++ b/workspace-server/internal/handlers/a2a_ingest_persist_test.go @@ -18,13 +18,18 @@ package handlers // INSERTing a duplicate / overwriting with an agent-error bubble. import ( + "bytes" "context" "errors" "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" ) // persistUserMessageAtIngest must INSERT a pending a2a_receive row @@ -216,3 +221,107 @@ func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) { t.Errorf("ingest INSERT did not match the required shape contract: %v", err) } } + +// ───────────────────────────────────────────────────────────────────────────── +// Integration e2e through the REAL ProxyA2A HTTP handler — the literal bug +// scenario: a canvas user sends a message, the agent is slow, and the client +// disconnects (exits the chat) BEFORE the agent replies. Pre-fix: no +// a2a_receive row was ever written → message lost on reopen. Post-fix: the +// ingest INSERT fires synchronously before dispatch, on context.WithoutCancel, +// so the user message is durable regardless of the client disconnect. +// ───────────────────────────────────────────────────────────────────────────── + +func TestProxyA2A_CanvasClientDisconnectMidFlight_UserMessageStillPersistedAtIngest(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // The mock agent hangs until the test releases it — modelling an agent + // still synthesising when the user exits the chat. The proxy forwards + // on context.WithoutCancel so this dispatch is in-flight while the + // inbound client request is already cancelled. + release := make(chan struct{}) + var agentHit int32 + agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&agentHit, 1) + <-release // hang until released (or test ends) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agent.Close() + defer close(release) + + const ws = "ws-disconnect" + mr.Set(fmt.Sprintf("ws:%s:url", ws), agent.URL) + + // SQL sequence for a canvas push-mode message/send up to dispatch: + // 1. budget check + // 2. resolveAgentURL DB fallback miss is avoided (Redis has the URL) + // 3. lookupDeliveryMode → NULL ⇒ push-mode (the loss case) + // 4. ingest: SELECT name, then INSERT ... RETURNING id ← THE FIX + // The agent never replies (hung) and the client ctx is cancelled, so + // NO logA2ASuccess/finalize runs — exactly the pre-fix loss window. + // The assertion: the ingest INSERT fired anyway (message durable). + expectBudgetCheck(mock, ws) + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id =`). + WithArgs(ws). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) // NULL ⇒ push + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs(ws). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Disconnect WS")) + ingestFired := make(chan struct{}, 1) + mock.ExpectQuery(`INSERT INTO activity_logs.*RETURNING id`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ingest-row-1")). + // signal the moment the durable write lands + WillDelayFor(0) + _ = ingestFired + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: ws}} + + // Cancel the inbound request context shortly after the call starts — + // this is the user exiting the chat / closing the tab mid-flight. + reqCtx, cancelClient := context.WithCancel(context.Background()) + body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"DO NOT LOSE THIS MESSAGE"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+ws+"/a2a", bytes.NewBufferString(body)).WithContext(reqCtx) + c.Request.Header.Set("Content-Type", "application/json") + + done := make(chan struct{}) + go func() { + handler.ProxyA2A(c) + close(done) + }() + + // Wait until the agent has been hit (dispatch in-flight), then the + // user "exits the chat": cancel the client request context. + deadline := time.After(5 * time.Second) + for atomic.LoadInt32(&agentHit) == 0 { + select { + case <-deadline: + t.Fatal("agent was never dispatched to within 5s") + default: + time.Sleep(10 * time.Millisecond) + } + } + cancelClient() // ← user exits the chat mid-flight + + // Give the handler a moment; it will be stuck on the hung agent, but + // the ingest INSERT must already have happened SYNCHRONOUSLY before + // dispatch. Verify the durable write landed despite the disconnect. + time.Sleep(150 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("ingest persist did not fire before dispatch — user message would be LOST on disconnect: %v", err) + } + + // Clean up: release the hung agent so the handler goroutine can exit. + // (release is closed by the deferred close; unblock now.) + select { + case release <- struct{}{}: + default: + } + <-done +} -- 2.52.0 From aa6a87e63373c5932c6912fe6f9018f06d3e2a06 Mon Sep 17 00:00:00 2001 From: infra-lead-agent Date: Sat, 16 May 2026 17:34:55 +0000 Subject: [PATCH 3/4] ci: re-fire pipeline after stale dispatch from disk-full SEV-1 --- .ci-trigger.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 .ci-trigger.txt diff --git a/.ci-trigger.txt b/.ci-trigger.txt new file mode 100644 index 000000000..f006a4cde --- /dev/null +++ b/.ci-trigger.txt @@ -0,0 +1 @@ +ci-refire 173455Z -- 2.52.0 From 608fc28d965a7a45f87327ef1ac1aaf7ecb3aed5 Mon Sep 17 00:00:00 2001 From: Molecule AI Dev Lead Date: Sat, 16 May 2026 18:26:36 +0000 Subject: [PATCH 4/4] ci: no-op re-trigger for qa-review re-evaluation [dev-lead] --- .ci-trigger-dev-lead.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 .ci-trigger-dev-lead.txt diff --git a/.ci-trigger-dev-lead.txt b/.ci-trigger-dev-lead.txt new file mode 100644 index 000000000..9340e4dd5 --- /dev/null +++ b/.ci-trigger-dev-lead.txt @@ -0,0 +1 @@ +2026-05-16T18:26:36Z no-op re-trigger for qa-review -- 2.52.0