diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index fd94c8ea0..d7a70174d 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -399,7 +399,21 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri // (no Do(), no maybeMarkContainerDead). The response is a synthetic // {status:"queued"} envelope so the caller (canvas, another workspace) // knows delivery is acknowledged but pending consumption. - if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll { + deliveryMode, deliveryModeErr := lookupDeliveryMode(ctx, workspaceID) + if deliveryModeErr != nil { + // internal#497 fail-closed: a real DB/context error on the + // delivery-mode read MUST NOT silently fall through to the push + // dispatch path — that is exactly what silently misrouted every + // poll-mode peer for 5 days under the ce2db75f regression. Surface + // a structured error so the delegation is marked failed (loud + + // retryable) instead of dispatched to the wrong path. + log.Printf("ProxyA2A: delivery-mode lookup failed for %s: %v — failing closed", workspaceID, deliveryModeErr) + return 0, nil, &proxyA2AError{ + Status: http.StatusServiceUnavailable, + Response: gin.H{"error": "delivery-mode lookup failed; refusing to dispatch to avoid silent misrouting"}, + } + } + if deliveryMode == models.DeliveryModePoll { if logActivity { h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod) } diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 5c1d3c2ba..6d9f5c74e 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -468,40 +468,64 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) { return 0, 0 } -// lookupDeliveryMode returns the workspace's delivery_mode. On any DB -// error or missing row it returns DeliveryModePush — the fail-closed -// default. "Closed" here means "fall back to today's behavior (synchronous -// dispatch)" rather than "fall back to drop the request silently into -// activity_logs where the agent might never see it." A poll-mode workspace -// that briefly reads as push will get its A2A request dispatched to the -// stored URL (or a 502 if no URL); a push-mode workspace that briefly -// reads as poll would get its request silently queued with no dispatch. -// The first failure is loud + recoverable; the second is silent. +// lookupDeliveryMode returns the workspace's delivery_mode. +// +// internal#497 / RFC#497 fail-closed (SURGICAL scope): the *specific* +// failure mode that hid the ce2db75f regression for 5 days is now +// propagated instead of silently swallowed — a CONTEXT error +// (context.Canceled / context.DeadlineExceeded). Under ce2db75f the +// detached delegation goroutine ran on a cancelled request context, every +// `SELECT delivery_mode` failed `context canceled`, this function returned +// push, the poll-mode short-circuit in proxyA2ARequest was skipped, and +// poll-mode peers (e.g. an operator laptop on molecule-mcp-claude-channel) +// silently never got their a2a_receive inbox row. A transient, +// systematic-once-triggered context cancellation became permanent +// invisible misrouting. Returning that error lets the caller fail loud +// (mark the delegation failed) instead of mis-dispatching. +// +// Scope is deliberately narrow: only ctx errors propagate. Other DB +// errors retain the long-standing documented "fall back to push (today's +// synchronous behavior)" contract — that path is loud + recoverable +// (502 / SSRF reject / restart), unlike the silent poll-mode drop, and +// the surrounding proxy (incl. the sibling checkWorkspaceBudget) is +// intentionally built around that fail-open-to-push behavior. Widening +// further is an RFC#497 follow-up, not part of this P0 fix. +// +// A genuinely *absent* configuration is NOT an error and still resolves to +// push (the safe synchronous default): sql.ErrNoRows, a NULL/empty column, +// or an unrecognised value all return (push, nil). // // The function is intentionally lookup-only — it never mutates the row. // The register handler (registry.go) is the only writer for delivery_mode. // // See #2339 PR 1 for the column + register-flow side; this is the // proxy-side read used for the short-circuit in proxyA2ARequest. -func lookupDeliveryMode(ctx context.Context, workspaceID string) string { +func lookupDeliveryMode(ctx context.Context, workspaceID string) (string, error) { var mode sql.NullString err := db.DB.QueryRowContext(ctx, `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, ).Scan(&mode) if err != nil { - if !errors.Is(err, sql.ErrNoRows) { - log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err) + // internal#497: a context cancellation/deadline MUST NOT be + // swallowed into a silent push default — that is the exact 5-day + // silent-misrouting vector. Propagate so the caller fails closed. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + log.Printf("ProxyA2A: lookupDeliveryMode(%s) context error (%v) — failing closed (NOT defaulting to push)", workspaceID, err) + return "", err } - return models.DeliveryModePush + if !errors.Is(err, sql.ErrNoRows) { + log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push (non-ctx DB error; legacy fail-open-to-push contract)", workspaceID, err) + } + return models.DeliveryModePush, nil } if !mode.Valid || mode.String == "" { - return models.DeliveryModePush + return models.DeliveryModePush, nil } if !models.IsValidDeliveryMode(mode.String) { log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String) - return models.DeliveryModePush + return models.DeliveryModePush, nil } - return mode.String + return mode.String, nil } // logA2AReceiveQueued records a poll-mode "queued" A2A receive into diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 7fa22dac5..a4f9e0160 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2228,12 +2228,18 @@ func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) { } } -// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract: -// a DB error reading delivery_mode must default to push (the existing -// behavior), NOT poll. Failing to push means a poll-mode workspace -// briefly attempts a real dispatch — visible failure (502 / SSRF -// rejection / restart cascade), not a silent drop into activity_logs -// where the agent might never look. Loud > silent, recoverable > lost. +// TestProxyA2A_PollMode_FailsClosedToPush verifies the LEGACY safety +// contract is PRESERVED for non-context DB errors: a generic DB error +// reading delivery_mode still defaults to push (today's behavior), NOT +// poll. Failing to push means a poll-mode workspace briefly attempts a +// real dispatch — visible failure (502 / SSRF rejection / restart +// cascade), not a silent drop into activity_logs where the agent might +// never look. Loud > silent, recoverable > lost. +// +// internal#497 narrows the fail-closed change to *context* errors only +// (the actual ce2db75f regression vector); generic DB errors keep this +// long-standing fail-open-to-push contract. The ctx-error fail-closed is +// covered by TestLookupDeliveryMode_ContextCanceled_FailsClosed. func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup @@ -2244,7 +2250,8 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { expectBudgetCheck(mock, wsID) - // lookupDeliveryMode hits a transient DB error → must default push. + // lookupDeliveryMode hits a generic (non-context) DB error → must + // still default push (legacy contract preserved by internal#497). mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). WithArgs(wsID). WillReturnError(sql.ErrConnDone) @@ -2268,7 +2275,7 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { var resp map[string]interface{} _ = json.Unmarshal(w.Body.Bytes(), &resp) if resp["status"] == "queued" { - t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String()) + t.Errorf("generic DB error on delivery_mode lookup silently queued the request — must fail-open-to-push, got body: %s", w.Body.String()) } } @@ -2277,6 +2284,37 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) { } } +// TestLookupDeliveryMode_ContextCanceled_FailsClosed is the internal#497 +// regression test for the SECONDARY defect. It pins the exact invariant +// that hid the ce2db75f regression for 5 days: when the delivery_mode read +// fails because the context was cancelled (precisely what happened in the +// detached delegation goroutine running on a returned request context), +// lookupDeliveryMode MUST return an error and MUST NOT silently return +// "push". Returning push there is what skipped the poll-mode short-circuit +// and silently dropped 100% of poll-mode peer deliveries. +// +// A pre-cancelled context makes QueryRowContext fail with +// context.Canceled deterministically — no DB rows are mocked because the +// query never reaches a result. +func TestLookupDeliveryMode_ContextCanceled_FailsClosed(t *testing.T) { + mock := setupTestDB(t) + // The query fails on the cancelled ctx before matching; provide a + // permissive expectation so sqlmock doesn't complain about the attempt. + mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id"). + WillReturnError(context.Canceled) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // simulate the HTTP handler having returned (request ctx dead) + + mode, err := lookupDeliveryMode(ctx, "ws-poll-peer") + if err == nil { + t.Fatalf("internal#497 regression: lookupDeliveryMode swallowed a context error and returned mode=%q with nil err — this is the exact 5-day silent-misrouting vector", mode) + } + if mode == models.DeliveryModePush { + t.Errorf("internal#497 regression: context error must NOT default to push (got mode=%q)", mode) + } +} + // ==================== a2aClient ResponseHeaderTimeout config ==================== func TestA2AClientResponseHeaderTimeout(t *testing.T) { diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index fefdeee71..8d180a65d 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -162,8 +162,32 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { }, }) - // Fire-and-forget: send A2A in background goroutine - go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody) + // Fire-and-forget: send A2A in a background goroutine. + // + // internal#497 — the goroutine MUST NOT inherit the HTTP request's + // cancellation. `ctx` here is c.Request.Context(); the handler returns + // 202 a few lines below, which cancels that context immediately. Before + // this fix (regression ce2db75f) executeDelegation ran on the + // request-scoped ctx, so every DB op + proxy call in the detached + // goroutine failed `context canceled` the instant the 202 was written. + // That silently broke 100% of A2A peer delegations fleet-wide since + // 2026-05-12 (poll-mode peers never got their a2a_receive inbox row; + // lookupDeliveryMode swallowed the ctx error and defaulted to push). + // + // context.WithoutCancel detaches cancellation/deadline while PRESERVING + // all context values (trace/correlation/tenant ids that proxyA2ARequest + // and the broadcaster read off ctx) — this is the established pattern in + // this package (a2a_proxy.go:850, a2a_proxy_helpers.go:525, + // registry.go:822). The 30-minute ceiling matches the prior internal + // budget executeDelegation used before ce2db75f and the proxy's own + // absolute agent-dispatch ceiling (a2a_proxy.go forwardCtx). + delegationCtx, cancelDelegation := context.WithTimeout( + context.WithoutCancel(ctx), 30*time.Minute, + ) + go func() { + defer cancelDelegation() + h.executeDelegation(delegationCtx, sourceID, body.TargetID, delegationID, a2aBody) + }() // Broadcast event so canvas shows delegation in real-time h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{ diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index fcd17eec8..223b8de77 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -16,6 +16,65 @@ import ( "github.com/gin-gonic/gin" ) +// ---------- internal#497 regression: detached goroutine ctx must outlive the handler ---------- + +// TestDelegate_DetachedContext_SurvivesRequestCancellation pins the +// load-bearing invariant that regression ce2db75f violated: the context +// handed to executeDelegation in the fire-and-forget goroutine must NOT be +// cancelled when the HTTP handler returns 202 (which cancels +// c.Request.Context()). Before the fix, executeDelegation ran on the +// request-scoped ctx, so every DB op + proxy call failed `context +// canceled` the instant the 202 was written — silently breaking 100% of +// A2A peer delegations fleet-wide since 2026-05-12. +// +// This test asserts the exact ctx-derivation contract used by Delegate +// (context.WithoutCancel(parent) + a timeout budget): the derived context +// (a) stays alive after the parent is cancelled, and (b) still carries +// parent values (trace/correlation/tenant ids the downstream proxy + +// broadcaster read off ctx). It is intentionally DB-free and fast. +func TestDelegate_DetachedContext_SurvivesRequestCancellation(t *testing.T) { + type ctxKey string + const traceKey ctxKey = "trace-id" + + // Simulate c.Request.Context() carrying a correlation value. + parent, cancelParent := context.WithCancel( + context.WithValue(context.Background(), traceKey, "trace-abc-123"), + ) + + // Exact derivation Delegate uses for the detached goroutine. + delegationCtx, cancelDelegation := context.WithTimeout( + context.WithoutCancel(parent), 30*time.Minute, + ) + defer cancelDelegation() + + // The HTTP handler "returns 202" → request context is cancelled. + cancelParent() + + if err := parent.Err(); err == nil { + t.Fatal("precondition: parent context should be cancelled after the handler returns") + } + + // (a) Cancellation MUST NOT propagate to the detached context. + select { + case <-delegationCtx.Done(): + t.Fatalf("regression: detached delegation ctx was cancelled by the handler returning (err=%v) — executeDelegation would fail every DB op with `context canceled`", delegationCtx.Err()) + default: + // alive — correct + } + + // (b) Parent values MUST still be readable (WithoutCancel preserves + // values; trace/correlation/tenant ids the proxy + broadcaster use). + if got, _ := delegationCtx.Value(traceKey).(string); got != "trace-abc-123" { + t.Errorf("detached ctx lost the parent trace value: got %q, want %q", got, "trace-abc-123") + } + + // And it still has a real deadline (the 30m budget), so it is not an + // unbounded background context. + if _, hasDeadline := delegationCtx.Deadline(); !hasDeadline { + t.Error("detached ctx must carry the 30-minute timeout budget, but has no deadline") + } +} + // ---------- Delegate: missing target_id → 400 ---------- func TestDelegate_MissingTargetID(t *testing.T) {