diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 502bacebf..e79c71d61 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -337,6 +337,25 @@ func main() { }) } + // CP-mode instance-state reconciler — authoritative EC2-liveness pass + // for SaaS workspaces (core#2261). Every other liveness sweep keys off + // a PROXY (Redis TTL, agent heartbeat, local Docker, or + // runtime='external'); a SaaS claude-code workspace whose EC2 was + // terminated/stopped falls through ALL of them and stays status='online' + // pointing at a dead instance_id forever (root cause: core#2247). This + // loop asks the ONE authoritative question the others lack — + // cpProv.IsRunning (CP DescribeInstances-equivalent) — for each online + // SaaS row, and on a CLEAN "not running" feeds it into the SAME + // onWorkspaceOffline closure the other sweeps use (status flip + + // RestartByID reprovision, existing volume). Fail-safe: IsRunning is + // (true, err) on any transient error, so a CP blip never flips a healthy + // workspace. + if cpProv != nil { + go supervised.RunWithRecover(ctx, "cp-instance-reconciler", func(c context.Context) { + registry.StartCPInstanceReconciler(c, cpProv, onWorkspaceOffline, 60*time.Second) + }) + } + // Pending-uploads GC sweep — deletes acked rows past their retention // window plus unacked rows past expires_at. Without this the // pending_uploads table grows unbounded; even with the 24h hard TTL, diff --git a/workspace-server/internal/registry/cp_instance_reconciler.go b/workspace-server/internal/registry/cp_instance_reconciler.go new file mode 100644 index 000000000..e5801e570 --- /dev/null +++ b/workspace-server/internal/registry/cp_instance_reconciler.go @@ -0,0 +1,178 @@ +package registry + +// cp_instance_reconciler.go — authoritative EC2-state reconcile for +// SaaS workspaces (core#2261). +// +// Root cause (core#2247): every existing liveness pass keys off a PROXY +// for "is this workspace alive?": +// +// - StartLivenessMonitor — Redis TTL expiry (agent stopped heartbeating). +// - StartHealthSweep (Docker pass) — local Docker daemon (prov != nil only). +// - StartHealthSweep (remote pass) — last_heartbeat_at freshness for +// runtime='external' rows. +// - StartCPOrphanSweeper — status='removed' rows with a stray instance_id. +// +// A SaaS claude-code workspace whose EC2 was terminated/stopped out from +// under us (manual AWS action, spot reclaim, CP-side reap, etc.) falls +// through ALL of them: it's not 'removed' (so the orphan sweeper skips +// it), it's not runtime='external' (so the heartbeat pass skips it), and +// on a pure-SaaS front-door prov == nil so the Docker pass never runs. +// The registry kept status='online' pointing at a dead instance forever. +// +// This sweeper closes that gap with the ONE authoritative check the +// others lack: CPProvisioner.IsRunning, which ultimately asks the +// control-plane "is this EC2 actually running?" (DescribeInstances- +// equivalent). When the answer is a CLEAN "no" it feeds the workspace +// into the EXISTING offline/auto-heal machinery (onOffline → status flip +// + RestartByID reprovision with the existing volume) — no new healing +// path, just real ground truth driving the one we already have. +// +// Guardrails: +// - FAIL-SAFE: IsRunning is (true, err) on any transient DB/transport +// error and (false, nil) ONLY when CP genuinely reports the instance +// is not running. We act ONLY on (false, nil); any err short-circuits +// to "leave it alone" so a CP blip never flips a healthy workspace. +// - ONLINE + SaaS ONLY: status='online', instance_id present, and +// runtime <> 'external'. Paused/hibernated/removed/provisioning/ +// awaiting_agent rows are out of scope; external rows are covered by +// the remote-heartbeat pass. +// - Per-cycle row cap + per-workspace timeout so one slow CP call can't +// stall the sweep. + +import ( + "context" + "log" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" +) + +// InstanceRunningChecker is the narrow dependency the reconciler takes +// from the CP provisioner. *provisioner.CPProvisioner satisfies this +// naturally; tests inject fakes. +// +// Contract (load-bearing): IsRunning is FAIL-SAFE — it returns +// (true, err) on transient DB/transport errors and (false, nil) ONLY +// when CP reports the instance is genuinely not running. The reconciler +// flips a workspace offline strictly on (false, nil). +type InstanceRunningChecker interface { + IsRunning(ctx context.Context, workspaceID string) (bool, error) +} + +// CPInstanceReconcileLimit caps the per-cycle row count so a sustained +// CP slowdown can't make a single sweep cycle run unbounded. With a 60s +// cadence and a per-workspace timeout below, this bounds worst-case +// cycle wall-time and lets subsequent cycles drain any backlog. +const CPInstanceReconcileLimit = 200 + +// cpInstanceCheckTimeout bounds a single IsRunning call so one slow CP +// round-trip can't stall the whole sweep. Each workspace gets its own +// timeout context derived from the cycle context. +const cpInstanceCheckTimeout = 10 * time.Second + +// StartCPInstanceReconciler runs the authoritative EC2-state reconcile +// loop until ctx is cancelled. A nil checker makes the loop a no-op +// (matches the nil-tolerant pattern of the sibling CP sweeper). +// +// Caller is expected to gate on `cpProv != nil` (matching how +// StartCPOrphanSweeper is gated at the wiring site in cmd/server/main.go) +// — passing a nil *CPProvisioner here would also short-circuit, but the +// gate at the call site keeps the call shape symmetric across sweepers. +// +// interval <= 0 falls back to the default 60s cadence so a misconfigured +// caller can't spin a zero-duration ticker (which panics). +func StartCPInstanceReconciler(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler, interval time.Duration) { + if checker == nil { + log.Println("cp-instance-reconciler: checker is nil — reconciler disabled") + return + } + if interval <= 0 { + interval = 60 * time.Second + } + log.Printf("cp-instance-reconciler started — reconciling online SaaS workspaces against real EC2 state every %s", interval) + ticker := time.NewTicker(interval) + defer ticker.Stop() + // Kick once at boot so a platform restart starts healing immediately + // rather than waiting a full interval. + reconcileOnce(ctx, checker, onOffline) + for { + select { + case <-ctx.Done(): + log.Println("cp-instance-reconciler: shutdown") + return + case <-ticker.C: + reconcileOnce(ctx, checker, onOffline) + } + } +} + +// reconcileOnce executes one reconcile pass. Defensive against db.DB +// being nil so a misconfigured boot doesn't panic. +// +// Scope: online + SaaS-EC2 workspaces only. runtime='external' rows are +// excluded (covered by the remote-heartbeat pass); paused/hibernated/ +// removed/provisioning/awaiting_agent are excluded by the status filter. +func reconcileOnce(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler) { + if db.DB == nil { + return + } + + rows, err := db.DB.QueryContext(ctx, ` + SELECT id::text + FROM workspaces + WHERE status = 'online' + AND instance_id IS NOT NULL + AND instance_id != '' + AND COALESCE(runtime, '') <> 'external' + ORDER BY updated_at DESC + LIMIT $1 + `, CPInstanceReconcileLimit) + if err != nil { + log.Printf("cp-instance-reconciler: DB query failed: %v", err) + return + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if scanErr := rows.Scan(&id); scanErr != nil { + log.Printf("cp-instance-reconciler: row scan failed: %v", scanErr) + continue + } + ids = append(ids, id) + } + if iterErr := rows.Err(); iterErr != nil { + log.Printf("cp-instance-reconciler: rows iteration failed: %v", iterErr) + return + } + + for _, id := range ids { + // Per-workspace timeout so one slow CP round-trip can't stall + // the whole sweep. + checkCtx, cancel := context.WithTimeout(ctx, cpInstanceCheckTimeout) + running, checkErr := checker.IsRunning(checkCtx, id) + cancel() + + if checkErr != nil { + // FAIL-SAFE: transient DB/transport error (or a no-backend + // signal). IsRunning returns (true, err) on these, so never + // flip — leave the row online and retry next cycle. + log.Printf("cp-instance-reconciler: IsRunning(%s) errored, leaving online (fail-safe): %v", id, checkErr) + continue + } + if running { + continue + } + + // CLEAN "not running" — CP authoritatively reports the EC2 is + // terminated/stopped/absent. Feed it into the existing offline + + // auto-heal machinery: onOffline flips the row offline and + // triggers RestartByID, which reprovisions with the existing + // volume. + log.Printf("cp-instance-reconciler: workspace %s is status=online but its EC2 is not running (terminated/stopped) — flipping offline + triggering reprovision", id) + if onOffline != nil { + onOffline(ctx, id) + } + } +} diff --git a/workspace-server/internal/registry/cp_instance_reconciler_test.go b/workspace-server/internal/registry/cp_instance_reconciler_test.go new file mode 100644 index 000000000..e1ea66c91 --- /dev/null +++ b/workspace-server/internal/registry/cp_instance_reconciler_test.go @@ -0,0 +1,282 @@ +package registry + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" +) + +// fakeRunningChecker implements InstanceRunningChecker for the +// instance-reconciler tests. Records every IsRunning call so tests can +// assert which workspace IDs were probed, and returns a per-id +// (running, err) pair so we can model CP's three answers: +// +// (true, nil) — instance is running. +// (false, nil) — CLEAN "not running" (terminated/stopped/absent). +// (true, err) — transient DB/transport error (FAIL-SAFE path). +type fakeRunningChecker struct { + mu sync.Mutex + running map[string]bool + errs map[string]error + calls []string +} + +func (f *fakeRunningChecker) IsRunning(_ context.Context, wsID string) (bool, error) { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, wsID) + if err, ok := f.errs[wsID]; ok { + // Mirror CPProvisioner.IsRunning: (true, err) on transient errors + // so callers stay on the alive path. + return true, err + } + return f.running[wsID], nil +} + +// recordingOffline is an OfflineHandler that records the workspace IDs +// it was invoked with. +type recordingOffline struct { + mu sync.Mutex + calls []string +} + +func (r *recordingOffline) handler() OfflineHandler { + return func(_ context.Context, wsID string) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, wsID) + } +} + +func (r *recordingOffline) got() []string { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]string, len(r.calls)) + copy(out, r.calls) + return out +} + +// expectReconcileQuery registers the reconciler's SELECT, pinning the +// scope-critical predicates: status='online', instance_id present, and +// runtime <> 'external'. A future widening that drops any of these (e.g. +// sweeping paused rows, or external rows the heartbeat pass owns) fails +// every test that uses this helper. +func expectReconcileQuery(mock sqlmock.Sqlmock, rows *sqlmock.Rows) { + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'online'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+AND COALESCE\(runtime, ''\) <> 'external'\s+ORDER BY updated_at DESC\s+LIMIT \$1`). + WithArgs(CPInstanceReconcileLimit). + WillReturnRows(rows) +} + +// TestReconcileOnce_NotRunning_FlipsOffline — the core bug (core#2247): +// an online SaaS workspace whose EC2 is terminated. CP reports a CLEAN +// (false, nil); onOffline MUST be called with that id so the existing +// auto-heal (status flip + RestartByID reprovision) kicks in. +func TestReconcileOnce_NotRunning_FlipsOffline(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{running: map[string]bool{"ws-dead": false}} + off := &recordingOffline{} + + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-dead")) + + reconcileOnce(context.Background(), checker, off.handler()) + + if got := off.got(); len(got) != 1 || got[0] != "ws-dead" { + t.Fatalf("expected onOffline(ws-dead), got %v", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_Running_DoesNotFlip — healthy steady state. CP +// reports (true, nil); the workspace stays online, onOffline is NOT +// called. +func TestReconcileOnce_Running_DoesNotFlip(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{running: map[string]bool{"ws-alive": true}} + off := &recordingOffline{} + + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-alive")) + + reconcileOnce(context.Background(), checker, off.handler()) + + if got := off.got(); len(got) != 0 { + t.Fatalf("running workspace must NOT be flipped offline, got %v", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_TransientError_DoesNotFlip — FAIL-SAFE contract. +// IsRunning returns (true, err) on a transient DB/transport blip; the +// reconciler MUST NOT flip the workspace offline. This is the guardrail +// that stops a CP outage from cascading every healthy workspace through +// reprovision. +func TestReconcileOnce_TransientError_DoesNotFlip(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{ + errs: map[string]error{"ws-blip": errors.New("cp provisioner: status: connection reset")}, + } + off := &recordingOffline{} + + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-blip")) + + reconcileOnce(context.Background(), checker, off.handler()) + + if got := off.got(); len(got) != 0 { + t.Fatalf("fail-safe violated: transient IsRunning error must NOT flip offline, got %v", got) + } + if calls := checker.calls; len(calls) != 1 || calls[0] != "ws-blip" { + t.Fatalf("expected IsRunning(ws-blip), got %v", checker.calls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline — pins the +// SELECT predicate. The regex in expectReconcileQuery requires +// status='online' AND runtime <> 'external'; if a future edit widens the +// scope to include paused/hibernated/removed rows or external rows (owned +// by the heartbeat pass), this query no longer matches and sqlmock fails +// the test. With the predicate intact, a DB that has only out-of-scope +// rows returns empty → no IsRunning, no flip. +func TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{} + off := &recordingOffline{} + + // The predicate filters out external + non-online rows server-side, + // modelled as the empty result those filters produce. + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"})) + + reconcileOnce(context.Background(), checker, off.handler()) + + if len(checker.calls) != 0 { + t.Fatalf("out-of-scope rows must never reach IsRunning, got %v", checker.calls) + } + if got := off.got(); len(got) != 0 { + t.Fatalf("expected no offline flips for out-of-scope rows, got %v", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_MixedBatch — each row is judged independently: the +// dead one flips, the alive one and the transient-error one don't. +func TestReconcileOnce_MixedBatch(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{ + running: map[string]bool{"ws-dead": false, "ws-alive": true}, + errs: map[string]error{"ws-blip": errors.New("503")}, + } + off := &recordingOffline{} + + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}). + AddRow("ws-dead"). + AddRow("ws-alive"). + AddRow("ws-blip")) + + reconcileOnce(context.Background(), checker, off.handler()) + + if got := off.got(); len(got) != 1 || got[0] != "ws-dead" { + t.Fatalf("expected only ws-dead flipped, got %v", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_QueryError — DB transient failure. Reconcile returns +// without panicking and never probes IsRunning or flips anything. +func TestReconcileOnce_QueryError(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{} + off := &recordingOffline{} + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(CPInstanceReconcileLimit). + WillReturnError(errors.New("connection refused")) + + reconcileOnce(context.Background(), checker, off.handler()) + + if len(checker.calls) != 0 || len(off.got()) != 0 { + t.Fatalf("query error must short-circuit; calls=%v offline=%v", checker.calls, off.got()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestReconcileOnce_NilDB — defensive against db.DB being nil. Must not +// panic, must not probe, must not flip. +func TestReconcileOnce_NilDB(t *testing.T) { + saved := db.DB + db.DB = nil + t.Cleanup(func() { db.DB = saved }) + + checker := &fakeRunningChecker{} + off := &recordingOffline{} + reconcileOnce(context.Background(), checker, off.handler()) + + if len(checker.calls) != 0 || len(off.got()) != 0 { + t.Fatalf("nil db.DB must short-circuit; calls=%v offline=%v", checker.calls, off.got()) + } +} + +// TestStartCPInstanceReconciler_NilCheckerDisabled — boot-safety: a SaaS +// CP without cpProv configured must not start the loop (immediate return, +// no goroutine leak). +func TestStartCPInstanceReconciler_NilCheckerDisabled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + StartCPInstanceReconciler(ctx, nil, nil, 60*time.Second) + close(done) + }() + select { + case <-done: + // expected — nil checker short-circuits. + case <-time.After(500 * time.Millisecond): + t.Fatal("StartCPInstanceReconciler(nil) did not return immediately") + } +} + +// TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel — +// cadence contract: one sweep at boot (so a restart starts healing +// immediately), and the loop terminates on ctx cancel. +func TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel(t *testing.T) { + mock := setupTestDB(t) + checker := &fakeRunningChecker{} + off := &recordingOffline{} + + // Boot sweep query. The 60s ticker won't fire inside the test window; + // register a second optional expectation so a stray tick can't fail. + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"})) + expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"})) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + StartCPInstanceReconciler(ctx, checker, off.handler(), 60*time.Second) + close(done) + }() + time.Sleep(100 * time.Millisecond) + cancel() + select { + case <-done: + // expected + case <-time.After(2 * time.Second): + t.Fatal("StartCPInstanceReconciler did not exit on ctx cancel") + } +}