fix(a2a_proxy): debounce reactive restart on transient 503 / IsRunning flake (#2929) #2931
@@ -25,6 +25,19 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// Reactive container-death debounce constants (#2929). A single A2A
|
||||
// forward error or one flaky IsRunning probe must not restart a
|
||||
// recently-alive workspace. We only declare the container dead when:
|
||||
// - the workspace has NOT heartbeated recently, AND IsRunning reports
|
||||
// not-running (immediate-dead path, preserving dead-EC2 recovery); OR
|
||||
// - the workspace DOES have a recent heartbeat but we see N consecutive
|
||||
// dead observations within the debounce window.
|
||||
const (
|
||||
recentHeartbeatWindow = 15 * time.Second
|
||||
containerDeadDebounceThreshold = 2
|
||||
containerDeadDebounceWindow = 30 * time.Second
|
||||
)
|
||||
|
||||
// proxyDispatchBuildError is a sentinel wrapper for failures inside
|
||||
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
|
||||
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
|
||||
@@ -171,6 +184,30 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
// with no auto-recovery and Cloudflare in front would mask the response with
|
||||
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
|
||||
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
|
||||
// maybeMarkContainerDead runs the reactive health check after a forward error.
|
||||
// If the workspace's compute (Docker container OR EC2 instance) is no longer
|
||||
// running (and the workspace isn't external), it marks the workspace offline,
|
||||
// clears Redis state, broadcasts WORKSPACE_OFFLINE, and triggers an async
|
||||
// restart. Returns true when the compute was found dead.
|
||||
//
|
||||
// Provisioner selection (mutually exclusive in production):
|
||||
// - h.provisioner != nil → local Docker deployment; IsRunning does docker inspect.
|
||||
// - h.cpProv != nil → SaaS / EC2 deployment; IsRunning calls CP's
|
||||
// /cp/workspaces/:id/status to read the EC2 state.
|
||||
//
|
||||
// Pre-fix this function ONLY consulted h.provisioner — for SaaS tenants
|
||||
// (h.provisioner=nil, h.cpProv=set) it short-circuited to false on every
|
||||
// call, so a dead EC2 agent would propagate upstream 502/503/504 to canvas
|
||||
// with no auto-recovery and Cloudflare in front would mask the response with
|
||||
// its own error page. The 2026-04-30 hongmingwang.moleculesai.app
|
||||
// canvas-chat-to-dead-workspace incident traces to exactly this gap.
|
||||
//
|
||||
// #2929 hardening: do NOT recycle a recently-alive workspace on a single
|
||||
// transient A2A 503 / IsRunning flake. We guard the restart with:
|
||||
// 1. A recent-heartbeat check (last_heartbeat_at within 15s).
|
||||
// 2. When the heartbeat is recent, require 2 consecutive dead observations
|
||||
// within 30s before declaring dead.
|
||||
// 3. Treat IsRunning transport errors as "assume alive" instead of dead.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'claude-code') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
@@ -196,6 +233,12 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
|
||||
return false
|
||||
}
|
||||
|
||||
// #2929: recent-heartbeat guard. A workspace that heartbeated seconds
|
||||
// ago is almost certainly alive; a single proxy/transport flake should
|
||||
// not kill it. We still run IsRunning below, but a recent heartbeat
|
||||
// puts us on the debounced path.
|
||||
recentHeartbeat := h.hasRecentHeartbeat(ctx, workspaceID)
|
||||
|
||||
var running bool
|
||||
var inspectErr error
|
||||
if h.provisioner != nil {
|
||||
@@ -211,11 +254,52 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
|
||||
// IsRunning's contract returns (true, err) in this case so we stay
|
||||
// on the alive path without triggering a restart cascade.
|
||||
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
|
||||
}
|
||||
if running {
|
||||
return false
|
||||
}
|
||||
if running {
|
||||
h.resetDeadProbe(workspaceID)
|
||||
return false
|
||||
}
|
||||
|
||||
// Container is not running. If we have no recent heartbeat, preserve the
|
||||
// pre-#2929 immediate-dead behavior so dead EC2 instances still recover
|
||||
// on the first failed request (hongmingwang incident recovery path).
|
||||
if !recentHeartbeat {
|
||||
return h.declareContainerDead(ctx, workspaceID)
|
||||
}
|
||||
|
||||
// Recent heartbeat but IsRunning says not-running: debounce. Require N
|
||||
// consecutive observations within the window before we believe it.
|
||||
if !h.incrementDeadProbe(workspaceID) {
|
||||
log.Printf("ProxyA2A: container for %s looks dead but has recent heartbeat — debouncing (%d/%d within %s)",
|
||||
workspaceID, h.deadProbeCount(workspaceID), containerDeadDebounceThreshold, containerDeadDebounceWindow)
|
||||
return false
|
||||
}
|
||||
return h.declareContainerDead(ctx, workspaceID)
|
||||
}
|
||||
|
||||
// hasRecentHeartbeat returns true if the workspace has a last_heartbeat_at
|
||||
// within recentHeartbeatWindow. Missing/null heartbeat is treated as not
|
||||
// recent.
|
||||
func (h *WorkspaceHandler) hasRecentHeartbeat(ctx context.Context, workspaceID string) bool {
|
||||
var lastHB *time.Time
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&lastHB); err != nil {
|
||||
log.Printf("ProxyA2A: failed to read last_heartbeat_at for %s: %v", workspaceID, err)
|
||||
return false
|
||||
}
|
||||
if lastHB == nil {
|
||||
return false
|
||||
}
|
||||
return time.Since(*lastHB) <= recentHeartbeatWindow
|
||||
}
|
||||
|
||||
// declareContainerDead is the single point that marks a workspace offline,
|
||||
// clears keys, broadcasts OFFLINE, and triggers an async restart.
|
||||
func (h *WorkspaceHandler) declareContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
log.Printf("ProxyA2A: container for %s is dead — marking offline and triggering restart", workspaceID)
|
||||
h.resetDeadProbe(workspaceID)
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`, models.StatusOffline, workspaceID); err != nil {
|
||||
log.Printf("ProxyA2A: failed to mark workspace %s offline: %v", workspaceID, err)
|
||||
}
|
||||
@@ -230,6 +314,35 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
|
||||
return true
|
||||
}
|
||||
|
||||
// incrementDeadProbe records another dead-looking observation for workspaceID
|
||||
// and returns true when the threshold is reached within the debounce window.
|
||||
func (h *WorkspaceHandler) incrementDeadProbe(workspaceID string) bool {
|
||||
h.deadProbeMu.Lock()
|
||||
defer h.deadProbeMu.Unlock()
|
||||
now := time.Now()
|
||||
rec, ok := h.deadProbeAttempts[workspaceID]
|
||||
if !ok || now.Sub(rec.first) > containerDeadDebounceWindow {
|
||||
rec = deadProbeRecord{count: 1, first: now, last: now}
|
||||
} else {
|
||||
rec.count++
|
||||
rec.last = now
|
||||
}
|
||||
h.deadProbeAttempts[workspaceID] = rec
|
||||
return rec.count >= containerDeadDebounceThreshold
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) resetDeadProbe(workspaceID string) {
|
||||
h.deadProbeMu.Lock()
|
||||
defer h.deadProbeMu.Unlock()
|
||||
delete(h.deadProbeAttempts, workspaceID)
|
||||
}
|
||||
|
||||
func (h *WorkspaceHandler) deadProbeCount(workspaceID string) int {
|
||||
h.deadProbeMu.Lock()
|
||||
defer h.deadProbeMu.Unlock()
|
||||
return h.deadProbeAttempts[workspaceID].count
|
||||
}
|
||||
|
||||
// preflightContainerHealth runs a proactive Provisioner.IsRunning check
|
||||
// (#36) before dispatching the a2a forward. Routed through provisioner's
|
||||
// SSOT IsRunning, which itself wraps RunningContainerName — same source
|
||||
|
||||
@@ -2242,6 +2242,7 @@ func TestStopForRestart_NoProvisioner_NoOp(t *testing.T) {
|
||||
// can check `calls == 0` after a sync barrier.
|
||||
type fakeCPProv struct {
|
||||
running bool
|
||||
err error
|
||||
calls int
|
||||
stopCalls int
|
||||
startCalls int
|
||||
@@ -2264,7 +2265,7 @@ func (f *fakeCPProv) GetConsoleOutput(_ context.Context, _ string) (string, erro
|
||||
}
|
||||
func (f *fakeCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
f.calls++
|
||||
return f.running, nil
|
||||
return f.running, f.err
|
||||
}
|
||||
|
||||
// external runtime → false regardless of provisioner.
|
||||
@@ -2282,6 +2283,123 @@ func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// #2929: a recent heartbeat + IsRunning=false should NOT declare the
|
||||
// container dead on the first observation. The second observation within
|
||||
// the debounce window should.
|
||||
func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
recentHB := time.Now()
|
||||
wsid := "ws-debounce"
|
||||
|
||||
// First observation: recent heartbeat, not running → debounce, no UPDATE.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("first dead observation with recent heartbeat should be debounced, not restarted")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("first observation: expected 1 IsRunning call, got %d", cp.calls)
|
||||
}
|
||||
|
||||
// Second observation within the window → threshold reached, restart.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WithArgs(models.StatusOffline, wsid).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); !got {
|
||||
t.Fatal("second dead observation with recent heartbeat should declare dead")
|
||||
}
|
||||
if cp.calls != 2 {
|
||||
t.Errorf("second observation: expected 2 IsRunning calls total, got %d", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// #2929: IsRunning returning a transport error must be treated as alive,
|
||||
// not as a dead container.
|
||||
func TestMaybeMarkContainerDead_InspectErr_AssumesAlive(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{err: errors.New("cp timeout")}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
wsid := "ws-inspect-err"
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Error("transient IsRunning error should be treated as alive, not dead")
|
||||
}
|
||||
}
|
||||
|
||||
// #2929: a successful IsRunning=true observation resets any accumulated
|
||||
// dead-probe counter.
|
||||
func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
wsid := "ws-alive-resets"
|
||||
recentHB := time.Now()
|
||||
|
||||
// One dead observation with recent heartbeat accumulates a probe.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("expected first observation to be debounced")
|
||||
}
|
||||
|
||||
// Next observation says running=true → resets counter and does NOT restart.
|
||||
cp.running = true
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Error("running=true should reset dead probe and not restart")
|
||||
}
|
||||
|
||||
// One more dead observation is now back to count=1, not threshold.
|
||||
cp.running = false
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("expected dead probe to have been reset; first post-reset observation should be debounced")
|
||||
}
|
||||
}
|
||||
|
||||
// --- logA2AFailure / logA2ASuccess smoke tests ---
|
||||
// These helpers spawn a detached goroutine that calls LogActivity, which
|
||||
// inserts into activity_logs. We can't easily sync on the goroutine via
|
||||
|
||||
@@ -97,12 +97,28 @@ type WorkspaceHandler struct {
|
||||
// main.go attaches this alongside namespaceCleanupFn when
|
||||
// MEMORY_PLUGIN_URL is set (memBundle.Plugin).
|
||||
seedMemoryPlugin seedMemoryPluginAPI
|
||||
// deadProbeMu guards deadProbeAttempts, the per-workspace debounce
|
||||
// state used by maybeMarkContainerDead. A transient A2A forward error
|
||||
// or a single flaky IsRunning probe must not recycle a recently-alive
|
||||
// container (#2929). Protected because ProxyA2A is called concurrently.
|
||||
deadProbeMu sync.Mutex
|
||||
deadProbeAttempts map[string]deadProbeRecord
|
||||
|
||||
// asyncWG tracks goroutines launched by goAsync so tests can wait
|
||||
// for async DB users (restart, provision) before asserting results.
|
||||
// Matches the pattern from main commit 1c3b4ff3.
|
||||
asyncWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// deadProbeRecord tracks consecutive "container looks dead" observations
|
||||
// for a workspace. first marks the initial observation in the current
|
||||
// window; count is the number of observations since first.
|
||||
type deadProbeRecord struct {
|
||||
count int
|
||||
first time.Time
|
||||
last time.Time
|
||||
}
|
||||
|
||||
// seedMemoryPluginAPI is the slice of the v2 memory plugin client that
|
||||
// seedInitialMemories needs. Defining it as an interface here (parallel
|
||||
// to memoryPluginAPI in mcp_tools_memory_v2.go) lets tests stub the
|
||||
@@ -186,9 +202,10 @@ func waitGlobalAsyncForTest() {
|
||||
|
||||
func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler {
|
||||
h := &WorkspaceHandler{
|
||||
broadcaster: b,
|
||||
platformURL: platformURL,
|
||||
configsDir: configsDir,
|
||||
broadcaster: b,
|
||||
platformURL: platformURL,
|
||||
configsDir: configsDir,
|
||||
deadProbeAttempts: make(map[string]deadProbeRecord),
|
||||
}
|
||||
// Only assign p when the concrete pointer is non-nil. Without this
|
||||
// guard, a `NewWorkspaceHandler(..., nil, ...)` call (which all the
|
||||
|
||||
Reference in New Issue
Block a user