feat(registry): reconcile online workspaces against real EC2 state — auto-heal terminated instances (core#2261) #2266
@@ -337,6 +337,25 @@ func main() {
|
||||
})
|
||||
}
|
||||
|
||||
// CP-mode instance-state reconciler — authoritative EC2-liveness pass
|
||||
// for SaaS workspaces (core#2261). Every other liveness sweep keys off
|
||||
// a PROXY (Redis TTL, agent heartbeat, local Docker, or
|
||||
// runtime='external'); a SaaS claude-code workspace whose EC2 was
|
||||
// terminated/stopped falls through ALL of them and stays status='online'
|
||||
// pointing at a dead instance_id forever (root cause: core#2247). This
|
||||
// loop asks the ONE authoritative question the others lack —
|
||||
// cpProv.IsRunning (CP DescribeInstances-equivalent) — for each online
|
||||
// SaaS row, and on a CLEAN "not running" feeds it into the SAME
|
||||
// onWorkspaceOffline closure the other sweeps use (status flip +
|
||||
// RestartByID reprovision, existing volume). Fail-safe: IsRunning is
|
||||
// (true, err) on any transient error, so a CP blip never flips a healthy
|
||||
// workspace.
|
||||
if cpProv != nil {
|
||||
go supervised.RunWithRecover(ctx, "cp-instance-reconciler", func(c context.Context) {
|
||||
registry.StartCPInstanceReconciler(c, cpProv, onWorkspaceOffline, 60*time.Second)
|
||||
})
|
||||
}
|
||||
|
||||
// Pending-uploads GC sweep — deletes acked rows past their retention
|
||||
// window plus unacked rows past expires_at. Without this the
|
||||
// pending_uploads table grows unbounded; even with the 24h hard TTL,
|
||||
|
||||
@@ -0,0 +1,178 @@
|
||||
package registry
|
||||
|
||||
// cp_instance_reconciler.go — authoritative EC2-state reconcile for
|
||||
// SaaS workspaces (core#2261).
|
||||
//
|
||||
// Root cause (core#2247): every existing liveness pass keys off a PROXY
|
||||
// for "is this workspace alive?":
|
||||
//
|
||||
// - StartLivenessMonitor — Redis TTL expiry (agent stopped heartbeating).
|
||||
// - StartHealthSweep (Docker pass) — local Docker daemon (prov != nil only).
|
||||
// - StartHealthSweep (remote pass) — last_heartbeat_at freshness for
|
||||
// runtime='external' rows.
|
||||
// - StartCPOrphanSweeper — status='removed' rows with a stray instance_id.
|
||||
//
|
||||
// A SaaS claude-code workspace whose EC2 was terminated/stopped out from
|
||||
// under us (manual AWS action, spot reclaim, CP-side reap, etc.) falls
|
||||
// through ALL of them: it's not 'removed' (so the orphan sweeper skips
|
||||
// it), it's not runtime='external' (so the heartbeat pass skips it), and
|
||||
// on a pure-SaaS front-door prov == nil so the Docker pass never runs.
|
||||
// The registry kept status='online' pointing at a dead instance forever.
|
||||
//
|
||||
// This sweeper closes that gap with the ONE authoritative check the
|
||||
// others lack: CPProvisioner.IsRunning, which ultimately asks the
|
||||
// control-plane "is this EC2 actually running?" (DescribeInstances-
|
||||
// equivalent). When the answer is a CLEAN "no" it feeds the workspace
|
||||
// into the EXISTING offline/auto-heal machinery (onOffline → status flip
|
||||
// + RestartByID reprovision with the existing volume) — no new healing
|
||||
// path, just real ground truth driving the one we already have.
|
||||
//
|
||||
// Guardrails:
|
||||
// - FAIL-SAFE: IsRunning is (true, err) on any transient DB/transport
|
||||
// error and (false, nil) ONLY when CP genuinely reports the instance
|
||||
// is not running. We act ONLY on (false, nil); any err short-circuits
|
||||
// to "leave it alone" so a CP blip never flips a healthy workspace.
|
||||
// - ONLINE + SaaS ONLY: status='online', instance_id present, and
|
||||
// runtime <> 'external'. Paused/hibernated/removed/provisioning/
|
||||
// awaiting_agent rows are out of scope; external rows are covered by
|
||||
// the remote-heartbeat pass.
|
||||
// - Per-cycle row cap + per-workspace timeout so one slow CP call can't
|
||||
// stall the sweep.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
)
|
||||
|
||||
// InstanceRunningChecker is the narrow dependency the reconciler takes
|
||||
// from the CP provisioner. *provisioner.CPProvisioner satisfies this
|
||||
// naturally; tests inject fakes.
|
||||
//
|
||||
// Contract (load-bearing): IsRunning is FAIL-SAFE — it returns
|
||||
// (true, err) on transient DB/transport errors and (false, nil) ONLY
|
||||
// when CP reports the instance is genuinely not running. The reconciler
|
||||
// flips a workspace offline strictly on (false, nil).
|
||||
type InstanceRunningChecker interface {
|
||||
IsRunning(ctx context.Context, workspaceID string) (bool, error)
|
||||
}
|
||||
|
||||
// CPInstanceReconcileLimit caps the per-cycle row count so a sustained
|
||||
// CP slowdown can't make a single sweep cycle run unbounded. With a 60s
|
||||
// cadence and a per-workspace timeout below, this bounds worst-case
|
||||
// cycle wall-time and lets subsequent cycles drain any backlog.
|
||||
const CPInstanceReconcileLimit = 200
|
||||
|
||||
// cpInstanceCheckTimeout bounds a single IsRunning call so one slow CP
|
||||
// round-trip can't stall the whole sweep. Each workspace gets its own
|
||||
// timeout context derived from the cycle context.
|
||||
const cpInstanceCheckTimeout = 10 * time.Second
|
||||
|
||||
// StartCPInstanceReconciler runs the authoritative EC2-state reconcile
|
||||
// loop until ctx is cancelled. A nil checker makes the loop a no-op
|
||||
// (matches the nil-tolerant pattern of the sibling CP sweeper).
|
||||
//
|
||||
// Caller is expected to gate on `cpProv != nil` (matching how
|
||||
// StartCPOrphanSweeper is gated at the wiring site in cmd/server/main.go)
|
||||
// — passing a nil *CPProvisioner here would also short-circuit, but the
|
||||
// gate at the call site keeps the call shape symmetric across sweepers.
|
||||
//
|
||||
// interval <= 0 falls back to the default 60s cadence so a misconfigured
|
||||
// caller can't spin a zero-duration ticker (which panics).
|
||||
func StartCPInstanceReconciler(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler, interval time.Duration) {
|
||||
if checker == nil {
|
||||
log.Println("cp-instance-reconciler: checker is nil — reconciler disabled")
|
||||
return
|
||||
}
|
||||
if interval <= 0 {
|
||||
interval = 60 * time.Second
|
||||
}
|
||||
log.Printf("cp-instance-reconciler started — reconciling online SaaS workspaces against real EC2 state every %s", interval)
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
// Kick once at boot so a platform restart starts healing immediately
|
||||
// rather than waiting a full interval.
|
||||
reconcileOnce(ctx, checker, onOffline)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("cp-instance-reconciler: shutdown")
|
||||
return
|
||||
case <-ticker.C:
|
||||
reconcileOnce(ctx, checker, onOffline)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reconcileOnce executes one reconcile pass. Defensive against db.DB
|
||||
// being nil so a misconfigured boot doesn't panic.
|
||||
//
|
||||
// Scope: online + SaaS-EC2 workspaces only. runtime='external' rows are
|
||||
// excluded (covered by the remote-heartbeat pass); paused/hibernated/
|
||||
// removed/provisioning/awaiting_agent are excluded by the status filter.
|
||||
func reconcileOnce(ctx context.Context, checker InstanceRunningChecker, onOffline OfflineHandler) {
|
||||
if db.DB == nil {
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id::text
|
||||
FROM workspaces
|
||||
WHERE status = 'online'
|
||||
AND instance_id IS NOT NULL
|
||||
AND instance_id != ''
|
||||
AND COALESCE(runtime, '') <> 'external'
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT $1
|
||||
`, CPInstanceReconcileLimit)
|
||||
if err != nil {
|
||||
log.Printf("cp-instance-reconciler: DB query failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if scanErr := rows.Scan(&id); scanErr != nil {
|
||||
log.Printf("cp-instance-reconciler: row scan failed: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if iterErr := rows.Err(); iterErr != nil {
|
||||
log.Printf("cp-instance-reconciler: rows iteration failed: %v", iterErr)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
// Per-workspace timeout so one slow CP round-trip can't stall
|
||||
// the whole sweep.
|
||||
checkCtx, cancel := context.WithTimeout(ctx, cpInstanceCheckTimeout)
|
||||
running, checkErr := checker.IsRunning(checkCtx, id)
|
||||
cancel()
|
||||
|
||||
if checkErr != nil {
|
||||
// FAIL-SAFE: transient DB/transport error (or a no-backend
|
||||
// signal). IsRunning returns (true, err) on these, so never
|
||||
// flip — leave the row online and retry next cycle.
|
||||
log.Printf("cp-instance-reconciler: IsRunning(%s) errored, leaving online (fail-safe): %v", id, checkErr)
|
||||
continue
|
||||
}
|
||||
if running {
|
||||
continue
|
||||
}
|
||||
|
||||
// CLEAN "not running" — CP authoritatively reports the EC2 is
|
||||
// terminated/stopped/absent. Feed it into the existing offline +
|
||||
// auto-heal machinery: onOffline flips the row offline and
|
||||
// triggers RestartByID, which reprovisions with the existing
|
||||
// volume.
|
||||
log.Printf("cp-instance-reconciler: workspace %s is status=online but its EC2 is not running (terminated/stopped) — flipping offline + triggering reprovision", id)
|
||||
if onOffline != nil {
|
||||
onOffline(ctx, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,282 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
)
|
||||
|
||||
// fakeRunningChecker implements InstanceRunningChecker for the
|
||||
// instance-reconciler tests. Records every IsRunning call so tests can
|
||||
// assert which workspace IDs were probed, and returns a per-id
|
||||
// (running, err) pair so we can model CP's three answers:
|
||||
//
|
||||
// (true, nil) — instance is running.
|
||||
// (false, nil) — CLEAN "not running" (terminated/stopped/absent).
|
||||
// (true, err) — transient DB/transport error (FAIL-SAFE path).
|
||||
type fakeRunningChecker struct {
|
||||
mu sync.Mutex
|
||||
running map[string]bool
|
||||
errs map[string]error
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (f *fakeRunningChecker) IsRunning(_ context.Context, wsID string) (bool, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.calls = append(f.calls, wsID)
|
||||
if err, ok := f.errs[wsID]; ok {
|
||||
// Mirror CPProvisioner.IsRunning: (true, err) on transient errors
|
||||
// so callers stay on the alive path.
|
||||
return true, err
|
||||
}
|
||||
return f.running[wsID], nil
|
||||
}
|
||||
|
||||
// recordingOffline is an OfflineHandler that records the workspace IDs
|
||||
// it was invoked with.
|
||||
type recordingOffline struct {
|
||||
mu sync.Mutex
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (r *recordingOffline) handler() OfflineHandler {
|
||||
return func(_ context.Context, wsID string) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.calls = append(r.calls, wsID)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recordingOffline) got() []string {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
out := make([]string, len(r.calls))
|
||||
copy(out, r.calls)
|
||||
return out
|
||||
}
|
||||
|
||||
// expectReconcileQuery registers the reconciler's SELECT, pinning the
|
||||
// scope-critical predicates: status='online', instance_id present, and
|
||||
// runtime <> 'external'. A future widening that drops any of these (e.g.
|
||||
// sweeping paused rows, or external rows the heartbeat pass owns) fails
|
||||
// every test that uses this helper.
|
||||
func expectReconcileQuery(mock sqlmock.Sqlmock, rows *sqlmock.Rows) {
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces\s+WHERE status = 'online'\s+AND instance_id IS NOT NULL\s+AND instance_id != ''\s+AND COALESCE\(runtime, ''\) <> 'external'\s+ORDER BY updated_at DESC\s+LIMIT \$1`).
|
||||
WithArgs(CPInstanceReconcileLimit).
|
||||
WillReturnRows(rows)
|
||||
}
|
||||
|
||||
// TestReconcileOnce_NotRunning_FlipsOffline — the core bug (core#2247):
|
||||
// an online SaaS workspace whose EC2 is terminated. CP reports a CLEAN
|
||||
// (false, nil); onOffline MUST be called with that id so the existing
|
||||
// auto-heal (status flip + RestartByID reprovision) kicks in.
|
||||
func TestReconcileOnce_NotRunning_FlipsOffline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-dead": false}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-dead"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 1 || got[0] != "ws-dead" {
|
||||
t.Fatalf("expected onOffline(ws-dead), got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_Running_DoesNotFlip — healthy steady state. CP
|
||||
// reports (true, nil); the workspace stays online, onOffline is NOT
|
||||
// called.
|
||||
func TestReconcileOnce_Running_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{running: map[string]bool{"ws-alive": true}}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-alive"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("running workspace must NOT be flipped offline, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_TransientError_DoesNotFlip — FAIL-SAFE contract.
|
||||
// IsRunning returns (true, err) on a transient DB/transport blip; the
|
||||
// reconciler MUST NOT flip the workspace offline. This is the guardrail
|
||||
// that stops a CP outage from cascading every healthy workspace through
|
||||
// reprovision.
|
||||
func TestReconcileOnce_TransientError_DoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{
|
||||
errs: map[string]error{"ws-blip": errors.New("cp provisioner: status: connection reset")},
|
||||
}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).AddRow("ws-blip"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("fail-safe violated: transient IsRunning error must NOT flip offline, got %v", got)
|
||||
}
|
||||
if calls := checker.calls; len(calls) != 1 || calls[0] != "ws-blip" {
|
||||
t.Fatalf("expected IsRunning(ws-blip), got %v", checker.calls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline — pins the
|
||||
// SELECT predicate. The regex in expectReconcileQuery requires
|
||||
// status='online' AND runtime <> 'external'; if a future edit widens the
|
||||
// scope to include paused/hibernated/removed rows or external rows (owned
|
||||
// by the heartbeat pass), this query no longer matches and sqlmock fails
|
||||
// the test. With the predicate intact, a DB that has only out-of-scope
|
||||
// rows returns empty → no IsRunning, no flip.
|
||||
func TestReconcileOnce_QueryScopeExcludesExternalAndNonOnline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
|
||||
// The predicate filters out external + non-online rows server-side,
|
||||
// modelled as the empty result those filters produce.
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if len(checker.calls) != 0 {
|
||||
t.Fatalf("out-of-scope rows must never reach IsRunning, got %v", checker.calls)
|
||||
}
|
||||
if got := off.got(); len(got) != 0 {
|
||||
t.Fatalf("expected no offline flips for out-of-scope rows, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_MixedBatch — each row is judged independently: the
|
||||
// dead one flips, the alive one and the transient-error one don't.
|
||||
func TestReconcileOnce_MixedBatch(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{
|
||||
running: map[string]bool{"ws-dead": false, "ws-alive": true},
|
||||
errs: map[string]error{"ws-blip": errors.New("503")},
|
||||
}
|
||||
off := &recordingOffline{}
|
||||
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-dead").
|
||||
AddRow("ws-alive").
|
||||
AddRow("ws-blip"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if got := off.got(); len(got) != 1 || got[0] != "ws-dead" {
|
||||
t.Fatalf("expected only ws-dead flipped, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_QueryError — DB transient failure. Reconcile returns
|
||||
// without panicking and never probes IsRunning or flips anything.
|
||||
func TestReconcileOnce_QueryError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
|
||||
mock.ExpectQuery(`(?s)^\s*SELECT id::text\s+FROM workspaces`).
|
||||
WithArgs(CPInstanceReconcileLimit).
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if len(checker.calls) != 0 || len(off.got()) != 0 {
|
||||
t.Fatalf("query error must short-circuit; calls=%v offline=%v", checker.calls, off.got())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestReconcileOnce_NilDB — defensive against db.DB being nil. Must not
|
||||
// panic, must not probe, must not flip.
|
||||
func TestReconcileOnce_NilDB(t *testing.T) {
|
||||
saved := db.DB
|
||||
db.DB = nil
|
||||
t.Cleanup(func() { db.DB = saved })
|
||||
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
reconcileOnce(context.Background(), checker, off.handler())
|
||||
|
||||
if len(checker.calls) != 0 || len(off.got()) != 0 {
|
||||
t.Fatalf("nil db.DB must short-circuit; calls=%v offline=%v", checker.calls, off.got())
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPInstanceReconciler_NilCheckerDisabled — boot-safety: a SaaS
|
||||
// CP without cpProv configured must not start the loop (immediate return,
|
||||
// no goroutine leak).
|
||||
func TestStartCPInstanceReconciler_NilCheckerDisabled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPInstanceReconciler(ctx, nil, nil, 60*time.Second)
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
// expected — nil checker short-circuits.
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Fatal("StartCPInstanceReconciler(nil) did not return immediately")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel —
|
||||
// cadence contract: one sweep at boot (so a restart starts healing
|
||||
// immediately), and the loop terminates on ctx cancel.
|
||||
func TestStartCPInstanceReconciler_RunsOnceImmediatelyAndExitsOnCancel(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
checker := &fakeRunningChecker{}
|
||||
off := &recordingOffline{}
|
||||
|
||||
// Boot sweep query. The 60s ticker won't fire inside the test window;
|
||||
// register a second optional expectation so a stray tick can't fail.
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
|
||||
expectReconcileQuery(mock, sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartCPInstanceReconciler(ctx, checker, off.handler(), 60*time.Second)
|
||||
close(done)
|
||||
}()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
// expected
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("StartCPInstanceReconciler did not exit on ctx cancel")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user