Merge pull request 'fix(ws-server): close self-fire restart feedback loop (internal#544)' (#1556) from fix/ws-server-self-fire-restart-loop into main
CI / Canvas Deploy Reminder (push) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
E2E Chat / E2E Chat (push) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
Harness Replays / Harness Replays (push) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
publish-workspace-server-image / build-and-push (push) Successful in 5m15s
Block internal-flavored paths / Block forbidden paths (push) Successful in 5s
CI / Detect changes (push) Successful in 7s
CI / Shellcheck (E2E scripts) (push) Successful in 11s
E2E API Smoke Test / detect-changes (push) Successful in 24s
E2E Chat / detect-changes (push) Successful in 10s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 10s
Handlers Postgres Integration / detect-changes (push) Successful in 6s
Harness Replays / detect-changes (push) Successful in 10s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 12s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 7s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Has started running
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 2s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 5s
CI / Platform (Go) (push) Successful in 5m20s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m16s
CI / Python Lint & Test (push) Successful in 6m53s
CI / Canvas (Next.js) (push) Successful in 7m19s
CI / all-required (push) Successful in 7m8s
publish-workspace-server-image / Production auto-deploy (push) Has been cancelled

This commit was merged in pull request #1556.
This commit is contained in:
2026-05-19 01:40:54 +00:00
4 changed files with 464 additions and 0 deletions
@@ -168,6 +168,21 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
if !h.HasProvisioner() {
return false
}
// Restart-aware short-circuit: during the 20-30s EC2-pending window of
// an in-flight restart, the workspace's url='' and IsRunning() returns
// false → looks indistinguishable from a dead container. Pre-fix this
// fired a fresh RestartByID for the just-launched instance, which
// coalesceRestart's pending-flag drained by running ANOTHER full
// stop+provision cycle (= ec2_stopped of the still-pending instance
// → re-provision). That's the 4x reprov thrash class. Skip the
// container-dead path while a restart is in flight; the in-flight
// restart's own provisionWorkspaceAutoSync will surface a real failure
// (markProvisionFailed) if the new container never comes up. Issue
// internal#544.
if isRestarting(workspaceID) {
log.Printf("ProxyA2A: maybeMarkContainerDead skipped for %s — restart already in flight (self-fire guard)", workspaceID)
return false
}
var running bool
var inspectErr error
@@ -223,6 +238,18 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
// shape post-EC2-replace (see molecule-controlplane#20 incident
// 2026-05-07) where the reconciler hasn't respawned the agent yet.
func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspaceID string) *proxyA2AError {
// Restart-aware short-circuit (mirror of maybeMarkContainerDead): if a
// restart cycle is in flight for this workspace, do not run the
// IsRunning probe — it would observe the EC2-pending state as "not
// running" and trigger RestartByID for an already-restarting workspace,
// closing the self-fire loop. Returning nil lets the optimistic
// forward proceed; the upstream Do() call will fail with a connection
// error or 502, and the *post-restart* reactive path can decide what
// to do once the cycle has actually completed. Issue internal#544.
if isRestarting(workspaceID) {
log.Printf("ProxyA2A preflight: %s — skipped, restart already in flight (self-fire guard)", workspaceID)
return nil
}
running, err := h.provisioner.IsRunning(ctx, workspaceID)
if err != nil {
// Transient daemon error. Provisioner.IsRunning returns (true, err)
@@ -180,6 +180,42 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
return false
}
// waitForFreshHeartbeat polls until the workspace has BOTH a non-empty
// url AND a last_heartbeat_at strictly after restartStartTs (i.e. the
// heartbeat we observe is NEW, not the stale pre-restart one carried
// across through the row update). Returns false on timeout or DB error.
//
// This is the Layer 2 gate for the 2026-05-19 ws-server self-fire restart
// loop fix. status='online' can flip while url='' is still in place (the
// status update happens in /registry/register; url is set at the same
// time but the read here may see a transient interleaving) and pre-fix
// the trailing restart-context probe could fire against a half-registered
// row, triggering the upstream-502 → maybeMarkContainerDead → self-fire
// chain we're closing. The url + heartbeat-freshness check is the
// strict, correlated end-state assertion that says "the new container is
// actually addressable" — not just "some heartbeat happened".
func waitForFreshHeartbeat(ctx context.Context, workspaceID string, restartStartTs time.Time, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
var url sql.NullString
var lastHB sql.NullTime
err := db.DB.QueryRowContext(ctx,
`SELECT url, last_heartbeat_at FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&url, &lastHB)
if err == nil &&
url.Valid && url.String != "" &&
lastHB.Valid && lastHB.Time.After(restartStartTs) {
return true
}
select {
case <-ctx.Done():
return false
case <-time.After(restartContextOnlinePollInterval):
}
}
return false
}
// buildRestartA2APayload wraps the rendered context string in the
// JSON-RPC 2.0 / A2A message/send shape that the proxy already knows
// how to normalize. Returns the marshalled body ready for ProxyA2ARequest.
@@ -220,6 +256,22 @@ func (h *WorkspaceHandler) sendRestartContext(workspaceID string, data restartCo
log.Printf("restart-context: workspace %s did not come online within %s — dropping context message", workspaceID, restartContextOnlineTimeout)
return
}
// Self-fire guard (Layer 2 of the 2026-05-19 ws-server self-fire fix):
// status='online' alone is not enough to safely fire the trailing
// ProxyA2ARequest. The workspace must also have:
// - url != '' (the new container's URL has been registered)
// - last_heartbeat_at > data.RestartAt (the heartbeat we're seeing is NEW, not stale)
// Without those, ProxyA2ARequest can fail with a connect error or
// upstream 502, hit handleA2ADispatchError → maybeMarkContainerDead →
// RestartByID → self-fire. The Layer 1 isRestarting gate already
// covers that, but this is a belt-and-suspenders so the probe never
// even tries until the new container is actually addressable. Best-
// effort: if the DB read errors out we proceed (preserves the legacy
// behaviour of "online means online").
if !waitForFreshHeartbeat(ctx, workspaceID, data.RestartAt, restartContextOnlineTimeout) {
log.Printf("restart-context: workspace %s online but no fresh heartbeat or empty url — dropping context message (self-fire guard)", workspaceID)
return
}
text := buildRestartContextMessage(data)
body, err := buildRestartA2APayload(text)
@@ -8,6 +8,7 @@ import (
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@@ -39,12 +40,57 @@ type restartState struct {
mu sync.Mutex
running bool // true while a restart cycle is in flight
pending bool // set by any caller that arrived during the in-flight cycle
// restartStartedAt records the wall-clock when the most recent cycle
// flipped running=true. Used by the self-fire debounce (internal#544,
// the ws-server self-fire restart feedback loop seen in prod-Reviewer/
// Researcher 2026-05-19 ~00:05Z 4x reprov thrash): any RestartByID
// arriving within restartDebounceWindow of this timestamp is silently
// dropped so a probe firing during the EC2-pending window can't
// re-trigger a fresh full cycle on the just-launched instance.
restartStartedAt time.Time
}
// restartStates is a per-workspace map of *restartState. Each workspace gets
// its own entry so unrelated workspaces don't serialize on each other.
var restartStates sync.Map // map[workspaceID]*restartState
// restartDebounceWindow is the silent-drop window for successive RestartByID
// calls. Sized to cover the typical EC2 pending → online interval (20-30s)
// with a margin so a probe firing during the just-after-online but still-
// flaky heartbeat window also gets dropped. Bigger than that would block
// legitimate "Restart failed, retry" recoveries; smaller would let the
// 4x thrash class through. Package-level so tests can shrink it.
var restartDebounceWindow = 60 * time.Second
// restartByIDDropCounter is incremented every time RestartByID drops a call
// inside the debounce window. Exposed as a package-level atomic counter so
// (a) tests can assert the drop fired, (b) ops can grep logs for the drop
// log line + the counter snapshot in a future /admin/metrics endpoint.
// Not a Prometheus metric because the platform doesn't pull metrics from
// workspace-server yet — that's a separate RFC.
var restartByIDDropCounter atomic.Uint64
// isRestarting reports whether a restart cycle is currently in flight for
// the workspace. Callers that have their own "container looks dead" probe
// MUST consult this before triggering a restart, because during the
// 20-30s EC2-pending window the workspace's url='' and IsRunning()=false
// looks identical to a dead container — and any restart-triggering probe
// (maybeMarkContainerDead from canvas /delegations poll, or the trailing
// restart-context probe at the end of runRestartCycle) will set
// pending=true and the outer coalesceRestart loop will drain by running
// ANOTHER full cycle, ec2_stopped of the just-booted instance →
// re-provision. That's the self-fire loop closed by this gate.
func isRestarting(workspaceID string) bool {
sv, ok := restartStates.Load(workspaceID)
if !ok {
return false
}
state := sv.(*restartState)
state.mu.Lock()
defer state.mu.Unlock()
return state.running
}
// isParentPaused checks if any ancestor of the workspace is paused.
func isParentPaused(ctx context.Context, workspaceID string) (bool, string) {
var parentID *string
@@ -376,9 +422,45 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
if !h.HasProvisioner() {
return
}
// Self-fire debounce: drop (not coalesce) successive RestartByID calls
// within restartDebounceWindow of the most recent cycle's start. This
// is the load-bearing protection against the 4x reprov thrash class —
// coalesceRestart's pending-flag would otherwise drain by running
// ANOTHER full cycle of stop+provision on the just-launched EC2 (still
// in the pending state), which is the self-fire we're closing.
//
// Only applies to RestartByID (programmatic — secrets handler,
// maybeMarkContainerDead, preflightContainerHealth). The HTTP Restart
// handler in workspace_restart.go's Restart() bypasses this path and
// calls RestartWorkspaceAutoOpts directly, so user-initiated restart
// clicks are unaffected.
if shouldDebounceRestart(workspaceID) {
restartByIDDropCounter.Add(1)
log.Printf("RestartByID: %s — dropped (within %s self-fire debounce window; total dropped=%d)",
workspaceID, restartDebounceWindow, restartByIDDropCounter.Load())
return
}
coalesceRestart(workspaceID, func() { h.runRestartCycle(workspaceID) })
}
// shouldDebounceRestart reports whether the most recent cycle for this
// workspace started within restartDebounceWindow. Read-only on
// restartState; the actual restartStartedAt stamp is written in
// coalesceRestart when running flips false→true.
func shouldDebounceRestart(workspaceID string) bool {
sv, ok := restartStates.Load(workspaceID)
if !ok {
return false
}
state := sv.(*restartState)
state.mu.Lock()
defer state.mu.Unlock()
if state.restartStartedAt.IsZero() {
return false
}
return time.Since(state.restartStartedAt) < restartDebounceWindow
}
// coalesceRestart implements the pending-flag gate around an arbitrary cycle
// function. Extracted from RestartByID for direct unit testing — the cycle
// function in production is `runRestartCycle`, but tests pass a counter to
@@ -398,6 +480,12 @@ func coalesceRestart(workspaceID string, cycle func()) {
return
}
state.running = true
// Stamp the start time so the RestartByID debounce can drop any
// self-fire probe that hits within restartDebounceWindow. Only the
// false→true edge stamps; the drain-loop's inner cycles re-use the
// same start (they're effectively one "restart event" from the
// debounce's POV).
state.restartStartedAt = time.Now()
state.mu.Unlock()
// Always clear running on exit — including panic — so a panicking
@@ -0,0 +1,297 @@
package handlers
// Tests for the 2026-05-19 ws-server self-fire restart feedback loop fix.
//
// Empirical chain reproduced (prod-Reviewer/Researcher 4x reprov thrash
// 2026-05-19 ~00:05-00:09Z, root-caused via Loki):
//
// 1. POST /secrets → go h.restartFunc(workspaceID) (secrets.go:264).
// 2. runRestartCycle sets url='' synchronously, then async provisions EC2
// (workspace_restart.go).
// 3. During 20-30s window while EC2 is `pending` (codex first heartbeat
// not yet landed): workspaces.url='' AND IsRunning=false.
// 4. Any ProxyA2A (canvas /delegations poll OR the restart-context probe
// at the end of runRestartCycle) → maybeMarkContainerDead sees the
// container-dead state → calls RestartByID → loop.
// 5. coalesceRestart sets pending=true, drains by running ANOTHER full
// cycle → provision.ec2_stopped of the just-booted instance →
// re-provision.
//
// Fix: three interdependent layers.
//
// L1) isRestarting() gate in maybeMarkContainerDead +
// preflightContainerHealth — early-return false/nil so the probe
// can't trigger a fresh RestartByID while a restart is in flight.
// L2) sendRestartContext requires url != '' AND last_heartbeat_at >
// restart_start_ts before firing the trailing ProxyA2A probe.
// L3) RestartByID silently drops successive calls within
// restartDebounceWindow of restartStartedAt, with a counter for
// observability.
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
// resetSelfFireState wipes all the per-workspace mutation state these
// tests touch, plus the package-level drop counter, so the test is
// hermetic regardless of ordering.
func resetSelfFireState(workspaceID string) {
restartStates.Delete(workspaceID)
restartByIDDropCounter.Store(0)
}
// markRestarting forces restartStates into "cycle in flight" without
// running an actual cycle, so the tests can isolate the gate behaviour
// without the full provision pipeline. Returns a finish() that flips
// running=false (mimicking coalesceRestart's deferred state-clear).
func markRestarting(workspaceID string) (finish func()) {
sv, _ := restartStates.LoadOrStore(workspaceID, &restartState{})
state := sv.(*restartState)
state.mu.Lock()
state.running = true
state.restartStartedAt = time.Now()
state.mu.Unlock()
return func() {
state.mu.Lock()
state.running = false
state.mu.Unlock()
}
}
// TestIsRestarting_FalseWhenNoStateEntry — baseline: a workspace that
// has never been restarted reports !isRestarting. Pinning this so a
// future LoadOrStore refactor can't silently start returning true for
// unknown workspaces.
func TestIsRestarting_FalseWhenNoStateEntry(t *testing.T) {
const wsID = "self-fire-ws-never"
resetSelfFireState(wsID)
if isRestarting(wsID) {
t.Fatal("isRestarting must return false for a workspace with no state entry")
}
}
// TestIsRestarting_TrueWhileCycleRunning — the load-bearing invariant
// that Layer 1 depends on. While running=true, isRestarting must report
// true; the moment it flips to false, isRestarting must report false.
func TestIsRestarting_TrueWhileCycleRunning(t *testing.T) {
const wsID = "self-fire-ws-in-flight"
resetSelfFireState(wsID)
finish := markRestarting(wsID)
if !isRestarting(wsID) {
t.Fatal("isRestarting must return true while running=true")
}
finish()
if isRestarting(wsID) {
t.Fatal("isRestarting must return false after running flips back to false")
}
}
// TestMaybeMarkContainerDead_SkippedWhileRestarting — Layer 1 for the
// reactive path. With isRestarting=true the function must early-return
// false WITHOUT invoking IsRunning, hitting the DB UPDATE, or kicking
// a RestartByID goroutine. If any of those side-effects fire we'd
// re-arm the self-fire loop the gate exists to close.
func TestMaybeMarkContainerDead_SkippedWhileRestarting(t *testing.T) {
const wsID = "self-fire-ws-mmcd"
resetSelfFireState(wsID)
mock := setupTestDB(t) // sqlmock with strict expectation matching
// Workspace row read inside maybeMarkContainerDead — this happens
// BEFORE the isRestarting gate in the current implementation, so
// allow exactly one SELECT runtime row.
mock.ExpectQuery(`SELECT COALESCE\(runtime, 'langgraph'\) FROM workspaces WHERE id =`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"runtime"}).AddRow("claude-code"))
// Gate flipped: must early-return without doing anything else.
finish := markRestarting(wsID)
defer finish()
stub := &preflightLocalProv{running: false, err: nil}
h := newSelfFireHandler(t)
h.provisioner = stub
if got := h.maybeMarkContainerDead(context.Background(), wsID); got != false {
t.Errorf("maybeMarkContainerDead must return false while restarting, got %v", got)
}
if stub.calls != 0 {
t.Errorf("IsRunning must not be called while restarting (Layer 1 gate broken); got %d calls", stub.calls)
}
}
// TestPreflightContainerHealth_SkippedWhileRestarting — Layer 1 for the
// proactive path. Same shape as above: with restart in flight, return
// nil (let the optimistic forward proceed) and DO NOT call IsRunning.
// The forward will fail with a connect error; the post-restart reactive
// path can decide what to do then, by which point the EC2 has either
// come up (no more failures) or markProvisionFailed has fired.
func TestPreflightContainerHealth_SkippedWhileRestarting(t *testing.T) {
const wsID = "self-fire-ws-preflight"
resetSelfFireState(wsID)
_ = setupTestDB(t)
finish := markRestarting(wsID)
defer finish()
stub := &preflightLocalProv{running: false, err: nil}
h := newSelfFireHandler(t)
h.provisioner = stub
if err := h.preflightContainerHealth(context.Background(), wsID); err != nil {
t.Errorf("preflightContainerHealth must return nil while restarting, got %+v", err)
}
if stub.calls != 0 {
t.Errorf("IsRunning must not be called while restarting (Layer 1 gate broken); got %d calls", stub.calls)
}
}
// TestRestartByID_DebounceSilentDrop — Layer 3. After a cycle starts,
// any RestartByID arriving within restartDebounceWindow MUST be dropped
// silently — not coalesced (which would still drain to another cycle).
// The drop counter must increment by exactly one per dropped call so
// ops can see how often the self-fire would have fired pre-fix.
func TestRestartByID_DebounceSilentDrop(t *testing.T) {
const wsID = "self-fire-ws-debounce"
resetSelfFireState(wsID)
// Stamp restartStartedAt = now, running=false (simulates the "just
// finished" window where the loop would re-fire pre-fix).
sv, _ := restartStates.LoadOrStore(wsID, &restartState{})
state := sv.(*restartState)
state.mu.Lock()
state.restartStartedAt = time.Now()
state.running = false
state.mu.Unlock()
// Counter baseline.
if got := restartByIDDropCounter.Load(); got != 0 {
t.Fatalf("expected drop counter 0 at start, got %d", got)
}
// Five rapid-fire RestartByID calls should all drop (the maximum
// observed pre-fix was 4x — pinning >=4 here keeps the regression
// shape true to the prod incident).
h := newSelfFireHandler(t)
stub := &preflightLocalProv{running: true, err: nil}
h.provisioner = stub
for i := 0; i < 5; i++ {
h.RestartByID(wsID)
}
if got := restartByIDDropCounter.Load(); got != 5 {
t.Errorf("expected 5 drops within debounce window, got %d", got)
}
// shouldDebounceRestart itself must report true for the same window.
if !shouldDebounceRestart(wsID) {
t.Error("shouldDebounceRestart must return true within window")
}
}
// TestRestartByID_DebounceExpiresAfterWindow — outside the window, the
// debounce must release: a legitimate later restart (e.g. user clicked
// Restart again after waiting) must proceed to coalesceRestart. We
// shrink restartDebounceWindow to 1ms for the duration of this test so
// we don't sleep a full 60s in CI.
func TestRestartByID_DebounceExpiresAfterWindow(t *testing.T) {
const wsID = "self-fire-ws-debounce-release"
resetSelfFireState(wsID)
orig := restartDebounceWindow
restartDebounceWindow = 5 * time.Millisecond
defer func() { restartDebounceWindow = orig }()
// Stamp inside the window.
sv, _ := restartStates.LoadOrStore(wsID, &restartState{})
state := sv.(*restartState)
state.mu.Lock()
state.restartStartedAt = time.Now()
state.running = false
state.mu.Unlock()
if !shouldDebounceRestart(wsID) {
t.Fatal("within 5ms window must debounce")
}
// Sleep past the window. Use a small margin to avoid clock-skew
// flakes on slow CI hosts.
time.Sleep(20 * time.Millisecond)
if shouldDebounceRestart(wsID) {
t.Fatal("after 20ms (4x window) must no longer debounce")
}
}
// TestRestartByID_SingleProvisionPerRestart — the regression test for
// the prod incident: a SINGLE secrets PUT (which is the trigger shape)
// must produce exactly ONE coalesceRestart cycle, not four. Models the
// full chain: secrets handler → RestartByID → coalesceRestart → cycle
// runs → during the cycle window, simulated probes call RestartByID
// again. With all three layers in place, the probes are dropped and the
// total cycle count stays at 1.
func TestRestartByID_SingleProvisionPerRestart(t *testing.T) {
const wsID = "self-fire-ws-single-provision"
resetSelfFireState(wsID)
// In-flight gate that mimics the EC2-pending window. The cycle
// blocks on cycleProceed so we can fire the simulated probes while
// running=true.
var cycleCount atomic.Int32
cycleStarted := make(chan struct{}, 1)
cycleProceed := make(chan struct{})
cycle := func() {
n := cycleCount.Add(1)
if n == 1 {
cycleStarted <- struct{}{}
<-cycleProceed
}
}
// Kick the first cycle via coalesceRestart (this is what RestartByID
// would do post-debounce-check).
done := make(chan struct{})
go func() {
coalesceRestart(wsID, cycle)
close(done)
}()
<-cycleStarted
// Simulate the 4 probe-driven RestartByID calls observed in prod.
// Each must drop because we're within the debounce window AND a
// cycle is in flight.
h := newSelfFireHandler(t)
stub := &preflightLocalProv{running: true, err: nil}
h.provisioner = stub
for i := 0; i < 4; i++ {
h.RestartByID(wsID)
}
// Release the cycle.
close(cycleProceed)
<-done
if got := cycleCount.Load(); got != 1 {
t.Errorf("expected exactly 1 provision cycle for a single trigger "+
"(self-fire fix), got %d — regression of the prod 4x reprov thrash class",
got)
}
if got := restartByIDDropCounter.Load(); got != 4 {
t.Errorf("expected 4 self-fire probes dropped, got %d "+
"(observability counter must record the saved cycles)", got)
}
}
// newSelfFireHandler constructs a minimal *WorkspaceHandler suitable for
// the Layer-1 gate tests. Wraps the boilerplate so the per-test setup
// stays focused on the assertion.
func newSelfFireHandler(t *testing.T) *WorkspaceHandler {
t.Helper()
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
}