feat(registry): reconcile online workspaces against real EC2 state — auto-heal terminated instances (core#2261) #2266

Merged
hongming merged 1 commits from feat/core2261-instance-state-reconciler into main 2026-06-05 00:28:32 +00:00
3 changed files with 479 additions and 0 deletions
+19
View File
@@ -337,6 +337,25 @@ func main() {
})
}
// CP-mode instance-state reconciler — authoritative EC2-liveness pass
// for SaaS workspaces (core#2261). Every other liveness sweep keys off
// a PROXY (Redis TTL, agent heartbeat, local Docker, or
// runtime='external'); a SaaS claude-code workspace whose EC2 was
// terminated/stopped falls through ALL of them and stays status='online'
// pointing at a dead instance_id forever (root cause: core#2247). This
// loop asks the ONE authoritative question the others lack —
// cpProv.IsRunning (CP DescribeInstances-equivalent) — for each online
// SaaS row, and on a CLEAN "not running" feeds it into the SAME
// onWorkspaceOffline closure the other sweeps use (status flip +
// RestartByID reprovision, existing volume). Fail-safe: IsRunning is
// (true, err) on any transient error, so a CP blip never flips a healthy
// workspace.
if cpProv != nil {
go supervised.RunWithRecover(ctx, "cp-instance-reconciler", func(c context.Context) {
registry.StartCPInstanceReconciler(c, cpProv, onWorkspaceOffline, 60*time.Second)
})
}
// Pending-uploads GC sweep — deletes acked rows past their retention
// window plus unacked rows past expires_at. Without this the
// pending_uploads table grows unbounded; even with the 24h hard TTL,
@@ -0,0 +1,178 @@
package registry
// cp_instance_reconciler.go — authoritative EC2-state reconcile for
// SaaS workspaces (core#2261).
//
// Root cause (core#2247): every existing liveness pass keys off a PROXY
// for "is this workspace alive?":
//
// - StartLivenessMonitor — Redis TTL expiry (agent stopped heartbeating).
// - StartHealthSweep (Docker pass) — local Docker daemon (prov != nil only).
// - StartHealthSweep (remote pass) — last_heartbeat_at freshness for
// runtime='external' rows.
// - StartCPOrphanSweeper — status='removed' rows with a stray instance_id.
//
// A SaaS claude-code workspace whose EC2 was terminated/stopped out from
// under us (manual AWS action, spot reclaim, CP-side reap, etc.) falls
// through ALL of them: it's not 'removed' (so the orphan sweeper skips
// it), it's not runtime='external' (so the heartbeat pass skips it), and
// on a pure-SaaS front-door prov == nil so the Docker pass never runs.
// The registry kept status='online' pointing at a dead instance forever.
//
// This sweeper closes that gap with the ONE authoritative check the
// others lack: CPProvisioner.IsRunning, which ultimately asks the
// control-plane "is this EC2 actually running?" (DescribeInstances-
// equivalent). When the answer is a CLEAN "no" it feeds the workspace
// into the EXISTING offline/auto-heal machinery (onOffline → status flip
// + RestartByID reprovision with the existing volume) — no new healing
// path, just real ground truth driving the one we already have.
//
// Guardrails:
// - FAIL-SAFE: IsRunning is (true, err) on any transient DB/transport
// error and (false, nil) ONLY when CP genuinely reports the instance
// is not running. We act ONLY on (false, nil); any err short-circuits
// to "leave it alone" so a CP blip never flips a healthy workspace.
// - ONLINE + SaaS ONLY: status='online', instance_id present, and
// runtime <> 'external'. Paused/hibernated/removed/provisioning/
// awaiting_agent rows are out of scope; external rows are covered by
// the remote-heartbeat pass.
// - Per-cycle row cap + per-workspace timeout so one slow CP call can't
// stall the sweep.
import (
"context"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
)
// InstanceRunningChecker is the narrow dependency the reconciler takes
// from the CP provisioner. *provisioner.CPProvisioner satisfies this
// naturally; tests inject fakes.
//
// Contract (load-bearing): IsRunning is FAIL-SAFE — it returns
// (true, err) on transient DB/transport errors and (false, nil) ONLY
// when CP reports the instance is genuinely not running. The reconciler
// flips a workspace offline strictly on (false, nil).
type InstanceRunningChecker interface {
IsRunning(ctx context.Context, workspaceID string) (bool, error)
}
// CPInstanceReconcileLimit caps the per-cycle row count so a sustained
// CP slowdown can't make a single sweep cycle run unbounded. With a 60s
// cadence and a per-workspace timeout below, this bounds worst-case
// cycle wall-time and lets subsequent cycles drain any backlog.
const CPInstanceReconcileLimit = 200
// cpInstanceCheckTimeout bounds a single IsRunning call so one slow CP
// round-trip can't stall the whole sweep. Each workspace gets its own
// timeout context derived from the cycle context.
const cpInstanceCheckTimeout = 10 * time.Second
// StartCPInstanceReconciler runs the authoritative EC2-state reconcile
// loop until ctx is cancelled. A nil checker makes the loop a no-op
// (matches the nil-tolerant pattern of the sibling CP sweeper).
//
// Caller is expected to gate on `cpProv != nil` (matching how
// StartCPOrphanSweeper is gated at the wiring site in cmd/server/main.go)
// — passing a nil *CPProvisioner here would also short-circuit, but the
// gate at the call site keeps the call shape symmetric across sweepers.
//
// interval <= 0 falls back to the default 60s cadence so a misconfigured
// caller can't spin a zero-duration ticker (which panics).
func StartCPInstanceReconciler(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler, interval time.Duration) {
if checker == nil {
log.Println("cp-instance-reconciler: checker is nil — reconciler disabled")
return
}
if interval <= 0 {
interval = 60 * time.Second
}
log.Printf("cp-instance-reconciler started — reconciling online SaaS workspaces against real EC2 state every %s", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
// Kick once at boot so a platform restart starts healing immediately
// rather than waiting a full interval.
reconcileOnce(ctx, checker, onOffline)
for {
select {
case <-ctx.Done():
log.Println("cp-instance-reconciler: shutdown")
return
case <-ticker.C:
reconcileOnce(ctx, checker, onOffline)
}
}
}
// reconcileOnce executes one reconcile pass. Defensive against db.DB
// being nil so a misconfigured boot doesn't panic.
//
// Scope: online + SaaS-EC2 workspaces only. runtime='external' rows are
// excluded (covered by the remote-heartbeat pass); paused/hibernated/
// removed/provisioning/awaiting_agent are excluded by the status filter.
func reconcileOnce(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler) {
if db.DB == nil {
return
}
rows, err := db.DB.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status = 'online'
AND instance_id IS NOT NULL
AND instance_id != ''
AND COALESCE(runtime, '') <> 'external'
ORDER BY updated_at DESC
LIMIT $1
`, CPInstanceReconcileLimit)
if err != nil {
log.Printf("cp-instance-reconciler: DB query failed: %v", err)
return
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if scanErr := rows.Scan(&id); scanErr != nil {
log.Printf("cp-instance-reconciler: row scan failed: %v", scanErr)
continue
}
ids = append(ids, id)
}
if iterErr := rows.Err(); iterErr != nil {
log.Printf("cp-instance-reconciler: rows iteration failed: %v", iterErr)
return
}
for _, id := range ids {
// Per-workspace timeout so one slow CP round-trip can't stall
// the whole sweep.
checkCtx, cancel := context.WithTimeout(ctx, cpInstanceCheckTimeout)
running, checkErr := checker.IsRunning(checkCtx, id)
cancel()
if checkErr != nil {
// FAIL-SAFE: transient DB/transport error (or a no-backend
// signal). IsRunning returns (true, err) on these, so never
// flip — leave the row online and retry next cycle.
log.Printf("cp-instance-reconciler: IsRunning(%s) errored, leaving online (fail-safe): %v", id, checkErr)
continue
}
if running {
continue
}
// CLEAN "not running" — CP authoritatively reports the EC2 is
// terminated/stopped/absent. Feed it into the existing offline +
// auto-heal machinery: onOffline flips the row offline and
// triggers RestartByID, which reprovisions with the existing
// volume.
log.Printf("cp-instance-reconciler: workspace %s is status=online but its EC2 is not running (terminated/stopped) — flipping offline + triggering reprovision", id)
if onOffline != nil {
onOffline(ctx, id)
}
}
}
@@ -0,0 +1,282 @@
package registry
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
)
// fakeRunningChecker implements InstanceRunningChecker for the
// instance-reconciler tests. Records every IsRunning call so tests can
// assert which workspace IDs were probed, and returns a per-id
// (running, err) pair so we can model CP's three answers:
//
// (true, nil) — instance is running.
// (false, nil) — CLEAN "not running" (terminated/stopped/absent).
// (true, err) — transient DB/transport error (FAIL-SAFE path).
type fakeRunningChecker struct {
mu sync.Mutex
running map[string]bool
errs map[string]error
calls []string
}
func (f *fakeRunningChecker) IsRunning(_ context.Context, wsID string) (bool, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.calls = append(f.calls, wsID)
if err, ok := f.errs[wsID]; ok {
// Mirror CPProvisioner.IsRunning: (true, err) on transient errors
// so callers stay on the alive path.
return true, err
}
return f.running[wsID], nil
}
// recordingOffline is an OfflineHandler that records the workspace IDs
// it was invoked with.
type recordingOffline struct {
mu sync.Mutex
calls []string
}
func (r *recordingOffline) handler() OfflineHandler {
return func(_ context.Context, wsID string) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, wsID)
}
}
func (r *recordingOffline) got() []string {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]string, len(r.calls))
copy(out, r.calls)
return out
}
// expectReconcileQuery registers the reconciler's SELECT, pinning the
// scope-critical predicates: status='online', instance_id present, and
// runtime <> 'external'. A future widening that drops any of these (e.g.
// sweeping paused rows, or external rows the heartbeat pass owns) fails
// every test that uses this helper.
func expectReconcileQuery(mock sqlmock.Sqlmock, rows *sqlmock.Rows) {
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'online'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+AND COALESCE\(runtime, ''\) <> 'external'\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
WithArgs(CPInstanceReconcileLimit).
WillReturnRows(rows)
}
// TestReconcileOnce_NotRunning_FlipsOffline — the core bug (core#2247):
// an online SaaS workspace whose EC2 is terminated. CP reports a CLEAN
// (false, nil); onOffline MUST be called with that id so the existing
// auto-heal (status flip + RestartByID reprovision) kicks in.
func TestReconcileOnce_NotRunning_FlipsOffline(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{running: map[string]bool{"ws-dead": false}}
off := &recordingOffline{}
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-dead"))
reconcileOnce(context.Background(), checker, off.handler())
if got := off.got(); len(got) != 1 || got[0] != "ws-dead" {
t.Fatalf("expected onOffline(ws-dead), got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_Running_DoesNotFlip — healthy steady state. CP
// reports (true, nil); the workspace stays online, onOffline is NOT
// called.
func TestReconcileOnce_Running_DoesNotFlip(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{running: map[string]bool{"ws-alive": true}}
off := &recordingOffline{}
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-alive"))
reconcileOnce(context.Background(), checker, off.handler())
if got := off.got(); len(got) != 0 {
t.Fatalf("running workspace must NOT be flipped offline, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_TransientError_DoesNotFlip — FAIL-SAFE contract.
// IsRunning returns (true, err) on a transient DB/transport blip; the
// reconciler MUST NOT flip the workspace offline. This is the guardrail
// that stops a CP outage from cascading every healthy workspace through
// reprovision.
func TestReconcileOnce_TransientError_DoesNotFlip(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{
errs: map[string]error{"ws-blip": errors.New("cp provisioner: status: connection reset")},
}
off := &recordingOffline{}
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-blip"))
reconcileOnce(context.Background(), checker, off.handler())
if got := off.got(); len(got) != 0 {
t.Fatalf("fail-safe violated: transient IsRunning error must NOT flip offline, got %v", got)
}
if calls := checker.calls; len(calls) != 1 || calls[0] != "ws-blip" {
t.Fatalf("expected IsRunning(ws-blip), got %v", checker.calls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline — pins the
// SELECT predicate. The regex in expectReconcileQuery requires
// status='online' AND runtime <> 'external'; if a future edit widens the
// scope to include paused/hibernated/removed rows or external rows (owned
// by the heartbeat pass), this query no longer matches and sqlmock fails
// the test. With the predicate intact, a DB that has only out-of-scope
// rows returns empty → no IsRunning, no flip.
func TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{}
off := &recordingOffline{}
// The predicate filters out external + non-online rows server-side,
// modelled as the empty result those filters produce.
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
reconcileOnce(context.Background(), checker, off.handler())
if len(checker.calls) != 0 {
t.Fatalf("out-of-scope rows must never reach IsRunning, got %v", checker.calls)
}
if got := off.got(); len(got) != 0 {
t.Fatalf("expected no offline flips for out-of-scope rows, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_MixedBatch — each row is judged independently: the
// dead one flips, the alive one and the transient-error one don't.
func TestReconcileOnce_MixedBatch(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{
running: map[string]bool{"ws-dead": false, "ws-alive": true},
errs: map[string]error{"ws-blip": errors.New("503")},
}
off := &recordingOffline{}
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).
AddRow("ws-dead").
AddRow("ws-alive").
AddRow("ws-blip"))
reconcileOnce(context.Background(), checker, off.handler())
if got := off.got(); len(got) != 1 || got[0] != "ws-dead" {
t.Fatalf("expected only ws-dead flipped, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_QueryError — DB transient failure. Reconcile returns
// without panicking and never probes IsRunning or flips anything.
func TestReconcileOnce_QueryError(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{}
off := &recordingOffline{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(CPInstanceReconcileLimit).
WillReturnError(errors.New("connection refused"))
reconcileOnce(context.Background(), checker, off.handler())
if len(checker.calls) != 0 || len(off.got()) != 0 {
t.Fatalf("query error must short-circuit; calls=%v offline=%v", checker.calls, off.got())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReconcileOnce_NilDB — defensive against db.DB being nil. Must not
// panic, must not probe, must not flip.
func TestReconcileOnce_NilDB(t *testing.T) {
saved := db.DB
db.DB = nil
t.Cleanup(func() { db.DB = saved })
checker := &fakeRunningChecker{}
off := &recordingOffline{}
reconcileOnce(context.Background(), checker, off.handler())
if len(checker.calls) != 0 || len(off.got()) != 0 {
t.Fatalf("nil db.DB must short-circuit; calls=%v offline=%v", checker.calls, off.got())
}
}
// TestStartCPInstanceReconciler_NilCheckerDisabled — boot-safety: a SaaS
// CP without cpProv configured must not start the loop (immediate return,
// no goroutine leak).
func TestStartCPInstanceReconciler_NilCheckerDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
StartCPInstanceReconciler(ctx, nil, nil, 60*time.Second)
close(done)
}()
select {
case <-done:
// expected — nil checker short-circuits.
case <-time.After(500 * time.Millisecond):
t.Fatal("StartCPInstanceReconciler(nil) did not return immediately")
}
}
// TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel —
// cadence contract: one sweep at boot (so a restart starts healing
// immediately), and the loop terminates on ctx cancel.
func TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel(t *testing.T) {
mock := setupTestDB(t)
checker := &fakeRunningChecker{}
off := &recordingOffline{}
// Boot sweep query. The 60s ticker won't fire inside the test window;
// register a second optional expectation so a stray tick can't fail.
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
StartCPInstanceReconciler(ctx, checker, off.handler(), 60*time.Second)
close(done)
}()
time.Sleep(100 * time.Millisecond)
cancel()
select {
case <-done:
// expected
case <-time.After(2 * time.Second):
t.Fatal("StartCPInstanceReconciler did not exit on ctx cancel")
}
}