fix(workspace-server): bounded retry on delete-path EC2 stop + durable leak event #1932
@@ -574,7 +574,12 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri
|
||||
|
||||
var stopErrs []error
|
||||
stopAndRemove := func(wsID string) {
|
||||
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
|
||||
// Delete-path stop uses bounded retry (matches the restart path) and
|
||||
// records a durable structure_events row on exhaustion so a leaked /
|
||||
// pending EC2 is queryable and handed off to the CP-orphan-sweeper —
|
||||
// rather than the bare one-shot StopWorkspaceAuto that produced the
|
||||
// silent-leak class (task #15 / workspace-ec2-leak).
|
||||
if err := h.stopWorkspaceForDelete(cleanupCtx, wsID); err != nil {
|
||||
log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
|
||||
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
|
||||
return
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
package handlers
|
||||
|
||||
// workspace_delete_stop_retry_test.go — pins the contract of the
|
||||
// delete-path EC2 stop retry (task #15 / workspace-ec2-leak).
|
||||
//
|
||||
// Background (Phase 1 evidence): the DELETE path's StopWorkspaceAuto →
|
||||
// cpProv.Stop had NO retry, while the restart path used cpStopWithRetry
|
||||
// (bounded exponential backoff). A transient CP/AWS hiccup on delete left
|
||||
// the workspace row at status='removed' with instance_id still populated,
|
||||
// returned a 500, and relied entirely on the 60s CP-orphan-sweeper to
|
||||
// re-drive the terminate. For a cascade *descendant* whose own row is
|
||||
// already 'removed', the inline retry-via-client-replay is defeated by
|
||||
// CascadeDelete's `status != 'removed'` CTE filter — so the only inline
|
||||
// recovery is this bounded retry.
|
||||
//
|
||||
// Contract of stopWorkspaceForDelete:
|
||||
// - CP path: bounded retry (cpStopRetryAttempts, exp backoff) on
|
||||
// cpProv.Stop; returns nil on eventual success.
|
||||
// - On retry exhaustion: returns the terminal error AND emits a
|
||||
// `workspace.delete.terminate_retry_exhausted` structure_events row so
|
||||
// the leak decision is queryable (structured-logging gate), not just a
|
||||
// log.Printf. The row is the durable pending-terminate signal: the row
|
||||
// stays status='removed' with instance_id populated, which is exactly
|
||||
// what the CP-orphan-sweeper (registry/cp_orphan_sweeper.go) re-drives.
|
||||
// - Docker path: single Stop, no retry (local daemon failure won't heal
|
||||
// on retry — matches RestartWorkspaceAuto's Docker rationale).
|
||||
// - No backend wired: nil (nothing to stop).
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
func TestStopWorkspaceForDelete_CPRetriesTransientThenSucceeds(t *testing.T) {
|
||||
shrinkRetryBackoff(t)
|
||||
buf := captureLog(t)
|
||||
// 2 transient failures then success — within the 3-attempt budget.
|
||||
stub := &scriptedCPStop{errs: []error{
|
||||
errors.New("cp 503 attempt 1"),
|
||||
errors.New("cp 503 attempt 2"),
|
||||
}}
|
||||
h := &WorkspaceHandler{cpProv: stub}
|
||||
|
||||
err := h.stopWorkspaceForDelete(context.Background(), "ws-del-1")
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error on eventual success, got %v", err)
|
||||
}
|
||||
if stub.calls != 3 {
|
||||
t.Errorf("expected 3 Stop calls (2 fails + 1 success), got %d", stub.calls)
|
||||
}
|
||||
if strings.Contains(buf.String(), "terminate_retry_exhausted") {
|
||||
t.Errorf("eventual success must NOT log retry-exhausted; got %q", buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestStopWorkspaceForDelete_CPExhaustsEmitsDurableEventAndReturnsError(t *testing.T) {
|
||||
shrinkRetryBackoff(t)
|
||||
mock := setupTestDB(t)
|
||||
buf := captureLog(t)
|
||||
stub := &scriptedCPStop{errs: []error{
|
||||
errors.New("cp 502 attempt 1"),
|
||||
errors.New("cp 502 attempt 2"),
|
||||
errors.New("cp 502 final"),
|
||||
}}
|
||||
h := &WorkspaceHandler{cpProv: stub}
|
||||
|
||||
// On exhaustion the helper persists a durable pending-terminate row so
|
||||
// the leak decision is queryable. structure_events is the audit-of-record.
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
err := h.stopWorkspaceForDelete(context.Background(), "ws-doomed")
|
||||
if err == nil {
|
||||
t.Fatal("expected terminal error on retry exhaustion, got nil")
|
||||
}
|
||||
if stub.calls != cpStopRetryAttempts {
|
||||
t.Errorf("expected %d Stop calls when all fail, got %d", cpStopRetryAttempts, stub.calls)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "cp 502 final") {
|
||||
t.Errorf("returned error should wrap the LAST attempt's error, got %v", err)
|
||||
}
|
||||
if e := mock.ExpectationsWereMet(); e != nil {
|
||||
t.Fatalf("expected structure_events INSERT on exhaustion: %v", e)
|
||||
}
|
||||
// The LEAK-SUSPECT line stays the operator-facing prose bridge to the
|
||||
// orphan reconciler; assert it carries the delete source so triage can
|
||||
// distinguish delete-leaks from restart-leaks.
|
||||
if !strings.Contains(buf.String(), "LEAK-SUSPECT") {
|
||||
t.Errorf("expected LEAK-SUSPECT log on exhaustion, got %q", buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestStopWorkspaceForDelete_NoBackendIsNoOp(t *testing.T) {
|
||||
h := &WorkspaceHandler{} // cpProv nil, provisioner nil
|
||||
if err := h.stopWorkspaceForDelete(context.Background(), "ws-x"); err != nil {
|
||||
t.Errorf("expected nil no-op with no backend, got %v", err)
|
||||
}
|
||||
}
|
||||
@@ -31,9 +31,11 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/provlog"
|
||||
)
|
||||
@@ -207,6 +209,86 @@ func (h *WorkspaceHandler) StopWorkspaceAuto(ctx context.Context, workspaceID st
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopWorkspaceForDelete is the DELETE-path stop dispatcher. It differs
|
||||
// from StopWorkspaceAuto in exactly one way: the CP (EC2) path gets the
|
||||
// same bounded retry the restart path uses (cpStopWithRetryErr), and on
|
||||
// retry exhaustion it persists a durable `workspace.delete.terminate_retry_exhausted`
|
||||
// event to structure_events (the structured-logging gate) so the leak
|
||||
// decision is queryable, not just stdout prose.
|
||||
//
|
||||
// Why retry here (task #15 / workspace-ec2-leak): the bare cpProv.Stop on
|
||||
// delete left a transient CP/AWS hiccup as an immediate 500 with no inline
|
||||
// recovery. For a cascade *descendant* the "client retries → replays
|
||||
// terminate" recovery is defeated by CascadeDelete's `status != 'removed'`
|
||||
// CTE filter (the descendant's row is already 'removed', so a retry walks
|
||||
// zero descendant rows). Bounded retry absorbs the transient class inline;
|
||||
// the durable event + the row staying status='removed'+instance_id is the
|
||||
// hand-off to the 60s CP-orphan-sweeper (registry/cp_orphan_sweeper.go) for
|
||||
// the (rarer) sustained-outage case.
|
||||
//
|
||||
// We deliberately do NOT clear status='removed' on exhaustion — the
|
||||
// CP-orphan-sweeper's recovery query keys on exactly that state, so
|
||||
// reverting it would break the existing backstop. The error is still
|
||||
// returned so the HTTP Delete handler surfaces the retryable 500.
|
||||
//
|
||||
// Docker path: single Stop, no retry — a local daemon that fails to stop a
|
||||
// container won't heal on retry (matches RestartWorkspaceAuto's Docker
|
||||
// rationale); the orphan-container sweeper (registry/orphan_sweeper.go) is
|
||||
// the Docker-side backstop.
|
||||
func (h *WorkspaceHandler) stopWorkspaceForDelete(ctx context.Context, workspaceID string) error {
|
||||
if h.cpProv != nil {
|
||||
if err := h.cpStopWithRetryErr(ctx, workspaceID, "Delete"); err != nil {
|
||||
h.emitDeleteTerminateRetryExhausted(ctx, workspaceID, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if h.provisioner != nil {
|
||||
return h.provisioner.Stop(ctx, workspaceID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// emitDeleteTerminateRetryExhausted persists a durable record that the
|
||||
// delete-path EC2 terminate could not be completed inline after the full
|
||||
// retry budget. Per the §Persistent structured logging gate: a
|
||||
// state-mutating decision (we are leaving a known-leaked-or-pending EC2 for
|
||||
// the orphan sweeper) must land in structure_events, not just log.Printf.
|
||||
//
|
||||
// Event-type taxonomy (append-only; never rename):
|
||||
//
|
||||
// workspace.delete.terminate_retry_exhausted — delete-path cpProv.Stop
|
||||
// exhausted its retry budget; row stays status='removed' with
|
||||
// instance_id populated for the CP-orphan-sweeper to re-drive.
|
||||
//
|
||||
// Telemetry never blocks the request path: marshal / INSERT failures are
|
||||
// logged and swallowed.
|
||||
func (h *WorkspaceHandler) emitDeleteTerminateRetryExhausted(ctx context.Context, workspaceID string, cause error) {
|
||||
payload := map[string]any{
|
||||
"workspace_id": workspaceID,
|
||||
"attempts": cpStopRetryAttempts,
|
||||
"last_error": cause.Error(),
|
||||
// recovery_path documents WHO is expected to finish the terminate,
|
||||
// so a reader of the audit row doesn't have to grep the code to
|
||||
// know the EC2 isn't simply abandoned.
|
||||
"recovery_path": "cp_orphan_sweeper",
|
||||
}
|
||||
payloadJSON, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
log.Printf("emitDeleteTerminateRetryExhausted: marshal payload failed for %s: %v", workspaceID, err)
|
||||
return
|
||||
}
|
||||
if db.DB == nil {
|
||||
return
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO structure_events (event_type, workspace_id, payload, created_at)
|
||||
VALUES ($1, $2, $3, now())
|
||||
`, "workspace.delete.terminate_retry_exhausted", workspaceID, payloadJSON); err != nil {
|
||||
log.Printf("emitDeleteTerminateRetryExhausted: insert failed for %s: %v", workspaceID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// RestartWorkspaceAuto stops the running workload (with retry semantics
|
||||
// tuned for the restart hot path) then starts provisioning again, in a
|
||||
// detached goroutine. Returns true when a backend was kicked off, false
|
||||
|
||||
@@ -721,8 +721,31 @@ var cpStopRetryBaseDelay = 1 * time.Second
|
||||
//
|
||||
// Returns nothing — caller's contract is unchanged.
|
||||
func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, source string) {
|
||||
// Restart's contract is "make the workspace alive again": it proceeds
|
||||
// with reprovision regardless of the Stop outcome, so it discards the
|
||||
// terminal error. The delete path needs the error (it must keep the
|
||||
// row recoverable for the orphan-sweeper + emit a durable event), so
|
||||
// the actual retry loop lives in cpStopWithRetryErr below.
|
||||
_ = h.cpStopWithRetryErr(ctx, workspaceID, source)
|
||||
}
|
||||
|
||||
// cpStopWithRetryErr is the shared bounded-retry core for cpProv.Stop.
|
||||
// It returns the terminal error so callers that need to react to a leak
|
||||
// (the DELETE path's stopWorkspaceForDelete) can do so, while
|
||||
// cpStopWithRetry keeps its void contract for the restart paths.
|
||||
//
|
||||
// Behaviour (unchanged from the original cpStopWithRetry loop):
|
||||
// - cpProv nil → nil (no-op; nothing to stop).
|
||||
// - success on attempt N → nil; logs a retry-success line when N > 1.
|
||||
// - ctx cancelled mid-retry → returns ctx.Err(); logs an "abandoned"
|
||||
// line and deliberately does NOT emit LEAK-SUSPECT (operator-initiated
|
||||
// drain is a different signal than "we tried hard and failed").
|
||||
// - all attempts fail → returns the LAST attempt's error and emits the
|
||||
// stable `LEAK-SUSPECT cpProv.Stop ...` log line so the CP-side orphan
|
||||
// reconciler can correlate by workspace_id.
|
||||
func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID, source string) error {
|
||||
if h.cpProv == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
var lastErr error
|
||||
delay := cpStopRetryBaseDelay
|
||||
@@ -732,7 +755,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou
|
||||
if attempt > 1 {
|
||||
log.Printf("%s: cpProv.Stop(%s) succeeded on attempt %d", source, workspaceID, attempt)
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
if attempt == cpStopRetryAttempts {
|
||||
@@ -744,7 +767,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou
|
||||
case <-ctx.Done():
|
||||
log.Printf("%s: cpProv.Stop(%s) abandoned mid-retry: ctx cancelled (last_err=%v)",
|
||||
source, workspaceID, lastErr)
|
||||
return
|
||||
return ctx.Err()
|
||||
case <-time.After(delay):
|
||||
}
|
||||
delay *= 2
|
||||
@@ -753,6 +776,7 @@ func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, sou
|
||||
// so logs are greppable / parseable for the CP-side orphan reconciler.
|
||||
log.Printf("LEAK-SUSPECT cpProv.Stop workspace_id=%s source=%s attempts=%d last_err=%q",
|
||||
workspaceID, source, cpStopRetryAttempts, lastErr.Error())
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// runRestartCycle does the actual stop+provision work for one restart
|
||||
|
||||
@@ -248,8 +248,13 @@ func TestRestart_CPStopOnlyInsideRetryHelper(t *testing.T) {
|
||||
if !ok || fn.Body == nil || fn.Recv == nil {
|
||||
continue
|
||||
}
|
||||
// cpStopWithRetry is the ONE allowed home for h.cpProv.Stop.
|
||||
if fn.Name.Name == "cpStopWithRetry" {
|
||||
// cpStopWithRetryErr is the ONE allowed home for h.cpProv.Stop —
|
||||
// the bounded-retry loop. cpStopWithRetry is the void-returning
|
||||
// wrapper (restart path) that delegates to it; the delete path uses
|
||||
// cpStopWithRetryErr directly via stopWorkspaceForDelete to capture
|
||||
// the terminal error (task #15). Both wrappers are exempt from this
|
||||
// gate; any OTHER direct cpProv.Stop is the silent-leak regression.
|
||||
if fn.Name.Name == "cpStopWithRetry" || fn.Name.Name == "cpStopWithRetryErr" {
|
||||
continue
|
||||
}
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
|
||||
Reference in New Issue
Block a user