fix(workspace-server): persist canvas user message at ingest (internal#470) #1347
1
.ci-trigger-dev-lead.txt
Normal file
1
.ci-trigger-dev-lead.txt
Normal file
@ -0,0 +1 @@
|
||||
2026-05-16T18:26:36Z no-op re-trigger for qa-review
|
||||
1
.ci-trigger.txt
Normal file
1
.ci-trigger.txt
Normal file
@ -0,0 +1 @@
|
||||
ci-refire 173455Z
|
||||
327
workspace-server/internal/handlers/a2a_ingest_persist_test.go
Normal file
327
workspace-server/internal/handlers/a2a_ingest_persist_test.go
Normal file
@ -0,0 +1,327 @@
|
||||
package handlers
|
||||
|
||||
// Regression coverage for the canvas user-message data-loss fix.
|
||||
//
|
||||
// Bug (reported 2026-05-16): "in canvas i sometimes lose my own message
|
||||
// when i exit chat". Root cause: for a push-mode workspace the user's
|
||||
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
|
||||
// which run AFTER the full agent A2A round-trip. The single chat-history
|
||||
// source is activity_logs rows (activity_type='a2a_receive', source_id
|
||||
// IS NULL). A user who typed a message then exited the chat (fetch
|
||||
// aborted on unmount / tab close / dropped connection) before the agent
|
||||
// replied left NO a2a_receive row written → message gone on reopen.
|
||||
//
|
||||
// Fix: persistUserMessageAtIngest writes the user message into
|
||||
// activity_logs SYNCHRONOUSLY, BEFORE dispatch, on a context.WithoutCancel
|
||||
// derived context; logA2ASuccess finalizes that row with the reply;
|
||||
// logA2AFailure leaves it 'pending' (message preserved) instead of
|
||||
// INSERTing a duplicate / overwriting with an agent-error bubble.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// persistUserMessageAtIngest must INSERT a pending a2a_receive row
|
||||
// carrying the user's request_body and RETURN its id — and it must do so
|
||||
// even when the passed-in ctx is ALREADY cancelled, because the whole
|
||||
// point of the fix is that a client disconnect on chat-exit cannot lose
|
||||
// the message. The persist runs on context.WithoutCancel internally.
|
||||
func TestPersistUserMessageAtIngest_WritesPendingRowEvenWhenCtxCancelled(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// Both the workspace-name lookup AND the INSERT run on
|
||||
// context.WithoutCancel inside the helper, so they reach the DB even
|
||||
// though the caller's ctx is already cancelled — that is the fix.
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-ingest").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ingest WS"))
|
||||
// The defining assertion: an INSERT ... RETURNING id is issued for
|
||||
// the user message, with status 'pending' and the request body.
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("row-abc-123"))
|
||||
|
||||
// Cancel the context BEFORE calling — simulates the canvas aborting
|
||||
// the inbound request because the user exited the chat.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","text":"do not lose me"}]}}}`)
|
||||
rowID := handler.persistUserMessageAtIngest(ctx, "ws-ingest", "", body, "message/send")
|
||||
|
||||
if rowID != "row-abc-123" {
|
||||
t.Fatalf("expected ingest row id 'row-abc-123' (message persisted despite cancelled ctx), got %q", rowID)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// On INSERT failure the helper must return "" (so the caller falls back
|
||||
// to the legacy post-round-trip INSERT) and must NOT panic — a DB hiccup
|
||||
// at ingest can never make the user's send fail.
|
||||
func TestPersistUserMessageAtIngest_FallsBackOnInsertError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-x").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow(""))
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs .* RETURNING id`).
|
||||
WillReturnError(fmt.Errorf("test: activity_logs unavailable"))
|
||||
|
||||
rowID := handler.persistUserMessageAtIngest(context.Background(), "ws-x", "", []byte(`{}`), "message/send")
|
||||
if rowID != "" {
|
||||
t.Fatalf("expected empty rowID on INSERT error (legacy-INSERT fallback), got %q", rowID)
|
||||
}
|
||||
}
|
||||
|
||||
// finalizeIngestRow must UPDATE the pre-persisted row (matching on
|
||||
// id + status='pending') with the agent response, and report handled.
|
||||
func TestFinalizeIngestRow_UpdatesPendingRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
|
||||
WithArgs("row-1", sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", 1234, nil).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
ok := finalizeIngestRow(context.Background(), "row-1",
|
||||
[]byte(`{"result":"hello back"}`), nil, "ok", 1234, nil)
|
||||
if !ok {
|
||||
t.Fatal("expected finalizeIngestRow to report the row finalized")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// finalizeIngestRow with an empty id is a no-op returning false — the
|
||||
// caller must then take the legacy INSERT path (poll-mode / system
|
||||
// callers / ingest-INSERT-failed). It must issue NO SQL.
|
||||
func TestFinalizeIngestRow_EmptyIDIsNoOp(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
if finalizeIngestRow(context.Background(), "", []byte(`{}`), nil, "ok", 1, nil) {
|
||||
t.Fatal("empty ingest id must return false (legacy INSERT fallback)")
|
||||
}
|
||||
}
|
||||
|
||||
// When the row is gone / no longer pending (RowsAffected==0), the helper
|
||||
// returns true so the caller does NOT double-INSERT a duplicate user
|
||||
// bubble — the message + a reply already landed on that row.
|
||||
func TestFinalizeIngestRow_AlreadyFinalizedSkipsDoubleInsert(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows: not pending
|
||||
|
||||
if !finalizeIngestRow(context.Background(), "row-stale", []byte(`{}`), nil, "ok", 1, nil) {
|
||||
t.Fatal("a not-pending row must report handled=true to suppress the legacy duplicate INSERT")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// THE CORE REGRESSION: logA2AFailure with an ingest row present must
|
||||
// NOT INSERT a (duplicate) activity_logs row and must NOT mutate the
|
||||
// preserved 'pending' row to an error bubble. The user's message stays
|
||||
// durable as a pending row — visible on chat reopen — which is exactly
|
||||
// the data-loss the fix prevents. (Pre-fix this path INSERTed an
|
||||
// error-status row AFTER the round-trip; if it never ran, the message
|
||||
// was lost. Post-fix the message is already safe at ingest.)
|
||||
func TestLogA2AFailure_WithIngestRow_PreservesMessageNoInsertNoMutation(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Intentionally set NO expectations: any INSERT or UPDATE here is a
|
||||
// bug. sqlmock fails the test on an unexpected statement.
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "",
|
||||
[]byte(`{"params":{"message":{"parts":[{"text":"keep me"}]}}}`),
|
||||
"message/send", errors.New("agent unreachable"), 5, "row-pending-keep")
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("logA2AFailure with an ingest row must issue no SQL (message already preserved): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// logA2ASuccess WITH an ingest row finalizes that row (UPDATE) instead
|
||||
// of INSERTing a second a2a_receive row — preventing the duplicate user
|
||||
// bubble that postgres_store.go's one-row-(user,agent) reconstruction
|
||||
// would otherwise produce.
|
||||
func TestLogA2ASuccess_WithIngestRow_FinalizesInsteadOfInsert(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// Workspace-name lookup still happens (synchronous).
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-ok").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("OK WS"))
|
||||
// The finalize UPDATE — and crucially NO "INSERT INTO activity_logs".
|
||||
mock.ExpectExec(`UPDATE activity_logs\s+SET response_body`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "",
|
||||
[]byte(`{}`), []byte(`{"result":"reply"}`), "message/send", 200, 10, "row-finalize-me")
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations (expected finalize UPDATE, no INSERT): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Guard the literal contract the read side depends on: the ingest INSERT
|
||||
// must target activity_type='a2a_receive' with status 'pending' and a
|
||||
// RETURNING id — that's what makes postgres_store.go render the user
|
||||
// bubble immediately and what lets the finalize UPDATE find the row.
|
||||
// sqlmock matches the issued query against this regex, so a green
|
||||
// expectation IS the proof the SQL carries every required fragment.
|
||||
func TestPersistUserMessageAtIngest_InsertShapeContract(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-shape").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Shape"))
|
||||
|
||||
// All four contract fragments in one regex — only matches if the
|
||||
// real INSERT contains a2a_receive + 'pending' + request_body +
|
||||
// RETURNING id (sqlmock does a regexp.MatchString of this against
|
||||
// the actual query text).
|
||||
mock.ExpectQuery(`(?s)INSERT INTO activity_logs.*request_body.*'a2a_receive'.*'pending'.*RETURNING id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("rid"))
|
||||
|
||||
rid := handler.persistUserMessageAtIngest(context.Background(), "ws-shape", "",
|
||||
[]byte(`{"params":{"message":{"parts":[{"text":"x"}]}}}`), "message/send")
|
||||
if rid != "rid" {
|
||||
t.Fatalf("expected rid from contract-shaped INSERT, got %q", rid)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("ingest INSERT did not match the required shape contract: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Integration e2e through the REAL ProxyA2A HTTP handler — the literal bug
|
||||
// scenario: a canvas user sends a message, the agent is slow, and the client
|
||||
// disconnects (exits the chat) BEFORE the agent replies. Pre-fix: no
|
||||
// a2a_receive row was ever written → message lost on reopen. Post-fix: the
|
||||
// ingest INSERT fires synchronously before dispatch, on context.WithoutCancel,
|
||||
// so the user message is durable regardless of the client disconnect.
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestProxyA2A_CanvasClientDisconnectMidFlight_UserMessageStillPersistedAtIngest(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
|
||||
// The mock agent hangs until the test releases it — modelling an agent
|
||||
// still synthesising when the user exits the chat. The proxy forwards
|
||||
// on context.WithoutCancel so this dispatch is in-flight while the
|
||||
// inbound client request is already cancelled.
|
||||
release := make(chan struct{})
|
||||
var agentHit int32
|
||||
agent := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&agentHit, 1)
|
||||
<-release // hang until released (or test ends)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{"status":"ok"}}`)
|
||||
}))
|
||||
defer agent.Close()
|
||||
defer close(release)
|
||||
|
||||
const ws = "ws-disconnect"
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", ws), agent.URL)
|
||||
|
||||
// SQL sequence for a canvas push-mode message/send up to dispatch:
|
||||
// 1. budget check
|
||||
// 2. resolveAgentURL DB fallback miss is avoided (Redis has the URL)
|
||||
// 3. lookupDeliveryMode → NULL ⇒ push-mode (the loss case)
|
||||
// 4. ingest: SELECT name, then INSERT ... RETURNING id ← THE FIX
|
||||
// The agent never replies (hung) and the client ctx is cancelled, so
|
||||
// NO logA2ASuccess/finalize runs — exactly the pre-fix loss window.
|
||||
// The assertion: the ingest INSERT fired anyway (message durable).
|
||||
expectBudgetCheck(mock, ws)
|
||||
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id =`).
|
||||
WithArgs(ws).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) // NULL ⇒ push
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
|
||||
WithArgs(ws).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Disconnect WS"))
|
||||
ingestFired := make(chan struct{}, 1)
|
||||
mock.ExpectQuery(`INSERT INTO activity_logs.*RETURNING id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ingest-row-1")).
|
||||
// signal the moment the durable write lands
|
||||
WillDelayFor(0)
|
||||
_ = ingestFired
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: ws}}
|
||||
|
||||
// Cancel the inbound request context shortly after the call starts —
|
||||
// this is the user exiting the chat / closing the tab mid-flight.
|
||||
reqCtx, cancelClient := context.WithCancel(context.Background())
|
||||
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"DO NOT LOSE THIS MESSAGE"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+ws+"/a2a", bytes.NewBufferString(body)).WithContext(reqCtx)
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
handler.ProxyA2A(c)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait until the agent has been hit (dispatch in-flight), then the
|
||||
// user "exits the chat": cancel the client request context.
|
||||
deadline := time.After(5 * time.Second)
|
||||
for atomic.LoadInt32(&agentHit) == 0 {
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatal("agent was never dispatched to within 5s")
|
||||
default:
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
cancelClient() // ← user exits the chat mid-flight
|
||||
|
||||
// Give the handler a moment; it will be stuck on the hung agent, but
|
||||
// the ingest INSERT must already have happened SYNCHRONOUSLY before
|
||||
// dispatch. Verify the durable write landed despite the disconnect.
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("ingest persist did not fire before dispatch — user message would be LOST on disconnect: %v", err)
|
||||
}
|
||||
|
||||
// Clean up: release the hung agent so the handler goroutine can exit.
|
||||
// (release is closed by the deferred close; unblock now.)
|
||||
select {
|
||||
case release <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
<-done
|
||||
}
|
||||
@ -467,6 +467,29 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
}
|
||||
|
||||
// DATA-LOSS FIX: persist the canvas user's message into activity_logs
|
||||
// SYNCHRONOUSLY, BEFORE dispatch. For a push-mode workspace the only
|
||||
// pre-fix durable write of the user message was logA2ASuccess /
|
||||
// logA2AFailure AFTER the agent round-trip — so a user who typed a
|
||||
// message then exited the chat (fetch aborted on unmount / tab close /
|
||||
// dropped connection) before the agent replied lost the message
|
||||
// entirely on reopen, because chat-history reads only a2a_receive rows
|
||||
// and none was ever written. Persisting here, with a WithoutCancel
|
||||
// context inside persistUserMessageAtIngest, makes the message durable
|
||||
// at submit time regardless of what the client does next. The returned
|
||||
// row id is threaded into the success/failure log so they UPDATE this
|
||||
// row with the agent reply rather than INSERT a duplicate.
|
||||
//
|
||||
// Scoped to: canvas-initiated (callerID == ""), message-bearing
|
||||
// (message/send), logActivity on, and past the poll-mode short-circuit
|
||||
// above (poll-mode already persists at ingest via logA2AReceiveQueued).
|
||||
// Best-effort: a "" return falls back to the legacy post-round-trip
|
||||
// INSERT, so this can never make a send fail.
|
||||
var ingestRowID string
|
||||
if logActivity && callerID == "" && a2aMethod == "message/send" {
|
||||
ingestRowID = h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
|
||||
if cancelFwd != nil {
|
||||
@ -474,7 +497,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
durationMs := int(time.Since(startTime).Milliseconds())
|
||||
if err != nil {
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
|
||||
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity, ingestRowID)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
@ -492,7 +515,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
if logActivity && deliveryConfirmed {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
|
||||
}
|
||||
// Preserve the actual HTTP status code and any body bytes already read.
|
||||
// Previously this returned (0, nil, error) which discarded both.
|
||||
@ -536,7 +559,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
|
||||
if logActivity {
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs, ingestRowID)
|
||||
}
|
||||
|
||||
// Track LLM token usage for cost transparency (#593).
|
||||
|
||||
@ -27,10 +27,138 @@ type proxyDispatchBuildError struct{ err error }
|
||||
|
||||
func (e *proxyDispatchBuildError) Error() string { return e.err.Error() }
|
||||
|
||||
// persistUserMessageAtIngest writes the canvas user's message into
|
||||
// activity_logs SYNCHRONOUSLY, BEFORE the agent is dispatched, and
|
||||
// returns the new row's id (empty string if the persist could not be
|
||||
// done — caller falls back to the legacy post-round-trip INSERT).
|
||||
//
|
||||
// # Why this exists (the data-loss window it closes)
|
||||
//
|
||||
// Pre-fix, for a push-mode (HTTP-dispatched) workspace the user's
|
||||
// message became durable ONLY inside logA2ASuccess / logA2AFailure,
|
||||
// which run AFTER the full agent round-trip completes. The single
|
||||
// chat-history source is `activity_logs` rows with
|
||||
// activity_type='a2a_receive' AND source_id IS NULL
|
||||
// (messagestore/postgres_store.go). So if the user typed a message and
|
||||
// exited the chat (component unmount aborts the fetch / tab close /
|
||||
// dropped connection) before the agent finished, NO a2a_receive row was
|
||||
// ever written and the message was permanently lost on chat reopen.
|
||||
// This is the inbound (user→agent) mirror of the reno-stars 2026-05-05
|
||||
// outbound data-loss incident (RFC #2945) — which only fixed the
|
||||
// agent→user side (AgentMessageWriter).
|
||||
//
|
||||
// poll-mode workspaces were never affected: logA2AReceiveQueued already
|
||||
// persists the request_body at ingest. This helper brings push-mode to
|
||||
// parity: the user message is durable at submit time, independent of
|
||||
// and prior to agent processing.
|
||||
//
|
||||
// # Contract
|
||||
//
|
||||
// - INSERT a row with request_body set, response_body NULL,
|
||||
// status='pending'. The chat-history read path
|
||||
// (activityRowToChatMessages) already renders a user bubble from a
|
||||
// row whose response_body is empty, so a pending row shows the
|
||||
// user's message immediately on reopen even before the agent replies.
|
||||
// - Use context.WithoutCancel: a client disconnect on chat-exit MUST
|
||||
// NOT abort this write — that is the entire point of the fix.
|
||||
// - SYNCHRONOUS (blocking): the row must exist before dispatchA2A is
|
||||
// allowed to start, otherwise the disconnect race reopens.
|
||||
// - On any failure return "" and log; the caller proceeds with the
|
||||
// legacy logA2ASuccess/logA2AFailure INSERT so behavior is never
|
||||
// worse than pre-fix (best-effort, never blocks the user's send).
|
||||
//
|
||||
// The returned id is threaded into logA2ASuccess/logA2AFailure so they
|
||||
// UPDATE this same row with the agent response instead of INSERTing a
|
||||
// second row — preserving the one-row-carries-(user,agent) read
|
||||
// contract that postgres_store.go depends on (no duplicate user bubble).
|
||||
func (h *WorkspaceHandler) persistUserMessageAtIngest(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) string {
|
||||
// context.WithoutCancel: the canvas exiting the chat aborts the
|
||||
// inbound HTTP request, cancelling ctx. The whole reason this fix
|
||||
// exists is so that cancellation does NOT lose the user's message —
|
||||
// so the entire persist (name lookup + INSERT) must run on a
|
||||
// non-cancellable derived context with its own short deadline. Doing
|
||||
// the name lookup on the raw ctx would skip it on a cancelled client
|
||||
// and degrade the summary; WithoutCancel keeps the summary accurate
|
||||
// too.
|
||||
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (received)"
|
||||
|
||||
var rowID string
|
||||
err := db.DB.QueryRowContext(insCtx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, status)
|
||||
VALUES ($1, 'a2a_receive', $2, $3, $4, $5, $6::jsonb, 'pending')
|
||||
RETURNING id
|
||||
`, workspaceID, nilIfEmpty(callerID), &workspaceID, &a2aMethod, &summary, string(body)).Scan(&rowID)
|
||||
if err != nil {
|
||||
// Best-effort: never block or fail the user's send because the
|
||||
// ingest persist hiccuped. Caller falls back to the legacy
|
||||
// post-round-trip INSERT (behavior == pre-fix for this request).
|
||||
log.Printf("persistUserMessageAtIngest: INSERT failed for %s: %v — falling back to post-round-trip persist", workspaceID, err)
|
||||
return ""
|
||||
}
|
||||
return rowID
|
||||
}
|
||||
|
||||
// finalizeIngestRow UPDATEs the pre-persisted ingest row (created by
|
||||
// persistUserMessageAtIngest) with the agent's response once the A2A
|
||||
// round-trip completes. Returns true if the row was found+updated; false
|
||||
// means the caller must fall through to the legacy INSERT (so a missing
|
||||
// ingest row never silently drops the agent reply).
|
||||
//
|
||||
// Runs on a WithoutCancel context for the same reason as the ingest
|
||||
// INSERT: the client may already be gone, but the agent reply still
|
||||
// needs to land on the row that already holds the user's message.
|
||||
func finalizeIngestRow(ctx context.Context, ingestRowID string, respBody json.RawMessage, toolTrace json.RawMessage, status string, durationMs int, errDetail *string) bool {
|
||||
if ingestRowID == "" {
|
||||
return false
|
||||
}
|
||||
upCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var respStr, traceStr *string
|
||||
if len(respBody) > 0 {
|
||||
s := string(respBody)
|
||||
respStr = &s
|
||||
}
|
||||
if len(toolTrace) > 0 {
|
||||
s := string(toolTrace)
|
||||
traceStr = &s
|
||||
}
|
||||
res, err := db.DB.ExecContext(upCtx, `
|
||||
UPDATE activity_logs
|
||||
SET response_body = $2::jsonb,
|
||||
tool_trace = $3::jsonb,
|
||||
status = $4,
|
||||
duration_ms = $5,
|
||||
error_detail = $6
|
||||
WHERE id = $1 AND status = 'pending'
|
||||
`, ingestRowID, respStr, traceStr, status, durationMs, errDetail)
|
||||
if err != nil {
|
||||
log.Printf("finalizeIngestRow: UPDATE failed for row %s: %v", ingestRowID, err)
|
||||
return false
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
if n == 0 {
|
||||
// Row not found or no longer pending (e.g. a concurrent finalize
|
||||
// already ran). Don't double-INSERT — the user message + a
|
||||
// response already landed on this row. Treat as handled.
|
||||
log.Printf("finalizeIngestRow: row %s not pending (already finalized?) — skipping legacy INSERT", ingestRowID)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// handleA2ADispatchError translates a forward-call failure into a proxyA2AError,
|
||||
// runs the reactive container-health check, and (when `logActivity` is true)
|
||||
// schedules a detached LogActivity goroutine for the failed attempt.
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) {
|
||||
func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool, ingestRowID string) (int, []byte, *proxyA2AError) {
|
||||
// Build-time failure (couldn't even create the http.Request) — return
|
||||
// a 500 without the reactive-health / busy-retry paths.
|
||||
if buildErr, ok := err.(*proxyDispatchBuildError); ok {
|
||||
@ -46,7 +174,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, ingestRowID)
|
||||
}
|
||||
if containerDead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
@ -254,8 +382,31 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
|
||||
|
||||
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
|
||||
// goroutine (the request context may already be done by the time it runs).
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
|
||||
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, ingestRowID string) {
|
||||
errMsg := err.Error()
|
||||
// DATA-LOSS FIX interaction: when the user message was persisted at
|
||||
// ingest (ingestRowID set), the pending a2a_receive row already holds
|
||||
// it durably and chat-history renders it as the user's bubble on
|
||||
// reopen — which is the entire point of the fix. Do NOT mutate that
|
||||
// row here:
|
||||
// - This path runs BEFORE the busy/enqueue branch in
|
||||
// handleA2ADispatchError. A "busy" error is followed by a
|
||||
// successful enqueue (HTTP 202) and a later drain that answers
|
||||
// the user — finalizing the row as 'error' now would wrongly
|
||||
// show the user a failed message that is actually queued.
|
||||
// - For a genuinely terminal failure the user's message must STILL
|
||||
// survive (that is the bug we are fixing); leaving the row
|
||||
// 'pending' preserves it. The failure itself is observable via
|
||||
// this log line + metrics, not by overwriting the preserved
|
||||
// user message with an agent-error bubble.
|
||||
// So: if an ingest row exists, the message is already safe — return
|
||||
// without the legacy failure INSERT (which would also duplicate the
|
||||
// user bubble).
|
||||
if ingestRowID != "" {
|
||||
log.Printf("logA2AFailure: %s message preserved at ingest (row %s, status=pending) — A2A attempt failed: %v",
|
||||
workspaceID, ingestRowID, errMsg)
|
||||
return
|
||||
}
|
||||
var errWsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&errWsName)
|
||||
if errWsName == "" {
|
||||
@ -283,7 +434,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
|
||||
// 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can
|
||||
// receive the reply without polling.
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int) {
|
||||
func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, callerID string, body, respBody []byte, a2aMethod string, statusCode, durationMs int, ingestRowID string) {
|
||||
logStatus := "ok"
|
||||
if statusCode >= 400 {
|
||||
logStatus = "error"
|
||||
@ -309,23 +460,39 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
}
|
||||
summary := a2aMethod + " → " + wsNameForLog
|
||||
toolTrace := extractToolTrace(respBody)
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
ToolTrace: toolTrace,
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
|
||||
// If the user message was persisted at ingest (canvas push-mode
|
||||
// path), finalize THAT row with the agent response instead of
|
||||
// INSERTing a second a2a_receive row — postgres_store.go reconstructs
|
||||
// (user, agent) bubbles from a single row, so a second row would
|
||||
// duplicate the user's message in chat-history. finalizeIngestRow
|
||||
// runs synchronously (it's already on a WithoutCancel context); the
|
||||
// broadcast still fires below regardless. Falls through to the legacy
|
||||
// async INSERT when there is no ingest row (poll-mode, system
|
||||
// callers, or an ingest-INSERT that failed best-effort).
|
||||
finalized := false
|
||||
if ingestRowID != "" {
|
||||
finalized = finalizeIngestRow(ctx, ingestRowID, json.RawMessage(respBody), toolTrace, logStatus, durationMs, nil)
|
||||
}
|
||||
if !finalized {
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
ResponseBody: json.RawMessage(respBody),
|
||||
ToolTrace: toolTrace,
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
|
||||
|
||||
@ -1736,7 +1736,7 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-dl", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
@ -1757,7 +1757,7 @@ func TestHandleA2ADispatchError_BuildError(t *testing.T) {
|
||||
|
||||
buildErr := &proxyDispatchBuildError{err: fmt.Errorf("bad url")}
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false,
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send", buildErr, 1, false, "",
|
||||
)
|
||||
if perr == nil || perr.Status != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %+v", perr)
|
||||
@ -1771,7 +1771,7 @@ func TestHandleA2ADispatchError_GenericReturns502(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-x", "", []byte("{}"), "message/send",
|
||||
fmt.Errorf("no such host"), 1, false,
|
||||
fmt.Errorf("no such host"), 1, false, "",
|
||||
)
|
||||
if perr == nil || perr.Status != http.StatusBadGateway {
|
||||
t.Errorf("expected 502, got %+v", perr)
|
||||
@ -1969,7 +1969,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42)
|
||||
handler.logA2AFailure(context.Background(), "ws-fail", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 42, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -1986,7 +1986,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1)
|
||||
handler.logA2AFailure(context.Background(), "ws-noname", "", []byte(`{}`), "message/send", fmt.Errorf("boom"), 1, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -2002,7 +2002,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-ok", "", []byte(`{}`), []byte(`{"result":"x"}`), "message/send", 200, 10, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
@ -2020,7 +2020,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// callerID != "" also means no A2A_RESPONSE broadcast.
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10, "")
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
|
||||
@ -204,7 +204,11 @@ func (h *WorkspaceHandler) handleMockA2A(ctx context.Context, workspaceID, calle
|
||||
// is identical to a real agent reply. Status 200 + duration 0
|
||||
// is the "synthesised reply" marker; activity_logs.duration_ms
|
||||
// being 0 is harmless (real fast paths can hit 0 too).
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0)
|
||||
// ingestRowID == "": the mock short-circuit runs before the
|
||||
// real-dispatch ingest-persist (it has no resolveAgentURL / no
|
||||
// dispatchA2A), so logA2ASuccess takes the legacy INSERT path —
|
||||
// identical to pre-fix behavior for mock workspaces.
|
||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, http.StatusOK, 0, "")
|
||||
}
|
||||
return http.StatusOK, respBody, true
|
||||
}
|
||||
|
||||
@ -31,7 +31,7 @@ func TestHandleA2ADispatchError_NativeSession_SkipsEnqueue(t *testing.T) {
|
||||
// sqlmock's ExpectationsWereMet implicitly enforces that on teardown.
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-native", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxy error, got nil")
|
||||
@ -74,7 +74,7 @@ func TestHandleA2ADispatchError_NoNativeSession_StillEnqueues(t *testing.T) {
|
||||
|
||||
_, _, perr := handler.handleA2ADispatchError(
|
||||
context.Background(), "ws-platform-queue", "", []byte("{}"), "message/send",
|
||||
context.DeadlineExceeded, 1, false,
|
||||
context.DeadlineExceeded, 1, false, "",
|
||||
)
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxy error, got nil")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user