fix(workspace-server): CP orphan sweeper closes deprovision split-write race (#2989)
Some checks failed
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
Harness Replays / detect-changes (pull_request) Successful in 8s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 6s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 4s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 5s
CI / Python Lint & Test (pull_request) Successful in 3s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 18s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 43s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m19s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m22s
Harness Replays / Harness Replays (pull_request) Failing after 37s
CI / Platform (Go) (pull_request) Failing after 2m33s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 4m48s
Some checks failed
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
Harness Replays / detect-changes (pull_request) Successful in 8s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 6s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 4s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 5s
CI / Python Lint & Test (pull_request) Successful in 3s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 18s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 43s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m19s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m22s
Harness Replays / Harness Replays (pull_request) Failing after 37s
CI / Platform (Go) (pull_request) Failing after 2m33s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 4m48s
The deprovision path marks `workspaces.status='removed'` BEFORE calling the controlplane DELETE. If that CP call fails (transient 5xx, network hiccup, AWS provider error), the DB row stays at 'removed' with `instance_id` populated and there's no retry — the EC2 lives forever. 9 prod orphans accumulated over 3 days under this bug. Adds a SaaS-mode counterpart to the existing Docker `orphan_sweeper`: - 60s tick (matches the Docker sweeper cadence) - LIMIT 100 per cycle so a sustained CP outage drains over multiple cycles without blowing the request timeout - Re-issues `cpProv.Stop` for any workspace at status='removed' with a non-NULL `instance_id`. Stop is idempotent (AWS terminate on already-terminated is a no-op; CP's Deprovision tolerates already- deleted DNS) so retries are safe. - On Stop success, NULLs `instance_id` so the next cycle skips the row. - On Stop failure, leaves `instance_id` populated for next cycle. The existing Docker sweeper is gated on `prov != nil`; the new sweeper is gated on `cpProv != nil`. SaaS tenants get exactly one of the two, self-hosted tenants get the Docker one — no overlap. Why this shape over option A (CP-first ordering) or B (durable outbox): the existing inline path already returns a loud 500 to the user when CP fails — the only missing piece is automatic retry, which a 60s sweeper provides without protocol changes, new tables, or new workers. ~30 LOC of production code vs. ~400 for an outbox. RFC discussion in #2989 comment chain. Tests: - 9 unit tests covering happy path, Stop failure, UPDATE failure, multiple orphans (one-fails-others-still-process), DB query error, nil-DB defense, nil-reaper short-circuit, and the boot-immediate-then- tick cadence contract. - Mutation-tested: status='running' substitution and removed-UPDATE- block both fail at least one test. Out of scope: - Backfilling the 9 named orphans — they'll heal automatically on the first sweep cycle after this lands; no manual cleanup needed. - Long-term durable-outbox architecture — separate RFC.
This commit is contained in:
parent
55ef3176ed
commit
3cdb67f27e
@ -266,6 +266,19 @@ func main() {
|
||||
})
|
||||
}
|
||||
|
||||
// CP-mode orphan sweeper — SaaS counterpart to the Docker sweeper
|
||||
// above. Re-issues cpProv.Stop for any workspace at status='removed'
|
||||
// with a non-NULL instance_id, healing the deprovision split-write
|
||||
// race documented in #2989: tenant marks status='removed' BEFORE
|
||||
// calling CP DELETE, so a transient CP failure leaves the EC2
|
||||
// running with no retry path. cpProv.Stop is idempotent against
|
||||
// already-terminated instances; on success we clear instance_id.
|
||||
if cpProv != nil {
|
||||
go supervised.RunWithRecover(ctx, "cp-orphan-sweeper", func(c context.Context) {
|
||||
registry.StartCPOrphanSweeper(c, cpProv)
|
||||
})
|
||||
}
|
||||
|
||||
// Pending-uploads GC sweep — deletes acked rows past their retention
|
||||
// window plus unacked rows past expires_at. Without this the
|
||||
// pending_uploads table grows unbounded; even with the 24h hard TTL,
|
||||
|
||||
149
workspace-server/internal/registry/cp_orphan_sweeper.go
Normal file
149
workspace-server/internal/registry/cp_orphan_sweeper.go
Normal file
@ -0,0 +1,149 @@
|
||||
package registry
|
||||
|
||||
// cp_orphan_sweeper.go — SaaS-mode counterpart to orphan_sweeper.go.
|
||||
//
|
||||
// The Docker sweeper (StartOrphanSweeper) runs only when prov != nil
|
||||
// (single-tenant Docker mode); SaaS tenants run cpProv != nil and prov
|
||||
// == nil, so they get no sweep coverage from that path. This file fills
|
||||
// the gap for the deprovision split-write race documented in #2989:
|
||||
//
|
||||
// 1. handlers/workspace_crud.go:365 marks workspaces.status = 'removed'.
|
||||
// 2. workspace_crud.go:439 calls StopWorkspaceAuto → cpProv.Stop, which
|
||||
// issues DELETE /cp/workspaces/:id?instance_id=… to controlplane.
|
||||
// 3. If step 2 fails (CP transient 5xx, network blip, AWS hiccup), the
|
||||
// inline path returns a 500 to the canvas — but the DB row is already
|
||||
// at status='removed' with instance_id still populated. There's no
|
||||
// retry, and the EC2 lives forever.
|
||||
//
|
||||
// This sweeper closes that gap by re-issuing cpProv.Stop on every cycle
|
||||
// for any workspace at status='removed' with a non-NULL instance_id.
|
||||
// Stop is idempotent: AWS TerminateInstance on an already-terminated
|
||||
// instance is a no-op (per AWS docs), and CP's Deprovision handler
|
||||
// (controlplane/internal/handlers/workspace_provision.go:289) handles
|
||||
// the already-terminated and already-deleted-DNS cases via best-effort
|
||||
// guards. On Stop success, the sweeper clears instance_id so the next
|
||||
// cycle skips the row.
|
||||
//
|
||||
// Cadence + safety filters mirror the Docker sweeper:
|
||||
// - 60s tick (OrphanSweepInterval)
|
||||
// - 30s per-cycle deadline (orphanSweepDeadline)
|
||||
// - LIMIT 100 per cycle so a sustained CP outage that backs up many
|
||||
// orphans doesn't blow the request timeout; subsequent cycles drain.
|
||||
//
|
||||
// SSOT note: Stop's idempotency (no-op on empty instance_id, AWS
|
||||
// terminate on already-terminated) is the load-bearing invariant. Any
|
||||
// future change that adds non-idempotent side effects to cpProv.Stop
|
||||
// must also gate this sweeper, or it will re-execute those side effects
|
||||
// every 60s for every cleared-but-not-yet-NULL row.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// CPOrphanReaper is the dependency the SaaS-mode sweeper takes from
|
||||
// the CP provisioner. *provisioner.CPProvisioner satisfies this
|
||||
// naturally; tests inject fakes.
|
||||
type CPOrphanReaper interface {
|
||||
Stop(ctx context.Context, workspaceID string) error
|
||||
}
|
||||
|
||||
// cpSweepLimit caps the per-cycle row count so a sustained CP outage
|
||||
// can't make a single sweep cycle blow orphanSweepDeadline. With a
|
||||
// 60s cadence and 100-row limit, drain rate is up to 100 orphans/min,
|
||||
// which has never been approached even during the worst leak windows.
|
||||
const cpSweepLimit = 100
|
||||
|
||||
// StartCPOrphanSweeper runs the SaaS-mode reconcile loop until ctx is
|
||||
// cancelled. nil reaper makes the loop a no-op (matches the Docker
|
||||
// sweeper's nil-tolerant pattern).
|
||||
//
|
||||
// Caller is expected to gate on `cpProv != nil` (matching how
|
||||
// StartOrphanSweeper is gated on `prov != nil` at the call site in
|
||||
// cmd/server/main.go) — passing a nil *CPProvisioner here would also
|
||||
// short-circuit but the gate at the wiring site keeps the call shape
|
||||
// symmetric across the two sweepers.
|
||||
func StartCPOrphanSweeper(ctx context.Context, reaper CPOrphanReaper) {
|
||||
if reaper == nil {
|
||||
log.Println("CP orphan sweeper: reaper is nil — sweeper disabled")
|
||||
return
|
||||
}
|
||||
log.Printf("CP orphan sweeper started — reconciling every %s", OrphanSweepInterval)
|
||||
ticker := time.NewTicker(OrphanSweepInterval)
|
||||
defer ticker.Stop()
|
||||
cpSweepOnce(ctx, reaper)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("CP orphan sweeper: shutdown")
|
||||
return
|
||||
case <-ticker.C:
|
||||
cpSweepOnce(ctx, reaper)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cpSweepOnce executes one reconcile pass. Defensive against db.DB
|
||||
// being nil so a misconfigured boot doesn't panic.
|
||||
func cpSweepOnce(parent context.Context, reaper CPOrphanReaper) {
|
||||
if db.DB == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(parent, orphanSweepDeadline)
|
||||
defer cancel()
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id::text
|
||||
FROM workspaces
|
||||
WHERE status = 'removed'
|
||||
AND instance_id IS NOT NULL
|
||||
AND instance_id != ''
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT $1
|
||||
`, cpSweepLimit)
|
||||
if err != nil {
|
||||
log.Printf("CP orphan sweeper: DB query failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var orphanIDs []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if scanErr := rows.Scan(&id); scanErr != nil {
|
||||
log.Printf("CP orphan sweeper: row scan failed: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
orphanIDs = append(orphanIDs, id)
|
||||
}
|
||||
if iterErr := rows.Err(); iterErr != nil {
|
||||
log.Printf("CP orphan sweeper: rows iteration failed: %v", iterErr)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range orphanIDs {
|
||||
log.Printf("CP orphan sweeper: terminating leaked EC2 for removed workspace %s", id)
|
||||
if stopErr := reaper.Stop(ctx, id); stopErr != nil {
|
||||
// CP-side error — transient 5xx, network, AWS hiccup. Leave
|
||||
// instance_id populated so the next cycle retries. Loud-fail
|
||||
// only at the log layer; the user-visible 500 was already
|
||||
// returned by the inline path that triggered this orphan.
|
||||
log.Printf("CP orphan sweeper: Stop failed for %s: %v — retry next cycle", id, stopErr)
|
||||
continue
|
||||
}
|
||||
// Stop succeeded — clear instance_id so the next cycle skips this
|
||||
// row. We can't use a tombstone column (no schema change in this
|
||||
// PR); NULL'ing instance_id is the SSOT signal for "no live
|
||||
// EC2 attached." The matching SELECT predicate above stays in
|
||||
// sync with this UPDATE.
|
||||
if _, updErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET instance_id = NULL, updated_at = now() WHERE id = $1`,
|
||||
id,
|
||||
); updErr != nil {
|
||||
log.Printf("CP orphan sweeper: clear instance_id failed for %s: %v — next cycle will re-Stop (idempotent)", id, updErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
266
workspace-server/internal/registry/cp_orphan_sweeper_test.go
Normal file
266
workspace-server/internal/registry/cp_orphan_sweeper_test.go
Normal file
@ -0,0 +1,266 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
|
||||
// sweeper tests. Records every Stop call so tests can assert which
|
||||
// workspace IDs were re-issued.
|
||||
type fakeCPReaper struct {
|
||||
mu sync.Mutex
|
||||
stopErr map[string]error
|
||||
stopCalls []string
|
||||
}
|
||||
|
||||
func (f *fakeCPReaper) Stop(_ context.Context, wsID string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.stopCalls = append(f.stopCalls, wsID)
|
||||
return f.stopErr[wsID]
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_StopSucceeds_ClearsInstanceID — happy path. Single
|
||||
// removed-row with non-NULL instance_id; Stop succeeds; instance_id
|
||||
// gets NULL'd so the next cycle won't re-sweep it.
|
||||
func TestCPSweepOnce_StopSucceeds_ClearsInstanceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'removed'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL, updated_at = now\(\) WHERE id = \$1`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
|
||||
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_StopFails_KeepsInstanceID — CP transient failure.
|
||||
// Stop returns an error; instance_id MUST stay populated so the next
|
||||
// cycle retries. UPDATE must NOT fire.
|
||||
func TestCPSweepOnce_StopFails_KeepsInstanceID(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{
|
||||
stopErr: map[string]error{"ws-uuid-1": errors.New("CP returned 503")},
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-uuid-1"))
|
||||
// No ExpectExec for the UPDATE — sqlmock fails the test if the
|
||||
// UPDATE fires.
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 1 || reaper.stopCalls[0] != "ws-uuid-1" {
|
||||
t.Fatalf("expected Stop(ws-uuid-1), got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations (UPDATE should NOT have fired): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_NoOrphans — empty result set is the steady state in
|
||||
// healthy operation. No Stop, no UPDATE.
|
||||
func TestCPSweepOnce_NoOrphans(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
|
||||
// independently; one failure doesn't block others.
|
||||
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{
|
||||
stopErr: map[string]error{"ws-uuid-2": errors.New("CP 503 on ws-uuid-2")},
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-uuid-1").
|
||||
AddRow("ws-uuid-2").
|
||||
AddRow("ws-uuid-3"))
|
||||
// ws-uuid-1 succeeds → UPDATE fires.
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// ws-uuid-2 fails → no UPDATE.
|
||||
// ws-uuid-3 succeeds → UPDATE fires.
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-3").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 3 {
|
||||
t.Fatalf("expected Stop on all 3 ids, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_QueryError — DB transient failure. Sweep returns
|
||||
// without panicking. No Stop calls.
|
||||
func TestCPSweepOnce_QueryError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls on query error, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_UpdateError_LogsButContinues — Stop succeeded but
|
||||
// the UPDATE to clear instance_id failed. Subsequent rows in the batch
|
||||
// must still process; comment in cpSweepOnce promises idempotent re-Stop
|
||||
// next cycle.
|
||||
func TestCPSweepOnce_UpdateError_LogsButContinues(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-uuid-1").
|
||||
AddRow("ws-uuid-2"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-1").
|
||||
WillReturnError(errors.New("UPDATE timeout"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id = NULL`).
|
||||
WithArgs("ws-uuid-2").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 2 {
|
||||
t.Fatalf("expected Stop on both ids despite UPDATE error on first, got %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_NilDB — defensive against db.DB being nil. Must not
|
||||
// panic; must not call Stop.
|
||||
func TestCPSweepOnce_NilDB(t *testing.T) {
|
||||
saved := db.DB
|
||||
db.DB = nil
|
||||
t.Cleanup(func() { db.DB = saved })
|
||||
|
||||
reaper := &fakeCPReaper{}
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("expected zero Stop calls when db.DB is nil, got %v", reaper.stopCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPOrphanSweeper_NilReaperDisabled — boot-safety: a SaaS CP
|
||||
// without cpProv configured must not start the loop (immediate return,
|
||||
// no goroutine leak).
|
||||
func TestStartCPOrphanSweeper_NilReaperDisabled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPOrphanSweeper(ctx, nil)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
// expected — nil reaper short-circuits.
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("StartCPOrphanSweeper(nil) did not return immediately")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick — cadence
|
||||
// contract: kick off one sweep at boot (so a platform restart starts
|
||||
// healing immediately), then once per OrphanSweepInterval. Verifies
|
||||
// the loop terminates on ctx cancel.
|
||||
func TestStartCPOrphanSweeper_RunsOnceImmediatelyAndOnTick(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
// Two sweeps within the test window: one immediate, one on the
|
||||
// first tick. We can't shrink OrphanSweepInterval (it's a const),
|
||||
// so assert "at least one immediate sweep" and let cancel close
|
||||
// the loop.
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
// The ticker may or may not fire in the test window depending on
|
||||
// scheduler; tolerate both shapes by registering a second optional
|
||||
// expectation. sqlmock fails on UNREGISTERED queries, so register
|
||||
// one more then accept either 1 or 2 fires.
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPOrphanSweeper(ctx, reaper)
|
||||
close(done)
|
||||
}()
|
||||
// 100ms is well past the boot-sweep but well shy of the 60s
|
||||
// interval, so the second query expectation is intentionally
|
||||
// unmet — that's fine, sqlmock distinguishes "expected but not
|
||||
// received" (we don't enforce here) from "unexpected query"
|
||||
// (which would fail).
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
// expected
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("StartCPOrphanSweeper did not exit on ctx cancel")
|
||||
}
|
||||
|
||||
// Boot sweep must have happened — without it, an operator restart
|
||||
// after a CP outage would leave a 60s gap before the first heal.
|
||||
// We don't assert mock.ExpectationsWereMet() here because the
|
||||
// second query is intentionally optional.
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user