diff --git a/.ci-trigger-dev-lead.txt b/.ci-trigger-dev-lead.txt new file mode 100644 index 000000000..9340e4dd5 --- /dev/null +++ b/.ci-trigger-dev-lead.txt @@ -0,0 +1 @@ +2026-05-16T18:26:36Z no-op re-trigger for qa-review diff --git a/.ci-trigger.txt b/.ci-trigger.txt new file mode 100644 index 000000000..f006a4cde --- /dev/null +++ b/.ci-trigger.txt @@ -0,0 +1 @@ +ci-refire 173455Z diff --git a/workspace-server/internal/handlers/a2a_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_ingest_persist_test.go new file mode 100644 index 000000000..8c82255df --- /dev/null +++ b/workspace-server/internal/handlers/a2a_ingest_persist_test.go @@ -0,0 +1,327 @@ +package handlers + +// Regression coverage for the canvas user-message data-loss fix. +// +// Bug (reported 2026-05-16): "in canvas i sometimes lose my own message +// when i exit chat". Root cause: for a push-mode workspace the user's +// message became durable ONLY inside logA2ASuccess / logA2AFailure, +// which run AFTER the full agent A2A round-trip. The single chat-history +// source is activity_logs rows (activity_type='a2a_receive', source_id +// IS NULL). A user who typed a message then exited the chat (fetch +// aborted on unmount / tab close / dropped connection) before the agent +// replied left NO a2a_receive row written → message gone on reopen. +// +// Fix: persistUserMessageAtIngest writes the user message into +// activity_logs SYNCHRONOUSLY, BEFORE dispatch, on a context.WithoutCancel +// derived context; logA2ASuccess finalizes that row with the reply; +// logA2AFailure leaves it 'pending' (message preserved) instead of +// INSERTing a duplicate / overwriting with an agent-error bubble. + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// persistUserMessageAtIngest must INSERT a pending a2a_receive row +// carrying the user's request_body and RETURN its id — and it must do so +// even when the passed-in ctx is ALREADY cancelled, because the whole +// point of the fix is that a client disconnect on chat-exit cannot lose +// the message. The persist runs on context.WithoutCancel internally. +func TestPersistUserMessageAtIngest_WritesPendingRowEvenWhenCtxCancelled(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // Both the workspace-name lookup AND the INSERT run on + // context.WithoutCancel inside the helper, so they reach the DB even + // though the caller's ctx is already cancelled — that is the fix. + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-ingest"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ingest WS")) + // The defining assertion: an INSERT ... RETURNING id is issued for + // the user message, with status 'pending' and the request body. + mock.ExpectQuery(`INSERT INTO activity_logs`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("row-abc-123")) + + // Cancel the context BEFORE calling — simulates the canvas aborting + // the inbound request because the user exited the chat. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"do not lose me"}]}}}`) + rowID := handler.persistUserMessageAtIngest(ctx, "ws-ingest", "", body, "message/send") + + if rowID != "row-abc-123" { + t.Fatalf("expected ingest row id 'row-abc-123' (message persisted despite cancelled ctx), got %q", rowID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// On INSERT failure the helper must return "" (so the caller falls back +// to the legacy post-round-trip INSERT) and must NOT panic — a DB hiccup +// at ingest can never make the user's send fail. +func TestPersistUserMessageAtIngest_FallsBackOnInsertError(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-x"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("")) + mock.ExpectQuery(`INSERT INTO activity_logs .* RETURNING id`). + WillReturnError(fmt.Errorf("test: activity_logs unavailable")) + + rowID := handler.persistUserMessageAtIngest(context.Background(), "ws-x", "", []byte(`{}`), "message/send") + if rowID != "" { + t.Fatalf("expected empty rowID on INSERT error (legacy-INSERT fallback), got %q", rowID) + } +} + +// finalizeIngestRow must UPDATE the pre-persisted row (matching on +// id + status='pending') with the agent response, and report handled. +func TestFinalizeIngestRow_UpdatesPendingRow(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`). + WithArgs("row-1", sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", 1234, nil). + WillReturnResult(sqlmock.NewResult(0, 1)) + + ok := finalizeIngestRow(context.Background(), "row-1", + []byte(`{"result":"hello back"}`), nil, "ok", 1234, nil) + if !ok { + t.Fatal("expected finalizeIngestRow to report the row finalized") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// finalizeIngestRow with an empty id is a no-op returning false — the +// caller must then take the legacy INSERT path (poll-mode / system +// callers / ingest-INSERT-failed). It must issue NO SQL. +func TestFinalizeIngestRow_EmptyIDIsNoOp(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + if finalizeIngestRow(context.Background(), "", []byte(`{}`), nil, "ok", 1, nil) { + t.Fatal("empty ingest id must return false (legacy INSERT fallback)") + } +} + +// When the row is gone / no longer pending (RowsAffected==0), the helper +// returns true so the caller does NOT double-INSERT a duplicate user +// bubble — the message + a reply already landed on that row. +func TestFinalizeIngestRow_AlreadyFinalizedSkipsDoubleInsert(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + mock.ExpectExec(`UPDATE activity_logs`). + WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows: not pending + + if !finalizeIngestRow(context.Background(), "row-stale", []byte(`{}`), nil, "ok", 1, nil) { + t.Fatal("a not-pending row must report handled=true to suppress the legacy duplicate INSERT") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// THE CORE REGRESSION: logA2AFailure with an ingest row present must +// NOT INSERT a (duplicate) activity_logs row and must NOT mutate the +// preserved 'pending' row to an error bubble. The user's message stays +// durable as a pending row — visible on chat reopen — which is exactly +// the data-loss the fix prevents. (Pre-fix this path INSERTed an +// error-status row AFTER the round-trip; if it never ran, the message +// was lost. Post-fix the message is already safe at ingest.) +func TestLogA2AFailure_WithIngestRow_PreservesMessageNoInsertNoMutation(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Intentionally set NO expectations: any INSERT or UPDATE here is a + // bug. sqlmock fails the test on an unexpected statement. + handler.logA2AFailure(context.Background(), "ws-fail", "", + []byte(`{"params":{"message":{"parts":[{"text":"keep me"}]}}}`), + "message/send", errors.New("agent unreachable"), 5, "row-pending-keep") + time.Sleep(60 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("logA2AFailure with an ingest row must issue no SQL (message already preserved): %v", err) + } +} + +// logA2ASuccess WITH an ingest row finalizes that row (UPDATE) instead +// of INSERTing a second a2a_receive row — preventing the duplicate user +// bubble that postgres_store.go's one-row-(user,agent) reconstruction +// would otherwise produce. +func TestLogA2ASuccess_WithIngestRow_FinalizesInsteadOfInsert(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // Workspace-name lookup still happens (synchronous). + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-ok"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("OK WS")) + // The finalize UPDATE — and crucially NO "INSERT INTO activity_logs". + mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.logA2ASuccess(context.Background(), "ws-ok", "", + []byte(`{}`), []byte(`{"result":"reply"}`), "message/send", 200, 10, "row-finalize-me") + time.Sleep(60 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations (expected finalize UPDATE, no INSERT): %v", err) + } +} + +// Guard the literal contract the read side depends on: the ingest INSERT +// must target activity_type='a2a_receive' with status 'pending' and a +// RETURNING id — that's what makes postgres_store.go render the user +// bubble immediately and what lets the finalize UPDATE find the row. +// sqlmock matches the issued query against this regex, so a green +// expectation IS the proof the SQL carries every required fragment. +func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-shape"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Shape")) + + // All four contract fragments in one regex — only matches if the + // real INSERT contains a2a_receive + 'pending' + request_body + + // RETURNING id (sqlmock does a regexp.MatchString of this against + // the actual query text). + mock.ExpectQuery(`(?s)INSERT INTO activity_logs.*request_body.*'a2a_receive'.*'pending'.*RETURNING id`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("rid")) + + rid := handler.persistUserMessageAtIngest(context.Background(), "ws-shape", "", + []byte(`{"params":{"message":{"parts":[{"text":"x"}]}}}`), "message/send") + if rid != "rid" { + t.Fatalf("expected rid from contract-shaped INSERT, got %q", rid) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("ingest INSERT did not match the required shape contract: %v", err) + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Integration e2e through the REAL ProxyA2A HTTP handler — the literal bug +// scenario: a canvas user sends a message, the agent is slow, and the client +// disconnects (exits the chat) BEFORE the agent replies. Pre-fix: no +// a2a_receive row was ever written → message lost on reopen. Post-fix: the +// ingest INSERT fires synchronously before dispatch, on context.WithoutCancel, +// so the user message is durable regardless of the client disconnect. +// ───────────────────────────────────────────────────────────────────────────── + +func TestProxyA2A_CanvasClientDisconnectMidFlight_UserMessageStillPersistedAtIngest(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + allowLoopbackForTest(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + // The mock agent hangs until the test releases it — modelling an agent + // still synthesising when the user exits the chat. The proxy forwards + // on context.WithoutCancel so this dispatch is in-flight while the + // inbound client request is already cancelled. + release := make(chan struct{}) + var agentHit int32 + agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&agentHit, 1) + <-release // hang until released (or test ends) + w.Header().Set("Content-Type", "application/json") + fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`) + })) + defer agent.Close() + defer close(release) + + const ws = "ws-disconnect" + mr.Set(fmt.Sprintf("ws:%s:url", ws), agent.URL) + + // SQL sequence for a canvas push-mode message/send up to dispatch: + // 1. budget check + // 2. resolveAgentURL DB fallback miss is avoided (Redis has the URL) + // 3. lookupDeliveryMode → NULL ⇒ push-mode (the loss case) + // 4. ingest: SELECT name, then INSERT ... RETURNING id ← THE FIX + // The agent never replies (hung) and the client ctx is cancelled, so + // NO logA2ASuccess/finalize runs — exactly the pre-fix loss window. + // The assertion: the ingest INSERT fired anyway (message durable). + expectBudgetCheck(mock, ws) + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id =`). + WithArgs(ws). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) // NULL ⇒ push + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs(ws). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Disconnect WS")) + ingestFired := make(chan struct{}, 1) + mock.ExpectQuery(`INSERT INTO activity_logs.*RETURNING id`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ingest-row-1")). + // signal the moment the durable write lands + WillDelayFor(0) + _ = ingestFired + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: ws}} + + // Cancel the inbound request context shortly after the call starts — + // this is the user exiting the chat / closing the tab mid-flight. + reqCtx, cancelClient := context.WithCancel(context.Background()) + body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"DO NOT LOSE THIS MESSAGE"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+ws+"/a2a", bytes.NewBufferString(body)).WithContext(reqCtx) + c.Request.Header.Set("Content-Type", "application/json") + + done := make(chan struct{}) + go func() { + handler.ProxyA2A(c) + close(done) + }() + + // Wait until the agent has been hit (dispatch in-flight), then the + // user "exits the chat": cancel the client request context. + deadline := time.After(5 * time.Second) + for atomic.LoadInt32(&agentHit) == 0 { + select { + case <-deadline: + t.Fatal("agent was never dispatched to within 5s") + default: + time.Sleep(10 * time.Millisecond) + } + } + cancelClient() // ← user exits the chat mid-flight + + // Give the handler a moment; it will be stuck on the hung agent, but + // the ingest INSERT must already have happened SYNCHRONOUSLY before + // dispatch. Verify the durable write landed despite the disconnect. + time.Sleep(150 * time.Millisecond) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("ingest persist did not fire before dispatch — user message would be LOST on disconnect: %v", err) + } + + // Clean up: release the hung agent so the handler goroutine can exit. + // (release is closed by the deferred close; unblock now.) + select { + case release <- struct{}{}: + default: + } + <-done +} diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 8fbef20c6..4379f2ab2 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -467,6 +467,29 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } } + // DATA-LOSS FIX: persist the canvas user's message into activity_logs + // SYNCHRONOUSLY, BEFORE dispatch. For a push-mode workspace the only + // pre-fix durable write of the user message was logA2ASuccess / + // logA2AFailure AFTER the agent round-trip — so a user who typed a + // message then exited the chat (fetch aborted on unmount / tab close / + // dropped connection) before the agent replied lost the message + // entirely on reopen, because chat-history reads only a2a_receive rows + // and none was ever written. Persisting here, with a WithoutCancel + // context inside persistUserMessageAtIngest, makes the message durable + // at submit time regardless of what the client does next. The returned + // row id is threaded into the success/failure log so they UPDATE this + // row with the agent reply rather than INSERT a duplicate. + // + // Scoped to: canvas-initiated (callerID == ""), message-bearing + // (message/send), logActivity on, and past the poll-mode short-circuit + // above (poll-mode already persists at ingest via logA2AReceiveQueued). + // Best-effort: a "" return falls back to the legacy post-round-trip + // INSERT, so this can never make a send fail. + var ingestRowID string + if logActivity && callerID == "" && a2aMethod == "message/send" { + ingestRowID = h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod) + } + startTime := time.Now() resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID) if cancelFwd != nil { @@ -474,7 +497,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } durationMs := int(time.Since(startTime).Milliseconds()) if err != nil { - return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity) + return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity, ingestRowID) } defer func() { _ = resp.Body.Close() }() @@ -492,7 +515,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) if logActivity && deliveryConfirmed { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID) } // Preserve the actual HTTP status code and any body bytes already read. // Previously this returned (0, nil, error) which discarded both. @@ -536,7 +559,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } if logActivity { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID) } // Track LLM token usage for cost transparency (#593). diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 8145a66a1..d8dfdc5ed 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -27,10 +27,138 @@ type proxyDispatchBuildError struct{ err error } func (e *proxyDispatchBuildError) Error() string { return e.err.Error() } +// persistUserMessageAtIngest writes the canvas user's message into +// activity_logs SYNCHRONOUSLY, BEFORE the agent is dispatched, and +// returns the new row's id (empty string if the persist could not be +// done — caller falls back to the legacy post-round-trip INSERT). +// +// # Why this exists (the data-loss window it closes) +// +// Pre-fix, for a push-mode (HTTP-dispatched) workspace the user's +// message became durable ONLY inside logA2ASuccess / logA2AFailure, +// which run AFTER the full agent round-trip completes. The single +// chat-history source is `activity_logs` rows with +// activity_type='a2a_receive' AND source_id IS NULL +// (messagestore/postgres_store.go). So if the user typed a message and +// exited the chat (component unmount aborts the fetch / tab close / +// dropped connection) before the agent finished, NO a2a_receive row was +// ever written and the message was permanently lost on chat reopen. +// This is the inbound (user→agent) mirror of the reno-stars 2026-05-05 +// outbound data-loss incident (RFC #2945) — which only fixed the +// agent→user side (AgentMessageWriter). +// +// poll-mode workspaces were never affected: logA2AReceiveQueued already +// persists the request_body at ingest. This helper brings push-mode to +// parity: the user message is durable at submit time, independent of +// and prior to agent processing. +// +// # Contract +// +// - INSERT a row with request_body set, response_body NULL, +// status='pending'. The chat-history read path +// (activityRowToChatMessages) already renders a user bubble from a +// row whose response_body is empty, so a pending row shows the +// user's message immediately on reopen even before the agent replies. +// - Use context.WithoutCancel: a client disconnect on chat-exit MUST +// NOT abort this write — that is the entire point of the fix. +// - SYNCHRONOUS (blocking): the row must exist before dispatchA2A is +// allowed to start, otherwise the disconnect race reopens. +// - On any failure return "" and log; the caller proceeds with the +// legacy logA2ASuccess/logA2AFailure INSERT so behavior is never +// worse than pre-fix (best-effort, never blocks the user's send). +// +// The returned id is threaded into logA2ASuccess/logA2AFailure so they +// UPDATE this same row with the agent response instead of INSERTing a +// second row — preserving the one-row-carries-(user,agent) read +// contract that postgres_store.go depends on (no duplicate user bubble). +func (h *WorkspaceHandler) persistUserMessageAtIngest(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) string { + // context.WithoutCancel: the canvas exiting the chat aborts the + // inbound HTTP request, cancelling ctx. The whole reason this fix + // exists is so that cancellation does NOT lose the user's message — + // so the entire persist (name lookup + INSERT) must run on a + // non-cancellable derived context with its own short deadline. Doing + // the name lookup on the raw ctx would skip it on a cancelled client + // and degrade the summary; WithoutCancel keeps the summary accurate + // too. + insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second) + defer cancel() + + var wsName string + db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (received)" + + var rowID string + err := db.DB.QueryRowContext(insCtx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, status) + VALUES ($1, 'a2a_receive', $2, $3, $4, $5, $6::jsonb, 'pending') + RETURNING id + `, workspaceID, nilIfEmpty(callerID), &workspaceID, &a2aMethod, &summary, string(body)).Scan(&rowID) + if err != nil { + // Best-effort: never block or fail the user's send because the + // ingest persist hiccuped. Caller falls back to the legacy + // post-round-trip INSERT (behavior == pre-fix for this request). + log.Printf("persistUserMessageAtIngest: INSERT failed for %s: %v — falling back to post-round-trip persist", workspaceID, err) + return "" + } + return rowID +} + +// finalizeIngestRow UPDATEs the pre-persisted ingest row (created by +// persistUserMessageAtIngest) with the agent's response once the A2A +// round-trip completes. Returns true if the row was found+updated; false +// means the caller must fall through to the legacy INSERT (so a missing +// ingest row never silently drops the agent reply). +// +// Runs on a WithoutCancel context for the same reason as the ingest +// INSERT: the client may already be gone, but the agent reply still +// needs to land on the row that already holds the user's message. +func finalizeIngestRow(ctx context.Context, ingestRowID string, respBody json.RawMessage, toolTrace json.RawMessage, status string, durationMs int, errDetail *string) bool { + if ingestRowID == "" { + return false + } + upCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + + var respStr, traceStr *string + if len(respBody) > 0 { + s := string(respBody) + respStr = &s + } + if len(toolTrace) > 0 { + s := string(toolTrace) + traceStr = &s + } + res, err := db.DB.ExecContext(upCtx, ` + UPDATE activity_logs + SET response_body = $2::jsonb, + tool_trace = $3::jsonb, + status = $4, + duration_ms = $5, + error_detail = $6 + WHERE id = $1 AND status = 'pending' + `, ingestRowID, respStr, traceStr, status, durationMs, errDetail) + if err != nil { + log.Printf("finalizeIngestRow: UPDATE failed for row %s: %v", ingestRowID, err) + return false + } + n, _ := res.RowsAffected() + if n == 0 { + // Row not found or no longer pending (e.g. a concurrent finalize + // already ran). Don't double-INSERT — the user message + a + // response already landed on this row. Treat as handled. + log.Printf("finalizeIngestRow: row %s not pending (already finalized?) — skipping legacy INSERT", ingestRowID) + return true + } + return true +} + // handleA2ADispatchError translates a forward-call failure into a proxyA2AError, // runs the reactive container-health check, and (when `logActivity` is true) // schedules a detached LogActivity goroutine for the failed attempt. -func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) { +func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool, ingestRowID string) (int, []byte, *proxyA2AError) { // Build-time failure (couldn't even create the http.Request) — return // a 500 without the reactive-health / busy-retry paths. if buildErr, ok := err.(*proxyDispatchBuildError); ok { @@ -46,7 +174,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace containerDead := h.maybeMarkContainerDead(ctx, workspaceID) if logActivity { - h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) + h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, ingestRowID) } if containerDead { return 0, nil, &proxyA2AError{ @@ -254,8 +382,31 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa // logA2AFailure records a failed A2A attempt to activity_logs in a detached // goroutine (the request context may already be done by the time it runs). -func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) { +func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, ingestRowID string) { errMsg := err.Error() + // DATA-LOSS FIX interaction: when the user message was persisted at + // ingest (ingestRowID set), the pending a2a_receive row already holds + // it durably and chat-history renders it as the user's bubble on + // reopen — which is the entire point of the fix. Do NOT mutate that + // row here: + // - This path runs BEFORE the busy/enqueue branch in + // handleA2ADispatchError. A "busy" error is followed by a + // successful enqueue (HTTP 202) and a later drain that answers + // the user — finalizing the row as 'error' now would wrongly + // show the user a failed message that is actually queued. + // - For a genuinely terminal failure the user's message must STILL + // survive (that is the bug we are fixing); leaving the row + // 'pending' preserves it. The failure itself is observable via + // this log line + metrics, not by overwriting the preserved + // user message with an agent-error bubble. + // So: if an ingest row exists, the message is already safe — return + // without the legacy failure INSERT (which would also duplicate the + // user bubble). + if ingestRowID != "" { + log.Printf("logA2AFailure: %s message preserved at ingest (row %s, status=pending) — A2A attempt failed: %v", + workspaceID, ingestRowID, errMsg) + return + } var errWsName string db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName) if errWsName == "" { @@ -283,7 +434,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated // 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can // receive the reply without polling. -func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) { +func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int, ingestRowID string) { logStatus := "ok" if statusCode >= 400 { logStatus = "error" @@ -309,23 +460,39 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - h.goAsync(func() { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) - defer cancel() - LogActivity(logCtx, h.broadcaster, ActivityParams{ - WorkspaceID: workspaceID, - ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), - TargetID: &workspaceID, - Method: &a2aMethod, - Summary: &summary, - RequestBody: json.RawMessage(body), - ResponseBody: json.RawMessage(respBody), - ToolTrace: toolTrace, - DurationMs: &durationMs, - Status: logStatus, + + // If the user message was persisted at ingest (canvas push-mode + // path), finalize THAT row with the agent response instead of + // INSERTing a second a2a_receive row — postgres_store.go reconstructs + // (user, agent) bubbles from a single row, so a second row would + // duplicate the user's message in chat-history. finalizeIngestRow + // runs synchronously (it's already on a WithoutCancel context); the + // broadcast still fires below regardless. Falls through to the legacy + // async INSERT when there is no ingest row (poll-mode, system + // callers, or an ingest-INSERT that failed best-effort). + finalized := false + if ingestRowID != "" { + finalized = finalizeIngestRow(ctx, ingestRowID, json.RawMessage(respBody), toolTrace, logStatus, durationMs, nil) + } + if !finalized { + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, + DurationMs: &durationMs, + Status: logStatus, + }) }) - }) + } if callerID == "" && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 3cf954624..4941eb816 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1736,7 +1736,7 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-dl", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected error, got nil") @@ -1757,7 +1757,7 @@ func TestHandleA2ADispatchError_BuildError(t *testing.T) { buildErr := &proxyDispatchBuildError{err: fmt.Errorf("bad url")} _, _, perr := handler.handleA2ADispatchError( - context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, + context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, "", ) if perr == nil || perr.Status != http.StatusInternalServerError { t.Errorf("expected 500, got %+v", perr) @@ -1771,7 +1771,7 @@ func TestHandleA2ADispatchError_GenericReturns502(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-x", "", []byte("{}"), "message/send", - fmt.Errorf("no such host"), 1, false, + fmt.Errorf("no such host"), 1, false, "", ) if perr == nil || perr.Status != http.StatusBadGateway { t.Errorf("expected 502, got %+v", perr) @@ -1969,7 +1969,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42) + handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42, "") time.Sleep(80 * time.Millisecond) } @@ -1986,7 +1986,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1) + handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1, "") time.Sleep(80 * time.Millisecond) } @@ -2002,7 +2002,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) { mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) - handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10) + handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10, "") time.Sleep(80 * time.Millisecond) } @@ -2020,7 +2020,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { WillReturnResult(sqlmock.NewResult(0, 1)) // callerID != "" also means no A2A_RESPONSE broadcast. - handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10) + handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10, "") time.Sleep(80 * time.Millisecond) } diff --git a/workspace-server/internal/handlers/mock_runtime.go b/workspace-server/internal/handlers/mock_runtime.go index 9d4493d2c..bc3fac7af 100644 --- a/workspace-server/internal/handlers/mock_runtime.go +++ b/workspace-server/internal/handlers/mock_runtime.go @@ -204,7 +204,11 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle // is identical to a real agent reply. Status 200 + duration 0 // is the "synthesised reply" marker; activity_logs.duration_ms // being 0 is harmless (real fast paths can hit 0 too). - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0) + // ingestRowID == "": the mock short-circuit runs before the + // real-dispatch ingest-persist (it has no resolveAgentURL / no + // dispatchA2A), so logA2ASuccess takes the legacy INSERT path — + // identical to pre-fix behavior for mock workspaces. + h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0, "") } return http.StatusOK, respBody, true } diff --git a/workspace-server/internal/handlers/native_session_test.go b/workspace-server/internal/handlers/native_session_test.go index 2f70d882a..60d3382c6 100644 --- a/workspace-server/internal/handlers/native_session_test.go +++ b/workspace-server/internal/handlers/native_session_test.go @@ -31,7 +31,7 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) { // sqlmock's ExpectationsWereMet implicitly enforces that on teardown. _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-native", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected proxy error, got nil") @@ -74,7 +74,7 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) { _, _, perr := handler.handleA2ADispatchError( context.Background(), "ws-platform-queue", "", []byte("{}"), "message/send", - context.DeadlineExceeded, 1, false, + context.DeadlineExceeded, 1, false, "", ) if perr == nil { t.Fatal("expected proxy error, got nil")