From 10590c063dc9b17380679bd25c5b6934137631cd Mon Sep 17 00:00:00 2001 From: claude-ceo-assistant Date: Sun, 31 May 2026 01:59:20 -0700 Subject: [PATCH] feat(workspace-server): capture rescue bundle on workspace boot-failure (RFC internal#742 Part 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a workspace boot FAILS — the provision-timeout sweep flips it to `failed`, or the control plane's bootstrap-watcher POSTs bootstrap-failed — capture a fixed forensic "rescue bundle" off the still-running (but boot-failed) EC2 BEFORE the control plane reaps it, and ship it to obs/Loki. This makes a wedged workspace (e.g. the codex provider-derivation failure that motivated the RFC) post-mortem- inspectable instead of an uninspectable wall. What it collects (fixed set, redacted before anything leaves the box): /configs/config.yaml, /configs/system-prompt.md, tail -200 of cloud-init-output.log, `docker ps -a`, the agent container's `docker logs --tail 200`, and the resolved MODEL|PROVIDER|RUNTIME env. Every section is run through the existing SAFE-T1201 secret-scan (handlers.redactSecrets) before shipping — and fails CLOSED (ships nothing) if the redactor is unwired. Shipping reuses the existing obs shipper (internal/audit → Loki via the tenant Vector stdout source) with event_type="rescue.bundle" and kind="rescue" / org / workspace_id in the record body, queryable as `{kind="rescue"} | json`. Hook points (the two boot-failure VERDICT paths only — never normal teardown/deprovision/recreate/billing-suspend/hibernate): - registry.sweepStuckProvisioning: fires the injected registry.BootFailureRescueHook only on a real flip (affected==1), never on a race (affected==0) or a non-overdue row. - handlers.WorkspaceHandler.BootstrapFailed: fires captureRescueBundle only after the row is actually flipped to `failed`. Capture is best-effort + non-blocking: it runs in its own goroutine with its own 45s timeout, detached from the request/sweep context, so it can never change boot-failure semantics or add latency to the failure path. The leaf internal/rescue package injects the EIC/SSH runner + redactor as package vars (wired from handlers at init) so registry can call it without importing handlers (no import cycle) — mirroring the existing RuntimeTimeoutLookup injection pattern. Volume retention: in molecule-core the boot-failure verdict only flips status to `failed`; it never terminates. Both platform reapers (registry.StartCPOrphanSweeper + handlers deprovision) act ONLY on status='removed', so a `failed` workspace's instance + /configs data volume are RETAINED by construction through the rescue grace (rescue.RescueVolumeGrace = 24h, the SSOT the CP reaper must honour), distinct from the user-prune erase path. Added a regression test pinning the orphan-sweeper's status='removed' predicate so a future widening to `failed` (which would terminate boxes mid-rescue) fails the build. Tests: capture fires on boot-failure (not on healthy teardown/race), bundle redacts secrets + fails closed without a redactor, Loki push called with the right labels, volume retained on boot-failure. EIC/SSH + Loki + ec2 faked via package-var swaps (mirrors existing provisioner test fakes). Co-Authored-By: Claude Opus 4.8 (1M context) --- workspace-server/cmd/server/main.go | 11 + .../internal/handlers/rescue_wiring.go | 158 +++++++++++ .../internal/handlers/rescue_wiring_test.go | 119 +++++++++ .../internal/handlers/workspace_bootstrap.go | 12 + .../registry/cp_orphan_sweeper_test.go | 56 ++++ .../internal/registry/provisiontimeout.go | 41 ++- .../registry/provisiontimeout_rescue_test.go | 130 +++++++++ .../registry/provisiontimeout_test.go | 51 ++-- workspace-server/internal/rescue/rescue.go | 247 ++++++++++++++++++ .../internal/rescue/rescue_test.go | 226 ++++++++++++++++ 10 files changed, 1022 insertions(+), 29 deletions(-) create mode 100644 workspace-server/internal/handlers/rescue_wiring.go create mode 100644 workspace-server/internal/handlers/rescue_wiring_test.go create mode 100644 workspace-server/internal/registry/provisiontimeout_rescue_test.go create mode 100644 workspace-server/internal/rescue/rescue.go create mode 100644 workspace-server/internal/rescue/rescue_test.go diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index b79a61d6b..3a9b3d243 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -349,6 +349,17 @@ func main() { codexauth.StartCodexAuthRefresher(c, db.DB) }) + // RFC internal#742 Part 2: wire the boot-failure rescue capture into + // the provision-timeout sweep's failure verdict. When the sweep flips + // a stuck workspace to `failed`, this hook captures a forensic rescue + // bundle off the still-running (but boot-failed) EC2 and ships it to + // obs/Loki before the control plane reaps the instance. Best-effort + + // non-blocking (handlers.BootFailureRescueHook dispatches on its own + // goroutine + timeout). The handler-side boot-failure path + // (WorkspaceHandler.BootstrapFailed) wires its own capture inline. + registry.BootFailureRescueHook = handlers.BootFailureRescueHook + + // Provision-timeout sweep — flips workspaces that have been stuck in // status='provisioning' past the timeout window to 'failed' and emits // WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic diff --git a/workspace-server/internal/handlers/rescue_wiring.go b/workspace-server/internal/handlers/rescue_wiring.go new file mode 100644 index 000000000..08000ce7e --- /dev/null +++ b/workspace-server/internal/handlers/rescue_wiring.go @@ -0,0 +1,158 @@ +package handlers + +// rescue_wiring.go — bridges the leaf internal/rescue package to the +// handlers package's EIC/SSH runner + secret redactor, and exposes the +// boot-failure rescue hook used by both boot-failure verdict paths +// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via +// an injected hook wired in main.go). +// +// Why the indirection: internal/rescue is a leaf so registry (which +// must NOT import handlers — that's an import cycle) can call it. The +// two heavy dependencies live here in handlers — `withEICTunnel` +// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets` +// (the SAFE-T1201 secret-scan) — so we inject them into rescue's +// package-level func vars at init(). +// +// RFC internal#742 Part 2. + +import ( + "bytes" + "context" + "database/sql" + "fmt" + "os" + "os/exec" + "strings" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" +) + +func init() { + // Wire the leaf rescue package to handlers' EIC runner + redactor. + // Done in init() (not main.go) so the binding is present for any + // caller of rescue.Capture, including the registry sweeper hook and + // the handler path, without each call site re-wiring it. + rescue.RunRemote = rescueRunRemoteViaEIC + rescue.Redact = func(workspaceID, content string) string { + out, _ := redactSecrets(workspaceID, content) + return out + } +} + +// rescueRunRemoteViaEIC runs a single shell command on the still-running +// (but boot-failed) workspace EC2 over an EIC tunnel and returns its +// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the +// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel → +// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g. +// PR #2822's LogLevel=ERROR shim) for free. +// +// Combined output (2>&1) is intentional: a boot-failed box's most +// useful signal is often on stderr (a panic, a missing-file error), and +// the rescue bundle is a forensic blob, not a parsed value — we want +// everything the command emitted. +func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) { + var combined []byte + runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error { + sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...) + sshCmd.Env = os.Environ() + var buf bytes.Buffer + sshCmd.Stdout = &buf + sshCmd.Stderr = &buf + // A non-zero remote exit is NOT a transport error for the rescue + // path — each section command already falls back to an + // `|| echo '(...)'` marker, so a clean exit is expected. Only + // surface an error when ssh/tunnel itself failed AND produced no + // output to ship. + err := sshCmd.Run() + combined = buf.Bytes() + if err != nil && len(combined) == 0 { + return fmt.Errorf("rescue ssh exec: %w", err) + } + return nil + }) + if runErr != nil { + return "", runErr + } + return strings.TrimRight(string(combined), "\n"), nil +} + +// captureRescueBundle fires a best-effort, non-blocking rescue capture +// for a boot-failed workspace. It is the single entry point both +// boot-failure verdict paths funnel through. +// +// NON-BLOCKING: the actual collection runs in its own goroutine with +// its own timeout (rescue.CaptureTimeout), detached from the caller's +// request/sweep context so it can't add latency to — or be cancelled +// by — the failure-handling path that triggered it. We snapshot the +// identity into a fresh context.Background() for the same reason: a +// gin request context is cancelled the instant the HTTP handler +// returns, which would kill the EIC tunnel mid-collection. +// +// instanceID/orgID are resolved here (best-effort) so the two call +// sites only need the workspace id. A missing instance id → rescue.Capture +// no-ops (logged), so an early-failure workspace that never got an EC2 +// is handled cleanly. +func captureRescueBundle(workspaceID, reason string) { + rescueDispatch(func() { + ctx := context.Background() + instanceID, err := rescueResolveInstanceID(ctx, workspaceID) + if err != nil { + // Best-effort: a resolve failure is logged inside Capture's + // caller chain; pass empty so Capture no-ops cleanly. + instanceID = "" + } + rescue.Capture(ctx, rescue.Input{ + InstanceID: instanceID, + WorkspaceID: workspaceID, + OrgID: os.Getenv("MOLECULE_ORG_ID"), + Reason: reason, + }) + }) +} + +// rescueDispatch runs the rescue collection off the request path. In +// production it's `go fn()` so the capture never blocks or adds latency +// to the boot-failure handler. Tests swap it for a synchronous runner so +// they can assert the capture fired (or didn't) deterministically +// without racing the goroutine. +var rescueDispatch = func(fn func()) { go fn() } + +// BootFailureRescueHook is the registry-facing adapter wired into +// registry.BootFailureRescueHook from main.go. The registry sweeper +// already resolved the instance id (it's in the candidate row), so this +// path uses it directly rather than re-querying — symmetric with the +// captureRescueBundle handler path but skipping the lookup. +// +// Best-effort + non-blocking: dispatches the capture on its own +// goroutine with its own timeout, so the sweep loop is never slowed. +func BootFailureRescueHook(workspaceID, instanceID, reason string) { + go rescue.Capture(context.Background(), rescue.Input{ + InstanceID: instanceID, + WorkspaceID: workspaceID, + OrgID: os.Getenv("MOLECULE_ORG_ID"), + Reason: reason, + }) +} + +// rescueResolveInstanceID looks up the EC2 instance id for a workspace. +// Package var so tests can stub it without a sqlmock. Mirrors +// provisioner.resolveInstanceID (same query) but lives here to keep the +// rescue wiring self-contained and avoid widening the provisioner +// surface. +var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) { + if db.DB == nil { + return "", nil // nil in unit tests + } + var instanceID sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&instanceID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + if !instanceID.Valid { + return "", nil + } + return instanceID.String, nil +} diff --git a/workspace-server/internal/handlers/rescue_wiring_test.go b/workspace-server/internal/handlers/rescue_wiring_test.go new file mode 100644 index 000000000..61b230b57 --- /dev/null +++ b/workspace-server/internal/handlers/rescue_wiring_test.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// rescueTestHarness makes the otherwise-async rescue capture +// deterministic + observable for handler tests: +// - rescueDispatch runs synchronously (no goroutine race). +// - rescueResolveInstanceID returns a fixed instance id. +// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH +// fires; runCalls counts how many remote-command collections ran, +// which is the proxy for "did the capture fire". +// +// All originals are restored on cleanup. +func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) { + t.Helper() + n := 0 + runCalls = &n + + prevDispatch := rescueDispatch + rescueDispatch = func(fn func()) { fn() } // synchronous + prevResolve := rescueResolveInstanceID + rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil } + prevRun, prevRedact := rescue.RunRemote, rescue.Redact + rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil } + rescue.Redact = func(_ws, c string) string { return c } + + t.Cleanup(func() { + rescueDispatch = prevDispatch + rescueResolveInstanceID = prevResolve + rescue.RunRemote = prevRun + rescue.Redact = prevRedact + }) + return runCalls +} + +// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler +// hook: when BootstrapFailed actually flips a workspace to `failed` +// (affected==1), the rescue capture fires against the resolved instance. +func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) { + h, mock := setupBootstrapHandler(t) + runCalls := rescueTestHarness(t, "i-failed01") + + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO structure_events`). + WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}} + c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed", + bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + h.BootstrapFailed(c) + + if w.Code != http.StatusOK { + t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String()) + } + if *runCalls != len(rescueBundleSectionCount()) { + t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount())) + } +} + +// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned +// workspace (affected==0: raced to online, or double-report) is NOT a +// boot-failure verdict here, so the rescue capture must NOT fire. +func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) { + h, mock := setupBootstrapHandler(t) + runCalls := rescueTestHarness(t, "i-online01") + + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-online"}} + c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed", + bytes.NewBufferString(`{"error":"late report","log_tail":""}`)) + c.Request.Header.Set("Content-Type", "application/json") + + h.BootstrapFailed(c) + + if w.Code != http.StatusOK { + t.Fatalf("want 200, got %d", w.Code) + } + if *runCalls != 0 { + t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls) + } +} + +// rescueBundleSectionCount returns the production rescue bundle section +// list length by running a capture against a counting runner once. It's +// a small indirection so the handler test stays decoupled from the exact +// section set in internal/rescue (which has its own tests). +func rescueBundleSectionCount() []struct{} { + count := 0 + prevRun, prevRedact := rescue.RunRemote, rescue.Redact + rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil } + rescue.Redact = func(_ws, c string) string { return c } + rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"}) + rescue.RunRemote = prevRun + rescue.Redact = prevRedact + return make([]struct{}, count) +} diff --git a/workspace-server/internal/handlers/workspace_bootstrap.go b/workspace-server/internal/handlers/workspace_bootstrap.go index d70d79359..3642d4559 100644 --- a/workspace-server/internal/handlers/workspace_bootstrap.go +++ b/workspace-server/internal/handlers/workspace_bootstrap.go @@ -91,6 +91,18 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) { "log_tail": tail, "source": "bootstrap_watcher", }) + + // RFC internal#742 Part 2: this is one of the two boot-failure + // verdict points. We've just flipped a still-running (but + // unconfigured) workspace EC2 to `failed`; the control plane will + // reap the instance shortly. Capture a forensic rescue bundle off + // the live box NOW, before it's torn down, so a wedged workspace is + // post-mortem-inspectable. Best-effort + non-blocking: runs in its + // own goroutine with its own timeout, detached from this request's + // context (which is cancelled the instant this handler returns). + // Failure to capture never changes the boot-failure handling. + captureRescueBundle(id, "bootstrap_watcher") + log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg) c.JSON(http.StatusOK, gin.H{"ok": true}) } diff --git a/workspace-server/internal/registry/cp_orphan_sweeper_test.go b/workspace-server/internal/registry/cp_orphan_sweeper_test.go index ae45b2276..22765090c 100644 --- a/workspace-server/internal/registry/cp_orphan_sweeper_test.go +++ b/workspace-server/internal/registry/cp_orphan_sweeper_test.go @@ -10,8 +10,15 @@ import ( "github.com/DATA-DOG/go-sqlmock" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" ) +// rescueVolumeGraceHours surfaces the rescue grace as whole hours for +// the retention-contract assertion (RFC internal#742 Part 2). +func rescueVolumeGraceHours() int { + return int(rescue.RescueVolumeGrace.Hours()) +} + // fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode // sweeper tests. Records every Stop call so tests can assert which // workspace IDs were re-issued. @@ -97,6 +104,55 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) { } } +// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2 +// volume-retention guarantee, molecule-core side. +// +// A boot-FAILED workspace (status='failed') must NOT be terminated by +// the platform's orphan sweeper: its instance + /configs data volume are +// retained through the rescue grace (rescue.RescueVolumeGrace) so a live +// rescue read is possible, distinct from the user-prune erase path. The +// sweeper reaps ONLY status='removed' (the explicit deprovision path), +// so a `failed` row is structurally excluded at the SELECT — it never +// reaches reaper.Stop. We assert the predicate filters to 'removed' +// (so the failed instance survives) and that no Stop fires for a DB +// whose only orphan-shaped row is `failed`. +// +// This is the "if the sweeper already keeps volumes by default, confirm +// + add a test asserting it" branch of the RFC: it does, by construction. +func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + // The sweeper's SELECT carries `status = 'removed'`. A boot-failed + // workspace (status='failed') does not match that predicate, so the + // real DB returns it nowhere in this result set — modelled as the + // empty result the `removed`-only filter produces when the only + // instance-bearing row is `failed`. The regex pins the retention- + // critical predicate so a future widening to include 'failed' (which + // would terminate boot-failed boxes mid-rescue) fails this test. + mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue +// grace is its own contract (24h) and not coupled to any prune timing — +// the value is the SSOT the control-plane reaper must honour. +func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) { + if rescueVolumeGraceHours() != 24 { + t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours()) + } +} + // TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd // independently; one failure doesn't block others. func TestCPSweepOnce_MultipleOrphans(t *testing.T) { diff --git a/workspace-server/internal/registry/provisiontimeout.go b/workspace-server/internal/registry/provisiontimeout.go index 0b863c35f..41c489da1 100644 --- a/workspace-server/internal/registry/provisiontimeout.go +++ b/workspace-server/internal/registry/provisiontimeout.go @@ -92,6 +92,23 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du return DefaultProvisioningTimeout } +// BootFailureRescueHook, when wired, is invoked once per workspace the +// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure +// verdict, BEFORE the control plane reaps the instance. It captures a +// forensic rescue bundle off the still-running (but boot-failed) EC2 and +// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to +// handlers.captureRescueBundle via a thin adapter; nil in tests + on +// self-hosted deploys (no rescue shipping there). +// +// Function-typed injection (not an import of handlers) keeps the +// existing handlers→registry import direction intact — registry must not +// import handlers. +// +// MUST be best-effort + non-blocking: the hook itself dispatches the +// capture on its own goroutine with its own timeout, so the sweep loop +// is never slowed or blocked by a hung EIC tunnel on the dead box. +var BootFailureRescueHook func(workspaceID, instanceID, reason string) + // StartProvisioningTimeoutSweep periodically scans for workspaces stuck in // `status='provisioning'` past the timeout window, flips them to `failed`, // and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can @@ -144,7 +161,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter // flight, not historical) and the partial index on status keeps // it fast. rows, err := db.DB.QueryContext(ctx, ` - SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int + SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int FROM workspaces WHERE status = 'provisioning' `) @@ -155,14 +172,15 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter defer rows.Close() type candidate struct { - id string - runtime string - ageSec int + id string + runtime string + instanceID string + ageSec int } var ids []candidate for rows.Next() { var c candidate - if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil { + if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil { ids = append(ids, c) } } @@ -200,6 +218,19 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter continue } log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout) + + // RFC internal#742 Part 2: this flip is a boot-failure verdict. + // The instance is still running (the CP reaps it shortly after); + // capture a forensic rescue bundle off it NOW, before teardown. + // Best-effort + non-blocking — the hook dispatches on its own + // goroutine + timeout, so a hung EIC tunnel on the dead box can't + // slow the sweep. Only fires on a real flip (affected==1), never + // on a race (affected==0) or a non-overdue row — guaranteeing it + // runs once per boot-failure verdict and never on a healthy row. + if BootFailureRescueHook != nil { + BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep") + } + // Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the // canvas event handler only flips node state on the _FAILED case. // A separate event type was considered but the UI reaction is diff --git a/workspace-server/internal/registry/provisiontimeout_rescue_test.go b/workspace-server/internal/registry/provisiontimeout_rescue_test.go new file mode 100644 index 000000000..283d244b8 --- /dev/null +++ b/workspace-server/internal/registry/provisiontimeout_rescue_test.go @@ -0,0 +1,130 @@ +package registry + +import ( + "context" + "sync" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "github.com/DATA-DOG/go-sqlmock" +) + +// rescueHookRecorder captures the args of every BootFailureRescueHook +// invocation so tests can assert the rescue capture fires exactly on the +// boot-failure verdict — and never on a healthy/raced row. +type rescueHookRecorder struct { + mu sync.Mutex + calls [][3]string // {workspaceID, instanceID, reason} +} + +func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) { + return func(workspaceID, instanceID, reason string) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason}) + } +} + +func (r *rescueHookRecorder) count() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.calls) +} + +// withRescueHook installs a recorder as the package-level +// BootFailureRescueHook for the test's duration. +func withRescueHook(t *testing.T) *rescueHookRecorder { + t.Helper() + rec := &rescueHookRecorder{} + prev := BootFailureRescueHook + BootFailureRescueHook = rec.hook() + t.Cleanup(func() { BootFailureRescueHook = prev }) + return rec +} + +// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742 +// assertion: when the sweep flips a stuck workspace to `failed`, the +// rescue hook fires once with the workspace + instance id and the +// provision_timeout_sweep reason, BEFORE teardown. +func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 1 { + t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count()) + } + got := rec.calls[0] + if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" { + t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got) + } +} + +// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to +// online/restart between SELECT and UPDATE. That is NOT a boot-failure +// verdict, so the rescue capture must NOT fire (we'd be snapshotting a +// healthy box that's about to come online). +func TestSweep_RescueDoesNotFireOnRace(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 0 { + t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count()) + } +} + +// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is +// never flipped, so the rescue capture must not fire. Guards against the +// hook being attached above the age gate. +func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + // hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip. + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660})) + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 0 { + t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count()) + } +} + +// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired +// (self-hosted / no rescue shipping), the sweep must still flip + emit +// without panicking on the nil hook. +func TestSweep_RescueNilHookIsSafe(t *testing.T) { + mock := setupTestDB(t) + prev := BootFailureRescueHook + BootFailureRescueHook = nil + t.Cleanup(func() { BootFailureRescueHook = prev }) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + + emit := &fakeEmitter{} + sweepStuckProvisioning(context.Background(), emit, nil) // must not panic + + if emit.count() != 1 { + t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count()) + } +} diff --git a/workspace-server/internal/registry/provisiontimeout_test.go b/workspace-server/internal/registry/provisiontimeout_test.go index 1c1517103..f314d05fd 100644 --- a/workspace-server/internal/registry/provisiontimeout_test.go +++ b/workspace-server/internal/registry/provisiontimeout_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/DATA-DOG/go-sqlmock" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "github.com/DATA-DOG/go-sqlmock" ) // fakeEmitter records every RecordAndBroadcast call so tests can assert @@ -42,12 +42,15 @@ func (f *fakeEmitter) count() int { return len(f.events) } -// candidateRows builds the new-shape query result (id, runtime, age_sec). -// Use this in every sweep test to match the runtime-aware SELECT. -func candidateRows(rows ...[3]any) *sqlmock.Rows { - r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"}) +// candidateRows builds the query result (id, runtime, instance_id, +// age_sec). instance_id was added for the RFC internal#742 rescue hook — +// it rides alongside runtime so the boot-failure capture can reach the +// still-running box. Tests that don't care about the rescue path pass +// "" for instance_id. Use this in every sweep test to match the SELECT. +func candidateRows(rows ...[4]any) *sqlmock.Rows { + r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"}) for _, row := range rows { - r = r.AddRow(row[0], row[1], row[2]) + r = r.AddRow(row[0], row[1], row[2], row[3]) } return r } @@ -58,8 +61,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) { mock := setupTestDB(t) // claude-code workspace, 700s old > 600s default timeout → flipped. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). @@ -92,8 +95,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) { // 11 min = 660 sec. < HermesProvisioningTimeout (1800s). // No UPDATE should fire — hermes still has time. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660})) emit := &fakeEmitter{} sweepStuckProvisioning(context.Background(), emit, nil) @@ -114,8 +117,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) { mock := setupTestDB(t) // 31 min = 1860 sec > HermesProvisioningTimeout (1800s). - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -150,8 +153,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) { func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660})) // No ExpectExec — if the sweeper still flips the row, sqlmock will // fail with an unexpected-query error. @@ -183,8 +186,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin mock := setupTestDB(t) // 21 min = 1260s > 1200s manifest override → flipped. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -221,8 +224,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin func TestSweepStuckProvisioning_RaceSafe(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). @@ -244,7 +247,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) { func TestSweepStuckProvisioning_NoStuck(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). WillReturnRows(candidateRows()) emit := &fakeEmitter{} @@ -265,10 +268,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) { func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). WillReturnRows(candidateRows( - [3]any{"ws-claude-code", "claude-code", 700}, - [3]any{"ws-hermes", "hermes", 1860}, + [4]any{"ws-claude-code", "claude-code", "i-cc", 700}, + [4]any{"ws-hermes", "hermes", "i-hh", 1860}, )) mock.ExpectExec(`UPDATE workspaces`). @@ -292,8 +295,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) { func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) diff --git a/workspace-server/internal/rescue/rescue.go b/workspace-server/internal/rescue/rescue.go new file mode 100644 index 000000000..38a40e87b --- /dev/null +++ b/workspace-server/internal/rescue/rescue.go @@ -0,0 +1,247 @@ +// Package rescue captures a fixed post-mortem "rescue bundle" off a +// workspace EC2 whose boot FAILED — before the platform's sweeper / +// control-plane reaps the instance — and ships it to obs/Loki so a +// wedged workspace (e.g. the codex provider-derivation failure that +// motivated RFC internal#742) is inspectable instead of an +// uninspectable wall. +// +// Design constraints (RFC internal#742, Part 2): +// +// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure +// semantics or add latency to the failure path. Callers fire +// Capture in its own goroutine; Capture additionally bounds itself +// with CaptureTimeout so a hung EIC tunnel can't wedge the +// goroutine forever. +// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are +// the provision-timeout sweep (registry.sweepStuckProvisioning) and +// the out-of-band bootstrap-watcher signal +// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown / +// deprovision / recreate / billing-suspend / hibernate paths do NOT +// call Capture — see the RFC's path enumeration. +// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is +// run through the injected Redact func (wired to the existing +// handlers.redactSecrets secret-scan) before it is shipped. Raw +// tokens/keys never reach Loki. +// +// The package is a LEAF: it imports only internal/audit (the obs +// shipper) so it can be called from both handlers and registry without +// an import cycle (registry must not import handlers). The two heavy +// dependencies — the EIC/SSH remote-command runner and the redactor — +// are injected as package-level func vars, wired once at boot from the +// handlers package (which owns withEICTunnel + redactSecrets). Tests +// swap them for fakes. +package rescue + +import ( + "context" + "fmt" + "log" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit" +) + +// CaptureTimeout bounds the whole bundle collection. The sweeper runs +// every 30s and the CP reap follows the failure verdict; 45s gives the +// EIC dance (~3-5s) plus six short remote commands (<2s each) generous +// headroom while still finishing well before the instance is torn down. +// Distinct from the per-op eicFileOpTimeout so a slow box that already +// failed to boot can't hang the capture goroutine indefinitely. +const CaptureTimeout = 45 * time.Second + +// LokiKind is the Loki stream label value that tags every rescue +// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels). +const LokiKind = "rescue" + +// RescueVolumeGrace is how long a boot-failed workspace's /configs data +// volume (and its still-running instance) must be RETAINED past the +// boot-failure verdict so a live rescue read is possible — distinct from +// the user-requested prune path (cp#415), which is an explicit erase. +// +// In molecule-core (the tenant platform) the boot-failure verdict only +// flips workspaces.status to `failed`; it never issues a terminate. The +// platform's two reapers (registry.StartCPOrphanSweeper + +// handlers deprovision) act ONLY on status='removed', so a `failed` +// workspace's instance + /configs volume are retained here by +// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The +// time-bounded reap of the failed instance is the control plane's +// bootstrap-watcher concern; this constant is the SSOT for the grace +// the CP must honour (24h covers an operator's next-business-day +// post-mortem without leaking the volume indefinitely). +const RescueVolumeGrace = 24 * time.Hour + +// rescueEventType is the audit event_type carried in the shipped +// record. The obs shipper (internal/audit) already maps event_type to a +// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream +// trivially filterable alongside the existing audit taxonomy. +const rescueEventType = "rescue.bundle" + +// RunRemote runs a single shell command on the still-running (but +// unconfigured) workspace EC2 over EIC/SSH and returns its combined +// output. Wired at boot to the handlers EIC runner +// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a +// logged no-op rather than panicking, so an operator who hasn't wired +// the hook still gets a clear signal instead of a crash on the failure +// path. +var RunRemote func(ctx context.Context, instanceID, command string) (string, error) + +// Redact scrubs secret-shaped substrings from a collected section +// before it leaves the box. Wired at boot to handlers.redactSecrets. +// nil until wired — Capture refuses to ship un-redacted content if the +// redactor is missing (fails closed: logs + aborts rather than leaking +// raw config). +var Redact func(workspaceID, content string) string + +// section is one labelled chunk of the rescue bundle: a human-readable +// name + the remote command that produces it. +type section struct { + name string + command string +} + +// bundleSections is the FIXED set collected on every boot-failure +// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading +// order: config first, then boot logs, then container state, then the +// resolved model/provider env that drove the codex derivation failure. +// +// - /configs/config.yaml + system-prompt.md: the managed config the +// runtime booted against (redacted; system-prompt can embed keys). +// - cloud-init-output.log tail: the user-data execution trace — where +// a wedged boot actually died. +// - docker ps -a: container state (did the agent container even +// start, exit-code, restart loop). +// - agent container logs: the runtime's own stderr (the codex +// provider-derivation panic lives here). +// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated +// the RFC. `sudo cat` of the container env via docker inspect-style +// grep — see the command. +// +// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu +// has passwordless sudo) and swallow missing-target stderr so a section +// that can't be produced ships as a short marker instead of failing the +// whole bundle. Kept as data (not inlined) so the redaction + ship loop +// is uniform and the set is reviewable in one place. +var bundleSections = []section{ + { + name: "config.yaml", + command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'", + }, + { + name: "system-prompt.md", + command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'", + }, + { + name: "cloud-init-output.log.tail", + command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'", + }, + { + name: "docker-ps", + command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'", + }, + { + // The agent container is the first non-infra container; grab the + // most recently created one and tail its logs. `head -1` of + // `docker ps -a -q` is creation-ordered newest-first, which is + // the agent runtime on a workspace box. + name: "agent-container.logs.tail", + command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'", + }, + { + // Resolved model/provider/runtime env from the agent container. + // `docker inspect` the env array and grep the routing keys. This + // is the field that pinpoints a provider-derivation failure. + name: "model-provider-runtime.env", + command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'", + }, +} + +// Input is the identity of the failed workspace being rescued. +type Input struct { + InstanceID string // EC2 instance id of the still-running failed box + WorkspaceID string + OrgID string + // Reason is a short tag for WHY the rescue fired (e.g. + // "provision_timeout_sweep" or "bootstrap_watcher") — carried into + // the Loki record so an operator can correlate the bundle with the + // failure verdict that triggered it. + Reason string +} + +// Capture collects the fixed rescue bundle off the failed instance, +// redacts each section, and ships it to Loki under +// {kind="rescue", org=, workspace_id=}. +// +// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single +// section that won't collect) is logged and does NOT propagate — Capture +// never returns an error and never panics, so the boot-failure handling +// at the call site is unaffected. The caller is expected to invoke this +// in its own goroutine; Capture additionally self-bounds with +// CaptureTimeout. +func Capture(ctx context.Context, in Input) { + defer func() { + // A logging helper on the failure path must never take the + // process down. Recover defensively — the redactor / shipper are + // injected and a future mis-wire shouldn't crash the sweeper. + if r := recover(); r != nil { + log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r) + } + }() + + if in.InstanceID == "" { + // No live box to read — nothing to rescue (e.g. failure before + // any EC2 was launched). Not an error; just skip. + log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID) + return + } + if RunRemote == nil { + log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID) + return + } + if Redact == nil { + // Fail CLOSED: without a redactor we could leak raw tokens to + // Loki. Abort rather than ship unredacted. + log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID) + return + } + + ctx, cancel := context.WithTimeout(ctx, CaptureTimeout) + defer cancel() + + log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason) + + collected := 0 + for _, sec := range bundleSections { + raw, err := RunRemote(ctx, in.InstanceID, sec.command) + if err != nil { + // One section failing (e.g. ssh blip mid-collection) must not + // abort the rest — ship a marker for it and continue. + log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err) + ship(ctx, in, sec.name, fmt.Sprintf("(rescue: section collection failed: %v)", err), false) + continue + } + redacted := Redact(in.WorkspaceID, raw) + ship(ctx, in, sec.name, redacted, true) + collected++ + } + + log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind) +} + +// ship emits one rescue section to Loki via the audit shipper. The +// org / workspace_id / kind ride in the record body (queryable via +// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality +// Loki label the shipper already promotes. `redacted` records whether +// the content passed through the secret-scan, so an operator can tell a +// shipped-but-redacted section from a collection-failure marker. +func ship(ctx context.Context, in Input, name, content string, redacted bool) { + audit.Emit(ctx, rescueEventType, map[string]any{ + "kind": LokiKind, + "org": in.OrgID, + "workspace_id": in.WorkspaceID, + "instance_id": in.InstanceID, + "reason": in.Reason, + "section": name, + "redacted": redacted, + "content": content, + }) +} diff --git a/workspace-server/internal/rescue/rescue_test.go b/workspace-server/internal/rescue/rescue_test.go new file mode 100644 index 000000000..1348d2303 --- /dev/null +++ b/workspace-server/internal/rescue/rescue_test.go @@ -0,0 +1,226 @@ +package rescue + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "strings" + "testing" +) + +// withFakes swaps the injected RunRemote + Redact for the duration of a +// test and restores them after. Mirrors the provisioner test-fake +// pattern (package-var swap + t.Cleanup). +func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) { + t.Helper() + prevRun, prevRedact := RunRemote, Redact + RunRemote = run + Redact = redact + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) +} + +// captureLoki points the audit shipper at a temp JSONL file and returns +// a reader that decodes the records the rescue ship() loop wrote. This +// is the same transport the production rescue stream uses (audit.Emit → +// Loki via the tenant Vector source), so asserting on it proves the +// shipper-reuse + labels end to end. +func captureLoki(t *testing.T) func() []map[string]any { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "audit.jsonl") + t.Setenv("MOLECULE_AUDIT_LOG_PATH", path) + return func() []map[string]any { + b, err := os.ReadFile(path) + if err != nil { + return nil + } + var out []map[string]any + for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") { + if line == "" { + continue + } + var rec map[string]any + if err := json.Unmarshal([]byte(line), &rec); err != nil { + t.Fatalf("bad audit jsonl line %q: %v", line, err) + } + out = append(out, rec) + } + return out + } +} + +func fields(rec map[string]any) map[string]any { + f, _ := rec["fields"].(map[string]any) + return f +} + +// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a +// boot-failure capture collects every fixed section, runs each through +// the redactor, and ships it to Loki under {kind="rescue", org, ws}. +func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) { + readLoki := captureLoki(t) + var seenCmds []string + withFakes(t, + func(_ context.Context, instanceID, cmd string) (string, error) { + seenCmds = append(seenCmds, cmd) + return "OUTPUT for " + instanceID, nil + }, + func(_ws, c string) string { return c }, // identity redactor + ) + + Capture(context.Background(), Input{ + InstanceID: "i-abc123", + WorkspaceID: "ws-1", + OrgID: "org-9", + Reason: "provision_timeout_sweep", + }) + + recs := readLoki() + if len(recs) != len(bundleSections) { + t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs)) + } + if len(seenCmds) != len(bundleSections) { + t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds)) + } + for _, rec := range recs { + if rec["event_type"] != rescueEventType { + t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType) + } + // workspace_id is promoted to the top-level record position by + // the audit shipper. + if rec["workspace_id"] != "ws-1" { + t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"]) + } + f := fields(rec) + if f["kind"] != LokiKind { + t.Errorf("kind = %v, want %q", f["kind"], LokiKind) + } + if f["org"] != "org-9" { + t.Errorf("org = %v, want org-9", f["org"]) + } + if f["instance_id"] != "i-abc123" { + t.Errorf("instance_id = %v, want i-abc123", f["instance_id"]) + } + if f["redacted"] != true { + t.Errorf("redacted = %v, want true for a collected section", f["redacted"]) + } + } +} + +// TestCapture_Redacts proves the bundle is scrubbed before it leaves the +// box: a remote section that contains a secret-shaped token must ship +// with the token replaced, never raw. +func TestCapture_Redacts(t *testing.T) { + readLoki := captureLoki(t) + const secret = "sk-ant-SUPERSECRETTOKENVALUE0001" + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { + return "ANTHROPIC_API_KEY=" + secret, nil + }, + // redactor that mangles anything containing the secret shape + func(_ws, c string) string { + if strings.Contains(c, secret) { + return strings.ReplaceAll(c, secret, "[REDACTED]") + } + return c + }, + ) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"}) + + for _, rec := range readLoki() { + content, _ := fields(rec)["content"].(string) + if strings.Contains(content, secret) { + t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content) + } + } +} + +// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has +// nothing to read — Capture must no-op (ship nothing) rather than dial a +// blank instance id. +func TestCapture_SkipsWhenNoInstance(t *testing.T) { + readLoki := captureLoki(t) + called := false + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil }, + func(_ws, c string) string { return c }, + ) + Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"}) + if called { + t.Error("RunRemote called for an empty instance id") + } + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records for an empty instance id, want 0", len(recs)) + } +} + +// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired, +// Capture must NOT ship anything (would leak raw config). Fail closed. +func TestCapture_FailsClosedWithoutRedactor(t *testing.T) { + readLoki := captureLoki(t) + prevRun, prevRedact := RunRemote, Redact + RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil } + Redact = nil + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"}) + + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs)) + } +} + +// TestCapture_SectionFailureIsIsolated: one section's RunRemote error +// must not abort the rest — the failing section ships a marker and the +// others still ship. +func TestCapture_SectionFailureIsIsolated(t *testing.T) { + readLoki := captureLoki(t) + withFakes(t, + func(_ context.Context, _ string, cmd string) (string, error) { + if strings.Contains(cmd, "config.yaml") { + return "", errors.New("ssh blip") + } + return "ok", nil + }, + func(_ws, c string) string { return c }, + ) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"}) + + recs := readLoki() + if len(recs) != len(bundleSections) { + t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs)) + } + var failureMarkers int + for _, rec := range recs { + if fields(rec)["redacted"] == false { + failureMarkers++ + content, _ := fields(rec)["content"].(string) + if !strings.Contains(content, "section collection failed") { + t.Errorf("failure marker content = %q, want a collection-failed marker", content) + } + } + } + if failureMarkers != 1 { + t.Errorf("want exactly 1 failure marker, got %d", failureMarkers) + } +} + +// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't +// called the boot wiring), Capture must be a logged no-op, never a panic. +func TestCapture_NoWiringIsSafeNoOp(t *testing.T) { + readLoki := captureLoki(t) + prevRun, prevRedact := RunRemote, Redact + RunRemote = nil + Redact = func(_ws, c string) string { return c } + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"}) + + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs)) + } +} -- 2.52.0