fix(workspace-server): persist canvas user message at ingest, before agent round-trip
Canvas chat lost the user's own message when they exited the chat before the agent replied, for push-mode (HTTP-dispatched) workspaces. Root cause: chat-history is reconstructed solely from activity_logs rows (activity_type='a2a_receive', source_id IS NULL). For push-mode that row was written ONLY in logA2ASuccess/logA2AFailure, AFTER the full agent A2A round-trip. The user message was never persisted at ingest. If the user exited the chat (fetch abort on unmount / tab close / dropped conn) before the agent finished, no row was ever written and the message was permanently lost on reopen. poll-mode was unaffected (logA2AReceiveQueued already persists at ingest). This is the inbound mirror of the reno-stars 2026-05-05 outbound data-loss incident (RFC #2945). Fix: persistUserMessageAtIngest() does a synchronous INSERT of the user message (status='pending', response_body NULL) BEFORE dispatchA2A, on a context.WithoutCancel context so a client disconnect cannot abort the write. logA2ASuccess finalizes that row via UPDATE (no duplicate user bubble — preserves the one-row-(user,agent) read contract). logA2AFailure with an ingest row is a no-op: the pending row already durably holds the message, and busy->enqueue requests stay pending to be answered by the queue drain. Best-effort: on persist failure, falls back to the legacy post-round-trip INSERT (never worse than pre-fix, never blocks the send). The read path already renders a user bubble from a row with empty response_body, so the message shows on reopen even before the agent answers. 8 new TDD regression tests (a2a_ingest_persist_test.go). Full internal/handlers + internal/messagestore suites green. Refs: molecule-ai/internal#470 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c3cfbea750
commit
0ee093ca9a
218
workspace-server/internal/handlers/a2a_ingest_persist_test.go
Normal file
218
workspace-server/internal/handlers/a2a_ingest_persist_test.go
Normal file
@ -0,0 +1,218 @@
|
||||
package handlers
|
||||
|
||||
// Regression coverage for the canvas user-message data-loss fix.
|
||||
//
|
||||
// Bug (reported 2026-05-16): "in canvas i sometimes lose my own message
|
||||
// when i exit chat". Root cause: for a push-mode workspace the user's
|
||||
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
|
||||
// which run AFTER the full agent A2A round-trip. The single chat-history
|
||||
// source is activity_logs rows (activity_type='a2a_receive', source_id
|
||||
// IS NULL). A user who typed a message then exited the chat (fetch
|
||||
// aborted on unmount / tab close / dropped connection) before the agent
|
||||
// replied left NO a2a_receive row written → message gone on reopen.
|
||||
//
|
||||
// Fix: persistUserMessageAtIngest writes the user message into
|
||||
// activity_logs SYNCHRONOUSLY, BEFORE dispatch, on a context.WithoutCancel
|
||||
// derived context; logA2ASuccess finalizes that row with the reply;
|
||||
// logA2AFailure leaves it 'pending' (message preserved) instead of
|
||||
// INSERTing a duplicate / overwriting with an agent-error bubble.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
// persistUserMessageAtIngest must INSERT a pending a2a_receive row
|
||||
// carrying the user's request_body and RETURN its id — and it must do so
|
||||
// even when the passed-in ctx is ALREADY cancelled, because the whole
|
||||
// point of the fix is that a client disconnect on chat-exit cannot lose
|
||||
// the message. The persist runs on context.WithoutCancel internally.
|
||||
func TestPersistUserMessageAtIngest_WritesPendingRowEvenWhenCtxCancelled(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Both the workspace-name lookup AND the INSERT run on
|
||||
// context.WithoutCancel inside the helper, so they reach the DB even
|
||||
// though the caller's ctx is already cancelled — that is the fix.
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-ingest").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ingest WS"))
|
||||
// The defining assertion: an INSERT ... RETURNING id is issued for
|
||||
// the user message, with status 'pending' and the request body.
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("row-abc-123"))
|
||||
|
||||
// Cancel the context BEFORE calling — simulates the canvas aborting
|
||||
// the inbound request because the user exited the chat.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"do not lose me"}]}}}`)
|
||||
rowID := handler.persistUserMessageAtIngest(ctx, "ws-ingest", "", body, "message/send")
|
||||
|
||||
if rowID != "row-abc-123" {
|
||||
t.Fatalf("expected ingest row id 'row-abc-123' (message persisted despite cancelled ctx), got %q", rowID)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// On INSERT failure the helper must return "" (so the caller falls back
|
||||
// to the legacy post-round-trip INSERT) and must NOT panic — a DB hiccup
|
||||
// at ingest can never make the user's send fail.
|
||||
func TestPersistUserMessageAtIngest_FallsBackOnInsertError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-x").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow(""))
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs .* RETURNING id`).
|
||||
WillReturnError(fmt.Errorf("test: activity_logs unavailable"))
|
||||
|
||||
rowID := handler.persistUserMessageAtIngest(context.Background(), "ws-x", "", []byte(`{}`), "message/send")
|
||||
if rowID != "" {
|
||||
t.Fatalf("expected empty rowID on INSERT error (legacy-INSERT fallback), got %q", rowID)
|
||||
}
|
||||
}
|
||||
|
||||
// finalizeIngestRow must UPDATE the pre-persisted row (matching on
|
||||
// id + status='pending') with the agent response, and report handled.
|
||||
func TestFinalizeIngestRow_UpdatesPendingRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
|
||||
WithArgs("row-1", sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", 1234, nil).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
ok := finalizeIngestRow(context.Background(), "row-1",
|
||||
[]byte(`{"result":"hello back"}`), nil, "ok", 1234, nil)
|
||||
if !ok {
|
||||
t.Fatal("expected finalizeIngestRow to report the row finalized")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// finalizeIngestRow with an empty id is a no-op returning false — the
|
||||
// caller must then take the legacy INSERT path (poll-mode / system
|
||||
// callers / ingest-INSERT-failed). It must issue NO SQL.
|
||||
func TestFinalizeIngestRow_EmptyIDIsNoOp(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
if finalizeIngestRow(context.Background(), "", []byte(`{}`), nil, "ok", 1, nil) {
|
||||
t.Fatal("empty ingest id must return false (legacy INSERT fallback)")
|
||||
}
|
||||
}
|
||||
|
||||
// When the row is gone / no longer pending (RowsAffected==0), the helper
|
||||
// returns true so the caller does NOT double-INSERT a duplicate user
|
||||
// bubble — the message + a reply already landed on that row.
|
||||
func TestFinalizeIngestRow_AlreadyFinalizedSkipsDoubleInsert(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows: not pending
|
||||
|
||||
if !finalizeIngestRow(context.Background(), "row-stale", []byte(`{}`), nil, "ok", 1, nil) {
|
||||
t.Fatal("a not-pending row must report handled=true to suppress the legacy duplicate INSERT")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// THE CORE REGRESSION: logA2AFailure with an ingest row present must
|
||||
// NOT INSERT a (duplicate) activity_logs row and must NOT mutate the
|
||||
// preserved 'pending' row to an error bubble. The user's message stays
|
||||
// durable as a pending row — visible on chat reopen — which is exactly
|
||||
// the data-loss the fix prevents. (Pre-fix this path INSERTed an
|
||||
// error-status row AFTER the round-trip; if it never ran, the message
|
||||
// was lost. Post-fix the message is already safe at ingest.)
|
||||
func TestLogA2AFailure_WithIngestRow_PreservesMessageNoInsertNoMutation(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Intentionally set NO expectations: any INSERT or UPDATE here is a
|
||||
// bug. sqlmock fails the test on an unexpected statement.
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "",
|
||||
[]byte(`{"params":{"message":{"parts":[{"text":"keep me"}]}}}`),
|
||||
"message/send", errors.New("agent unreachable"), 5, "row-pending-keep")
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("logA2AFailure with an ingest row must issue no SQL (message already preserved): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// logA2ASuccess WITH an ingest row finalizes that row (UPDATE) instead
|
||||
// of INSERTing a second a2a_receive row — preventing the duplicate user
|
||||
// bubble that postgres_store.go's one-row-(user,agent) reconstruction
|
||||
// would otherwise produce.
|
||||
func TestLogA2ASuccess_WithIngestRow_FinalizesInsteadOfInsert(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Workspace-name lookup still happens (synchronous).
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-ok").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("OK WS"))
|
||||
// The finalize UPDATE — and crucially NO "INSERT INTO activity_logs".
|
||||
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "",
|
||||
[]byte(`{}`), []byte(`{"result":"reply"}`), "message/send", 200, 10, "row-finalize-me")
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations (expected finalize UPDATE, no INSERT): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Guard the literal contract the read side depends on: the ingest INSERT
|
||||
// must target activity_type='a2a_receive' with status 'pending' and a
|
||||
// RETURNING id — that's what makes postgres_store.go render the user
|
||||
// bubble immediately and what lets the finalize UPDATE find the row.
|
||||
// sqlmock matches the issued query against this regex, so a green
|
||||
// expectation IS the proof the SQL carries every required fragment.
|
||||
func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-shape").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Shape"))
|
||||
|
||||
// All four contract fragments in one regex — only matches if the
|
||||
// real INSERT contains a2a_receive + 'pending' + request_body +
|
||||
// RETURNING id (sqlmock does a regexp.MatchString of this against
|
||||
// the actual query text).
|
||||
mock.ExpectQuery(`(?s)INSERT INTO activity_logs.*request_body.*'a2a_receive'.*'pending'.*RETURNING id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("rid"))
|
||||
|
||||
rid := handler.persistUserMessageAtIngest(context.Background(), "ws-shape", "",
|
||||
[]byte(`{"params":{"message":{"parts":[{"text":"x"}]}}}`), "message/send")
|
||||
if rid != "rid" {
|
||||
t.Fatalf("expected rid from contract-shaped INSERT, got %q", rid)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("ingest INSERT did not match the required shape contract: %v", err)
|
||||
}
|
||||
}
|
||||
@ -467,6 +467,29 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
}
|
||||
|
||||
// DATA-LOSS FIX: persist the canvas user's message into activity_logs
|
||||
// SYNCHRONOUSLY, BEFORE dispatch. For a push-mode workspace the only
|
||||
// pre-fix durable write of the user message was logA2ASuccess /
|
||||
// logA2AFailure AFTER the agent round-trip — so a user who typed a
|
||||
// message then exited the chat (fetch aborted on unmount / tab close /
|
||||
// dropped connection) before the agent replied lost the message
|
||||
// entirely on reopen, because chat-history reads only a2a_receive rows
|
||||
// and none was ever written. Persisting here, with a WithoutCancel
|
||||
// context inside persistUserMessageAtIngest, makes the message durable
|
||||
// at submit time regardless of what the client does next. The returned
|
||||
// row id is threaded into the success/failure log so they UPDATE this
|
||||
// row with the agent reply rather than INSERT a duplicate.
|
||||
//
|
||||
// Scoped to: canvas-initiated (callerID == ""), message-bearing
|
||||
// (message/send), logActivity on, and past the poll-mode short-circuit
|
||||
// above (poll-mode already persists at ingest via logA2AReceiveQueued).
|
||||
// Best-effort: a "" return falls back to the legacy post-round-trip
|
||||
// INSERT, so this can never make a send fail.
|
||||
var ingestRowID string
|
||||
if logActivity && callerID == "" && a2aMethod == "message/send" {
|
||||
ingestRowID = h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
@ -474,7 +497,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity, ingestRowID)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
@ -492,7 +515,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
if logActivity && deliveryConfirmed {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
|
||||
}
|
||||
// Preserve the actual HTTP status code and any body bytes already read.
|
||||
// Previously this returned (0, nil, error) which discarded both.
|
||||
@ -536,7 +559,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
|
||||
}
|
||||
|
||||
// Track LLM token usage for cost transparency (#593).
|
||||
|
||||
@ -27,10 +27,138 @@ type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// persistUserMessageAtIngest writes the canvas user's message into
|
||||
// activity_logs SYNCHRONOUSLY, BEFORE the agent is dispatched, and
|
||||
// returns the new row's id (empty string if the persist could not be
|
||||
// done — caller falls back to the legacy post-round-trip INSERT).
|
||||
//
|
||||
// # Why this exists (the data-loss window it closes)
|
||||
//
|
||||
// Pre-fix, for a push-mode (HTTP-dispatched) workspace the user's
|
||||
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
|
||||
// which run AFTER the full agent round-trip completes. The single
|
||||
// chat-history source is `activity_logs` rows with
|
||||
// activity_type='a2a_receive' AND source_id IS NULL
|
||||
// (messagestore/postgres_store.go). So if the user typed a message and
|
||||
// exited the chat (component unmount aborts the fetch / tab close /
|
||||
// dropped connection) before the agent finished, NO a2a_receive row was
|
||||
// ever written and the message was permanently lost on chat reopen.
|
||||
// This is the inbound (user→agent) mirror of the reno-stars 2026-05-05
|
||||
// outbound data-loss incident (RFC #2945) — which only fixed the
|
||||
// agent→user side (AgentMessageWriter).
|
||||
//
|
||||
// poll-mode workspaces were never affected: logA2AReceiveQueued already
|
||||
// persists the request_body at ingest. This helper brings push-mode to
|
||||
// parity: the user message is durable at submit time, independent of
|
||||
// and prior to agent processing.
|
||||
//
|
||||
// # Contract
|
||||
//
|
||||
// - INSERT a row with request_body set, response_body NULL,
|
||||
// status='pending'. The chat-history read path
|
||||
// (activityRowToChatMessages) already renders a user bubble from a
|
||||
// row whose response_body is empty, so a pending row shows the
|
||||
// user's message immediately on reopen even before the agent replies.
|
||||
// - Use context.WithoutCancel: a client disconnect on chat-exit MUST
|
||||
// NOT abort this write — that is the entire point of the fix.
|
||||
// - SYNCHRONOUS (blocking): the row must exist before dispatchA2A is
|
||||
// allowed to start, otherwise the disconnect race reopens.
|
||||
// - On any failure return "" and log; the caller proceeds with the
|
||||
// legacy logA2ASuccess/logA2AFailure INSERT so behavior is never
|
||||
// worse than pre-fix (best-effort, never blocks the user's send).
|
||||
//
|
||||
// The returned id is threaded into logA2ASuccess/logA2AFailure so they
|
||||
// UPDATE this same row with the agent response instead of INSERTing a
|
||||
// second row — preserving the one-row-carries-(user,agent) read
|
||||
// contract that postgres_store.go depends on (no duplicate user bubble).
|
||||
func (h *WorkspaceHandler) persistUserMessageAtIngest(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) string {
|
||||
// context.WithoutCancel: the canvas exiting the chat aborts the
|
||||
// inbound HTTP request, cancelling ctx. The whole reason this fix
|
||||
// exists is so that cancellation does NOT lose the user's message —
|
||||
// so the entire persist (name lookup + INSERT) must run on a
|
||||
// non-cancellable derived context with its own short deadline. Doing
|
||||
// the name lookup on the raw ctx would skip it on a cancelled client
|
||||
// and degrade the summary; WithoutCancel keeps the summary accurate
|
||||
// too.
|
||||
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (received)"
|
||||
|
||||
var rowID string
|
||||
err := db.DB.QueryRowContext(insCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, status)
|
||||
VALUES ($1, 'a2a_receive', $2, $3, $4, $5, $6::jsonb, 'pending')
|
||||
RETURNING id
|
||||
`, workspaceID, nilIfEmpty(callerID), &workspaceID, &a2aMethod, &summary, string(body)).Scan(&rowID)
|
||||
if err != nil {
|
||||
// Best-effort: never block or fail the user's send because the
|
||||
// ingest persist hiccuped. Caller falls back to the legacy
|
||||
// post-round-trip INSERT (behavior == pre-fix for this request).
|
||||
log.Printf("persistUserMessageAtIngest: INSERT failed for %s: %v — falling back to post-round-trip persist", workspaceID, err)
|
||||
return ""
|
||||
}
|
||||
return rowID
|
||||
}
|
||||
|
||||
// finalizeIngestRow UPDATEs the pre-persisted ingest row (created by
|
||||
// persistUserMessageAtIngest) with the agent's response once the A2A
|
||||
// round-trip completes. Returns true if the row was found+updated; false
|
||||
// means the caller must fall through to the legacy INSERT (so a missing
|
||||
// ingest row never silently drops the agent reply).
|
||||
//
|
||||
// Runs on a WithoutCancel context for the same reason as the ingest
|
||||
// INSERT: the client may already be gone, but the agent reply still
|
||||
// needs to land on the row that already holds the user's message.
|
||||
func finalizeIngestRow(ctx context.Context, ingestRowID string, respBody json.RawMessage, toolTrace json.RawMessage, status string, durationMs int, errDetail *string) bool {
|
||||
if ingestRowID == "" {
|
||||
return false
|
||||
}
|
||||
upCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var respStr, traceStr *string
|
||||
if len(respBody) > 0 {
|
||||
s := string(respBody)
|
||||
respStr = &s
|
||||
}
|
||||
if len(toolTrace) > 0 {
|
||||
s := string(toolTrace)
|
||||
traceStr = &s
|
||||
}
|
||||
res, err := db.DB.ExecContext(upCtx, `
|
||||
UPDATE activity_logs
|
||||
SET response_body = $2::jsonb,
|
||||
tool_trace = $3::jsonb,
|
||||
status = $4,
|
||||
duration_ms = $5,
|
||||
error_detail = $6
|
||||
WHERE id = $1 AND status = 'pending'
|
||||
`, ingestRowID, respStr, traceStr, status, durationMs, errDetail)
|
||||
if err != nil {
|
||||
log.Printf("finalizeIngestRow: UPDATE failed for row %s: %v", ingestRowID, err)
|
||||
return false
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n == 0 {
|
||||
// Row not found or no longer pending (e.g. a concurrent finalize
|
||||
// already ran). Don't double-INSERT — the user message + a
|
||||
// response already landed on this row. Treat as handled.
|
||||
log.Printf("finalizeIngestRow: row %s not pending (already finalized?) — skipping legacy INSERT", ingestRowID)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and (when `logActivity` is true)
|
||||
// schedules a detached LogActivity goroutine for the failed attempt.
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool, ingestRowID string) (int, []byte, *proxyA2AError) {
|
||||
// Build-time failure (couldn't even create the http.Request) — return
|
||||
// a 500 without the reactive-health / busy-retry paths.
|
||||
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
|
||||
@ -46,7 +174,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, ingestRowID)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
@ -254,8 +382,31 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
|
||||
|
||||
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
||||
// goroutine (the request context may already be done by the time it runs).
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, ingestRowID string) {
|
||||
errMsg := err.Error()
|
||||
// DATA-LOSS FIX interaction: when the user message was persisted at
|
||||
// ingest (ingestRowID set), the pending a2a_receive row already holds
|
||||
// it durably and chat-history renders it as the user's bubble on
|
||||
// reopen — which is the entire point of the fix. Do NOT mutate that
|
||||
// row here:
|
||||
// - This path runs BEFORE the busy/enqueue branch in
|
||||
// handleA2ADispatchError. A "busy" error is followed by a
|
||||
// successful enqueue (HTTP 202) and a later drain that answers
|
||||
// the user — finalizing the row as 'error' now would wrongly
|
||||
// show the user a failed message that is actually queued.
|
||||
// - For a genuinely terminal failure the user's message must STILL
|
||||
// survive (that is the bug we are fixing); leaving the row
|
||||
// 'pending' preserves it. The failure itself is observable via
|
||||
// this log line + metrics, not by overwriting the preserved
|
||||
// user message with an agent-error bubble.
|
||||
// So: if an ingest row exists, the message is already safe — return
|
||||
// without the legacy failure INSERT (which would also duplicate the
|
||||
// user bubble).
|
||||
if ingestRowID != "" {
|
||||
log.Printf("logA2AFailure: %s message preserved at ingest (row %s, status=pending) — A2A attempt failed: %v",
|
||||
workspaceID, ingestRowID, errMsg)
|
||||
return
|
||||
}
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
@ -283,7 +434,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int, ingestRowID string) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
@ -309,23 +460,39 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
toolTrace := extractToolTrace(respBody)
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
ToolTrace: toolTrace,
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
|
||||
// If the user message was persisted at ingest (canvas push-mode
|
||||
// path), finalize THAT row with the agent response instead of
|
||||
// INSERTing a second a2a_receive row — postgres_store.go reconstructs
|
||||
// (user, agent) bubbles from a single row, so a second row would
|
||||
// duplicate the user's message in chat-history. finalizeIngestRow
|
||||
// runs synchronously (it's already on a WithoutCancel context); the
|
||||
// broadcast still fires below regardless. Falls through to the legacy
|
||||
// async INSERT when there is no ingest row (poll-mode, system
|
||||
// callers, or an ingest-INSERT that failed best-effort).
|
||||
finalized := false
|
||||
if ingestRowID != "" {
|
||||
finalized = finalizeIngestRow(ctx, ingestRowID, json.RawMessage(respBody), toolTrace, logStatus, durationMs, nil)
|
||||
}
|
||||
if !finalized {
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
ToolTrace: toolTrace,
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
|
||||
|
||||
@ -1736,7 +1736,7 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-dl", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
@ -1757,7 +1757,7 @@ func TestHandleA2ADispatchError_BuildError(t *testing.T) {
|
||||
|
||||
buildErr := &proxyDispatchBuildError{err: fmt.Errorf("bad url")}
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false,
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, "",
|
||||
)
|
||||
if perr == nil || perr.Status != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %+v", perr)
|
||||
@ -1771,7 +1771,7 @@ func TestHandleA2ADispatchError_GenericReturns502(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send",
|
||||
fmt.Errorf("no such host"), 1, false,
|
||||
fmt.Errorf("no such host"), 1, false, "",
|
||||
)
|
||||
if perr == nil || perr.Status != http.StatusBadGateway {
|
||||
t.Errorf("expected 502, got %+v", perr)
|
||||
@ -1969,7 +1969,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42)
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -1986,7 +1986,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1)
|
||||
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -2002,7 +2002,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -2020,7 +2020,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// callerID != "" also means no A2A_RESPONSE broadcast.
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
|
||||
@ -204,7 +204,11 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle
|
||||
// is identical to a real agent reply. Status 200 + duration 0
|
||||
// is the "synthesised reply" marker; activity_logs.duration_ms
|
||||
// being 0 is harmless (real fast paths can hit 0 too).
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0)
|
||||
// ingestRowID == "": the mock short-circuit runs before the
|
||||
// real-dispatch ingest-persist (it has no resolveAgentURL / no
|
||||
// dispatchA2A), so logA2ASuccess takes the legacy INSERT path —
|
||||
// identical to pre-fix behavior for mock workspaces.
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0, "")
|
||||
}
|
||||
return http.StatusOK, respBody, true
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
|
||||
// sqlmock's ExpectationsWereMet implicitly enforces that on teardown.
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-native", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxy error, got nil")
|
||||
@ -74,7 +74,7 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-platform-queue", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxy error, got nil")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user