fix(a2a_proxy): debounce reactive restart on transient 503 / IsRunning flake (#2929) #2931

Merged
devops-engineer merged 1 commits from fix/2929-a2a-restart-debounce into main 2026-06-15 11:10:14 +00:00
3 changed files with 254 additions and 6 deletions
@@ -25,6 +25,19 @@ import (
"github.com/gin-gonic/gin"
)
// Reactive container-death debounce constants (#2929). A single A2A
// forward error or one flaky IsRunning probe must not restart a
// recently-alive workspace. We only declare the container dead when:
// - the workspace has NOT heartbeated recently, AND IsRunning reports
// not-running (immediate-dead path, preserving dead-EC2 recovery); OR
// - the workspace DOES have a recent heartbeat but we see N consecutive
// dead observations within the debounce window.
const (
recentHeartbeatWindow = 15 * time.Second
containerDeadDebounceThreshold = 2
containerDeadDebounceWindow = 30 * time.Second
)
// proxyDispatchBuildError is a sentinel wrapper for failures inside
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
@@ -171,6 +184,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
// with no auto-recovery and Cloudflare in front would mask the response with
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
// maybeMarkContainerDead runs the reactive health check after a forward error.
// If the workspace's compute (Docker container OR EC2 instance) is no longer
// running (and the workspace isn't external), it marks the workspace offline,
// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async
// restart. Returns true when the compute was found dead.
//
// Provisioner selection (mutually exclusive in production):
// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect.
// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's
// /cp/workspaces/:id/status to read the EC2 state.
//
// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants
// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every
// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas
// with no auto-recovery and Cloudflare in front would mask the response with
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
//
// #2929 hardening: do NOT recycle a recently-alive workspace on a single
// transient A2A 503 / IsRunning flake. We guard the restart with:
// 1. A recent-heartbeat check (last_heartbeat_at within 15s).
// 2. When the heartbeat is recent, require 2 consecutive dead observations
// within 30s before declaring dead.
// 3. Treat IsRunning transport errors as "assume alive" instead of dead.
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
var wsRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'claude-code') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
@@ -196,6 +233,12 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
return false
}
// #2929: recent-heartbeat guard. A workspace that heartbeated seconds
// ago is almost certainly alive; a single proxy/transport flake should
// not kill it. We still run IsRunning below, but a recent heartbeat
// puts us on the debounced path.
recentHeartbeat := h.hasRecentHeartbeat(ctx, workspaceID)
var running bool
var inspectErr error
if h.provisioner != nil {
@@ -211,11 +254,52 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
// IsRunning's contract returns (true, err) in this case so we stay
// on the alive path without triggering a restart cascade.
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
}
if running {
return false
}
if running {
h.resetDeadProbe(workspaceID)
return false
}
// Container is not running. If we have no recent heartbeat, preserve the
// pre-#2929 immediate-dead behavior so dead EC2 instances still recover
// on the first failed request (hongmingwang incident recovery path).
if !recentHeartbeat {
return h.declareContainerDead(ctx, workspaceID)
}
// Recent heartbeat but IsRunning says not-running: debounce. Require N
// consecutive observations within the window before we believe it.
if !h.incrementDeadProbe(workspaceID) {
log.Printf("ProxyA2A: container for %s looks dead but has recent heartbeat — debouncing (%d/%d within %s)",
workspaceID, h.deadProbeCount(workspaceID), containerDeadDebounceThreshold, containerDeadDebounceWindow)
return false
}
return h.declareContainerDead(ctx, workspaceID)
}
// hasRecentHeartbeat returns true if the workspace has a last_heartbeat_at
// within recentHeartbeatWindow. Missing/null heartbeat is treated as not
// recent.
func (h *WorkspaceHandler) hasRecentHeartbeat(ctx context.Context, workspaceID string) bool {
var lastHB *time.Time
if err := db.DB.QueryRowContext(ctx,
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&lastHB); err != nil {
log.Printf("ProxyA2A: failed to read last_heartbeat_at for %s: %v", workspaceID, err)
return false
}
if lastHB == nil {
return false
}
return time.Since(*lastHB) <= recentHeartbeatWindow
}
// declareContainerDead is the single point that marks a workspace offline,
// clears keys, broadcasts OFFLINE, and triggers an async restart.
func (h *WorkspaceHandler) declareContainerDead(ctx context.Context, workspaceID string) bool {
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
h.resetDeadProbe(workspaceID)
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`, models.StatusOffline, workspaceID); err != nil {
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
}
@@ -230,6 +314,35 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
return true
}
// incrementDeadProbe records another dead-looking observation for workspaceID
// and returns true when the threshold is reached within the debounce window.
func (h *WorkspaceHandler) incrementDeadProbe(workspaceID string) bool {
h.deadProbeMu.Lock()
defer h.deadProbeMu.Unlock()
now := time.Now()
rec, ok := h.deadProbeAttempts[workspaceID]
if !ok || now.Sub(rec.first) > containerDeadDebounceWindow {
rec = deadProbeRecord{count: 1, first: now, last: now}
} else {
rec.count++
rec.last = now
}
h.deadProbeAttempts[workspaceID] = rec
return rec.count >= containerDeadDebounceThreshold
}
func (h *WorkspaceHandler) resetDeadProbe(workspaceID string) {
h.deadProbeMu.Lock()
defer h.deadProbeMu.Unlock()
delete(h.deadProbeAttempts, workspaceID)
}
func (h *WorkspaceHandler) deadProbeCount(workspaceID string) int {
h.deadProbeMu.Lock()
defer h.deadProbeMu.Unlock()
return h.deadProbeAttempts[workspaceID].count
}
// preflightContainerHealth runs a proactive Provisioner.IsRunning check
// (#36) before dispatching the a2a forward. Routed through provisioner's
// SSOT IsRunning, which itself wraps RunningContainerName — same source
@@ -2242,6 +2242,7 @@ func TestStopForRestart_NoProvisioner_NoOp(t *testing.T) {
// can check `calls == 0` after a sync barrier.
type fakeCPProv struct {
running bool
err error
calls int
stopCalls int
startCalls int
@@ -2264,7 +2265,7 @@ func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, erro
}
func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
f.calls++
return f.running, nil
return f.running, f.err
}
// external runtime → false regardless of provisioner.
@@ -2282,6 +2283,123 @@ func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) {
}
}
// #2929: a recent heartbeat + IsRunning=false should NOT declare the
// container dead on the first observation. The second observation within
// the debounce window should.
func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
recentHB := time.Now()
wsid := "ws-debounce"
// First observation: recent heartbeat, not running → debounce, no UPDATE.
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
t.Fatal("first dead observation with recent heartbeat should be debounced, not restarted")
}
if cp.calls != 1 {
t.Errorf("first observation: expected 1 IsRunning call, got %d", cp.calls)
}
// Second observation within the window → threshold reached, restart.
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
mock.ExpectExec(`UPDATE workspaces SET status =`).
WithArgs(models.StatusOffline, wsid).
WillReturnResult(sqlmock.NewResult(0, 1))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); !got {
t.Fatal("second dead observation with recent heartbeat should declare dead")
}
if cp.calls != 2 {
t.Errorf("second observation: expected 2 IsRunning calls total, got %d", cp.calls)
}
}
// #2929: IsRunning returning a transport error must be treated as alive,
// not as a dead container.
func TestMaybeMarkContainerDead_InspectErr_AssumesAlive(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
cp := &fakeCPProv{err: errors.New("cp timeout")}
handler.SetCPProvisioner(cp)
wsid := "ws-inspect-err"
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
t.Error("transient IsRunning error should be treated as alive, not dead")
}
}
// #2929: a successful IsRunning=true observation resets any accumulated
// dead-probe counter.
func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
wsid := "ws-alive-resets"
recentHB := time.Now()
// One dead observation with recent heartbeat accumulates a probe.
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
t.Fatal("expected first observation to be debounced")
}
// Next observation says running=true → resets counter and does NOT restart.
cp.running = true
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
t.Error("running=true should reset dead probe and not restart")
}
// One more dead observation is now back to count=1, not threshold.
cp.running = false
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
WithArgs(wsid).
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
t.Fatal("expected dead probe to have been reset; first post-reset observation should be debounced")
}
}
// --- logA2AFailure / logA2ASuccess smoke tests ---
// These helpers spawn a detached goroutine that calls LogActivity, which
// inserts into activity_logs. We can't easily sync on the goroutine via
@@ -97,12 +97,28 @@ type WorkspaceHandler struct {
// main.go attaches this alongside namespaceCleanupFn when
// MEMORY_PLUGIN_URL is set (memBundle.Plugin).
seedMemoryPlugin seedMemoryPluginAPI
// deadProbeMu guards deadProbeAttempts, the per-workspace debounce
// state used by maybeMarkContainerDead. A transient A2A forward error
// or a single flaky IsRunning probe must not recycle a recently-alive
// container (#2929). Protected because ProxyA2A is called concurrently.
deadProbeMu sync.Mutex
deadProbeAttempts map[string]deadProbeRecord
// asyncWG tracks goroutines launched by goAsync so tests can wait
// for async DB users (restart, provision) before asserting results.
// Matches the pattern from main commit 1c3b4ff3.
asyncWG sync.WaitGroup
}
// deadProbeRecord tracks consecutive "container looks dead" observations
// for a workspace. first marks the initial observation in the current
// window; count is the number of observations since first.
type deadProbeRecord struct {
count int
first time.Time
last time.Time
}
// seedMemoryPluginAPI is the slice of the v2 memory plugin client that
// seedInitialMemories needs. Defining it as an interface here (parallel
// to memoryPluginAPI in mcp_tools_memory_v2.go) lets tests stub the
@@ -186,9 +202,10 @@ func waitGlobalAsyncForTest() {
func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler {
h := &WorkspaceHandler{
broadcaster: b,
platformURL: platformURL,
configsDir: configsDir,
broadcaster: b,
platformURL: platformURL,
configsDir: configsDir,
deadProbeAttempts: make(map[string]deadProbeRecord),
}
// Only assign p when the concrete pointer is non-nil. Without this
// guard, a `NewWorkspaceHandler(..., nil, ...)` call (which all the