From e058137fbf51513275c8efc756bc1be087994e7e Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 26 May 2026 21:50:48 -0700 Subject: [PATCH] fix(workspace-server): bounded retry on delete-path EC2 stop + durable leak event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DELETE path's StopWorkspaceAuto → cpProv.Stop had no retry, while the restart path used cpStopWithRetry (bounded exp backoff). A transient CP/AWS hiccup on delete left the workspace row at status='removed' with instance_id populated, returned a 500, and relied entirely on the 60s CP-orphan-sweeper to re-drive the terminate. For a cascade *descendant* the "client retries → replays terminate" recovery is defeated by CascadeDelete's status != 'removed' CTE filter — so the only inline recovery is a bounded retry. This extracts the retry loop into cpStopWithRetryErr (cpStopWithRetry keeps its void contract for the restart paths) and adds stopWorkspaceForDelete, which retries the CP terminate and, on exhaustion, persists a durable workspace.delete.terminate_retry_exhausted row to structure_events (the §Persistent structured logging gate) so the leak/pending decision is queryable. The row deliberately stays status='removed' + instance_id so the existing CP-orphan-sweeper backstop still re-drives it; the error is still returned so the HTTP Delete surfaces the retryable 500. Test-first, fail-direction proof: CPRetriesTransientThenSucceeds (3 calls, no event) vs CPExhausts (event + error) discriminate the new behavior from the pre-fix bare Stop. AST gate updated to recognize cpStopWithRetryErr as the relocated home of the retry loop. Refs task #15 (workspace-ec2-leak). Paired with the controlplane workspace- EC2 reaper PR for the row-gone leak class. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/workspace_crud.go | 7 +- .../workspace_delete_stop_retry_test.go | 102 ++++++++++++++++++ .../handlers/workspace_dispatchers.go | 82 ++++++++++++++ .../internal/handlers/workspace_restart.go | 30 +++++- .../workspace_restart_stop_retry_test.go | 9 +- 5 files changed, 224 insertions(+), 6 deletions(-) create mode 100644 workspace-server/internal/handlers/workspace_delete_stop_retry_test.go diff --git a/workspace-server/internal/handlers/workspace_crud.go b/workspace-server/internal/handlers/workspace_crud.go index 8c54e56c1..f122f4fb3 100644 --- a/workspace-server/internal/handlers/workspace_crud.go +++ b/workspace-server/internal/handlers/workspace_crud.go @@ -574,7 +574,12 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri var stopErrs []error stopAndRemove := func(wsID string) { - if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil { + // Delete-path stop uses bounded retry (matches the restart path) and + // records a durable structure_events row on exhaustion so a leaked / + // pending EC2 is queryable and handed off to the CP-orphan-sweeper — + // rather than the bare one-shot StopWorkspaceAuto that produced the + // silent-leak class (task #15 / workspace-ec2-leak). + if err := h.stopWorkspaceForDelete(cleanupCtx, wsID); err != nil { log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err) stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err)) return diff --git a/workspace-server/internal/handlers/workspace_delete_stop_retry_test.go b/workspace-server/internal/handlers/workspace_delete_stop_retry_test.go new file mode 100644 index 000000000..480c49c45 --- /dev/null +++ b/workspace-server/internal/handlers/workspace_delete_stop_retry_test.go @@ -0,0 +1,102 @@ +package handlers + +// workspace_delete_stop_retry_test.go — pins the contract of the +// delete-path EC2 stop retry (task #15 / workspace-ec2-leak). +// +// Background (Phase 1 evidence): the DELETE path's StopWorkspaceAuto → +// cpProv.Stop had NO retry, while the restart path used cpStopWithRetry +// (bounded exponential backoff). A transient CP/AWS hiccup on delete left +// the workspace row at status='removed' with instance_id still populated, +// returned a 500, and relied entirely on the 60s CP-orphan-sweeper to +// re-drive the terminate. For a cascade *descendant* whose own row is +// already 'removed', the inline retry-via-client-replay is defeated by +// CascadeDelete's `status != 'removed'` CTE filter — so the only inline +// recovery is this bounded retry. +// +// Contract of stopWorkspaceForDelete: +// - CP path: bounded retry (cpStopRetryAttempts, exp backoff) on +// cpProv.Stop; returns nil on eventual success. +// - On retry exhaustion: returns the terminal error AND emits a +// `workspace.delete.terminate_retry_exhausted` structure_events row so +// the leak decision is queryable (structured-logging gate), not just a +// log.Printf. The row is the durable pending-terminate signal: the row +// stays status='removed' with instance_id populated, which is exactly +// what the CP-orphan-sweeper (registry/cp_orphan_sweeper.go) re-drives. +// - Docker path: single Stop, no retry (local daemon failure won't heal +// on retry — matches RestartWorkspaceAuto's Docker rationale). +// - No backend wired: nil (nothing to stop). + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/DATA-DOG/go-sqlmock" +) + +func TestStopWorkspaceForDelete_CPRetriesTransientThenSucceeds(t *testing.T) { + shrinkRetryBackoff(t) + buf := captureLog(t) + // 2 transient failures then success — within the 3-attempt budget. + stub := &scriptedCPStop{errs: []error{ + errors.New("cp 503 attempt 1"), + errors.New("cp 503 attempt 2"), + }} + h := &WorkspaceHandler{cpProv: stub} + + err := h.stopWorkspaceForDelete(context.Background(), "ws-del-1") + if err != nil { + t.Fatalf("expected nil error on eventual success, got %v", err) + } + if stub.calls != 3 { + t.Errorf("expected 3 Stop calls (2 fails + 1 success), got %d", stub.calls) + } + if strings.Contains(buf.String(), "terminate_retry_exhausted") { + t.Errorf("eventual success must NOT log retry-exhausted; got %q", buf.String()) + } +} + +func TestStopWorkspaceForDelete_CPExhaustsEmitsDurableEventAndReturnsError(t *testing.T) { + shrinkRetryBackoff(t) + mock := setupTestDB(t) + buf := captureLog(t) + stub := &scriptedCPStop{errs: []error{ + errors.New("cp 502 attempt 1"), + errors.New("cp 502 attempt 2"), + errors.New("cp 502 final"), + }} + h := &WorkspaceHandler{cpProv: stub} + + // On exhaustion the helper persists a durable pending-terminate row so + // the leak decision is queryable. structure_events is the audit-of-record. + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + err := h.stopWorkspaceForDelete(context.Background(), "ws-doomed") + if err == nil { + t.Fatal("expected terminal error on retry exhaustion, got nil") + } + if stub.calls != cpStopRetryAttempts { + t.Errorf("expected %d Stop calls when all fail, got %d", cpStopRetryAttempts, stub.calls) + } + if !strings.Contains(err.Error(), "cp 502 final") { + t.Errorf("returned error should wrap the LAST attempt's error, got %v", err) + } + if e := mock.ExpectationsWereMet(); e != nil { + t.Fatalf("expected structure_events INSERT on exhaustion: %v", e) + } + // The LEAK-SUSPECT line stays the operator-facing prose bridge to the + // orphan reconciler; assert it carries the delete source so triage can + // distinguish delete-leaks from restart-leaks. + if !strings.Contains(buf.String(), "LEAK-SUSPECT") { + t.Errorf("expected LEAK-SUSPECT log on exhaustion, got %q", buf.String()) + } +} + +func TestStopWorkspaceForDelete_NoBackendIsNoOp(t *testing.T) { + h := &WorkspaceHandler{} // cpProv nil, provisioner nil + if err := h.stopWorkspaceForDelete(context.Background(), "ws-x"); err != nil { + t.Errorf("expected nil no-op with no backend, got %v", err) + } +} diff --git a/workspace-server/internal/handlers/workspace_dispatchers.go b/workspace-server/internal/handlers/workspace_dispatchers.go index 7787c3541..0ebcf7696 100644 --- a/workspace-server/internal/handlers/workspace_dispatchers.go +++ b/workspace-server/internal/handlers/workspace_dispatchers.go @@ -31,9 +31,11 @@ package handlers import ( "context" + "encoding/json" "log" "time" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/provlog" ) @@ -207,6 +209,86 @@ func (h *WorkspaceHandler) StopWorkspaceAuto(ctx context.Context, workspaceID st return nil } +// stopWorkspaceForDelete is the DELETE-path stop dispatcher. It differs +// from StopWorkspaceAuto in exactly one way: the CP (EC2) path gets the +// same bounded retry the restart path uses (cpStopWithRetryErr), and on +// retry exhaustion it persists a durable `workspace.delete.terminate_retry_exhausted` +// event to structure_events (the structured-logging gate) so the leak +// decision is queryable, not just stdout prose. +// +// Why retry here (task #15 / workspace-ec2-leak): the bare cpProv.Stop on +// delete left a transient CP/AWS hiccup as an immediate 500 with no inline +// recovery. For a cascade *descendant* the "client retries → replays +// terminate" recovery is defeated by CascadeDelete's `status != 'removed'` +// CTE filter (the descendant's row is already 'removed', so a retry walks +// zero descendant rows). Bounded retry absorbs the transient class inline; +// the durable event + the row staying status='removed'+instance_id is the +// hand-off to the 60s CP-orphan-sweeper (registry/cp_orphan_sweeper.go) for +// the (rarer) sustained-outage case. +// +// We deliberately do NOT clear status='removed' on exhaustion — the +// CP-orphan-sweeper's recovery query keys on exactly that state, so +// reverting it would break the existing backstop. The error is still +// returned so the HTTP Delete handler surfaces the retryable 500. +// +// Docker path: single Stop, no retry — a local daemon that fails to stop a +// container won't heal on retry (matches RestartWorkspaceAuto's Docker +// rationale); the orphan-container sweeper (registry/orphan_sweeper.go) is +// the Docker-side backstop. +func (h *WorkspaceHandler) stopWorkspaceForDelete(ctx context.Context, workspaceID string) error { + if h.cpProv != nil { + if err := h.cpStopWithRetryErr(ctx, workspaceID, "Delete"); err != nil { + h.emitDeleteTerminateRetryExhausted(ctx, workspaceID, err) + return err + } + return nil + } + if h.provisioner != nil { + return h.provisioner.Stop(ctx, workspaceID) + } + return nil +} + +// emitDeleteTerminateRetryExhausted persists a durable record that the +// delete-path EC2 terminate could not be completed inline after the full +// retry budget. Per the §Persistent structured logging gate: a +// state-mutating decision (we are leaving a known-leaked-or-pending EC2 for +// the orphan sweeper) must land in structure_events, not just log.Printf. +// +// Event-type taxonomy (append-only; never rename): +// +// workspace.delete.terminate_retry_exhausted — delete-path cpProv.Stop +// exhausted its retry budget; row stays status='removed' with +// instance_id populated for the CP-orphan-sweeper to re-drive. +// +// Telemetry never blocks the request path: marshal / INSERT failures are +// logged and swallowed. +func (h *WorkspaceHandler) emitDeleteTerminateRetryExhausted(ctx context.Context, workspaceID string, cause error) { + payload := map[string]any{ + "workspace_id": workspaceID, + "attempts": cpStopRetryAttempts, + "last_error": cause.Error(), + // recovery_path documents WHO is expected to finish the terminate, + // so a reader of the audit row doesn't have to grep the code to + // know the EC2 isn't simply abandoned. + "recovery_path": "cp_orphan_sweeper", + } + payloadJSON, err := json.Marshal(payload) + if err != nil { + log.Printf("emitDeleteTerminateRetryExhausted: marshal payload failed for %s: %v", workspaceID, err) + return + } + if db.DB == nil { + return + } + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO structure_events (event_type, workspace_id, payload, created_at) + VALUES ($1, $2, $3, now()) + `, "workspace.delete.terminate_retry_exhausted", workspaceID, payloadJSON); err != nil { + log.Printf("emitDeleteTerminateRetryExhausted: insert failed for %s: %v", workspaceID, err) + } +} + // RestartWorkspaceAuto stops the running workload (with retry semantics // tuned for the restart hot path) then starts provisioning again, in a // detached goroutine. Returns true when a backend was kicked off, false diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index f7769006a..5d9122648 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -721,8 +721,31 @@ var cpStopRetryBaseDelay = 1 * time.Second // // Returns nothing — caller's contract is unchanged. func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, source string) { + // Restart's contract is "make the workspace alive again": it proceeds + // with reprovision regardless of the Stop outcome, so it discards the + // terminal error. The delete path needs the error (it must keep the + // row recoverable for the orphan-sweeper + emit a durable event), so + // the actual retry loop lives in cpStopWithRetryErr below. + _ = h.cpStopWithRetryErr(ctx, workspaceID, source) +} + +// cpStopWithRetryErr is the shared bounded-retry core for cpProv.Stop. +// It returns the terminal error so callers that need to react to a leak +// (the DELETE path's stopWorkspaceForDelete) can do so, while +// cpStopWithRetry keeps its void contract for the restart paths. +// +// Behaviour (unchanged from the original cpStopWithRetry loop): +// - cpProv nil → nil (no-op; nothing to stop). +// - success on attempt N → nil; logs a retry-success line when N > 1. +// - ctx cancelled mid-retry → returns ctx.Err(); logs an "abandoned" +// line and deliberately does NOT emit LEAK-SUSPECT (operator-initiated +// drain is a different signal than "we tried hard and failed"). +// - all attempts fail → returns the LAST attempt's error and emits the +// stable `LEAK-SUSPECT cpProv.Stop ...` log line so the CP-side orphan +// reconciler can correlate by workspace_id. +func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID, source string) error { if h.cpProv == nil { - return + return nil } var lastErr error delay := cpStopRetryBaseDelay @@ -732,7 +755,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou if attempt > 1 { log.Printf("%s: cpProv.Stop(%s) succeeded on attempt %d", source, workspaceID, attempt) } - return + return nil } lastErr = err if attempt == cpStopRetryAttempts { @@ -744,7 +767,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou case <-ctx.Done(): log.Printf("%s: cpProv.Stop(%s) abandoned mid-retry: ctx cancelled (last_err=%v)", source, workspaceID, lastErr) - return + return ctx.Err() case <-time.After(delay): } delay *= 2 @@ -753,6 +776,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou // so logs are greppable / parseable for the CP-side orphan reconciler. log.Printf("LEAK-SUSPECT cpProv.Stop workspace_id=%s source=%s attempts=%d last_err=%q", workspaceID, source, cpStopRetryAttempts, lastErr.Error()) + return lastErr } // runRestartCycle does the actual stop+provision work for one restart diff --git a/workspace-server/internal/handlers/workspace_restart_stop_retry_test.go b/workspace-server/internal/handlers/workspace_restart_stop_retry_test.go index 12b1c22f1..0c7dcf4c6 100644 --- a/workspace-server/internal/handlers/workspace_restart_stop_retry_test.go +++ b/workspace-server/internal/handlers/workspace_restart_stop_retry_test.go @@ -248,8 +248,13 @@ func TestRestart_CPStopOnlyInsideRetryHelper(t *testing.T) { if !ok || fn.Body == nil || fn.Recv == nil { continue } - // cpStopWithRetry is the ONE allowed home for h.cpProv.Stop. - if fn.Name.Name == "cpStopWithRetry" { + // cpStopWithRetryErr is the ONE allowed home for h.cpProv.Stop — + // the bounded-retry loop. cpStopWithRetry is the void-returning + // wrapper (restart path) that delegates to it; the delete path uses + // cpStopWithRetryErr directly via stopWorkspaceForDelete to capture + // the terminal error (task #15). Both wrappers are exempt from this + // gate; any OTHER direct cpProv.Stop is the silent-leak regression. + if fn.Name.Name == "cpStopWithRetry" || fn.Name.Name == "cpStopWithRetryErr" { continue } ast.Inspect(fn.Body, func(n ast.Node) bool { -- 2.52.0