diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 7ff618a1..1cd58ffb 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -490,13 +490,50 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { // idleTimeoutDuration is the per-dispatch silence window: if the // platform's broadcaster emits no events for this workspace for the // full duration, the dispatch ctx is cancelled. Resets on every -// ACTIVITY_LOGGED / TASK_UPDATED / A2A_RESPONSE event for the -// workspace, so a chat that's actively reporting tool calls or -// streaming status updates never trips it. Picked to be longer than -// any reasonable single-tool-use cadence (Claude Code's slowest -// observed silence between tools is ~30s) but short enough that a -// truly wedged runtime fails in 1 minute, not 5. -const idleTimeoutDuration = 60 * time.Second +// broadcaster event for the workspace — including the WORKSPACE_HEARTBEAT +// fired by the registry's /heartbeat handler every 30s, so a runtime +// that's just thinking silently between tool calls keeps the connection +// alive without having to emit ACTIVITY_LOGGED noise. +// +// Pre-2026-04-26 this was 60s, picked when the platform only broadcast +// on TASK_UPDATED (which itself only fires when current_task CHANGES). +// A claude-code agent doing a long packaging step or a slow model thought +// kept the same current_task for >60s, fired no broadcast, got cancelled +// mid-flight. Bumped to 5min as a safety net AND the heartbeat handler +// now broadcasts unconditionally — together either one alone closes the +// gap, both together is defence in depth. +// +// Override via A2A_IDLE_TIMEOUT_SECONDS for ops who want to tune (e.g. +// shorter for canary/test runners that want fail-fast on wedge, longer +// for prod tenants running unusually slow plugins). +var idleTimeoutDuration = parseIdleTimeoutEnv(os.Getenv("A2A_IDLE_TIMEOUT_SECONDS")) + +// defaultIdleTimeoutDuration is what parseIdleTimeoutEnv returns when +// the env var is unset or invalid. Pulled out as a const so tests can +// reference it without re-deriving the value. +const defaultIdleTimeoutDuration = 5 * time.Minute + +// parseIdleTimeoutEnv parses the A2A_IDLE_TIMEOUT_SECONDS value, falling +// back to defaultIdleTimeoutDuration on empty / non-numeric / non-positive +// input. Bad-input cases LOG so an operator who set the wrong value +// doesn't silently get the default and waste hours debugging "why is my +// override not working." Without the log line, A2A_IDLE_TIMEOUT_SECONDS=foo +// or =-30 produces identical observable behaviour to leaving it unset. +func parseIdleTimeoutEnv(v string) time.Duration { + if v == "" { + return defaultIdleTimeoutDuration + } + n, err := strconv.Atoi(v) + if err != nil { + log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%q is not a valid integer; using default %s", v, defaultIdleTimeoutDuration) + return defaultIdleTimeoutDuration + } + if n <= 0 { + log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%d must be > 0; using default %s", n, defaultIdleTimeoutDuration) + return defaultIdleTimeoutDuration + } + return time.Duration(n) * time.Second +} // dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation // chains survive client disconnect (browser tab close). Two layers of diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index be1fd1c8..a051f662 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -884,6 +884,116 @@ func TestHeartbeatHandler_TaskCleared(t *testing.T) { } } +// ---------- TestHeartbeatHandler_AlwaysBroadcastsHeartbeat ---------- +// +// Regression for the "context canceled" wave on 2026-04-26 (15+ failures +// in 1hr across 6 workspaces). The a2a-proxy idle timer subscribes to +// the broadcaster's SSE channel for the workspace and resets on every +// event. Pre-fix the only broadcast paths from heartbeat were +// TASK_UPDATED (only on current_task change) and the +// WORKSPACE_ONLINE/DEGRADED transitions inside evaluateStatus (only on +// status change). A long-running agent on the same task with stable +// status fired NO broadcasts → idle timer fired → user message +// got cancelled mid-flight. +// +// The fix emits an unconditional WORKSPACE_HEARTBEAT on every successful +// heartbeat. This test pins the property: regardless of whether +// current_task changed, the SSE subscriber observes a broadcast. + +func TestHeartbeatHandler_AlwaysBroadcastsHeartbeat(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + // Subscribe BEFORE the heartbeat so we don't miss the broadcast. + sub, unsub := broadcaster.SubscribeSSE("ws-123") + defer unsub() + + // Same-task scenario: task value unchanged across the heartbeat. + // Pre-fix this path emitted ZERO broadcasts. + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-123"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("doing work")) + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-123", 0.0, "", 1, 500, "doing work"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-123"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-123","error_rate":0.0,"sample_error":"","active_tasks":1,"uptime_seconds":500,"current_task":"doing work"}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + // Drain whatever the handler broadcast (with a tight timeout — the + // channel is in-process so the event should already be queued by + // the time Heartbeat returns). + gotHeartbeat := false + for i := 0; i < 5; i++ { + select { + case msg, ok := <-sub: + if !ok { + t.Fatal("broadcaster channel closed unexpectedly") + } + if msg.Event == "WORKSPACE_HEARTBEAT" { + gotHeartbeat = true + goto done + } + case <-time.After(200 * time.Millisecond): + goto done + } + } +done: + if !gotHeartbeat { + t.Error("expected WORKSPACE_HEARTBEAT broadcast on every heartbeat (regression: pre-fix, same-task heartbeats fired no broadcast and the a2a-proxy idle timer trip-cancelled in-flight requests)") + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// ---------- TestParseIdleTimeoutEnv ---------- +// +// Pins the env-override path including the bad-input fallback paths +// that the package-init `var idleTimeoutDuration = parseIdleTimeoutEnv(...)` +// relies on. Without this test, an operator who sets +// A2A_IDLE_TIMEOUT_SECONDS=foo would get the default with no log signal +// (pre-fix behaviour) and the regression would slip in unnoticed. + +func TestParseIdleTimeoutEnv(t *testing.T) { + cases := []struct { + name string + in string + want time.Duration + }{ + {"empty falls back to default", "", defaultIdleTimeoutDuration}, + {"valid positive integer parses to seconds", "120", 120 * time.Second}, + {"valid integer at minimum (1) is accepted", "1", 1 * time.Second}, + {"non-numeric falls back to default", "foo", defaultIdleTimeoutDuration}, + {"negative falls back to default", "-30", defaultIdleTimeoutDuration}, + {"zero falls back to default", "0", defaultIdleTimeoutDuration}, + {"float falls back to default (Atoi rejects)", "1.5", defaultIdleTimeoutDuration}, + {"trailing units rejected (we accept seconds only)", "60s", defaultIdleTimeoutDuration}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseIdleTimeoutEnv(tc.in) + if got != tc.want { + t.Errorf("parseIdleTimeoutEnv(%q) = %v, want %v", tc.in, got, tc.want) + } + }) + } +} + // ---------- TestActivityHandler_ListEmpty ---------- func TestActivityHandler_ListEmpty(t *testing.T) { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 2a07264e..a3b63291 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -441,6 +441,26 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { }) } + // Always emit a lightweight heartbeat broadcast — load-bearing for + // the a2a-proxy's per-dispatch idle timeout (a2a_proxy.go:applyIdleTimeout). + // Before this, the proxy's idle timer reset on TASK_UPDATED but + // TASK_UPDATED only fires when current_task CHANGES. A long-running + // agent that keeps the same task value for >idleTimeoutDuration + // (claude-code packaging a ZIP, slow tool call, model thinking time) + // hit no broadcast → idle timer fired → user's message got cancelled + // mid-flight with "context canceled". Symptom users hit on the + // 2026-04-26 director-bypass investigation: 15+ failures in 1hr + // across 6 workspaces, all silent during the gap. + // + // Cost: BroadcastOnly skips the DB write (no activity_logs row), + // so per-heartbeat cost is one in-memory channel send per active + // SSE subscriber and one WS hub fan-out. At 30s heartbeat cadence + // this is far below any noise floor on either path. + h.broadcaster.BroadcastOnly(payload.WorkspaceID, "WORKSPACE_HEARTBEAT", map[string]interface{}{ + "active_tasks": payload.ActiveTasks, + "uptime_seconds": payload.UptimeSeconds, + }) + c.JSON(http.StatusOK, gin.H{"status": "ok"}) }