fix(workspace-server): CP orphan sweeper closes deprovision split-write race (#2989) #2

Merged
claude-ceo-assistant merged 1 commits from fix/cp-orphan-sweeper-2989 into staging 2026-05-07 11:38:49 +00:00
3 changed files with 428 additions and 0 deletions

View File

@ -266,6 +266,19 @@ func main() {
})
}
// CP-mode orphan sweeper — SaaS counterpart to the Docker sweeper
// above. Re-issues cpProv.Stop for any workspace at status='removed'
// with a non-NULL instance_id, healing the deprovision split-write
// race documented in #2989: tenant marks status='removed' BEFORE
// calling CP DELETE, so a transient CP failure leaves the EC2
// running with no retry path. cpProv.Stop is idempotent against
// already-terminated instances; on success we clear instance_id.
if cpProv != nil {
go supervised.RunWithRecover(ctx, "cp-orphan-sweeper", func(c context.Context) {
registry.StartCPOrphanSweeper(c, cpProv)
})
}
// Pending-uploads GC sweep — deletes acked rows past their retention
// window plus unacked rows past expires_at. Without this the
// pending_uploads table grows unbounded; even with the 24h hard TTL,

View File

@ -0,0 +1,149 @@
package registry
// cp_orphan_sweeper.go — SaaS-mode counterpart to orphan_sweeper.go.
//
// The Docker sweeper (StartOrphanSweeper) runs only when prov != nil
// (single-tenant Docker mode); SaaS tenants run cpProv != nil and prov
// == nil, so they get no sweep coverage from that path. This file fills
// the gap for the deprovision split-write race documented in #2989:
//
// 1. handlers/workspace_crud.go:365 marks workspaces.status = 'removed'.
// 2. workspace_crud.go:439 calls StopWorkspaceAuto → cpProv.Stop, which
// issues DELETE /cp/workspaces/:id?instance_id=… to controlplane.
// 3. If step 2 fails (CP transient 5xx, network blip, AWS hiccup), the
// inline path returns a 500 to the canvas — but the DB row is already
// at status='removed' with instance_id still populated. There's no
// retry, and the EC2 lives forever.
//
// This sweeper closes that gap by re-issuing cpProv.Stop on every cycle
// for any workspace at status='removed' with a non-NULL instance_id.
// Stop is idempotent: AWS TerminateInstance on an already-terminated
// instance is a no-op (per AWS docs), and CP's Deprovision handler
// (controlplane/internal/handlers/workspace_provision.go:289) handles
// the already-terminated and already-deleted-DNS cases via best-effort
// guards. On Stop success, the sweeper clears instance_id so the next
// cycle skips the row.
//
// Cadence + safety filters mirror the Docker sweeper:
// - 60s tick (OrphanSweepInterval)
// - 30s per-cycle deadline (orphanSweepDeadline)
// - LIMIT 100 per cycle so a sustained CP outage that backs up many
// orphans doesn't blow the request timeout; subsequent cycles drain.
//
// SSOT note: Stop's idempotency (no-op on empty instance_id, AWS
// terminate on already-terminated) is the load-bearing invariant. Any
// future change that adds non-idempotent side effects to cpProv.Stop
// must also gate this sweeper, or it will re-execute those side effects
// every 60s for every cleared-but-not-yet-NULL row.
import (
"context"
"log"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// CPOrphanReaper is the dependency the SaaS-mode sweeper takes from
// the CP provisioner. *provisioner.CPProvisioner satisfies this
// naturally; tests inject fakes.
type CPOrphanReaper interface {
Stop(ctx context.Context, workspaceID string) error
}
// cpSweepLimit caps the per-cycle row count so a sustained CP outage
// can't make a single sweep cycle blow orphanSweepDeadline. With a
// 60s cadence and 100-row limit, drain rate is up to 100 orphans/min,
// which has never been approached even during the worst leak windows.
const cpSweepLimit = 100
// StartCPOrphanSweeper runs the SaaS-mode reconcile loop until ctx is
// cancelled. nil reaper makes the loop a no-op (matches the Docker
// sweeper's nil-tolerant pattern).
//
// Caller is expected to gate on `cpProv != nil` (matching how
// StartOrphanSweeper is gated on `prov != nil` at the call site in
// cmd/server/main.go) — passing a nil *CPProvisioner here would also
// short-circuit but the gate at the wiring site keeps the call shape
// symmetric across the two sweepers.
func StartCPOrphanSweeper(ctx context.Context, reaper CPOrphanReaper) {
if reaper == nil {
log.Println("CP orphan sweeper: reaper is nil — sweeper disabled")
return
}
log.Printf("CP orphan sweeper started — reconciling every %s", OrphanSweepInterval)
ticker := time.NewTicker(OrphanSweepInterval)
defer ticker.Stop()
cpSweepOnce(ctx, reaper)
for {
select {
case <-ctx.Done():
log.Println("CP orphan sweeper: shutdown")
return
case <-ticker.C:
cpSweepOnce(ctx, reaper)
}
}
}
// cpSweepOnce executes one reconcile pass. Defensive against db.DB
// being nil so a misconfigured boot doesn't panic.
func cpSweepOnce(parent context.Context, reaper CPOrphanReaper) {
if db.DB == nil {
return
}
ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline)
defer cancel()
rows, err := db.DB.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status = 'removed'
AND instance_id IS NOT NULL
AND instance_id != ''
ORDER BY updated_at DESC
LIMIT $1
`, cpSweepLimit)
if err != nil {
log.Printf("CP orphan sweeper: DB query failed: %v", err)
return
}
defer rows.Close()
var orphanIDs []string
for rows.Next() {
var id string
if scanErr := rows.Scan(&id); scanErr != nil {
log.Printf("CP orphan sweeper: row scan failed: %v", scanErr)
continue
}
orphanIDs = append(orphanIDs, id)
}
if iterErr := rows.Err(); iterErr != nil {
log.Printf("CP orphan sweeper: rows iteration failed: %v", iterErr)
return
}
for _, id := range orphanIDs {
log.Printf("CP orphan sweeper: terminating leaked EC2 for removed workspace %s", id)
if stopErr := reaper.Stop(ctx, id); stopErr != nil {
// CP-side error — transient 5xx, network, AWS hiccup. Leave
// instance_id populated so the next cycle retries. Loud-fail
// only at the log layer; the user-visible 500 was already
// returned by the inline path that triggered this orphan.
log.Printf("CP orphan sweeper: Stop failed for %s: %v — retry next cycle", id, stopErr)
continue
}
// Stop succeeded — clear instance_id so the next cycle skips this
// row. We can't use a tombstone column (no schema change in this
// PR); NULL'ing instance_id is the SSOT signal for "no live
// EC2 attached." The matching SELECT predicate above stays in
// sync with this UPDATE.
if _, updErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET instance_id = NULL, updated_at = now() WHERE id = $1`,
id,
); updErr != nil {
log.Printf("CP orphan sweeper: clear instance_id failed for %s: %v — next cycle will re-Stop (idempotent)", id, updErr)
}
}
}

View File

@ -0,0 +1,266 @@
package registry
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
// sweeper tests. Records every Stop call so tests can assert which
// workspace IDs were re-issued.
type fakeCPReaper struct {
mu sync.Mutex
stopErr map[string]error
stopCalls []string
}
func (f *fakeCPReaper) Stop(_ context.Context, wsID string) error {
f.mu.Lock()
defer f.mu.Unlock()
f.stopCalls = append(f.stopCalls, wsID)
return f.stopErr[wsID]
}
// TestCPSweepOnce_StopSucceeds_ClearsInstanceID — happy path. Single
// removed-row with non-NULL instance_id; Stop succeeds; instance_id
// gets NULL'd so the next cycle won't re-sweep it.
func TestCPSweepOnce_StopSucceeds_ClearsInstanceID(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'removed'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL, updated_at = now\(\) WHERE id = \$1`).
WithArgs("ws-uuid-1").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_StopFails_KeepsInstanceID — CP transient failure.
// Stop returns an error; instance_id MUST stay populated so the next
// cycle retries. UPDATE must NOT fire.
func TestCPSweepOnce_StopFails_KeepsInstanceID(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{
stopErr: map[string]error{"ws-uuid-1": errors.New("CP returned 503")},
}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
// No ExpectExec for the UPDATE — sqlmock fails the test if the
// UPDATE fires.
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations (UPDATE should NOT have fired): %v", err)
}
}
// TestCPSweepOnce_NoOrphans — empty result set is the steady state in
// healthy operation. No Stop, no UPDATE.
func TestCPSweepOnce_NoOrphans(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
// independently; one failure doesn't block others.
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{
stopErr: map[string]error{"ws-uuid-2": errors.New("CP 503 on ws-uuid-2")},
}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-uuid-1").
AddRow("ws-uuid-2").
AddRow("ws-uuid-3"))
// ws-uuid-1 succeeds → UPDATE fires.
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-1").
WillReturnResult(sqlmock.NewResult(0, 1))
// ws-uuid-2 fails → no UPDATE.
// ws-uuid-3 succeeds → UPDATE fires.
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-3").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 3 {
t.Fatalf("expected Stop on all 3 ids, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_QueryError — DB transient failure. Sweep returns
// without panicking. No Stop calls.
func TestCPSweepOnce_QueryError(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnError(errors.New("connection refused"))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls on query error, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_UpdateError_LogsButContinues — Stop succeeded but
// the UPDATE to clear instance_id failed. Subsequent rows in the batch
// must still process; comment in cpSweepOnce promises idempotent re-Stop
// next cycle.
func TestCPSweepOnce_UpdateError_LogsButContinues(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-uuid-1").
AddRow("ws-uuid-2"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-1").
WillReturnError(errors.New("UPDATE timeout"))
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
WithArgs("ws-uuid-2").
WillReturnResult(sqlmock.NewResult(0, 1))
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 2 {
t.Fatalf("expected Stop on both ids despite UPDATE error on first, got %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestCPSweepOnce_NilDB — defensive against db.DB being nil. Must not
// panic; must not call Stop.
func TestCPSweepOnce_NilDB(t *testing.T) {
saved := db.DB
db.DB = nil
t.Cleanup(func() { db.DB = saved })
reaper := &fakeCPReaper{}
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("expected zero Stop calls when db.DB is nil, got %v", reaper.stopCalls)
}
}
// TestStartCPOrphanSweeper_NilReaperDisabled — boot-safety: a SaaS CP
// without cpProv configured must not start the loop (immediate return,
// no goroutine leak).
func TestStartCPOrphanSweeper_NilReaperDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan struct{})
go func() {
StartCPOrphanSweeper(ctx, nil)
close(done)
}()
select {
case <-done:
// expected — nil reaper short-circuits.
case <-time.After(500 * time.Millisecond):
t.Fatal("StartCPOrphanSweeper(nil) did not return immediately")
}
}
// TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick — cadence
// contract: kick off one sweep at boot (so a platform restart starts
// healing immediately), then once per OrphanSweepInterval. Verifies
// the loop terminates on ctx cancel.
func TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
// Two sweeps within the test window: one immediate, one on the
// first tick. We can't shrink OrphanSweepInterval (it's a const),
// so assert "at least one immediate sweep" and let cancel close
// the loop.
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
// The ticker may or may not fire in the test window depending on
// scheduler; tolerate both shapes by registering a second optional
// expectation. sqlmock fails on UNREGISTERED queries, so register
// one more then accept either 1 or 2 fires.
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
StartCPOrphanSweeper(ctx, reaper)
close(done)
}()
// 100ms is well past the boot-sweep but well shy of the 60s
// interval, so the second query expectation is intentionally
// unmet — that's fine, sqlmock distinguishes "expected but not
// received" (we don't enforce here) from "unexpected query"
// (which would fail).
time.Sleep(100 * time.Millisecond)
cancel()
select {
case <-done:
// expected
case <-time.After(2 * time.Second):
t.Fatal("StartCPOrphanSweeper did not exit on ctx cancel")
}
// Boot sweep must have happened — without it, an operator restart
// after a CP outage would leave a 60s gap before the first heal.
// We don't assert mock.ExpectationsWereMet() here because the
// second query is intentionally optional.
}