forked from molecule-ai/molecule-core
Merge pull request #2128 from Molecule-AI/fix/a2a-idle-timeout-and-heartbeat-broadcast
fix(a2a-proxy): close 60s context-canceled gap on long silent runs
This commit is contained in:
commit
9d97e2af2f
@ -490,13 +490,50 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
// idleTimeoutDuration is the per-dispatch silence window: if the
|
||||
// platform's broadcaster emits no events for this workspace for the
|
||||
// full duration, the dispatch ctx is cancelled. Resets on every
|
||||
// ACTIVITY_LOGGED / TASK_UPDATED / A2A_RESPONSE event for the
|
||||
// workspace, so a chat that's actively reporting tool calls or
|
||||
// streaming status updates never trips it. Picked to be longer than
|
||||
// any reasonable single-tool-use cadence (Claude Code's slowest
|
||||
// observed silence between tools is ~30s) but short enough that a
|
||||
// truly wedged runtime fails in 1 minute, not 5.
|
||||
const idleTimeoutDuration = 60 * time.Second
|
||||
// broadcaster event for the workspace — including the WORKSPACE_HEARTBEAT
|
||||
// fired by the registry's /heartbeat handler every 30s, so a runtime
|
||||
// that's just thinking silently between tool calls keeps the connection
|
||||
// alive without having to emit ACTIVITY_LOGGED noise.
|
||||
//
|
||||
// Pre-2026-04-26 this was 60s, picked when the platform only broadcast
|
||||
// on TASK_UPDATED (which itself only fires when current_task CHANGES).
|
||||
// A claude-code agent doing a long packaging step or a slow model thought
|
||||
// kept the same current_task for >60s, fired no broadcast, got cancelled
|
||||
// mid-flight. Bumped to 5min as a safety net AND the heartbeat handler
|
||||
// now broadcasts unconditionally — together either one alone closes the
|
||||
// gap, both together is defence in depth.
|
||||
//
|
||||
// Override via A2A_IDLE_TIMEOUT_SECONDS for ops who want to tune (e.g.
|
||||
// shorter for canary/test runners that want fail-fast on wedge, longer
|
||||
// for prod tenants running unusually slow plugins).
|
||||
var idleTimeoutDuration = parseIdleTimeoutEnv(os.Getenv("A2A_IDLE_TIMEOUT_SECONDS"))
|
||||
|
||||
// defaultIdleTimeoutDuration is what parseIdleTimeoutEnv returns when
|
||||
// the env var is unset or invalid. Pulled out as a const so tests can
|
||||
// reference it without re-deriving the value.
|
||||
const defaultIdleTimeoutDuration = 5 * time.Minute
|
||||
|
||||
// parseIdleTimeoutEnv parses the A2A_IDLE_TIMEOUT_SECONDS value, falling
|
||||
// back to defaultIdleTimeoutDuration on empty / non-numeric / non-positive
|
||||
// input. Bad-input cases LOG so an operator who set the wrong value
|
||||
// doesn't silently get the default and waste hours debugging "why is my
|
||||
// override not working." Without the log line, A2A_IDLE_TIMEOUT_SECONDS=foo
|
||||
// or =-30 produces identical observable behaviour to leaving it unset.
|
||||
func parseIdleTimeoutEnv(v string) time.Duration {
|
||||
if v == "" {
|
||||
return defaultIdleTimeoutDuration
|
||||
}
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%q is not a valid integer; using default %s", v, defaultIdleTimeoutDuration)
|
||||
return defaultIdleTimeoutDuration
|
||||
}
|
||||
if n <= 0 {
|
||||
log.Printf("A2A_IDLE_TIMEOUT_SECONDS=%d must be > 0; using default %s", n, defaultIdleTimeoutDuration)
|
||||
return defaultIdleTimeoutDuration
|
||||
}
|
||||
return time.Duration(n) * time.Second
|
||||
}
|
||||
|
||||
// dispatchA2A POSTs `body` to `agentURL`. Uses WithoutCancel so delegation
|
||||
// chains survive client disconnect (browser tab close). Two layers of
|
||||
|
||||
@ -884,6 +884,116 @@ func TestHeartbeatHandler_TaskCleared(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- TestHeartbeatHandler_AlwaysBroadcastsHeartbeat ----------
|
||||
//
|
||||
// Regression for the "context canceled" wave on 2026-04-26 (15+ failures
|
||||
// in 1hr across 6 workspaces). The a2a-proxy idle timer subscribes to
|
||||
// the broadcaster's SSE channel for the workspace and resets on every
|
||||
// event. Pre-fix the only broadcast paths from heartbeat were
|
||||
// TASK_UPDATED (only on current_task change) and the
|
||||
// WORKSPACE_ONLINE/DEGRADED transitions inside evaluateStatus (only on
|
||||
// status change). A long-running agent on the same task with stable
|
||||
// status fired NO broadcasts → idle timer fired → user message
|
||||
// got cancelled mid-flight.
|
||||
//
|
||||
// The fix emits an unconditional WORKSPACE_HEARTBEAT on every successful
|
||||
// heartbeat. This test pins the property: regardless of whether
|
||||
// current_task changed, the SSE subscriber observes a broadcast.
|
||||
|
||||
func TestHeartbeatHandler_AlwaysBroadcastsHeartbeat(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
// Subscribe BEFORE the heartbeat so we don't miss the broadcast.
|
||||
sub, unsub := broadcaster.SubscribeSSE("ws-123")
|
||||
defer unsub()
|
||||
|
||||
// Same-task scenario: task value unchanged across the heartbeat.
|
||||
// Pre-fix this path emitted ZERO broadcasts.
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-123").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("doing work"))
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-123", 0.0, "", 1, 500, "doing work").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-123").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"workspace_id":"ws-123","error_rate":0.0,"sample_error":"","active_tasks":1,"uptime_seconds":500,"current_task":"doing work"}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// Drain whatever the handler broadcast (with a tight timeout — the
|
||||
// channel is in-process so the event should already be queued by
|
||||
// the time Heartbeat returns).
|
||||
gotHeartbeat := false
|
||||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
case msg, ok := <-sub:
|
||||
if !ok {
|
||||
t.Fatal("broadcaster channel closed unexpectedly")
|
||||
}
|
||||
if msg.Event == "WORKSPACE_HEARTBEAT" {
|
||||
gotHeartbeat = true
|
||||
goto done
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
goto done
|
||||
}
|
||||
}
|
||||
done:
|
||||
if !gotHeartbeat {
|
||||
t.Error("expected WORKSPACE_HEARTBEAT broadcast on every heartbeat (regression: pre-fix, same-task heartbeats fired no broadcast and the a2a-proxy idle timer trip-cancelled in-flight requests)")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- TestParseIdleTimeoutEnv ----------
|
||||
//
|
||||
// Pins the env-override path including the bad-input fallback paths
|
||||
// that the package-init `var idleTimeoutDuration = parseIdleTimeoutEnv(...)`
|
||||
// relies on. Without this test, an operator who sets
|
||||
// A2A_IDLE_TIMEOUT_SECONDS=foo would get the default with no log signal
|
||||
// (pre-fix behaviour) and the regression would slip in unnoticed.
|
||||
|
||||
func TestParseIdleTimeoutEnv(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
in string
|
||||
want time.Duration
|
||||
}{
|
||||
{"empty falls back to default", "", defaultIdleTimeoutDuration},
|
||||
{"valid positive integer parses to seconds", "120", 120 * time.Second},
|
||||
{"valid integer at minimum (1) is accepted", "1", 1 * time.Second},
|
||||
{"non-numeric falls back to default", "foo", defaultIdleTimeoutDuration},
|
||||
{"negative falls back to default", "-30", defaultIdleTimeoutDuration},
|
||||
{"zero falls back to default", "0", defaultIdleTimeoutDuration},
|
||||
{"float falls back to default (Atoi rejects)", "1.5", defaultIdleTimeoutDuration},
|
||||
{"trailing units rejected (we accept seconds only)", "60s", defaultIdleTimeoutDuration},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := parseIdleTimeoutEnv(tc.in)
|
||||
if got != tc.want {
|
||||
t.Errorf("parseIdleTimeoutEnv(%q) = %v, want %v", tc.in, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- TestActivityHandler_ListEmpty ----------
|
||||
|
||||
func TestActivityHandler_ListEmpty(t *testing.T) {
|
||||
|
||||
@ -441,6 +441,26 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// Always emit a lightweight heartbeat broadcast — load-bearing for
|
||||
// the a2a-proxy's per-dispatch idle timeout (a2a_proxy.go:applyIdleTimeout).
|
||||
// Before this, the proxy's idle timer reset on TASK_UPDATED but
|
||||
// TASK_UPDATED only fires when current_task CHANGES. A long-running
|
||||
// agent that keeps the same task value for >idleTimeoutDuration
|
||||
// (claude-code packaging a ZIP, slow tool call, model thinking time)
|
||||
// hit no broadcast → idle timer fired → user's message got cancelled
|
||||
// mid-flight with "context canceled". Symptom users hit on the
|
||||
// 2026-04-26 director-bypass investigation: 15+ failures in 1hr
|
||||
// across 6 workspaces, all silent during the gap.
|
||||
//
|
||||
// Cost: BroadcastOnly skips the DB write (no activity_logs row),
|
||||
// so per-heartbeat cost is one in-memory channel send per active
|
||||
// SSE subscriber and one WS hub fan-out. At 30s heartbeat cadence
|
||||
// this is far below any noise floor on either path.
|
||||
h.broadcaster.BroadcastOnly(payload.WorkspaceID, "WORKSPACE_HEARTBEAT", map[string]interface{}{
|
||||
"active_tasks": payload.ActiveTasks,
|
||||
"uptime_seconds": payload.UptimeSeconds,
|
||||
})
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user