From 691d341fbbf63bd7d25e3d0d56d30531cb2d5e4c Mon Sep 17 00:00:00 2001 From: claude-ceo-assistant Date: Sat, 23 May 2026 16:21:22 -0700 Subject: [PATCH] fix(a2a): avoid false failure on busy queue fallback --- .../internal/handlers/a2a_proxy.go | 9 +-- .../internal/handlers/a2a_proxy_helpers.go | 46 +++++++++++++-- .../internal/handlers/a2a_proxy_test.go | 56 ++++++++++++++++++- 3 files changed, 100 insertions(+), 11 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 7689ddd3..2e3ec560 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -111,12 +111,13 @@ const maxProxyResponseBody = 10 << 20 // a generic 502 page to canvas. 10s is well above realistic intra-region // latencies and well below CF's edge timeout. // -// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end +// 3. Transport.ResponseHeaderTimeout — 5min default. From request-body-end // to response-headers-start. Configurable via // A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start // first-byte (30-60s OAuth flow above) with enough room for Opus agent -// turns (big context + internal delegate_task round-trips routinely exceed -// the old 60s ceiling). Body streaming after headers is governed by the +// turns and Codex scheduled tasks (big context + internal delegate_task +// round-trips routinely exceed the old 60s/180s ceilings). Body streaming +// after headers is governed by the // per-request context deadline, NOT this timeout — so multi-minute agent // responses still work fine. // @@ -131,7 +132,7 @@ var a2aClient = &http.Client{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, - ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 180*time.Second), + ResponseHeaderTimeout: envx.Duration("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", 5*time.Minute), TLSHandshakeTimeout: 10 * time.Second, // MaxIdleConns / IdleConnTimeout: stdlib defaults are fine; agent // fan-in is bounded by the platform's broadcaster fan-out, not by diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index cd16f989..ae475e83 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -28,8 +28,8 @@ type proxyDispatchBuildError struct{ err error } func (e *proxyDispatchBuildError) Error() string { return e.err.Error() } // handleA2ADispatchError translates a forward-call failure into a proxyA2AError, -// runs the reactive container-health check, and (when `logActivity` is true) -// schedules a detached LogActivity goroutine for the failed attempt. +// runs the reactive container-health check, and records the outcome. Busy +// targets that are successfully queued are logged as queued, not failed. func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int, logActivity bool) (int, []byte, *proxyA2AError) { // Build-time failure (couldn't even create the http.Request) — return // a 500 without the reactive-health / busy-retry paths. @@ -45,10 +45,10 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace containerDead := h.maybeMarkContainerDead(ctx, workspaceID) - if logActivity { - h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) - } if containerDead { + if logActivity { + h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) + } return 0, nil, &proxyA2AError{ Status: http.StatusServiceUnavailable, Response: gin.H{"error": "workspace agent unreachable — container restart triggered", "restarting": true}, @@ -108,6 +108,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey, expiresAt, ); qerr == nil { log.Printf("ProxyA2A: target %s busy — enqueued as %s (depth=%d)", workspaceID, qid, depth) + if logActivity { + h.logA2ABusyQueued(ctx, workspaceID, callerID, body, a2aMethod, durationMs) + } respBody, _ := json.Marshal(gin.H{ "queued": true, "queue_id": qid, @@ -121,6 +124,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // make delegation silently disappear. log.Printf("ProxyA2A: enqueue for %s failed (%v) — falling back to 503", workspaceID, qerr) } + if logActivity { + h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) + } return 0, nil, &proxyA2AError{ Status: http.StatusServiceUnavailable, Headers: map[string]string{"Retry-After": strconv.Itoa(busyRetryAfterSeconds)}, @@ -131,6 +137,9 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace }, } } + if logActivity { + h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs) + } return 0, nil, &proxyA2AError{ Status: http.StatusBadGateway, Response: gin.H{"error": "failed to reach workspace agent"}, @@ -311,6 +320,33 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle }) } +// logA2ABusyQueued records that a push attempt reached a live but busy +// workspace and was durably queued for heartbeat drain. +func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, durationMs int) { + var wsName string + db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (queued: target busy)" + parent := ctx + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + defer cancel() + LogActivity(logCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + DurationMs: &durationMs, + Status: "ok", + }) + }) +} + // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated // 2xx/3xx responses) broadcasts an A2A_RESPONSE event so the frontend can // receive the reply without polling. diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 930e747b..5558b378 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1779,6 +1779,58 @@ func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) { } } +func TestHandleA2ADispatchError_BusyEnqueueLogsQueuedNotFailure(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + + mock.ExpectQuery(`INSERT INTO a2a_queue`). + WithArgs("ws-busy", nil, PriorityTask, "{}", "message/send", nil, nil). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("11111111-1111-1111-1111-111111111111")) + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue`). + WithArgs("ws-busy"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). + WithArgs("ws-busy"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Busy Target")) + mock.ExpectExec("INSERT INTO activity_logs"). + WithArgs( + "ws-busy", + "a2a_receive", + nil, + sqlmock.AnyArg(), + sqlmock.AnyArg(), + sqlmock.AnyArg(), + sqlmock.AnyArg(), + nil, + nil, + sqlmock.AnyArg(), + "ok", + nil, + ). + WillReturnResult(sqlmock.NewResult(0, 1)) + + status, body, perr := handler.handleA2ADispatchError( + context.Background(), "ws-busy", "", []byte("{}"), "message/send", + context.DeadlineExceeded, 180002, true, + ) + if perr != nil { + t.Fatalf("expected busy enqueue success, got proxy error: %+v", perr) + } + if status != http.StatusAccepted { + t.Fatalf("got status %d, want 202", status) + } + if !bytes.Contains(body, []byte(`"queued":true`)) { + t.Fatalf("expected queued response body, got %s", string(body)) + } + + time.Sleep(80 * time.Millisecond) + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations; busy enqueue must log status=ok, not error: %v", err) + } +} + func TestHandleA2ADispatchError_BuildError(t *testing.T) { setupTestDB(t) setupTestRedis(t) @@ -2354,7 +2406,7 @@ func TestLookupDeliveryMode_ContextCanceled_FailsClosed(t *testing.T) { // ==================== a2aClient ResponseHeaderTimeout config ==================== func TestA2AClientResponseHeaderTimeout(t *testing.T) { - const defaultTimeout = 180 * time.Second + const defaultTimeout = 5 * time.Minute // Default (unset env) — a2aClient was initialised at package load time. if a2aClient.Transport.(*http.Transport).ResponseHeaderTimeout != defaultTimeout { @@ -2378,7 +2430,7 @@ func TestA2AClientResponseHeaderTimeout(t *testing.T) { t.Run("invalid A2A_PROXY_RESPONSE_HEADER_TIMEOUT falls back to default", func(t *testing.T) { t.Setenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT", "not-a-duration") // Simulate what envx.Duration does with an invalid value. - var fallback = 180 * time.Second + var fallback = 5 * time.Minute override := fallback if v := os.Getenv("A2A_PROXY_RESPONSE_HEADER_TIMEOUT"); v != "" { if d, err := time.ParseDuration(v); err == nil && d > 0 { -- 2.52.0