From d552c43b94616f5fd9d86b9a985b7fd3aea71525 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 15:45:44 -0700 Subject: [PATCH 1/2] fix(a2a-proxy): close 60s context-canceled gap on long silent runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two compounding bugs caused the "context canceled" wave on 2026-04-26 (15+ failed user/agent A2A calls in 1hr across 6 workspaces, including the user's "send it in the chat" message that the director never received): 1. **a2a_proxy.go:applyIdleTimeout cancels the dispatch after 60s of broadcaster silence** for the workspace. Resets on any SSE event for the workspace, fires cancel() if no event arrives in time. 2. **registry.go:Heartbeat broadcast was conditional** — `if payload.CurrentTask != prevTask`. The runtime POSTs /registry/heartbeat every 30s, but if current_task hasn't changed the handler emits ZERO broadcasts. evaluateStatus only broadcasts on online/degraded transitions — also no-op when steady. Net: a claude-code agent on a long packaging step or slow tool call keeps the same current_task for >60s → no broadcasts → idle timer fires → in-flight request cancelled mid-flight with the "context canceled" error the user sees in the activity log. Fix: (a) Heartbeat handler always emits a `WORKSPACE_HEARTBEAT` BroadcastOnly event (no DB write — same path as TASK_UPDATED). At the existing 30s runtime cadence this resets the idle timer twice per minute. Cost is one in-memory channel send per active SSE subscriber + one WS hub fan-out per heartbeat — far below any noise floor. (b) idleTimeoutDuration default bumped 60s → 5min as a safety net for any future regression where the heartbeat path goes silent (e.g. runtime crashed mid-request before its next heartbeat). Made env-overridable via A2A_IDLE_TIMEOUT_SECONDS for ops who want to tune (canary tests fail-fast, prod tenants with slow plugins want longer). Either fix alone closes today's gap; both together is defence in depth. The runtime side already POSTs /registry/heartbeat every 30s via workspace/heartbeat.py — no runtime change needed. Test: TestHeartbeatHandler_AlwaysBroadcastsHeartbeat pins the property that an SSE subscriber observes a WORKSPACE_HEARTBEAT broadcast on a same-task heartbeat (the regression scenario). All 16 existing handler tests still pass. Doesn't fix: task #102 (single SDK session bottleneck) — peers will still queue when busy. But this PR ensures the queue/wait flow actually completes instead of being killed by the idle timer mid-wait. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy.go | 31 ++++++-- .../internal/handlers/handlers_test.go | 77 +++++++++++++++++++ .../internal/handlers/registry.go | 20 +++++ 3 files changed, 121 insertions(+), 7 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 7ff618a1..90a52b5b 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -490,13 +490,30 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { // idleTimeoutDuration is the per-dispatch silence window: if the // platform's broadcaster emits no events for this workspace for the // full duration, the dispatch ctx is cancelled. Resets on every -// ACTIVITY_LOGGED / TASK_UPDATED / A2A_RESPONSE event for the -// workspace, so a chat that's actively reporting tool calls or -// streaming status updates never trips it. Picked to be longer than -// any reasonable single-tool-use cadence (Claude Code's slowest -// observed silence between tools is ~30s) but short enough that a -// truly wedged runtime fails in 1 minute, not 5. -const idleTimeoutDuration = 60 * time.Second +// broadcaster event for the workspace — including the WORKSPACE_HEARTBEAT +// fired by the registry's /heartbeat handler every 30s, so a runtime +// that's just thinking silently between tool calls keeps the connection +// alive without having to emit ACTIVITY_LOGGED noise. +// +// Pre-2026-04-26 this was 60s, picked when the platform only broadcast +// on TASK_UPDATED (which itself only fires when current_task CHANGES). +// A claude-code agent doing a long packaging step or a slow model thought +// kept the same current_task for >60s, fired no broadcast, got cancelled +// mid-flight. Bumped to 5min as a safety net AND the heartbeat handler +// now broadcasts unconditionally — together either one alone closes the +// gap, both together is defence in depth. +// +// Override via A2A_IDLE_TIMEOUT_SECONDS for ops who want to tune (e.g. +// shorter for canary/test runners that want fail-fast on wedge, longer +// for prod tenants running unusually slow plugins). +var idleTimeoutDuration = func() time.Duration { + if v := os.Getenv("A2A_IDLE_TIMEOUT_SECONDS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return time.Duration(n) * time.Second + } + } + return 5 * time.Minute +}() // dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation // chains survive client disconnect (browser tab close). Two layers of diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index be1fd1c8..19098f74 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -884,6 +884,83 @@ func TestHeartbeatHandler_TaskCleared(t *testing.T) { } } +// ---------- TestHeartbeatHandler_AlwaysBroadcastsHeartbeat ---------- +// +// Regression for the "context canceled" wave on 2026-04-26 (15+ failures +// in 1hr across 6 workspaces). The a2a-proxy idle timer subscribes to +// the broadcaster's SSE channel for the workspace and resets on every +// event. Pre-fix the only broadcast paths from heartbeat were +// TASK_UPDATED (only on current_task change) and the +// WORKSPACE_ONLINE/DEGRADED transitions inside evaluateStatus (only on +// status change). A long-running agent on the same task with stable +// status fired NO broadcasts → idle timer fired → user message +// got cancelled mid-flight. +// +// The fix emits an unconditional WORKSPACE_HEARTBEAT on every successful +// heartbeat. This test pins the property: regardless of whether +// current_task changed, the SSE subscriber observes a broadcast. + +func TestHeartbeatHandler_AlwaysBroadcastsHeartbeat(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + // Subscribe BEFORE the heartbeat so we don't miss the broadcast. + sub, unsub := broadcaster.SubscribeSSE("ws-123") + defer unsub() + + // Same-task scenario: task value unchanged across the heartbeat. + // Pre-fix this path emitted ZERO broadcasts. + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-123"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("doing work")) + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-123", 0.0, "", 1, 500, "doing work"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-123"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-123","error_rate":0.0,"sample_error":"","active_tasks":1,"uptime_seconds":500,"current_task":"doing work"}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + // Drain whatever the handler broadcast (with a tight timeout — the + // channel is in-process so the event should already be queued by + // the time Heartbeat returns). + gotHeartbeat := false + for i := 0; i < 5; i++ { + select { + case msg, ok := <-sub: + if !ok { + t.Fatal("broadcaster channel closed unexpectedly") + } + if msg.Event == "WORKSPACE_HEARTBEAT" { + gotHeartbeat = true + goto done + } + case <-time.After(200 * time.Millisecond): + goto done + } + } +done: + if !gotHeartbeat { + t.Error("expected WORKSPACE_HEARTBEAT broadcast on every heartbeat (regression: pre-fix, same-task heartbeats fired no broadcast and the a2a-proxy idle timer trip-cancelled in-flight requests)") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + // ---------- TestActivityHandler_ListEmpty ---------- func TestActivityHandler_ListEmpty(t *testing.T) { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 2a07264e..a3b63291 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -441,6 +441,26 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { }) } + // Always emit a lightweight heartbeat broadcast — load-bearing for + // the a2a-proxy's per-dispatch idle timeout (a2a_proxy.go:applyIdleTimeout). + // Before this, the proxy's idle timer reset on TASK_UPDATED but + // TASK_UPDATED only fires when current_task CHANGES. A long-running + // agent that keeps the same task value for >idleTimeoutDuration + // (claude-code packaging a ZIP, slow tool call, model thinking time) + // hit no broadcast → idle timer fired → user's message got cancelled + // mid-flight with "context canceled". Symptom users hit on the + // 2026-04-26 director-bypass investigation: 15+ failures in 1hr + // across 6 workspaces, all silent during the gap. + // + // Cost: BroadcastOnly skips the DB write (no activity_logs row), + // so per-heartbeat cost is one in-memory channel send per active + // SSE subscriber and one WS hub fan-out. At 30s heartbeat cadence + // this is far below any noise floor on either path. + h.broadcaster.BroadcastOnly(payload.WorkspaceID, "WORKSPACE_HEARTBEAT", map[string]interface{}{ + "active_tasks": payload.ActiveTasks, + "uptime_seconds": payload.UptimeSeconds, + }) + c.JSON(http.StatusOK, gin.H{"status": "ok"}) } From 00f78c625261699300f5ac56b844040084e5d890 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 15:57:00 -0700 Subject: [PATCH 2/2] fix(a2a-proxy): log when A2A_IDLE_TIMEOUT_SECONDS is invalid MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review-feedback follow-up. Pre-fix, A2A_IDLE_TIMEOUT_SECONDS=foo or =-30 fell back to the default with zero log signal — operator sets the wrong value, sees "no effect," wastes hours debugging "why is my override not working." Now bad-input cases log a clear message naming the variable, the bad value, and the default applied. Refactor: extract parseIdleTimeoutEnv(string) → time.Duration so the parse logic is unit-testable. defaultIdleTimeoutDuration is a const so tests reference it without re-deriving the value. 8 new unit tests cover empty / valid / negative / zero / non-numeric / float / trailing-units inputs. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy.go | 34 +++++++++++++++---- .../internal/handlers/handlers_test.go | 33 ++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 90a52b5b..1cd58ffb 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -506,14 +506,34 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { // Override via A2A_IDLE_TIMEOUT_SECONDS for ops who want to tune (e.g. // shorter for canary/test runners that want fail-fast on wedge, longer // for prod tenants running unusually slow plugins). -var idleTimeoutDuration = func() time.Duration { - if v := os.Getenv("A2A_IDLE_TIMEOUT_SECONDS"); v != "" { - if n, err := strconv.Atoi(v); err == nil && n > 0 { - return time.Duration(n) * time.Second - } +var idleTimeoutDuration = parseIdleTimeoutEnv(os.Getenv("A2A_IDLE_TIMEOUT_SECONDS")) + +// defaultIdleTimeoutDuration is what parseIdleTimeoutEnv returns when +// the env var is unset or invalid. Pulled out as a const so tests can +// reference it without re-deriving the value. +const defaultIdleTimeoutDuration = 5 * time.Minute + +// parseIdleTimeoutEnv parses the A2A_IDLE_TIMEOUT_SECONDS value, falling +// back to defaultIdleTimeoutDuration on empty / non-numeric / non-positive +// input. Bad-input cases LOG so an operator who set the wrong value +// doesn't silently get the default and waste hours debugging "why is my +// override not working." Without the log line, A2A_IDLE_TIMEOUT_SECONDS=foo +// or =-30 produces identical observable behaviour to leaving it unset. +func parseIdleTimeoutEnv(v string) time.Duration { + if v == "" { + return defaultIdleTimeoutDuration } - return 5 * time.Minute -}() + n, err := strconv.Atoi(v) + if err != nil { + log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%q is not a valid integer; using default %s", v, defaultIdleTimeoutDuration) + return defaultIdleTimeoutDuration + } + if n <= 0 { + log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%d must be > 0; using default %s", n, defaultIdleTimeoutDuration) + return defaultIdleTimeoutDuration + } + return time.Duration(n) * time.Second +} // dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation // chains survive client disconnect (browser tab close). Two layers of diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index 19098f74..a051f662 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -961,6 +961,39 @@ done: } } +// ---------- TestParseIdleTimeoutEnv ---------- +// +// Pins the env-override path including the bad-input fallback paths +// that the package-init `var idleTimeoutDuration = parseIdleTimeoutEnv(...)` +// relies on. Without this test, an operator who sets +// A2A_IDLE_TIMEOUT_SECONDS=foo would get the default with no log signal +// (pre-fix behaviour) and the regression would slip in unnoticed. + +func TestParseIdleTimeoutEnv(t *testing.T) { + cases := []struct { + name string + in string + want time.Duration + }{ + {"empty falls back to default", "", defaultIdleTimeoutDuration}, + {"valid positive integer parses to seconds", "120", 120 * time.Second}, + {"valid integer at minimum (1) is accepted", "1", 1 * time.Second}, + {"non-numeric falls back to default", "foo", defaultIdleTimeoutDuration}, + {"negative falls back to default", "-30", defaultIdleTimeoutDuration}, + {"zero falls back to default", "0", defaultIdleTimeoutDuration}, + {"float falls back to default (Atoi rejects)", "1.5", defaultIdleTimeoutDuration}, + {"trailing units rejected (we accept seconds only)", "60s", defaultIdleTimeoutDuration}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseIdleTimeoutEnv(tc.in) + if got != tc.want { + t.Errorf("parseIdleTimeoutEnv(%q) = %v, want %v", tc.in, got, tc.want) + } + }) + } +} + // ---------- TestActivityHandler_ListEmpty ---------- func TestActivityHandler_ListEmpty(t *testing.T) {