From 3b66ce1173717af58a56b1b02440bcfd80674f84 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Mon, 15 Jun 2026 11:01:28 +0000 Subject: [PATCH] fix(a2a_proxy): debounce reactive restart on transient 503 / IsRunning flake (#2929) Harden maybeMarkContainerDead so a single agent-origin A2A 503 or flaky IsRunning probe cannot recycle a recently-alive workspace: - Add recent-heartbeat guard (last_heartbeat_at within 15s); recent heartbeats put the workspace on a debounced path. - Require 2 consecutive dead observations within 30s before declaring dead when the heartbeat is recent. - Treat IsRunning transport errors as alive instead of dead, closing the Docker-daemon-EOF / CP-5xx restart cascade. - Preserve immediate dead-declaration when there is no recent heartbeat, so dead EC2 recovery on the first failed request still works. Fixes #2929. Test plan: - go test ./internal/handlers -run TestMaybeMarkContainerDead -count=1 - go test ./internal/handlers -count=1 - go build ./... Co-Authored-By: Claude --- .../internal/handlers/a2a_proxy_helpers.go | 117 ++++++++++++++++- .../internal/handlers/a2a_proxy_test.go | 120 +++++++++++++++++- .../internal/handlers/workspace.go | 23 +++- 3 files changed, 254 insertions(+), 6 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 31daadb3..61a69f5e 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -25,6 +25,19 @@ import ( "github.com/gin-gonic/gin" ) +// Reactive container-death debounce constants (#2929). A single A2A +// forward error or one flaky IsRunning probe must not restart a +// recently-alive workspace. We only declare the container dead when: +// - the workspace has NOT heartbeated recently, AND IsRunning reports +// not-running (immediate-dead path, preserving dead-EC2 recovery); OR +// - the workspace DOES have a recent heartbeat but we see N consecutive +// dead observations within the debounce window. +const ( + recentHeartbeatWindow = 15 * time.Second + containerDeadDebounceThreshold = 2 + containerDeadDebounceWindow = 30 * time.Second +) + // proxyDispatchBuildError is a sentinel wrapper for failures inside // http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the // "failed to create proxy request" 500 instead of the standard 502/503 paths. @@ -171,6 +184,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace // with no auto-recovery and Cloudflare in front would mask the response with // its own error page. The 2026-04-30 hongmingwang.moleculesai.app // canvas-chat-to-dead-workspace incident traces to exactly this gap. +// maybeMarkContainerDead runs the reactive health check after a forward error. +// If the workspace's compute (Docker container OR EC2 instance) is no longer +// running (and the workspace isn't external), it marks the workspace offline, +// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async +// restart. Returns true when the compute was found dead. +// +// Provisioner selection (mutually exclusive in production): +// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect. +// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's +// /cp/workspaces/:id/status to read the EC2 state. +// +// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants +// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every +// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas +// with no auto-recovery and Cloudflare in front would mask the response with +// its own error page. The 2026-04-30 hongmingwang.moleculesai.app +// canvas-chat-to-dead-workspace incident traces to exactly this gap. +// +// #2929 hardening: do NOT recycle a recently-alive workspace on a single +// transient A2A 503 / IsRunning flake. We guard the restart with: +// 1. A recent-heartbeat check (last_heartbeat_at within 15s). +// 2. When the heartbeat is recent, require 2 consecutive dead observations +// within 30s before declaring dead. +// 3. Treat IsRunning transport errors as "assume alive" instead of dead. func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool { var wsRuntime string db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'claude-code') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime) @@ -196,6 +233,12 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace return false } + // #2929: recent-heartbeat guard. A workspace that heartbeated seconds + // ago is almost certainly alive; a single proxy/transport flake should + // not kill it. We still run IsRunning below, but a recent heartbeat + // puts us on the debounced path. + recentHeartbeat := h.hasRecentHeartbeat(ctx, workspaceID) + var running bool var inspectErr error if h.provisioner != nil { @@ -211,11 +254,52 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace // IsRunning's contract returns (true, err) in this case so we stay // on the alive path without triggering a restart cascade. log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr) - } - if running { return false } + if running { + h.resetDeadProbe(workspaceID) + return false + } + + // Container is not running. If we have no recent heartbeat, preserve the + // pre-#2929 immediate-dead behavior so dead EC2 instances still recover + // on the first failed request (hongmingwang incident recovery path). + if !recentHeartbeat { + return h.declareContainerDead(ctx, workspaceID) + } + + // Recent heartbeat but IsRunning says not-running: debounce. Require N + // consecutive observations within the window before we believe it. + if !h.incrementDeadProbe(workspaceID) { + log.Printf("ProxyA2A: container for %s looks dead but has recent heartbeat — debouncing (%d/%d within %s)", + workspaceID, h.deadProbeCount(workspaceID), containerDeadDebounceThreshold, containerDeadDebounceWindow) + return false + } + return h.declareContainerDead(ctx, workspaceID) +} + +// hasRecentHeartbeat returns true if the workspace has a last_heartbeat_at +// within recentHeartbeatWindow. Missing/null heartbeat is treated as not +// recent. +func (h *WorkspaceHandler) hasRecentHeartbeat(ctx context.Context, workspaceID string) bool { + var lastHB *time.Time + if err := db.DB.QueryRowContext(ctx, + `SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&lastHB); err != nil { + log.Printf("ProxyA2A: failed to read last_heartbeat_at for %s: %v", workspaceID, err) + return false + } + if lastHB == nil { + return false + } + return time.Since(*lastHB) <= recentHeartbeatWindow +} + +// declareContainerDead is the single point that marks a workspace offline, +// clears keys, broadcasts OFFLINE, and triggers an async restart. +func (h *WorkspaceHandler) declareContainerDead(ctx context.Context, workspaceID string) bool { log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID) + h.resetDeadProbe(workspaceID) if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`, models.StatusOffline, workspaceID); err != nil { log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err) } @@ -230,6 +314,35 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace return true } +// incrementDeadProbe records another dead-looking observation for workspaceID +// and returns true when the threshold is reached within the debounce window. +func (h *WorkspaceHandler) incrementDeadProbe(workspaceID string) bool { + h.deadProbeMu.Lock() + defer h.deadProbeMu.Unlock() + now := time.Now() + rec, ok := h.deadProbeAttempts[workspaceID] + if !ok || now.Sub(rec.first) > containerDeadDebounceWindow { + rec = deadProbeRecord{count: 1, first: now, last: now} + } else { + rec.count++ + rec.last = now + } + h.deadProbeAttempts[workspaceID] = rec + return rec.count >= containerDeadDebounceThreshold +} + +func (h *WorkspaceHandler) resetDeadProbe(workspaceID string) { + h.deadProbeMu.Lock() + defer h.deadProbeMu.Unlock() + delete(h.deadProbeAttempts, workspaceID) +} + +func (h *WorkspaceHandler) deadProbeCount(workspaceID string) int { + h.deadProbeMu.Lock() + defer h.deadProbeMu.Unlock() + return h.deadProbeAttempts[workspaceID].count +} + // preflightContainerHealth runs a proactive Provisioner.IsRunning check // (#36) before dispatching the a2a forward. Routed through provisioner's // SSOT IsRunning, which itself wraps RunningContainerName — same source diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index da1a06ca..d5e31b7c 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2242,6 +2242,7 @@ func TestStopForRestart_NoProvisioner_NoOp(t *testing.T) { // can check `calls == 0` after a sync barrier. type fakeCPProv struct { running bool + err error calls int stopCalls int startCalls int @@ -2264,7 +2265,7 @@ func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, erro } func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) { f.calls++ - return f.running, nil + return f.running, f.err } // external runtime → false regardless of provisioner. @@ -2282,6 +2283,123 @@ func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) { } } +// #2929: a recent heartbeat + IsRunning=false should NOT declare the +// container dead on the first observation. The second observation within +// the debounce window should. +func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) + cp := &fakeCPProv{running: false} + handler.SetCPProvisioner(cp) + + recentHB := time.Now() + wsid := "ws-debounce" + + // First observation: recent heartbeat, not running → debounce, no UPDATE. + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB)) + + if got := handler.maybeMarkContainerDead(context.Background(), wsid); got { + t.Fatal("first dead observation with recent heartbeat should be debounced, not restarted") + } + if cp.calls != 1 { + t.Errorf("first observation: expected 1 IsRunning call, got %d", cp.calls) + } + + // Second observation within the window → threshold reached, restart. + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB)) + mock.ExpectExec(`UPDATE workspaces SET status =`). + WithArgs(models.StatusOffline, wsid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if got := handler.maybeMarkContainerDead(context.Background(), wsid); !got { + t.Fatal("second dead observation with recent heartbeat should declare dead") + } + if cp.calls != 2 { + t.Errorf("second observation: expected 2 IsRunning calls total, got %d", cp.calls) + } +} + +// #2929: IsRunning returning a transport error must be treated as alive, +// not as a dead container. +func TestMaybeMarkContainerDead_InspectErr_AssumesAlive(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{err: errors.New("cp timeout")} + handler.SetCPProvisioner(cp) + + wsid := "ws-inspect-err" + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{})) + + if got := handler.maybeMarkContainerDead(context.Background(), wsid); got { + t.Error("transient IsRunning error should be treated as alive, not dead") + } +} + +// #2929: a successful IsRunning=true observation resets any accumulated +// dead-probe counter. +func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + cp := &fakeCPProv{running: false} + handler.SetCPProvisioner(cp) + wsid := "ws-alive-resets" + recentHB := time.Now() + + // One dead observation with recent heartbeat accumulates a probe. + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB)) + if got := handler.maybeMarkContainerDead(context.Background(), wsid); got { + t.Fatal("expected first observation to be debounced") + } + + // Next observation says running=true → resets counter and does NOT restart. + cp.running = true + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB)) + if got := handler.maybeMarkContainerDead(context.Background(), wsid); got { + t.Error("running=true should reset dead probe and not restart") + } + + // One more dead observation is now back to count=1, not threshold. + cp.running = false + mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes")) + mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`). + WithArgs(wsid). + WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB)) + if got := handler.maybeMarkContainerDead(context.Background(), wsid); got { + t.Fatal("expected dead probe to have been reset; first post-reset observation should be debounced") + } +} + // --- logA2AFailure / logA2ASuccess smoke tests --- // These helpers spawn a detached goroutine that calls LogActivity, which // inserts into activity_logs. We can't easily sync on the goroutine via diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index 3f154fc9..1a7d16dc 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -97,12 +97,28 @@ type WorkspaceHandler struct { // main.go attaches this alongside namespaceCleanupFn when // MEMORY_PLUGIN_URL is set (memBundle.Plugin). seedMemoryPlugin seedMemoryPluginAPI + // deadProbeMu guards deadProbeAttempts, the per-workspace debounce + // state used by maybeMarkContainerDead. A transient A2A forward error + // or a single flaky IsRunning probe must not recycle a recently-alive + // container (#2929). Protected because ProxyA2A is called concurrently. + deadProbeMu sync.Mutex + deadProbeAttempts map[string]deadProbeRecord + // asyncWG tracks goroutines launched by goAsync so tests can wait // for async DB users (restart, provision) before asserting results. // Matches the pattern from main commit 1c3b4ff3. asyncWG sync.WaitGroup } +// deadProbeRecord tracks consecutive "container looks dead" observations +// for a workspace. first marks the initial observation in the current +// window; count is the number of observations since first. +type deadProbeRecord struct { + count int + first time.Time + last time.Time +} + // seedMemoryPluginAPI is the slice of the v2 memory plugin client that // seedInitialMemories needs. Defining it as an interface here (parallel // to memoryPluginAPI in mcp_tools_memory_v2.go) lets tests stub the @@ -186,9 +202,10 @@ func waitGlobalAsyncForTest() { func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler { h := &WorkspaceHandler{ - broadcaster: b, - platformURL: platformURL, - configsDir: configsDir, + broadcaster: b, + platformURL: platformURL, + configsDir: configsDir, + deadProbeAttempts: make(map[string]deadProbeRecord), } // Only assign p when the concrete pointer is non-nil. Without this // guard, a `NewWorkspaceHandler(..., nil, ...)` call (which all the -- 2.52.0