fix(handlers): detach executeDelegation ctx from HTTP request — A2A delegation P0 (regression ce2db75f, internal#497)
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 17s
E2E API Smoke Test / detect-changes (pull_request) Successful in 23s
E2E Chat / detect-changes (pull_request) Successful in 23s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 25s
Harness Replays / detect-changes (pull_request) Successful in 16s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 18s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 13s
gate-check-v3 / gate-check (pull_request) Successful in 16s
qa-review / approved (pull_request) Successful in 7s
security-review / approved (pull_request) Successful in 8s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m20s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 14s
CI / Platform (Go) (pull_request) Successful in 9m47s
CI / Canvas (Next.js) (pull_request) Successful in 10m21s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Failing after 17s
Harness Replays / Harness Replays (pull_request) Successful in 2s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m20s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m38s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 3s
audit-force-merge / audit (pull_request) Successful in 3s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 17s
E2E API Smoke Test / detect-changes (pull_request) Successful in 23s
E2E Chat / detect-changes (pull_request) Successful in 23s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 25s
Harness Replays / detect-changes (pull_request) Successful in 16s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 18s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 13s
gate-check-v3 / gate-check (pull_request) Successful in 16s
qa-review / approved (pull_request) Successful in 7s
security-review / approved (pull_request) Successful in 8s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m20s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 14s
CI / Platform (Go) (pull_request) Successful in 9m47s
CI / Canvas (Next.js) (pull_request) Successful in 10m21s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Failing after 17s
Harness Replays / Harness Replays (pull_request) Successful in 2s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m20s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m38s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 3s
audit-force-merge / audit (pull_request) Successful in 3s
A2A peer_agent delegation delivery has been 100% broken fleet-wide since 2026-05-12. Delegate() ran the fire-and-forget executeDelegation goroutine on c.Request.Context(); the handler returns HTTP 202 immediately, which cancels that context, so every DB op + proxy call in the detached goroutine failed `context canceled` the instant the response was written. lookupDeliveryMode swallowed the resulting error and silently defaulted to push, skipping the poll-mode short-circuit that writes the a2a_receive inbox row — so poll-mode peers (e.g. hongming-pc) never received messages and push-mode peers hit the #190-style self-echo timeouts. Introduced byce2db75f("handlers: pass cancellable context through executeDelegation"). Primary fix (delegation.go): derive the goroutine context via context.WithTimeout(context.WithoutCancel(ctx), 30*time.Minute). WithoutCancel detaches request cancellation/deadline while preserving all ctx values (trace/correlation/tenant ids the proxy + broadcaster read). This is the established pattern in this package (a2a_proxy.go:850, a2a_proxy_helpers.go:525, registry.go:822); the 30m budget matches the pre-ce2db75f internal budget and the proxy's own agent-dispatch ceiling. Secondary fix, surgical (a2a_proxy_helpers.go + a2a_proxy.go), RFC#497 fail-closed theme: lookupDeliveryMode no longer swallows a *context* error (context.Canceled / context.DeadlineExceeded) into a silent push default — it propagates so the caller fails closed with a structured 503. Scope deliberately narrowed to ctx errors only: generic DB errors retain the long-standing documented fail-open-to-push contract (loud + recoverable 502/SSRF/restart, unlike the silent poll drop), so checkWorkspaceBudget's intentional fail-open and the existing suite are unaffected. Widening further is an RFC#497 follow-up, not part of this P0. Regression tests: - TestDelegate_DetachedContext_SurvivesRequestCancellation: detached ctx outlives request cancellation AND preserves parent values + deadline. - TestLookupDeliveryMode_ContextCanceled_FailsClosed: ctx-cancelled delivery-mode read returns an error, never push. - TestProxyA2A_PollMode_FailsClosedToPush: legacy non-ctx-DB-error fail-open-to-push contract preserved. Full workspace-server/internal/handlers package suite passes (go test -count=1), go build ./... and go vet clean. Refs: internal#497, regressionce2db75fCo-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -399,7 +399,21 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// (no Do(), no maybeMarkContainerDead). The response is a synthetic
|
||||
// {status:"queued"} envelope so the caller (canvas, another workspace)
|
||||
// knows delivery is acknowledged but pending consumption.
|
||||
if lookupDeliveryMode(ctx, workspaceID) == models.DeliveryModePoll {
|
||||
deliveryMode, deliveryModeErr := lookupDeliveryMode(ctx, workspaceID)
|
||||
if deliveryModeErr != nil {
|
||||
// internal#497 fail-closed: a real DB/context error on the
|
||||
// delivery-mode read MUST NOT silently fall through to the push
|
||||
// dispatch path — that is exactly what silently misrouted every
|
||||
// poll-mode peer for 5 days under the ce2db75f regression. Surface
|
||||
// a structured error so the delegation is marked failed (loud +
|
||||
// retryable) instead of dispatched to the wrong path.
|
||||
log.Printf("ProxyA2A: delivery-mode lookup failed for %s: %v — failing closed", workspaceID, deliveryModeErr)
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "delivery-mode lookup failed; refusing to dispatch to avoid silent misrouting"},
|
||||
}
|
||||
}
|
||||
if deliveryMode == models.DeliveryModePoll {
|
||||
if logActivity {
|
||||
h.logA2AReceiveQueued(ctx, workspaceID, callerID, body, a2aMethod)
|
||||
}
|
||||
|
||||
@@ -468,40 +468,64 @@ func parseUsageFromA2AResponse(body []byte) (inputTokens, outputTokens int64) {
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
// lookupDeliveryMode returns the workspace's delivery_mode. On any DB
|
||||
// error or missing row it returns DeliveryModePush — the fail-closed
|
||||
// default. "Closed" here means "fall back to today's behavior (synchronous
|
||||
// dispatch)" rather than "fall back to drop the request silently into
|
||||
// activity_logs where the agent might never see it." A poll-mode workspace
|
||||
// that briefly reads as push will get its A2A request dispatched to the
|
||||
// stored URL (or a 502 if no URL); a push-mode workspace that briefly
|
||||
// reads as poll would get its request silently queued with no dispatch.
|
||||
// The first failure is loud + recoverable; the second is silent.
|
||||
// lookupDeliveryMode returns the workspace's delivery_mode.
|
||||
//
|
||||
// internal#497 / RFC#497 fail-closed (SURGICAL scope): the *specific*
|
||||
// failure mode that hid the ce2db75f regression for 5 days is now
|
||||
// propagated instead of silently swallowed — a CONTEXT error
|
||||
// (context.Canceled / context.DeadlineExceeded). Under ce2db75f the
|
||||
// detached delegation goroutine ran on a cancelled request context, every
|
||||
// `SELECT delivery_mode` failed `context canceled`, this function returned
|
||||
// push, the poll-mode short-circuit in proxyA2ARequest was skipped, and
|
||||
// poll-mode peers (e.g. an operator laptop on molecule-mcp-claude-channel)
|
||||
// silently never got their a2a_receive inbox row. A transient,
|
||||
// systematic-once-triggered context cancellation became permanent
|
||||
// invisible misrouting. Returning that error lets the caller fail loud
|
||||
// (mark the delegation failed) instead of mis-dispatching.
|
||||
//
|
||||
// Scope is deliberately narrow: only ctx errors propagate. Other DB
|
||||
// errors retain the long-standing documented "fall back to push (today's
|
||||
// synchronous behavior)" contract — that path is loud + recoverable
|
||||
// (502 / SSRF reject / restart), unlike the silent poll-mode drop, and
|
||||
// the surrounding proxy (incl. the sibling checkWorkspaceBudget) is
|
||||
// intentionally built around that fail-open-to-push behavior. Widening
|
||||
// further is an RFC#497 follow-up, not part of this P0 fix.
|
||||
//
|
||||
// A genuinely *absent* configuration is NOT an error and still resolves to
|
||||
// push (the safe synchronous default): sql.ErrNoRows, a NULL/empty column,
|
||||
// or an unrecognised value all return (push, nil).
|
||||
//
|
||||
// The function is intentionally lookup-only — it never mutates the row.
|
||||
// The register handler (registry.go) is the only writer for delivery_mode.
|
||||
//
|
||||
// See #2339 PR 1 for the column + register-flow side; this is the
|
||||
// proxy-side read used for the short-circuit in proxyA2ARequest.
|
||||
func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
|
||||
func lookupDeliveryMode(ctx context.Context, workspaceID string) (string, error) {
|
||||
var mode sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&mode)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push", workspaceID, err)
|
||||
// internal#497: a context cancellation/deadline MUST NOT be
|
||||
// swallowed into a silent push default — that is the exact 5-day
|
||||
// silent-misrouting vector. Propagate so the caller fails closed.
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
log.Printf("ProxyA2A: lookupDeliveryMode(%s) context error (%v) — failing closed (NOT defaulting to push)", workspaceID, err)
|
||||
return "", err
|
||||
}
|
||||
return models.DeliveryModePush
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
log.Printf("ProxyA2A: lookupDeliveryMode(%s) failed (%v) — defaulting to push (non-ctx DB error; legacy fail-open-to-push contract)", workspaceID, err)
|
||||
}
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
if !mode.Valid || mode.String == "" {
|
||||
return models.DeliveryModePush
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
if !models.IsValidDeliveryMode(mode.String) {
|
||||
log.Printf("ProxyA2A: workspace %s has invalid delivery_mode=%q — defaulting to push", workspaceID, mode.String)
|
||||
return models.DeliveryModePush
|
||||
return models.DeliveryModePush, nil
|
||||
}
|
||||
return mode.String
|
||||
return mode.String, nil
|
||||
}
|
||||
|
||||
// logA2AReceiveQueued records a poll-mode "queued" A2A receive into
|
||||
|
||||
@@ -2228,12 +2228,18 @@ func TestProxyA2A_PushMode_NoShortCircuit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestProxyA2A_PollMode_FailsClosedToPush verifies the safety contract:
|
||||
// a DB error reading delivery_mode must default to push (the existing
|
||||
// behavior), NOT poll. Failing to push means a poll-mode workspace
|
||||
// briefly attempts a real dispatch — visible failure (502 / SSRF
|
||||
// rejection / restart cascade), not a silent drop into activity_logs
|
||||
// where the agent might never look. Loud > silent, recoverable > lost.
|
||||
// TestProxyA2A_PollMode_FailsClosedToPush verifies the LEGACY safety
|
||||
// contract is PRESERVED for non-context DB errors: a generic DB error
|
||||
// reading delivery_mode still defaults to push (today's behavior), NOT
|
||||
// poll. Failing to push means a poll-mode workspace briefly attempts a
|
||||
// real dispatch — visible failure (502 / SSRF rejection / restart
|
||||
// cascade), not a silent drop into activity_logs where the agent might
|
||||
// never look. Loud > silent, recoverable > lost.
|
||||
//
|
||||
// internal#497 narrows the fail-closed change to *context* errors only
|
||||
// (the actual ce2db75f regression vector); generic DB errors keep this
|
||||
// long-standing fail-open-to-push contract. The ctx-error fail-closed is
|
||||
// covered by TestLookupDeliveryMode_ContextCanceled_FailsClosed.
|
||||
func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t) // empty Redis — forces resolveAgentURL DB lookup
|
||||
@@ -2244,7 +2250,8 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
|
||||
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// lookupDeliveryMode hits a transient DB error → must default push.
|
||||
// lookupDeliveryMode hits a generic (non-context) DB error → must
|
||||
// still default push (legacy contract preserved by internal#497).
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
@@ -2268,7 +2275,7 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
|
||||
var resp map[string]interface{}
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["status"] == "queued" {
|
||||
t.Errorf("DB error on delivery_mode lookup silently queued the request — must fail-closed-to-push, got body: %s", w.Body.String())
|
||||
t.Errorf("generic DB error on delivery_mode lookup silently queued the request — must fail-open-to-push, got body: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2277,6 +2284,37 @@ func TestProxyA2A_PollMode_FailsClosedToPush(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestLookupDeliveryMode_ContextCanceled_FailsClosed is the internal#497
|
||||
// regression test for the SECONDARY defect. It pins the exact invariant
|
||||
// that hid the ce2db75f regression for 5 days: when the delivery_mode read
|
||||
// fails because the context was cancelled (precisely what happened in the
|
||||
// detached delegation goroutine running on a returned request context),
|
||||
// lookupDeliveryMode MUST return an error and MUST NOT silently return
|
||||
// "push". Returning push there is what skipped the poll-mode short-circuit
|
||||
// and silently dropped 100% of poll-mode peer deliveries.
|
||||
//
|
||||
// A pre-cancelled context makes QueryRowContext fail with
|
||||
// context.Canceled deterministically — no DB rows are mocked because the
|
||||
// query never reaches a result.
|
||||
func TestLookupDeliveryMode_ContextCanceled_FailsClosed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
// The query fails on the cancelled ctx before matching; provide a
|
||||
// permissive expectation so sqlmock doesn't complain about the attempt.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WillReturnError(context.Canceled)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // simulate the HTTP handler having returned (request ctx dead)
|
||||
|
||||
mode, err := lookupDeliveryMode(ctx, "ws-poll-peer")
|
||||
if err == nil {
|
||||
t.Fatalf("internal#497 regression: lookupDeliveryMode swallowed a context error and returned mode=%q with nil err — this is the exact 5-day silent-misrouting vector", mode)
|
||||
}
|
||||
if mode == models.DeliveryModePush {
|
||||
t.Errorf("internal#497 regression: context error must NOT default to push (got mode=%q)", mode)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== a2aClient ResponseHeaderTimeout config ====================
|
||||
|
||||
func TestA2AClientResponseHeaderTimeout(t *testing.T) {
|
||||
|
||||
@@ -162,8 +162,32 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
},
|
||||
})
|
||||
|
||||
// Fire-and-forget: send A2A in background goroutine
|
||||
go h.executeDelegation(ctx, sourceID, body.TargetID, delegationID, a2aBody)
|
||||
// Fire-and-forget: send A2A in a background goroutine.
|
||||
//
|
||||
// internal#497 — the goroutine MUST NOT inherit the HTTP request's
|
||||
// cancellation. `ctx` here is c.Request.Context(); the handler returns
|
||||
// 202 a few lines below, which cancels that context immediately. Before
|
||||
// this fix (regression ce2db75f) executeDelegation ran on the
|
||||
// request-scoped ctx, so every DB op + proxy call in the detached
|
||||
// goroutine failed `context canceled` the instant the 202 was written.
|
||||
// That silently broke 100% of A2A peer delegations fleet-wide since
|
||||
// 2026-05-12 (poll-mode peers never got their a2a_receive inbox row;
|
||||
// lookupDeliveryMode swallowed the ctx error and defaulted to push).
|
||||
//
|
||||
// context.WithoutCancel detaches cancellation/deadline while PRESERVING
|
||||
// all context values (trace/correlation/tenant ids that proxyA2ARequest
|
||||
// and the broadcaster read off ctx) — this is the established pattern in
|
||||
// this package (a2a_proxy.go:850, a2a_proxy_helpers.go:525,
|
||||
// registry.go:822). The 30-minute ceiling matches the prior internal
|
||||
// budget executeDelegation used before ce2db75f and the proxy's own
|
||||
// absolute agent-dispatch ceiling (a2a_proxy.go forwardCtx).
|
||||
delegationCtx, cancelDelegation := context.WithTimeout(
|
||||
context.WithoutCancel(ctx), 30*time.Minute,
|
||||
)
|
||||
go func() {
|
||||
defer cancelDelegation()
|
||||
h.executeDelegation(delegationCtx, sourceID, body.TargetID, delegationID, a2aBody)
|
||||
}()
|
||||
|
||||
// Broadcast event so canvas shows delegation in real-time
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationSent), sourceID, map[string]interface{}{
|
||||
|
||||
@@ -16,6 +16,65 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ---------- internal#497 regression: detached goroutine ctx must outlive the handler ----------
|
||||
|
||||
// TestDelegate_DetachedContext_SurvivesRequestCancellation pins the
|
||||
// load-bearing invariant that regression ce2db75f violated: the context
|
||||
// handed to executeDelegation in the fire-and-forget goroutine must NOT be
|
||||
// cancelled when the HTTP handler returns 202 (which cancels
|
||||
// c.Request.Context()). Before the fix, executeDelegation ran on the
|
||||
// request-scoped ctx, so every DB op + proxy call failed `context
|
||||
// canceled` the instant the 202 was written — silently breaking 100% of
|
||||
// A2A peer delegations fleet-wide since 2026-05-12.
|
||||
//
|
||||
// This test asserts the exact ctx-derivation contract used by Delegate
|
||||
// (context.WithoutCancel(parent) + a timeout budget): the derived context
|
||||
// (a) stays alive after the parent is cancelled, and (b) still carries
|
||||
// parent values (trace/correlation/tenant ids the downstream proxy +
|
||||
// broadcaster read off ctx). It is intentionally DB-free and fast.
|
||||
func TestDelegate_DetachedContext_SurvivesRequestCancellation(t *testing.T) {
|
||||
type ctxKey string
|
||||
const traceKey ctxKey = "trace-id"
|
||||
|
||||
// Simulate c.Request.Context() carrying a correlation value.
|
||||
parent, cancelParent := context.WithCancel(
|
||||
context.WithValue(context.Background(), traceKey, "trace-abc-123"),
|
||||
)
|
||||
|
||||
// Exact derivation Delegate uses for the detached goroutine.
|
||||
delegationCtx, cancelDelegation := context.WithTimeout(
|
||||
context.WithoutCancel(parent), 30*time.Minute,
|
||||
)
|
||||
defer cancelDelegation()
|
||||
|
||||
// The HTTP handler "returns 202" → request context is cancelled.
|
||||
cancelParent()
|
||||
|
||||
if err := parent.Err(); err == nil {
|
||||
t.Fatal("precondition: parent context should be cancelled after the handler returns")
|
||||
}
|
||||
|
||||
// (a) Cancellation MUST NOT propagate to the detached context.
|
||||
select {
|
||||
case <-delegationCtx.Done():
|
||||
t.Fatalf("regression: detached delegation ctx was cancelled by the handler returning (err=%v) — executeDelegation would fail every DB op with `context canceled`", delegationCtx.Err())
|
||||
default:
|
||||
// alive — correct
|
||||
}
|
||||
|
||||
// (b) Parent values MUST still be readable (WithoutCancel preserves
|
||||
// values; trace/correlation/tenant ids the proxy + broadcaster use).
|
||||
if got, _ := delegationCtx.Value(traceKey).(string); got != "trace-abc-123" {
|
||||
t.Errorf("detached ctx lost the parent trace value: got %q, want %q", got, "trace-abc-123")
|
||||
}
|
||||
|
||||
// And it still has a real deadline (the 30m budget), so it is not an
|
||||
// unbounded background context.
|
||||
if _, hasDeadline := delegationCtx.Deadline(); !hasDeadline {
|
||||
t.Error("detached ctx must carry the 30-minute timeout budget, but has no deadline")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Delegate: missing target_id → 400 ----------
|
||||
|
||||
func TestDelegate_MissingTargetID(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user