diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 45597367..cba0334c 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -266,6 +266,19 @@ func main() { }) } + // CP-mode orphan sweeper — SaaS counterpart to the Docker sweeper + // above. Re-issues cpProv.Stop for any workspace at status='removed' + // with a non-NULL instance_id, healing the deprovision split-write + // race documented in #2989: tenant marks status='removed' BEFORE + // calling CP DELETE, so a transient CP failure leaves the EC2 + // running with no retry path. cpProv.Stop is idempotent against + // already-terminated instances; on success we clear instance_id. + if cpProv != nil { + go supervised.RunWithRecover(ctx, "cp-orphan-sweeper", func(c context.Context) { + registry.StartCPOrphanSweeper(c, cpProv) + }) + } + // Pending-uploads GC sweep — deletes acked rows past their retention // window plus unacked rows past expires_at. Without this the // pending_uploads table grows unbounded; even with the 24h hard TTL, diff --git a/workspace-server/internal/registry/cp_orphan_sweeper.go b/workspace-server/internal/registry/cp_orphan_sweeper.go new file mode 100644 index 00000000..1dc4906d --- /dev/null +++ b/workspace-server/internal/registry/cp_orphan_sweeper.go @@ -0,0 +1,149 @@ +package registry + +// cp_orphan_sweeper.go — SaaS-mode counterpart to orphan_sweeper.go. +// +// The Docker sweeper (StartOrphanSweeper) runs only when prov != nil +// (single-tenant Docker mode); SaaS tenants run cpProv != nil and prov +// == nil, so they get no sweep coverage from that path. This file fills +// the gap for the deprovision split-write race documented in #2989: +// +// 1. handlers/workspace_crud.go:365 marks workspaces.status = 'removed'. +// 2. workspace_crud.go:439 calls StopWorkspaceAuto → cpProv.Stop, which +// issues DELETE /cp/workspaces/:id?instance_id=… to controlplane. +// 3. If step 2 fails (CP transient 5xx, network blip, AWS hiccup), the +// inline path returns a 500 to the canvas — but the DB row is already +// at status='removed' with instance_id still populated. There's no +// retry, and the EC2 lives forever. +// +// This sweeper closes that gap by re-issuing cpProv.Stop on every cycle +// for any workspace at status='removed' with a non-NULL instance_id. +// Stop is idempotent: AWS TerminateInstance on an already-terminated +// instance is a no-op (per AWS docs), and CP's Deprovision handler +// (controlplane/internal/handlers/workspace_provision.go:289) handles +// the already-terminated and already-deleted-DNS cases via best-effort +// guards. On Stop success, the sweeper clears instance_id so the next +// cycle skips the row. +// +// Cadence + safety filters mirror the Docker sweeper: +// - 60s tick (OrphanSweepInterval) +// - 30s per-cycle deadline (orphanSweepDeadline) +// - LIMIT 100 per cycle so a sustained CP outage that backs up many +// orphans doesn't blow the request timeout; subsequent cycles drain. +// +// SSOT note: Stop's idempotency (no-op on empty instance_id, AWS +// terminate on already-terminated) is the load-bearing invariant. Any +// future change that adds non-idempotent side effects to cpProv.Stop +// must also gate this sweeper, or it will re-execute those side effects +// every 60s for every cleared-but-not-yet-NULL row. + +import ( + "context" + "log" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// CPOrphanReaper is the dependency the SaaS-mode sweeper takes from +// the CP provisioner. *provisioner.CPProvisioner satisfies this +// naturally; tests inject fakes. +type CPOrphanReaper interface { + Stop(ctx context.Context, workspaceID string) error +} + +// cpSweepLimit caps the per-cycle row count so a sustained CP outage +// can't make a single sweep cycle blow orphanSweepDeadline. With a +// 60s cadence and 100-row limit, drain rate is up to 100 orphans/min, +// which has never been approached even during the worst leak windows. +const cpSweepLimit = 100 + +// StartCPOrphanSweeper runs the SaaS-mode reconcile loop until ctx is +// cancelled. nil reaper makes the loop a no-op (matches the Docker +// sweeper's nil-tolerant pattern). +// +// Caller is expected to gate on `cpProv != nil` (matching how +// StartOrphanSweeper is gated on `prov != nil` at the call site in +// cmd/server/main.go) — passing a nil *CPProvisioner here would also +// short-circuit but the gate at the wiring site keeps the call shape +// symmetric across the two sweepers. +func StartCPOrphanSweeper(ctx context.Context, reaper CPOrphanReaper) { + if reaper == nil { + log.Println("CP orphan sweeper: reaper is nil — sweeper disabled") + return + } + log.Printf("CP orphan sweeper started — reconciling every %s", OrphanSweepInterval) + ticker := time.NewTicker(OrphanSweepInterval) + defer ticker.Stop() + cpSweepOnce(ctx, reaper) + for { + select { + case <-ctx.Done(): + log.Println("CP orphan sweeper: shutdown") + return + case <-ticker.C: + cpSweepOnce(ctx, reaper) + } + } +} + +// cpSweepOnce executes one reconcile pass. Defensive against db.DB +// being nil so a misconfigured boot doesn't panic. +func cpSweepOnce(parent context.Context, reaper CPOrphanReaper) { + if db.DB == nil { + return + } + ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline) + defer cancel() + + rows, err := db.DB.QueryContext(ctx, ` + SELECT id::text + FROM workspaces + WHERE status = 'removed' + AND instance_id IS NOT NULL + AND instance_id != '' + ORDER BY updated_at DESC + LIMIT $1 + `, cpSweepLimit) + if err != nil { + log.Printf("CP orphan sweeper: DB query failed: %v", err) + return + } + defer rows.Close() + + var orphanIDs []string + for rows.Next() { + var id string + if scanErr := rows.Scan(&id); scanErr != nil { + log.Printf("CP orphan sweeper: row scan failed: %v", scanErr) + continue + } + orphanIDs = append(orphanIDs, id) + } + if iterErr := rows.Err(); iterErr != nil { + log.Printf("CP orphan sweeper: rows iteration failed: %v", iterErr) + return + } + + for _, id := range orphanIDs { + log.Printf("CP orphan sweeper: terminating leaked EC2 for removed workspace %s", id) + if stopErr := reaper.Stop(ctx, id); stopErr != nil { + // CP-side error — transient 5xx, network, AWS hiccup. Leave + // instance_id populated so the next cycle retries. Loud-fail + // only at the log layer; the user-visible 500 was already + // returned by the inline path that triggered this orphan. + log.Printf("CP orphan sweeper: Stop failed for %s: %v — retry next cycle", id, stopErr) + continue + } + // Stop succeeded — clear instance_id so the next cycle skips this + // row. We can't use a tombstone column (no schema change in this + // PR); NULL'ing instance_id is the SSOT signal for "no live + // EC2 attached." The matching SELECT predicate above stays in + // sync with this UPDATE. + if _, updErr := db.DB.ExecContext(ctx, + `UPDATE workspaces SET instance_id = NULL, updated_at = now() WHERE id = $1`, + id, + ); updErr != nil { + log.Printf("CP orphan sweeper: clear instance_id failed for %s: %v — next cycle will re-Stop (idempotent)", id, updErr) + } + } +} diff --git a/workspace-server/internal/registry/cp_orphan_sweeper_test.go b/workspace-server/internal/registry/cp_orphan_sweeper_test.go new file mode 100644 index 00000000..f2d57d0e --- /dev/null +++ b/workspace-server/internal/registry/cp_orphan_sweeper_test.go @@ -0,0 +1,266 @@ +package registry + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode +// sweeper tests. Records every Stop call so tests can assert which +// workspace IDs were re-issued. +type fakeCPReaper struct { + mu sync.Mutex + stopErr map[string]error + stopCalls []string +} + +func (f *fakeCPReaper) Stop(_ context.Context, wsID string) error { + f.mu.Lock() + defer f.mu.Unlock() + f.stopCalls = append(f.stopCalls, wsID) + return f.stopErr[wsID] +} + +// TestCPSweepOnce_StopSucceeds_ClearsInstanceID — happy path. Single +// removed-row with non-NULL instance_id; Stop succeeds; instance_id +// gets NULL'd so the next cycle won't re-sweep it. +func TestCPSweepOnce_StopSucceeds_ClearsInstanceID(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'removed'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+ORDER BY updated_at DESC\s+LIMIT \$1`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1")) + mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL, updated_at = now\(\) WHERE id = \$1`). + WithArgs("ws-uuid-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" { + t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestCPSweepOnce_StopFails_KeepsInstanceID — CP transient failure. +// Stop returns an error; instance_id MUST stay populated so the next +// cycle retries. UPDATE must NOT fire. +func TestCPSweepOnce_StopFails_KeepsInstanceID(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{ + stopErr: map[string]error{"ws-uuid-1": errors.New("CP returned 503")}, + } + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1")) + // No ExpectExec for the UPDATE — sqlmock fails the test if the + // UPDATE fires. + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" { + t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations (UPDATE should NOT have fired): %v", err) + } +} + +// TestCPSweepOnce_NoOrphans — empty result set is the steady state in +// healthy operation. No Stop, no UPDATE. +func TestCPSweepOnce_NoOrphans(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"})) + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Fatalf("expected zero Stop calls, got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd +// independently; one failure doesn't block others. +func TestCPSweepOnce_MultipleOrphans(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{ + stopErr: map[string]error{"ws-uuid-2": errors.New("CP 503 on ws-uuid-2")}, + } + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("ws-uuid-1"). + AddRow("ws-uuid-2"). + AddRow("ws-uuid-3")) + // ws-uuid-1 succeeds → UPDATE fires. + mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`). + WithArgs("ws-uuid-1"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // ws-uuid-2 fails → no UPDATE. + // ws-uuid-3 succeeds → UPDATE fires. + mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`). + WithArgs("ws-uuid-3"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 3 { + t.Fatalf("expected Stop on all 3 ids, got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestCPSweepOnce_QueryError — DB transient failure. Sweep returns +// without panicking. No Stop calls. +func TestCPSweepOnce_QueryError(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnError(errors.New("connection refused")) + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Fatalf("expected zero Stop calls on query error, got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestCPSweepOnce_UpdateError_LogsButContinues — Stop succeeded but +// the UPDATE to clear instance_id failed. Subsequent rows in the batch +// must still process; comment in cpSweepOnce promises idempotent re-Stop +// next cycle. +func TestCPSweepOnce_UpdateError_LogsButContinues(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("ws-uuid-1"). + AddRow("ws-uuid-2")) + mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`). + WithArgs("ws-uuid-1"). + WillReturnError(errors.New("UPDATE timeout")) + mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`). + WithArgs("ws-uuid-2"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 2 { + t.Fatalf("expected Stop on both ids despite UPDATE error on first, got %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestCPSweepOnce_NilDB — defensive against db.DB being nil. Must not +// panic; must not call Stop. +func TestCPSweepOnce_NilDB(t *testing.T) { + saved := db.DB + db.DB = nil + t.Cleanup(func() { db.DB = saved }) + + reaper := &fakeCPReaper{} + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Fatalf("expected zero Stop calls when db.DB is nil, got %v", reaper.stopCalls) + } +} + +// TestStartCPOrphanSweeper_NilReaperDisabled — boot-safety: a SaaS CP +// without cpProv configured must not start the loop (immediate return, +// no goroutine leak). +func TestStartCPOrphanSweeper_NilReaperDisabled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + StartCPOrphanSweeper(ctx, nil) + close(done) + }() + select { + case <-done: + // expected — nil reaper short-circuits. + case <-time.After(500 * time.Millisecond): + t.Fatal("StartCPOrphanSweeper(nil) did not return immediately") + } +} + +// TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick — cadence +// contract: kick off one sweep at boot (so a platform restart starts +// healing immediately), then once per OrphanSweepInterval. Verifies +// the loop terminates on ctx cancel. +func TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + // Two sweeps within the test window: one immediate, one on the + // first tick. We can't shrink OrphanSweepInterval (it's a const), + // so assert "at least one immediate sweep" and let cancel close + // the loop. + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"})) + // The ticker may or may not fire in the test window depending on + // scheduler; tolerate both shapes by registering a second optional + // expectation. sqlmock fails on UNREGISTERED queries, so register + // one more then accept either 1 or 2 fires. + mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"})) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + StartCPOrphanSweeper(ctx, reaper) + close(done) + }() + // 100ms is well past the boot-sweep but well shy of the 60s + // interval, so the second query expectation is intentionally + // unmet — that's fine, sqlmock distinguishes "expected but not + // received" (we don't enforce here) from "unexpected query" + // (which would fail). + time.Sleep(100 * time.Millisecond) + cancel() + select { + case <-done: + // expected + case <-time.After(2 * time.Second): + t.Fatal("StartCPOrphanSweeper did not exit on ctx cancel") + } + + // Boot sweep must have happened — without it, an operator restart + // after a CP outage would leave a 60s gap before the first heal. + // We don't assert mock.ExpectationsWereMet() here because the + // second query is intentionally optional. +}