fix(a2a_proxy): debounce IsRunning=false in post-restart settle window (#2929) #2950
@@ -808,7 +808,11 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
// authoritative check) but the empty-body case is what makes
|
||||
// this fix necessary.
|
||||
if isUpstreamDeadStatus(resp.StatusCode) {
|
||||
if h.maybeMarkContainerDead(ctx, workspaceID) {
|
||||
dead, queuedStatus, queuedBody := h.maybeMarkContainerDead(ctx, workspaceID, callerID, body, a2aMethod, durationMs, logActivity)
|
||||
if queuedStatus != 0 {
|
||||
return queuedStatus, queuedBody, nil
|
||||
}
|
||||
if dead {
|
||||
return 0, nil, &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": "15"},
|
||||
|
||||
@@ -33,11 +33,18 @@ import (
|
||||
// - the workspace DOES have a recent heartbeat but we see N consecutive
|
||||
// dead observations within the debounce window.
|
||||
const (
|
||||
recentHeartbeatWindow = 15 * time.Second
|
||||
containerDeadDebounceThreshold = 2
|
||||
containerDeadDebounceWindow = 30 * time.Second
|
||||
recentHeartbeatWindow = 15 * time.Second
|
||||
containerDeadDebounceThreshold = 2
|
||||
containerDeadDebounceWindow = 30 * time.Second
|
||||
containerDeadReprobeDelay = 500 * time.Millisecond
|
||||
containerDeadHeartbeatWaitTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// containerDeadReprobeDelayV is a mutable copy so tests can shrink the
|
||||
// settle-window re-probe to zero. Package-level to avoid plumbing through
|
||||
// the handler struct. core#2929.
|
||||
var containerDeadReprobeDelayV = containerDeadReprobeDelay
|
||||
|
||||
// proxyDispatchBuildError is a sentinel wrapper for failures inside
|
||||
// http.NewRequestWithContext. handleA2ADispatchError unwraps it to emit the
|
||||
// "failed to create proxy request" 500 instead of the standard 502/503 paths.
|
||||
@@ -61,9 +68,15 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
|
||||
log.Printf("ProxyA2A forward error: %v", err)
|
||||
|
||||
containerDead := h.maybeMarkContainerDead(ctx, workspaceID)
|
||||
dead, queuedStatus, queuedBody := h.maybeMarkContainerDead(ctx, workspaceID, callerID, body, a2aMethod, durationMs, logActivity)
|
||||
|
||||
if containerDead {
|
||||
if queuedStatus != 0 {
|
||||
// maybeMarkContainerDead decided the container is alive-but-busy/settling
|
||||
// and enqueued the request. Return the 202 Accepted response directly.
|
||||
return queuedStatus, queuedBody, nil
|
||||
}
|
||||
|
||||
if dead {
|
||||
if logActivity {
|
||||
h.logA2AFailure(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs)
|
||||
}
|
||||
@@ -204,78 +217,153 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
//
|
||||
// #2929 hardening: do NOT recycle a recently-alive workspace on a single
|
||||
// transient A2A 503 / IsRunning flake. We guard the restart with:
|
||||
// 1. A recent-heartbeat check (last_heartbeat_at within 15s).
|
||||
// 2. When the heartbeat is recent, require 2 consecutive dead observations
|
||||
// within 30s before declaring dead.
|
||||
// 3. Treat IsRunning transport errors as "assume alive" instead of dead.
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
// 1. A widened self-fire guard that also covers the post-restart settle
|
||||
// window (not just a restart currently in-flight).
|
||||
// 2. A re-probe delay after a lone IsRunning=false before trusting it.
|
||||
// 3. A recent-heartbeat / fresh-post-restart-heartbeat check. When
|
||||
// transport-liveness is green we enqueue the request (202 queued)
|
||||
// instead of clearing the workspace URL and restarting.
|
||||
// 4. Treat IsRunning transport errors as "assume alive" instead of dead.
|
||||
//
|
||||
// Returns (dead, httpStatus, responseBody). When httpStatus is non-zero the
|
||||
// caller must return that response directly (e.g., 202 Accepted queued).
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, durationMs int, logActivity bool) (bool, int, []byte) {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'claude-code') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if isExternalLikeRuntime(wsRuntime) {
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
if !h.HasProvisioner() {
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
// Restart-aware short-circuit: during the 20-30s EC2-pending window of
|
||||
// an in-flight restart, the workspace's url='' and IsRunning() returns
|
||||
// false → looks indistinguishable from a dead container. Pre-fix this
|
||||
// fired a fresh RestartByID for the just-launched instance, which
|
||||
// coalesceRestart's pending-flag drained by running ANOTHER full
|
||||
// stop+provision cycle (= ec2_stopped of the still-pending instance
|
||||
// → re-provision). That's the 4x reprov thrash class. Skip the
|
||||
// container-dead path while a restart is in flight; the in-flight
|
||||
// restart's own provisionWorkspaceAutoSync will surface a real failure
|
||||
// (markProvisionFailed) if the new container never comes up. Issue
|
||||
// internal#544.
|
||||
|
||||
// Layer 1 self-fire guard: skip the container-dead path while a restart
|
||||
// is in flight AND during the post-restart settle window. During the
|
||||
// settle window a config-PUT-restarted container can report
|
||||
// IsRunning=false before its first heartbeat arrives; a lone false must
|
||||
// not trigger RestartByID and clear the URL. core#2929.
|
||||
if isRestarting(workspaceID) {
|
||||
log.Printf("ProxyA2A: maybeMarkContainerDead skipped for %s — restart already in flight (self-fire guard)", workspaceID)
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
settling := inRestartSettleWindow(workspaceID)
|
||||
if settling {
|
||||
log.Printf("ProxyA2A: maybeMarkContainerDead for %s is in post-restart settle window — using conservative path", workspaceID)
|
||||
}
|
||||
|
||||
// #2929: recent-heartbeat guard. A workspace that heartbeated seconds
|
||||
// ago is almost certainly alive; a single proxy/transport flake should
|
||||
// not kill it. We still run IsRunning below, but a recent heartbeat
|
||||
// puts us on the debounced path.
|
||||
// puts us on the debounced/enqueued path.
|
||||
recentHeartbeat := h.hasRecentHeartbeat(ctx, workspaceID)
|
||||
|
||||
var running bool
|
||||
var inspectErr error
|
||||
if h.provisioner != nil {
|
||||
running, inspectErr = h.provisioner.IsRunning(ctx, workspaceID)
|
||||
} else {
|
||||
// SaaS path: ask the CP about the EC2 state. Same (true, err) on
|
||||
// transport errors contract — keeps the caller on the alive path
|
||||
// instead of triggering a restart cascade on a flaky CP call.
|
||||
running, inspectErr = h.cpProv.IsRunning(ctx, workspaceID)
|
||||
// If we're in the settle window, also check whether a heartbeat arrived
|
||||
// strictly AFTER the restart started (i.e. a genuine post-restart PONG).
|
||||
freshHeartbeat := false
|
||||
if settling {
|
||||
if restartStart, ok := lastRestartStartedAt(workspaceID); ok {
|
||||
// waitForFreshHeartbeat is the same correlated check used by the
|
||||
// restart-context sender: url non-empty + heartbeat newer than the
|
||||
// restart start. A short timeout is enough — the heartbeat we're
|
||||
// looking for already happened or is imminent.
|
||||
freshHeartbeat = waitForFreshHeartbeat(ctx, workspaceID, restartStart, containerDeadHeartbeatWaitTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
probe := func() (bool, error) {
|
||||
if h.provisioner != nil {
|
||||
return h.provisioner.IsRunning(ctx, workspaceID)
|
||||
}
|
||||
return h.cpProv.IsRunning(ctx, workspaceID)
|
||||
}
|
||||
|
||||
running, inspectErr := probe()
|
||||
if inspectErr != nil {
|
||||
// Transient backend error (Docker daemon EOF, CP HTTP 5xx, etc.).
|
||||
// IsRunning's contract returns (true, err) in this case so we stay
|
||||
// on the alive path without triggering a restart cascade.
|
||||
log.Printf("ProxyA2A: IsRunning for %s returned transient error (assuming alive): %v", workspaceID, inspectErr)
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
if running {
|
||||
h.resetDeadProbe(workspaceID)
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
// Container is not running. If we have no recent heartbeat, preserve the
|
||||
// First probe says not running. In the recent-heartbeat or settle-window
|
||||
// cases, do not trust a single false — re-probe after a short delay. A
|
||||
// transient container-settle flap resolves itself on the second probe.
|
||||
if recentHeartbeat || freshHeartbeat || settling {
|
||||
time.Sleep(containerDeadReprobeDelayV)
|
||||
running2, inspectErr2 := probe()
|
||||
if inspectErr2 != nil {
|
||||
log.Printf("ProxyA2A: IsRunning re-probe for %s returned transient error (assuming alive): %v", workspaceID, inspectErr2)
|
||||
return false, 0, nil
|
||||
}
|
||||
if running2 {
|
||||
log.Printf("ProxyA2A: IsRunning re-probe for %s now reports running — settling transient, not dead", workspaceID)
|
||||
h.resetDeadProbe(workspaceID)
|
||||
return false, 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Still not running after the re-probe. If transport-liveness is green,
|
||||
// the agent is alive-but-busy/settling; queue the request instead of
|
||||
// clearing the URL and restarting. core#2929.
|
||||
if recentHeartbeat || freshHeartbeat {
|
||||
log.Printf("ProxyA2A: container for %s not running but heartbeat is recent — enqueuing instead of restarting", workspaceID)
|
||||
return h.enqueueBusyA2A(ctx, workspaceID, callerID, body, a2aMethod, durationMs, logActivity)
|
||||
}
|
||||
|
||||
// No recent/fresh heartbeat and not in the settle window: preserve the
|
||||
// pre-#2929 immediate-dead behavior so dead EC2 instances still recover
|
||||
// on the first failed request (hongmingwang incident recovery path).
|
||||
if !recentHeartbeat {
|
||||
return h.declareContainerDead(ctx, workspaceID)
|
||||
if !settling {
|
||||
return h.declareContainerDead(ctx, workspaceID), 0, nil
|
||||
}
|
||||
|
||||
// Recent heartbeat but IsRunning says not-running: debounce. Require N
|
||||
// Settle window but no fresh heartbeat yet: debounce. Require N
|
||||
// consecutive observations within the window before we believe it.
|
||||
if !h.incrementDeadProbe(workspaceID) {
|
||||
log.Printf("ProxyA2A: container for %s looks dead but has recent heartbeat — debouncing (%d/%d within %s)",
|
||||
log.Printf("ProxyA2A: container for %s looks dead in settle window — debouncing (%d/%d within %s)",
|
||||
workspaceID, h.deadProbeCount(workspaceID), containerDeadDebounceThreshold, containerDeadDebounceWindow)
|
||||
return false
|
||||
return false, 0, nil
|
||||
}
|
||||
return h.declareContainerDead(ctx, workspaceID)
|
||||
return h.declareContainerDead(ctx, workspaceID), 0, nil
|
||||
}
|
||||
|
||||
// enqueueBusyA2A enqueues the current request for drain on the next idle
|
||||
// heartbeat. Returns (dead=false, http.StatusAccepted, queuedBody) on success,
|
||||
// or (false, 0, nil) if the queue insert failed so the caller can fall back to
|
||||
// its normal error path. core#2929.
|
||||
func (h *WorkspaceHandler) enqueueBusyA2A(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, durationMs int, logActivity bool) (bool, int, []byte) {
|
||||
idempotencyKey := extractIdempotencyKey(body)
|
||||
var expiresAt *time.Time
|
||||
if secs := extractExpiresInSeconds(body); secs > 0 {
|
||||
t := time.Now().Add(time.Duration(secs) * time.Second)
|
||||
expiresAt = &t
|
||||
}
|
||||
qid, depth, qerr := EnqueueA2A(
|
||||
ctx, workspaceID, callerID, PriorityTask, body, a2aMethod, idempotencyKey, expiresAt,
|
||||
)
|
||||
if qerr == nil {
|
||||
log.Printf("ProxyA2A: target %s busy/settling — enqueued as %s (depth=%d)", workspaceID, qid, depth)
|
||||
if logActivity {
|
||||
h.logA2ABusyQueued(ctx, workspaceID, callerID, body, a2aMethod, durationMs)
|
||||
}
|
||||
respBody, marshalErr := json.Marshal(gin.H{
|
||||
"queued": true,
|
||||
"queue_id": qid,
|
||||
"queue_depth": depth,
|
||||
"message": "workspace agent busy — request queued, will dispatch when capacity available",
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("ProxyA2A %s: json.Marshal respBody failed: %v", workspaceID, marshalErr)
|
||||
}
|
||||
return false, http.StatusAccepted, respBody
|
||||
}
|
||||
log.Printf("ProxyA2A: enqueue for %s failed (%v) — falling back to 503", workspaceID, qerr)
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
// hasRecentHeartbeat returns true if the workspace has a last_heartbeat_at
|
||||
|
||||
@@ -2120,7 +2120,8 @@ func TestMaybeMarkContainerDead_NilProvisioner(t *testing.T) {
|
||||
WithArgs("ws-nilprov").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("claude-code"))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), "ws-nilprov"); got {
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), "ws-nilprov", "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Error("expected false when provisioner is nil")
|
||||
}
|
||||
}
|
||||
@@ -2142,12 +2143,15 @@ func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-dead").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WithArgs(models.StatusOffline, "ws-saas-dead").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-dead")
|
||||
if !got {
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), "ws-saas-dead", "", []byte("{}"), "message/send", 0, false)
|
||||
if !dead {
|
||||
t.Fatal("expected true (cpProv reports not running) — without cpProv consultation, SaaS dead-agent recovery is impossible")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
@@ -2171,8 +2175,12 @@ func TestMaybeMarkContainerDead_CPOnly_Running(t *testing.T) {
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-alive").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-saas-alive").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), "ws-saas-alive"); got {
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), "ws-saas-alive", "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Error("expected false when cpProv reports running — must not recycle a healthy agent")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
@@ -2278,15 +2286,17 @@ func TestMaybeMarkContainerDead_ExternalRuntime(t *testing.T) {
|
||||
WithArgs("ws-ext").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("external"))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), "ws-ext"); got {
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), "ws-ext", "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Error("expected false for external runtime")
|
||||
}
|
||||
}
|
||||
|
||||
// #2929: a recent heartbeat + IsRunning=false should NOT declare the
|
||||
// container dead on the first observation. The second observation within
|
||||
// the debounce window should.
|
||||
func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) {
|
||||
// container dead. The function re-probes after a short delay and, if still
|
||||
// not running, enqueues the request rather than clearing the URL and
|
||||
// restarting.
|
||||
func TestMaybeMarkContainerDead_RecentHeartbeat_DoesNotRestart(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
@@ -2294,10 +2304,14 @@ func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) {
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
// Speed the test: zero the reprobe delay. The second probe still runs.
|
||||
origDelay := containerDeadReprobeDelayV
|
||||
containerDeadReprobeDelayV = 0
|
||||
defer func() { containerDeadReprobeDelayV = origDelay }()
|
||||
|
||||
recentHB := time.Now()
|
||||
wsid := "ws-debounce"
|
||||
|
||||
// First observation: recent heartbeat, not running → debounce, no UPDATE.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
@@ -2305,29 +2319,97 @@ func TestMaybeMarkContainerDead_RecentHeartbeat_Debounces(t *testing.T) {
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("first dead observation with recent heartbeat should be debounced, not restarted")
|
||||
dead, status, _ := handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte(`{"jsonrpc":"2.0","method":"message/send"}`), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Fatal("dead observation with recent heartbeat should not declare container dead")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("first observation: expected 1 IsRunning call, got %d", cp.calls)
|
||||
// It re-probes, so two IsRunning calls even with delay=0.
|
||||
if cp.calls != 2 {
|
||||
t.Errorf("expected 2 IsRunning calls (probe + re-probe), got %d", cp.calls)
|
||||
}
|
||||
// If EnqueueA2A fails (no DB expectations here) it returns status=0 and
|
||||
// lets the caller fall back to its normal error path. The critical
|
||||
// invariant is that the workspace was NOT marked offline/restarting.
|
||||
_ = status
|
||||
}
|
||||
|
||||
// Second observation within the window → threshold reached, restart.
|
||||
// #2929: when there is no recent heartbeat and IsRunning=false, the
|
||||
// container is declared dead immediately (preserves dead-EC2 recovery).
|
||||
func TestMaybeMarkContainerDead_NoRecentHeartbeat_DeclaresDead(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
wsid := "ws-no-hb-dead"
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WithArgs(models.StatusOffline, wsid).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); !got {
|
||||
t.Fatal("second dead observation with recent heartbeat should declare dead")
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte("{}"), "message/send", 0, false)
|
||||
if !dead {
|
||||
t.Fatal("expected dead when there is no recent heartbeat and IsRunning=false")
|
||||
}
|
||||
if cp.calls != 1 {
|
||||
t.Errorf("expected 1 IsRunning call, got %d", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// #2929 regression: in the post-restart settle window, a single IsRunning=false
|
||||
// must NOT nuke a PONG-healthy container's URL. Evidence: job 506813 saw the
|
||||
// agent PONG 0.7s before a lone false probe; pre-fix that cleared the URL,
|
||||
// flipped status offline, and self-fired a restart.
|
||||
func TestMaybeMarkContainerDead_SettleWindow_DoesNotClearURL(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
waitForHandlerAsyncBeforeDBCleanup(t, handler)
|
||||
cp := &fakeCPProv{running: false}
|
||||
handler.SetCPProvisioner(cp)
|
||||
|
||||
origDelay := containerDeadReprobeDelayV
|
||||
containerDeadReprobeDelayV = 0
|
||||
defer func() { containerDeadReprobeDelayV = origDelay }()
|
||||
|
||||
wsid := "ws-settle-window"
|
||||
recentHB := time.Now()
|
||||
|
||||
// Stamp a restart that has *finished* (running=false) but is still inside
|
||||
// the settle window. This is the exact state after a config-PUT restart
|
||||
// where the container is alive-but-settling.
|
||||
sv, _ := restartStates.LoadOrStore(wsid, &restartState{})
|
||||
state := sv.(*restartState)
|
||||
state.mu.Lock()
|
||||
state.running = false
|
||||
state.restartStartedAt = time.Now()
|
||||
state.mu.Unlock()
|
||||
defer restartStates.Delete(wsid)
|
||||
|
||||
if !inRestartSettleWindow(wsid) {
|
||||
t.Fatal("test setup failed: workspace should be in restart settle window")
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte(`{"jsonrpc":"2.0","method":"message/send"}`), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Fatal("single IsRunning=false in post-restart settle window must not declare container dead")
|
||||
}
|
||||
if cp.calls != 2 {
|
||||
t.Errorf("second observation: expected 2 IsRunning calls total, got %d", cp.calls)
|
||||
t.Errorf("expected 2 IsRunning calls (probe + re-probe), got %d", cp.calls)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2348,14 +2430,15 @@ func TestMaybeMarkContainerDead_InspectErr_AssumesAlive(t *testing.T) {
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(sql.NullTime{}))
|
||||
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Error("transient IsRunning error should be treated as alive, not dead")
|
||||
}
|
||||
}
|
||||
|
||||
// #2929: a successful IsRunning=true observation resets any accumulated
|
||||
// dead-probe counter.
|
||||
func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) {
|
||||
// #2929: a successful IsRunning=true observation after a re-probe resets the
|
||||
// dead-probe state and does not restart.
|
||||
func TestMaybeMarkContainerDead_RunningTrueAfterReprobe_Resets(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
@@ -2364,18 +2447,25 @@ func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) {
|
||||
wsid := "ws-alive-resets"
|
||||
recentHB := time.Now()
|
||||
|
||||
// One dead observation with recent heartbeat accumulates a probe.
|
||||
origDelay := containerDeadReprobeDelayV
|
||||
containerDeadReprobeDelayV = 0
|
||||
defer func() { containerDeadReprobeDelayV = origDelay }()
|
||||
|
||||
// First observation: recent heartbeat but IsRunning=false. The re-probe
|
||||
// also returns false, so the request is enqueued (or falls back) and the
|
||||
// workspace is not marked dead.
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("expected first observation to be debounced")
|
||||
dead, _, _ := handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Fatal("expected first observation to be debounced/enqueued, not dead")
|
||||
}
|
||||
|
||||
// Next observation says running=true → resets counter and does NOT restart.
|
||||
// Next observation says running=true → no restart.
|
||||
cp.running = true
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
@@ -2383,20 +2473,9 @@ func TestMaybeMarkContainerDead_RunningTrue_ResetsDeadProbe(t *testing.T) {
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Error("running=true should reset dead probe and not restart")
|
||||
}
|
||||
|
||||
// One more dead observation is now back to count=1, not threshold.
|
||||
cp.running = false
|
||||
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'claude-code'\) FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("hermes"))
|
||||
mock.ExpectQuery(`SELECT last_heartbeat_at FROM workspaces WHERE id =`).
|
||||
WithArgs(wsid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"last_heartbeat_at"}).AddRow(recentHB))
|
||||
if got := handler.maybeMarkContainerDead(context.Background(), wsid); got {
|
||||
t.Fatal("expected dead probe to have been reset; first post-reset observation should be debounced")
|
||||
dead, _, _ = handler.maybeMarkContainerDead(context.Background(), wsid, "", []byte("{}"), "message/send", 0, false)
|
||||
if dead {
|
||||
t.Error("running=true should not restart")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -232,6 +232,15 @@ func (h *WorkspaceHandler) maybeRestartAfterFileWrite(workspaceID string) {
|
||||
// pending=true and the outer coalesceRestart loop will drain by running
|
||||
// ANOTHER full cycle, ec2_stopped of the just-booted instance →
|
||||
// re-provision. That's the self-fire loop closed by this gate.
|
||||
// restartSettleWindow is the post-restart window during which a single
|
||||
// IsRunning=false probe is NOT trusted. A workspace that just had its
|
||||
// config.yaml PUT and was restarted can report IsRunning=false while the
|
||||
// agent is still registering its first heartbeat / settling the container.
|
||||
// Widening the self-fire guard to cover this window prevents a lone flaky
|
||||
// probe from clearing the workspace URL and re-triggering a destructive
|
||||
// restart. core#2929.
|
||||
const restartSettleWindow = 30 * time.Second
|
||||
|
||||
func isRestarting(workspaceID string) bool {
|
||||
sv, ok := restartStates.Load(workspaceID)
|
||||
if !ok {
|
||||
@@ -243,6 +252,41 @@ func isRestarting(workspaceID string) bool {
|
||||
return state.running
|
||||
}
|
||||
|
||||
// inRestartSettleWindow reports whether workspaceID is within
|
||||
// restartSettleWindow of its most recent restart start. This widens the
|
||||
// self-fire guard beyond the in-flight restart flag to cover the settle
|
||||
// window right after a config-PUT restart. core#2929.
|
||||
func inRestartSettleWindow(workspaceID string) bool {
|
||||
sv, ok := restartStates.Load(workspaceID)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
state := sv.(*restartState)
|
||||
state.mu.Lock()
|
||||
defer state.mu.Unlock()
|
||||
if state.restartStartedAt.IsZero() {
|
||||
return false
|
||||
}
|
||||
return time.Since(state.restartStartedAt) < restartSettleWindow
|
||||
}
|
||||
|
||||
// lastRestartStartedAt returns the timestamp recorded when the most recent
|
||||
// restart cycle started for workspaceID, if any. Used by the settle-window
|
||||
// heartbeat freshness check in maybeMarkContainerDead. core#2929.
|
||||
func lastRestartStartedAt(workspaceID string) (time.Time, bool) {
|
||||
sv, ok := restartStates.Load(workspaceID)
|
||||
if !ok {
|
||||
return time.Time{}, false
|
||||
}
|
||||
state := sv.(*restartState)
|
||||
state.mu.Lock()
|
||||
defer state.mu.Unlock()
|
||||
if state.restartStartedAt.IsZero() {
|
||||
return time.Time{}, false
|
||||
}
|
||||
return state.restartStartedAt, true
|
||||
}
|
||||
|
||||
// isParentPaused checks if any ancestor of the workspace is paused.
|
||||
func isParentPaused(ctx context.Context, workspaceID string) (bool, string) {
|
||||
var parentID *string
|
||||
|
||||
@@ -117,8 +117,9 @@ func TestMaybeMarkContainerDead_SkippedWhileRestarting(t *testing.T) {
|
||||
h := newSelfFireHandler(t)
|
||||
h.provisioner = stub
|
||||
|
||||
if got := h.maybeMarkContainerDead(context.Background(), wsID); got != false {
|
||||
t.Errorf("maybeMarkContainerDead must return false while restarting, got %v", got)
|
||||
dead, _, _ := h.maybeMarkContainerDead(context.Background(), wsID, "", []byte("{}"), "message/send", 0, false)
|
||||
if dead != false {
|
||||
t.Errorf("maybeMarkContainerDead must return false while restarting, got %v", dead)
|
||||
}
|
||||
if stub.calls != 0 {
|
||||
t.Errorf("IsRunning must not be called while restarting (Layer 1 gate broken); got %d calls", stub.calls)
|
||||
|
||||
Reference in New Issue
Block a user