fix(workspace-server): persist canvas user message at ingest (internal#470) #1347

Open
core-be wants to merge 4 commits from fix/canvas-user-message-persist-at-ingest into main
8 changed files with 556 additions and 33 deletions

1
.ci-trigger-dev-lead.txt Normal file
View File

@ -0,0 +1 @@
2026-05-16T18:26:36Z no-op re-trigger for qa-review

1
.ci-trigger.txt Normal file
View File

@ -0,0 +1 @@
ci-refire 173455Z

View File

@ -0,0 +1,327 @@
package handlers
// Regression coverage for the canvas user-message data-loss fix.
//
// Bug (reported 2026-05-16): "in canvas i sometimes lose my own message
// when i exit chat". Root cause: for a push-mode workspace the user's
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
// which run AFTER the full agent A2A round-trip. The single chat-history
// source is activity_logs rows (activity_type='a2a_receive', source_id
// IS NULL). A user who typed a message then exited the chat (fetch
// aborted on unmount / tab close / dropped connection) before the agent
// replied left NO a2a_receive row written → message gone on reopen.
//
// Fix: persistUserMessageAtIngest writes the user message into
// activity_logs SYNCHRONOUSLY, BEFORE dispatch, on a context.WithoutCancel
// derived context; logA2ASuccess finalizes that row with the reply;
// logA2AFailure leaves it 'pending' (message preserved) instead of
// INSERTing a duplicate / overwriting with an agent-error bubble.
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// persistUserMessageAtIngest must INSERT a pending a2a_receive row
// carrying the user's request_body and RETURN its id — and it must do so
// even when the passed-in ctx is ALREADY cancelled, because the whole
// point of the fix is that a client disconnect on chat-exit cannot lose
// the message. The persist runs on context.WithoutCancel internally.
func TestPersistUserMessageAtIngest_WritesPendingRowEvenWhenCtxCancelled(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// Both the workspace-name lookup AND the INSERT run on
// context.WithoutCancel inside the helper, so they reach the DB even
// though the caller's ctx is already cancelled — that is the fix.
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-ingest").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ingest WS"))
// The defining assertion: an INSERT ... RETURNING id is issued for
// the user message, with status 'pending' and the request body.
mock.ExpectQuery(`INSERT INTO activity_logs`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("row-abc-123"))
// Cancel the context BEFORE calling — simulates the canvas aborting
// the inbound request because the user exited the chat.
ctx, cancel := context.WithCancel(context.Background())
cancel()
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"do not lose me"}]}}}`)
rowID := handler.persistUserMessageAtIngest(ctx, "ws-ingest", "", body, "message/send")
if rowID != "row-abc-123" {
t.Fatalf("expected ingest row id 'row-abc-123' (message persisted despite cancelled ctx), got %q", rowID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// On INSERT failure the helper must return "" (so the caller falls back
// to the legacy post-round-trip INSERT) and must NOT panic — a DB hiccup
// at ingest can never make the user's send fail.
func TestPersistUserMessageAtIngest_FallsBackOnInsertError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-x").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow(""))
mock.ExpectQuery(`INSERT INTO activity_logs .* RETURNING id`).
WillReturnError(fmt.Errorf("test: activity_logs unavailable"))
rowID := handler.persistUserMessageAtIngest(context.Background(), "ws-x", "", []byte(`{}`), "message/send")
if rowID != "" {
t.Fatalf("expected empty rowID on INSERT error (legacy-INSERT fallback), got %q", rowID)
}
}
// finalizeIngestRow must UPDATE the pre-persisted row (matching on
// id + status='pending') with the agent response, and report handled.
func TestFinalizeIngestRow_UpdatesPendingRow(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
WithArgs("row-1", sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", 1234, nil).
WillReturnResult(sqlmock.NewResult(0, 1))
ok := finalizeIngestRow(context.Background(), "row-1",
[]byte(`{"result":"hello back"}`), nil, "ok", 1234, nil)
if !ok {
t.Fatal("expected finalizeIngestRow to report the row finalized")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// finalizeIngestRow with an empty id is a no-op returning false — the
// caller must then take the legacy INSERT path (poll-mode / system
// callers / ingest-INSERT-failed). It must issue NO SQL.
func TestFinalizeIngestRow_EmptyIDIsNoOp(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
if finalizeIngestRow(context.Background(), "", []byte(`{}`), nil, "ok", 1, nil) {
t.Fatal("empty ingest id must return false (legacy INSERT fallback)")
}
}
// When the row is gone / no longer pending (RowsAffected==0), the helper
// returns true so the caller does NOT double-INSERT a duplicate user
// bubble — the message + a reply already landed on that row.
func TestFinalizeIngestRow_AlreadyFinalizedSkipsDoubleInsert(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectExec(`UPDATE activity_logs`).
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows: not pending
if !finalizeIngestRow(context.Background(), "row-stale", []byte(`{}`), nil, "ok", 1, nil) {
t.Fatal("a not-pending row must report handled=true to suppress the legacy duplicate INSERT")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// THE CORE REGRESSION: logA2AFailure with an ingest row present must
// NOT INSERT a (duplicate) activity_logs row and must NOT mutate the
// preserved 'pending' row to an error bubble. The user's message stays
// durable as a pending row — visible on chat reopen — which is exactly
// the data-loss the fix prevents. (Pre-fix this path INSERTed an
// error-status row AFTER the round-trip; if it never ran, the message
// was lost. Post-fix the message is already safe at ingest.)
func TestLogA2AFailure_WithIngestRow_PreservesMessageNoInsertNoMutation(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Intentionally set NO expectations: any INSERT or UPDATE here is a
// bug. sqlmock fails the test on an unexpected statement.
handler.logA2AFailure(context.Background(), "ws-fail", "",
[]byte(`{"params":{"message":{"parts":[{"text":"keep me"}]}}}`),
"message/send", errors.New("agent unreachable"), 5, "row-pending-keep")
time.Sleep(60 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("logA2AFailure with an ingest row must issue no SQL (message already preserved): %v", err)
}
}
// logA2ASuccess WITH an ingest row finalizes that row (UPDATE) instead
// of INSERTing a second a2a_receive row — preventing the duplicate user
// bubble that postgres_store.go's one-row-(user,agent) reconstruction
// would otherwise produce.
func TestLogA2ASuccess_WithIngestRow_FinalizesInsteadOfInsert(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Workspace-name lookup still happens (synchronous).
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-ok").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("OK WS"))
// The finalize UPDATE — and crucially NO "INSERT INTO activity_logs".
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2ASuccess(context.Background(), "ws-ok", "",
[]byte(`{}`), []byte(`{"result":"reply"}`), "message/send", 200, 10, "row-finalize-me")
time.Sleep(60 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations (expected finalize UPDATE, no INSERT): %v", err)
}
}
// Guard the literal contract the read side depends on: the ingest INSERT
// must target activity_type='a2a_receive' with status 'pending' and a
// RETURNING id — that's what makes postgres_store.go render the user
// bubble immediately and what lets the finalize UPDATE find the row.
// sqlmock matches the issued query against this regex, so a green
// expectation IS the proof the SQL carries every required fragment.
func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-shape").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Shape"))
// All four contract fragments in one regex — only matches if the
// real INSERT contains a2a_receive + 'pending' + request_body +
// RETURNING id (sqlmock does a regexp.MatchString of this against
// the actual query text).
mock.ExpectQuery(`(?s)INSERT INTO activity_logs.*request_body.*'a2a_receive'.*'pending'.*RETURNING id`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("rid"))
rid := handler.persistUserMessageAtIngest(context.Background(), "ws-shape", "",
[]byte(`{"params":{"message":{"parts":[{"text":"x"}]}}}`), "message/send")
if rid != "rid" {
t.Fatalf("expected rid from contract-shaped INSERT, got %q", rid)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("ingest INSERT did not match the required shape contract: %v", err)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Integration e2e through the REAL ProxyA2A HTTP handler — the literal bug
// scenario: a canvas user sends a message, the agent is slow, and the client
// disconnects (exits the chat) BEFORE the agent replies. Pre-fix: no
// a2a_receive row was ever written → message lost on reopen. Post-fix: the
// ingest INSERT fires synchronously before dispatch, on context.WithoutCancel,
// so the user message is durable regardless of the client disconnect.
// ─────────────────────────────────────────────────────────────────────────────
func TestProxyA2A_CanvasClientDisconnectMidFlight_UserMessageStillPersistedAtIngest(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// The mock agent hangs until the test releases it — modelling an agent
// still synthesising when the user exits the chat. The proxy forwards
// on context.WithoutCancel so this dispatch is in-flight while the
// inbound client request is already cancelled.
release := make(chan struct{})
var agentHit int32
agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&agentHit, 1)
<-release // hang until released (or test ends)
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
}))
defer agent.Close()
defer close(release)
const ws = "ws-disconnect"
mr.Set(fmt.Sprintf("ws:%s:url", ws), agent.URL)
// SQL sequence for a canvas push-mode message/send up to dispatch:
// 1. budget check
// 2. resolveAgentURL DB fallback miss is avoided (Redis has the URL)
// 3. lookupDeliveryMode → NULL ⇒ push-mode (the loss case)
// 4. ingest: SELECT name, then INSERT ... RETURNING id ← THE FIX
// The agent never replies (hung) and the client ctx is cancelled, so
// NO logA2ASuccess/finalize runs — exactly the pre-fix loss window.
// The assertion: the ingest INSERT fired anyway (message durable).
expectBudgetCheck(mock, ws)
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id =`).
WithArgs(ws).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) // NULL ⇒ push
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs(ws).
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Disconnect WS"))
ingestFired := make(chan struct{}, 1)
mock.ExpectQuery(`INSERT INTO activity_logs.*RETURNING id`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ingest-row-1")).
// signal the moment the durable write lands
WillDelayFor(0)
_ = ingestFired
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ws}}
// Cancel the inbound request context shortly after the call starts —
// this is the user exiting the chat / closing the tab mid-flight.
reqCtx, cancelClient := context.WithCancel(context.Background())
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"DO NOT LOSE THIS MESSAGE"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+ws+"/a2a", bytes.NewBufferString(body)).WithContext(reqCtx)
c.Request.Header.Set("Content-Type", "application/json")
done := make(chan struct{})
go func() {
handler.ProxyA2A(c)
close(done)
}()
// Wait until the agent has been hit (dispatch in-flight), then the
// user "exits the chat": cancel the client request context.
deadline := time.After(5 * time.Second)
for atomic.LoadInt32(&agentHit) == 0 {
select {
case <-deadline:
t.Fatal("agent was never dispatched to within 5s")
default:
time.Sleep(10 * time.Millisecond)
}
}
cancelClient() // ← user exits the chat mid-flight
// Give the handler a moment; it will be stuck on the hung agent, but
// the ingest INSERT must already have happened SYNCHRONOUSLY before
// dispatch. Verify the durable write landed despite the disconnect.
time.Sleep(150 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("ingest persist did not fire before dispatch — user message would be LOST on disconnect: %v", err)
}
// Clean up: release the hung agent so the handler goroutine can exit.
// (release is closed by the deferred close; unblock now.)
select {
case release <- struct{}{}:
default:
}
<-done
}

View File

@ -467,6 +467,29 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
}
// DATA-LOSS FIX: persist the canvas user's message into activity_logs
// SYNCHRONOUSLY, BEFORE dispatch. For a push-mode workspace the only
// pre-fix durable write of the user message was logA2ASuccess /
// logA2AFailure AFTER the agent round-trip — so a user who typed a
// message then exited the chat (fetch aborted on unmount / tab close /
// dropped connection) before the agent replied lost the message
// entirely on reopen, because chat-history reads only a2a_receive rows
// and none was ever written. Persisting here, with a WithoutCancel
// context inside persistUserMessageAtIngest, makes the message durable
// at submit time regardless of what the client does next. The returned
// row id is threaded into the success/failure log so they UPDATE this
// row with the agent reply rather than INSERT a duplicate.
//
// Scoped to: canvas-initiated (callerID == ""), message-bearing
// (message/send), logActivity on, and past the poll-mode short-circuit
// above (poll-mode already persists at ingest via logA2AReceiveQueued).
// Best-effort: a "" return falls back to the legacy post-round-trip
// INSERT, so this can never make a send fail.
var ingestRowID string
if logActivity && callerID == "" && a2aMethod == "message/send" {
ingestRowID = h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod)
}
startTime := time.Now()
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
if cancelFwd != nil {
@ -474,7 +497,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
durationMs := int(time.Since(startTime).Milliseconds())
if err != nil {
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity, ingestRowID)
}
defer func() { _ = resp.Body.Close() }()
@ -492,7 +515,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
if logActivity && deliveryConfirmed {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
}
// Preserve the actual HTTP status code and any body bytes already read.
// Previously this returned (0, nil, error) which discarded both.
@ -536,7 +559,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
if logActivity {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
}
// Track LLM token usage for cost transparency (#593).

View File

@ -27,10 +27,138 @@ type proxyDispatchBuildError struct{ err error }
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
// persistUserMessageAtIngest writes the canvas user's message into
// activity_logs SYNCHRONOUSLY, BEFORE the agent is dispatched, and
// returns the new row's id (empty string if the persist could not be
// done — caller falls back to the legacy post-round-trip INSERT).
//
// # Why this exists (the data-loss window it closes)
//
// Pre-fix, for a push-mode (HTTP-dispatched) workspace the user's
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
// which run AFTER the full agent round-trip completes. The single
// chat-history source is `activity_logs` rows with
// activity_type='a2a_receive' AND source_id IS NULL
// (messagestore/postgres_store.go). So if the user typed a message and
// exited the chat (component unmount aborts the fetch / tab close /
// dropped connection) before the agent finished, NO a2a_receive row was
// ever written and the message was permanently lost on chat reopen.
// This is the inbound (user→agent) mirror of the reno-stars 2026-05-05
// outbound data-loss incident (RFC #2945) — which only fixed the
// agent→user side (AgentMessageWriter).
//
// poll-mode workspaces were never affected: logA2AReceiveQueued already
// persists the request_body at ingest. This helper brings push-mode to
// parity: the user message is durable at submit time, independent of
// and prior to agent processing.
//
// # Contract
//
// - INSERT a row with request_body set, response_body NULL,
// status='pending'. The chat-history read path
// (activityRowToChatMessages) already renders a user bubble from a
// row whose response_body is empty, so a pending row shows the
// user's message immediately on reopen even before the agent replies.
// - Use context.WithoutCancel: a client disconnect on chat-exit MUST
// NOT abort this write — that is the entire point of the fix.
// - SYNCHRONOUS (blocking): the row must exist before dispatchA2A is
// allowed to start, otherwise the disconnect race reopens.
// - On any failure return "" and log; the caller proceeds with the
// legacy logA2ASuccess/logA2AFailure INSERT so behavior is never
// worse than pre-fix (best-effort, never blocks the user's send).
//
// The returned id is threaded into logA2ASuccess/logA2AFailure so they
// UPDATE this same row with the agent response instead of INSERTing a
// second row — preserving the one-row-carries-(user,agent) read
// contract that postgres_store.go depends on (no duplicate user bubble).
func (h *WorkspaceHandler) persistUserMessageAtIngest(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) string {
// context.WithoutCancel: the canvas exiting the chat aborts the
// inbound HTTP request, cancelling ctx. The whole reason this fix
// exists is so that cancellation does NOT lose the user's message —
// so the entire persist (name lookup + INSERT) must run on a
// non-cancellable derived context with its own short deadline. Doing
// the name lookup on the raw ctx would skip it on a cancelled client
// and degrade the summary; WithoutCancel keeps the summary accurate
// too.
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second)
defer cancel()
var wsName string
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName == "" {
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (received)"
var rowID string
err := db.DB.QueryRowContext(insCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, status)
VALUES ($1, 'a2a_receive', $2, $3, $4, $5, $6::jsonb, 'pending')
RETURNING id
`, workspaceID, nilIfEmpty(callerID), &workspaceID, &a2aMethod, &summary, string(body)).Scan(&rowID)
if err != nil {
// Best-effort: never block or fail the user's send because the
// ingest persist hiccuped. Caller falls back to the legacy
// post-round-trip INSERT (behavior == pre-fix for this request).
log.Printf("persistUserMessageAtIngest: INSERT failed for %s: %v — falling back to post-round-trip persist", workspaceID, err)
return ""
}
return rowID
}
// finalizeIngestRow UPDATEs the pre-persisted ingest row (created by
// persistUserMessageAtIngest) with the agent's response once the A2A
// round-trip completes. Returns true if the row was found+updated; false
// means the caller must fall through to the legacy INSERT (so a missing
// ingest row never silently drops the agent reply).
//
// Runs on a WithoutCancel context for the same reason as the ingest
// INSERT: the client may already be gone, but the agent reply still
// needs to land on the row that already holds the user's message.
func finalizeIngestRow(ctx context.Context, ingestRowID string, respBody json.RawMessage, toolTrace json.RawMessage, status string, durationMs int, errDetail *string) bool {
if ingestRowID == "" {
return false
}
upCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
var respStr, traceStr *string
if len(respBody) > 0 {
s := string(respBody)
respStr = &s
}
if len(toolTrace) > 0 {
s := string(toolTrace)
traceStr = &s
}
res, err := db.DB.ExecContext(upCtx, `
UPDATE activity_logs
SET response_body = $2::jsonb,
tool_trace = $3::jsonb,
status = $4,
duration_ms = $5,
error_detail = $6
WHERE id = $1 AND status = 'pending'
`, ingestRowID, respStr, traceStr, status, durationMs, errDetail)
if err != nil {
log.Printf("finalizeIngestRow: UPDATE failed for row %s: %v", ingestRowID, err)
return false
}
n, _ := res.RowsAffected()
if n == 0 {
// Row not found or no longer pending (e.g. a concurrent finalize
// already ran). Don't double-INSERT — the user message + a
// response already landed on this row. Treat as handled.
log.Printf("finalizeIngestRow: row %s not pending (already finalized?) — skipping legacy INSERT", ingestRowID)
return true
}
return true
}
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
// runs the reactive container-health check, and (when `logActivity` is true)
// schedules a detached LogActivity goroutine for the failed attempt.
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool, ingestRowID string) (int, []byte, *proxyA2AError) {
// Build-time failure (couldn't even create the http.Request) — return
// a 500 without the reactive-health / busy-retry paths.
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
@ -46,7 +174,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
if logActivity {
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, ingestRowID)
}
if containerDead {
return 0, nil, &proxyA2AError{
@ -254,8 +382,31 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
// goroutine (the request context may already be done by the time it runs).
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, ingestRowID string) {
errMsg := err.Error()
// DATA-LOSS FIX interaction: when the user message was persisted at
// ingest (ingestRowID set), the pending a2a_receive row already holds
// it durably and chat-history renders it as the user's bubble on
// reopen — which is the entire point of the fix. Do NOT mutate that
// row here:
// - This path runs BEFORE the busy/enqueue branch in
// handleA2ADispatchError. A "busy" error is followed by a
// successful enqueue (HTTP 202) and a later drain that answers
// the user — finalizing the row as 'error' now would wrongly
// show the user a failed message that is actually queued.
// - For a genuinely terminal failure the user's message must STILL
// survive (that is the bug we are fixing); leaving the row
// 'pending' preserves it. The failure itself is observable via
// this log line + metrics, not by overwriting the preserved
// user message with an agent-error bubble.
// So: if an ingest row exists, the message is already safe — return
// without the legacy failure INSERT (which would also duplicate the
// user bubble).
if ingestRowID != "" {
log.Printf("logA2AFailure: %s message preserved at ingest (row %s, status=pending) — A2A attempt failed: %v",
workspaceID, ingestRowID, errMsg)
return
}
var errWsName string
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
if errWsName == "" {
@ -283,7 +434,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
// receive the reply without polling.
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int, ingestRowID string) {
logStatus := "ok"
if statusCode >= 400 {
logStatus = "error"
@ -309,23 +460,39 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
}
summary := a2aMethod + " → " + wsNameForLog
toolTrace := extractToolTrace(respBody)
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
// If the user message was persisted at ingest (canvas push-mode
// path), finalize THAT row with the agent response instead of
// INSERTing a second a2a_receive row — postgres_store.go reconstructs
// (user, agent) bubbles from a single row, so a second row would
// duplicate the user's message in chat-history. finalizeIngestRow
// runs synchronously (it's already on a WithoutCancel context); the
// broadcast still fires below regardless. Falls through to the legacy
// async INSERT when there is no ingest row (poll-mode, system
// callers, or an ingest-INSERT that failed best-effort).
finalized := false
if ingestRowID != "" {
finalized = finalizeIngestRow(ctx, ingestRowID, json.RawMessage(respBody), toolTrace, logStatus, durationMs, nil)
}
if !finalized {
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
ResponseBody: json.RawMessage(respBody),
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
})
})
})
}
if callerID == "" && statusCode < 400 {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{

View File

@ -1736,7 +1736,7 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-dl", "", []byte("{}"), "message/send",
context.DeadlineExceeded, 1, false,
context.DeadlineExceeded, 1, false, "",
)
if perr == nil {
t.Fatal("expected error, got nil")
@ -1757,7 +1757,7 @@ func TestHandleA2ADispatchError_BuildError(t *testing.T) {
buildErr := &proxyDispatchBuildError{err: fmt.Errorf("bad url")}
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false,
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, "",
)
if perr == nil || perr.Status != http.StatusInternalServerError {
t.Errorf("expected 500, got %+v", perr)
@ -1771,7 +1771,7 @@ func TestHandleA2ADispatchError_GenericReturns502(t *testing.T) {
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-x", "", []byte("{}"), "message/send",
fmt.Errorf("no such host"), 1, false,
fmt.Errorf("no such host"), 1, false, "",
)
if perr == nil || perr.Status != http.StatusBadGateway {
t.Errorf("expected 502, got %+v", perr)
@ -1969,7 +1969,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42)
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42, "")
time.Sleep(80 * time.Millisecond)
}
@ -1986,7 +1986,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1)
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1, "")
time.Sleep(80 * time.Millisecond)
}
@ -2002,7 +2002,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10, "")
time.Sleep(80 * time.Millisecond)
}
@ -2020,7 +2020,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
WillReturnResult(sqlmock.NewResult(0, 1))
// callerID != "" also means no A2A_RESPONSE broadcast.
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10, "")
time.Sleep(80 * time.Millisecond)
}

View File

@ -204,7 +204,11 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle
// is identical to a real agent reply. Status 200 + duration 0
// is the "synthesised reply" marker; activity_logs.duration_ms
// being 0 is harmless (real fast paths can hit 0 too).
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0)
// ingestRowID == "": the mock short-circuit runs before the
// real-dispatch ingest-persist (it has no resolveAgentURL / no
// dispatchA2A), so logA2ASuccess takes the legacy INSERT path —
// identical to pre-fix behavior for mock workspaces.
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0, "")
}
return http.StatusOK, respBody, true
}

View File

@ -31,7 +31,7 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
// sqlmock's ExpectationsWereMet implicitly enforces that on teardown.
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-native", "", []byte("{}"), "message/send",
context.DeadlineExceeded, 1, false,
context.DeadlineExceeded, 1, false, "",
)
if perr == nil {
t.Fatal("expected proxy error, got nil")
@ -74,7 +74,7 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) {
_, _, perr := handler.handleA2ADispatchError(
context.Background(), "ws-platform-queue", "", []byte("{}"), "message/send",
context.DeadlineExceeded, 1, false,
context.DeadlineExceeded, 1, false, "",
)
if perr == nil {
t.Fatal("expected proxy error, got nil")