From 702ea5dc0911490b3d0c488e64612b9f39496a58 Mon Sep 17 00:00:00 2001 From: claude-ceo-assistant Date: Sun, 31 May 2026 01:59:20 -0700 Subject: [PATCH 1/3] feat(workspace-server): capture rescue bundle on workspace boot-failure (RFC internal#742 Part 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a workspace boot FAILS — the provision-timeout sweep flips it to `failed`, or the control plane's bootstrap-watcher POSTs bootstrap-failed — capture a fixed forensic "rescue bundle" off the still-running (but boot-failed) EC2 BEFORE the control plane reaps it, and ship it to obs/Loki. This makes a wedged workspace (e.g. the codex provider-derivation failure that motivated the RFC) post-mortem- inspectable instead of an uninspectable wall. What it collects (fixed set, redacted before anything leaves the box): /configs/config.yaml, /configs/system-prompt.md, tail -200 of cloud-init-output.log, `docker ps -a`, the agent container's `docker logs --tail 200`, and the resolved MODEL|PROVIDER|RUNTIME env. Every section is run through the existing SAFE-T1201 secret-scan (handlers.redactSecrets) before shipping — and fails CLOSED (ships nothing) if the redactor is unwired. Shipping reuses the existing obs shipper (internal/audit → Loki via the tenant Vector stdout source) with event_type="rescue.bundle" and kind="rescue" / org / workspace_id in the record body, queryable as `{kind="rescue"} | json`. Hook points (the two boot-failure VERDICT paths only — never normal teardown/deprovision/recreate/billing-suspend/hibernate): - registry.sweepStuckProvisioning: fires the injected registry.BootFailureRescueHook only on a real flip (affected==1), never on a race (affected==0) or a non-overdue row. - handlers.WorkspaceHandler.BootstrapFailed: fires captureRescueBundle only after the row is actually flipped to `failed`. Capture is best-effort + non-blocking: it runs in its own goroutine with its own 45s timeout, detached from the request/sweep context, so it can never change boot-failure semantics or add latency to the failure path. The leaf internal/rescue package injects the EIC/SSH runner + redactor as package vars (wired from handlers at init) so registry can call it without importing handlers (no import cycle) — mirroring the existing RuntimeTimeoutLookup injection pattern. Volume retention: in molecule-core the boot-failure verdict only flips status to `failed`; it never terminates. Both platform reapers (registry.StartCPOrphanSweeper + handlers deprovision) act ONLY on status='removed', so a `failed` workspace's instance + /configs data volume are RETAINED by construction through the rescue grace (rescue.RescueVolumeGrace = 24h, the SSOT the CP reaper must honour), distinct from the user-prune erase path. Added a regression test pinning the orphan-sweeper's status='removed' predicate so a future widening to `failed` (which would terminate boxes mid-rescue) fails the build. Tests: capture fires on boot-failure (not on healthy teardown/race), bundle redacts secrets + fails closed without a redactor, Loki push called with the right labels, volume retained on boot-failure. EIC/SSH + Loki + ec2 faked via package-var swaps (mirrors existing provisioner test fakes). Co-Authored-By: Claude Opus 4.8 (1M context) --- workspace-server/cmd/server/main.go | 11 + .../internal/handlers/rescue_wiring.go | 158 +++++++++++ .../internal/handlers/rescue_wiring_test.go | 119 +++++++++ .../internal/handlers/workspace_bootstrap.go | 12 + .../registry/cp_orphan_sweeper_test.go | 56 ++++ .../internal/registry/provisiontimeout.go | 41 ++- .../registry/provisiontimeout_rescue_test.go | 130 +++++++++ .../registry/provisiontimeout_test.go | 51 ++-- workspace-server/internal/rescue/rescue.go | 247 ++++++++++++++++++ .../internal/rescue/rescue_test.go | 226 ++++++++++++++++ 10 files changed, 1022 insertions(+), 29 deletions(-) create mode 100644 workspace-server/internal/handlers/rescue_wiring.go create mode 100644 workspace-server/internal/handlers/rescue_wiring_test.go create mode 100644 workspace-server/internal/registry/provisiontimeout_rescue_test.go create mode 100644 workspace-server/internal/rescue/rescue.go create mode 100644 workspace-server/internal/rescue/rescue_test.go diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index b79a61d6b..3a9b3d243 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -349,6 +349,17 @@ func main() { codexauth.StartCodexAuthRefresher(c, db.DB) }) + // RFC internal#742 Part 2: wire the boot-failure rescue capture into + // the provision-timeout sweep's failure verdict. When the sweep flips + // a stuck workspace to `failed`, this hook captures a forensic rescue + // bundle off the still-running (but boot-failed) EC2 and ships it to + // obs/Loki before the control plane reaps the instance. Best-effort + + // non-blocking (handlers.BootFailureRescueHook dispatches on its own + // goroutine + timeout). The handler-side boot-failure path + // (WorkspaceHandler.BootstrapFailed) wires its own capture inline. + registry.BootFailureRescueHook = handlers.BootFailureRescueHook + + // Provision-timeout sweep — flips workspaces that have been stuck in // status='provisioning' past the timeout window to 'failed' and emits // WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic diff --git a/workspace-server/internal/handlers/rescue_wiring.go b/workspace-server/internal/handlers/rescue_wiring.go new file mode 100644 index 000000000..08000ce7e --- /dev/null +++ b/workspace-server/internal/handlers/rescue_wiring.go @@ -0,0 +1,158 @@ +package handlers + +// rescue_wiring.go — bridges the leaf internal/rescue package to the +// handlers package's EIC/SSH runner + secret redactor, and exposes the +// boot-failure rescue hook used by both boot-failure verdict paths +// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via +// an injected hook wired in main.go). +// +// Why the indirection: internal/rescue is a leaf so registry (which +// must NOT import handlers — that's an import cycle) can call it. The +// two heavy dependencies live here in handlers — `withEICTunnel` +// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets` +// (the SAFE-T1201 secret-scan) — so we inject them into rescue's +// package-level func vars at init(). +// +// RFC internal#742 Part 2. + +import ( + "bytes" + "context" + "database/sql" + "fmt" + "os" + "os/exec" + "strings" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" +) + +func init() { + // Wire the leaf rescue package to handlers' EIC runner + redactor. + // Done in init() (not main.go) so the binding is present for any + // caller of rescue.Capture, including the registry sweeper hook and + // the handler path, without each call site re-wiring it. + rescue.RunRemote = rescueRunRemoteViaEIC + rescue.Redact = func(workspaceID, content string) string { + out, _ := redactSecrets(workspaceID, content) + return out + } +} + +// rescueRunRemoteViaEIC runs a single shell command on the still-running +// (but boot-failed) workspace EC2 over an EIC tunnel and returns its +// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the +// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel → +// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g. +// PR #2822's LogLevel=ERROR shim) for free. +// +// Combined output (2>&1) is intentional: a boot-failed box's most +// useful signal is often on stderr (a panic, a missing-file error), and +// the rescue bundle is a forensic blob, not a parsed value — we want +// everything the command emitted. +func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) { + var combined []byte + runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error { + sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...) + sshCmd.Env = os.Environ() + var buf bytes.Buffer + sshCmd.Stdout = &buf + sshCmd.Stderr = &buf + // A non-zero remote exit is NOT a transport error for the rescue + // path — each section command already falls back to an + // `|| echo '(...)'` marker, so a clean exit is expected. Only + // surface an error when ssh/tunnel itself failed AND produced no + // output to ship. + err := sshCmd.Run() + combined = buf.Bytes() + if err != nil && len(combined) == 0 { + return fmt.Errorf("rescue ssh exec: %w", err) + } + return nil + }) + if runErr != nil { + return "", runErr + } + return strings.TrimRight(string(combined), "\n"), nil +} + +// captureRescueBundle fires a best-effort, non-blocking rescue capture +// for a boot-failed workspace. It is the single entry point both +// boot-failure verdict paths funnel through. +// +// NON-BLOCKING: the actual collection runs in its own goroutine with +// its own timeout (rescue.CaptureTimeout), detached from the caller's +// request/sweep context so it can't add latency to — or be cancelled +// by — the failure-handling path that triggered it. We snapshot the +// identity into a fresh context.Background() for the same reason: a +// gin request context is cancelled the instant the HTTP handler +// returns, which would kill the EIC tunnel mid-collection. +// +// instanceID/orgID are resolved here (best-effort) so the two call +// sites only need the workspace id. A missing instance id → rescue.Capture +// no-ops (logged), so an early-failure workspace that never got an EC2 +// is handled cleanly. +func captureRescueBundle(workspaceID, reason string) { + rescueDispatch(func() { + ctx := context.Background() + instanceID, err := rescueResolveInstanceID(ctx, workspaceID) + if err != nil { + // Best-effort: a resolve failure is logged inside Capture's + // caller chain; pass empty so Capture no-ops cleanly. + instanceID = "" + } + rescue.Capture(ctx, rescue.Input{ + InstanceID: instanceID, + WorkspaceID: workspaceID, + OrgID: os.Getenv("MOLECULE_ORG_ID"), + Reason: reason, + }) + }) +} + +// rescueDispatch runs the rescue collection off the request path. In +// production it's `go fn()` so the capture never blocks or adds latency +// to the boot-failure handler. Tests swap it for a synchronous runner so +// they can assert the capture fired (or didn't) deterministically +// without racing the goroutine. +var rescueDispatch = func(fn func()) { go fn() } + +// BootFailureRescueHook is the registry-facing adapter wired into +// registry.BootFailureRescueHook from main.go. The registry sweeper +// already resolved the instance id (it's in the candidate row), so this +// path uses it directly rather than re-querying — symmetric with the +// captureRescueBundle handler path but skipping the lookup. +// +// Best-effort + non-blocking: dispatches the capture on its own +// goroutine with its own timeout, so the sweep loop is never slowed. +func BootFailureRescueHook(workspaceID, instanceID, reason string) { + go rescue.Capture(context.Background(), rescue.Input{ + InstanceID: instanceID, + WorkspaceID: workspaceID, + OrgID: os.Getenv("MOLECULE_ORG_ID"), + Reason: reason, + }) +} + +// rescueResolveInstanceID looks up the EC2 instance id for a workspace. +// Package var so tests can stub it without a sqlmock. Mirrors +// provisioner.resolveInstanceID (same query) but lives here to keep the +// rescue wiring self-contained and avoid widening the provisioner +// surface. +var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) { + if db.DB == nil { + return "", nil // nil in unit tests + } + var instanceID sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&instanceID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + if !instanceID.Valid { + return "", nil + } + return instanceID.String, nil +} diff --git a/workspace-server/internal/handlers/rescue_wiring_test.go b/workspace-server/internal/handlers/rescue_wiring_test.go new file mode 100644 index 000000000..61b230b57 --- /dev/null +++ b/workspace-server/internal/handlers/rescue_wiring_test.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "bytes" + "context" + "net/http" + "net/http/httptest" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// rescueTestHarness makes the otherwise-async rescue capture +// deterministic + observable for handler tests: +// - rescueDispatch runs synchronously (no goroutine race). +// - rescueResolveInstanceID returns a fixed instance id. +// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH +// fires; runCalls counts how many remote-command collections ran, +// which is the proxy for "did the capture fire". +// +// All originals are restored on cleanup. +func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) { + t.Helper() + n := 0 + runCalls = &n + + prevDispatch := rescueDispatch + rescueDispatch = func(fn func()) { fn() } // synchronous + prevResolve := rescueResolveInstanceID + rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil } + prevRun, prevRedact := rescue.RunRemote, rescue.Redact + rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil } + rescue.Redact = func(_ws, c string) string { return c } + + t.Cleanup(func() { + rescueDispatch = prevDispatch + rescueResolveInstanceID = prevResolve + rescue.RunRemote = prevRun + rescue.Redact = prevRedact + }) + return runCalls +} + +// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler +// hook: when BootstrapFailed actually flips a workspace to `failed` +// (affected==1), the rescue capture fires against the resolved instance. +func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) { + h, mock := setupBootstrapHandler(t) + runCalls := rescueTestHarness(t, "i-failed01") + + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO structure_events`). + WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}} + c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed", + bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + h.BootstrapFailed(c) + + if w.Code != http.StatusOK { + t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String()) + } + if *runCalls != len(rescueBundleSectionCount()) { + t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount())) + } +} + +// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned +// workspace (affected==0: raced to online, or double-report) is NOT a +// boot-failure verdict here, so the rescue capture must NOT fire. +func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) { + h, mock := setupBootstrapHandler(t) + runCalls := rescueTestHarness(t, "i-online01") + + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-online"}} + c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed", + bytes.NewBufferString(`{"error":"late report","log_tail":""}`)) + c.Request.Header.Set("Content-Type", "application/json") + + h.BootstrapFailed(c) + + if w.Code != http.StatusOK { + t.Fatalf("want 200, got %d", w.Code) + } + if *runCalls != 0 { + t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls) + } +} + +// rescueBundleSectionCount returns the production rescue bundle section +// list length by running a capture against a counting runner once. It's +// a small indirection so the handler test stays decoupled from the exact +// section set in internal/rescue (which has its own tests). +func rescueBundleSectionCount() []struct{} { + count := 0 + prevRun, prevRedact := rescue.RunRemote, rescue.Redact + rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil } + rescue.Redact = func(_ws, c string) string { return c } + rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"}) + rescue.RunRemote = prevRun + rescue.Redact = prevRedact + return make([]struct{}, count) +} diff --git a/workspace-server/internal/handlers/workspace_bootstrap.go b/workspace-server/internal/handlers/workspace_bootstrap.go index d70d79359..3642d4559 100644 --- a/workspace-server/internal/handlers/workspace_bootstrap.go +++ b/workspace-server/internal/handlers/workspace_bootstrap.go @@ -91,6 +91,18 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) { "log_tail": tail, "source": "bootstrap_watcher", }) + + // RFC internal#742 Part 2: this is one of the two boot-failure + // verdict points. We've just flipped a still-running (but + // unconfigured) workspace EC2 to `failed`; the control plane will + // reap the instance shortly. Capture a forensic rescue bundle off + // the live box NOW, before it's torn down, so a wedged workspace is + // post-mortem-inspectable. Best-effort + non-blocking: runs in its + // own goroutine with its own timeout, detached from this request's + // context (which is cancelled the instant this handler returns). + // Failure to capture never changes the boot-failure handling. + captureRescueBundle(id, "bootstrap_watcher") + log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg) c.JSON(http.StatusOK, gin.H{"ok": true}) } diff --git a/workspace-server/internal/registry/cp_orphan_sweeper_test.go b/workspace-server/internal/registry/cp_orphan_sweeper_test.go index ae45b2276..22765090c 100644 --- a/workspace-server/internal/registry/cp_orphan_sweeper_test.go +++ b/workspace-server/internal/registry/cp_orphan_sweeper_test.go @@ -10,8 +10,15 @@ import ( "github.com/DATA-DOG/go-sqlmock" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" ) +// rescueVolumeGraceHours surfaces the rescue grace as whole hours for +// the retention-contract assertion (RFC internal#742 Part 2). +func rescueVolumeGraceHours() int { + return int(rescue.RescueVolumeGrace.Hours()) +} + // fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode // sweeper tests. Records every Stop call so tests can assert which // workspace IDs were re-issued. @@ -97,6 +104,55 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) { } } +// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2 +// volume-retention guarantee, molecule-core side. +// +// A boot-FAILED workspace (status='failed') must NOT be terminated by +// the platform's orphan sweeper: its instance + /configs data volume are +// retained through the rescue grace (rescue.RescueVolumeGrace) so a live +// rescue read is possible, distinct from the user-prune erase path. The +// sweeper reaps ONLY status='removed' (the explicit deprovision path), +// so a `failed` row is structurally excluded at the SELECT — it never +// reaches reaper.Stop. We assert the predicate filters to 'removed' +// (so the failed instance survives) and that no Stop fires for a DB +// whose only orphan-shaped row is `failed`. +// +// This is the "if the sweeper already keeps volumes by default, confirm +// + add a test asserting it" branch of the RFC: it does, by construction. +func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) { + mock := setupTestDB(t) + reaper := &fakeCPReaper{} + + // The sweeper's SELECT carries `status = 'removed'`. A boot-failed + // workspace (status='failed') does not match that predicate, so the + // real DB returns it nowhere in this result set — modelled as the + // empty result the `removed`-only filter produces when the only + // instance-bearing row is `failed`. The regex pins the retention- + // critical predicate so a future widening to include 'failed' (which + // would terminate boot-failed boxes mid-rescue) fails this test. + mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`). + WithArgs(cpSweepLimit). + WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate + + cpSweepOnce(context.Background(), reaper) + + if len(reaper.stopCalls) != 0 { + t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue +// grace is its own contract (24h) and not coupled to any prune timing — +// the value is the SSOT the control-plane reaper must honour. +func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) { + if rescueVolumeGraceHours() != 24 { + t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours()) + } +} + // TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd // independently; one failure doesn't block others. func TestCPSweepOnce_MultipleOrphans(t *testing.T) { diff --git a/workspace-server/internal/registry/provisiontimeout.go b/workspace-server/internal/registry/provisiontimeout.go index 0b863c35f..41c489da1 100644 --- a/workspace-server/internal/registry/provisiontimeout.go +++ b/workspace-server/internal/registry/provisiontimeout.go @@ -92,6 +92,23 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du return DefaultProvisioningTimeout } +// BootFailureRescueHook, when wired, is invoked once per workspace the +// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure +// verdict, BEFORE the control plane reaps the instance. It captures a +// forensic rescue bundle off the still-running (but boot-failed) EC2 and +// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to +// handlers.captureRescueBundle via a thin adapter; nil in tests + on +// self-hosted deploys (no rescue shipping there). +// +// Function-typed injection (not an import of handlers) keeps the +// existing handlers→registry import direction intact — registry must not +// import handlers. +// +// MUST be best-effort + non-blocking: the hook itself dispatches the +// capture on its own goroutine with its own timeout, so the sweep loop +// is never slowed or blocked by a hung EIC tunnel on the dead box. +var BootFailureRescueHook func(workspaceID, instanceID, reason string) + // StartProvisioningTimeoutSweep periodically scans for workspaces stuck in // `status='provisioning'` past the timeout window, flips them to `failed`, // and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can @@ -144,7 +161,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter // flight, not historical) and the partial index on status keeps // it fast. rows, err := db.DB.QueryContext(ctx, ` - SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int + SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int FROM workspaces WHERE status = 'provisioning' `) @@ -155,14 +172,15 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter defer rows.Close() type candidate struct { - id string - runtime string - ageSec int + id string + runtime string + instanceID string + ageSec int } var ids []candidate for rows.Next() { var c candidate - if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil { + if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil { ids = append(ids, c) } } @@ -200,6 +218,19 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter continue } log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout) + + // RFC internal#742 Part 2: this flip is a boot-failure verdict. + // The instance is still running (the CP reaps it shortly after); + // capture a forensic rescue bundle off it NOW, before teardown. + // Best-effort + non-blocking — the hook dispatches on its own + // goroutine + timeout, so a hung EIC tunnel on the dead box can't + // slow the sweep. Only fires on a real flip (affected==1), never + // on a race (affected==0) or a non-overdue row — guaranteeing it + // runs once per boot-failure verdict and never on a healthy row. + if BootFailureRescueHook != nil { + BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep") + } + // Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the // canvas event handler only flips node state on the _FAILED case. // A separate event type was considered but the UI reaction is diff --git a/workspace-server/internal/registry/provisiontimeout_rescue_test.go b/workspace-server/internal/registry/provisiontimeout_rescue_test.go new file mode 100644 index 000000000..283d244b8 --- /dev/null +++ b/workspace-server/internal/registry/provisiontimeout_rescue_test.go @@ -0,0 +1,130 @@ +package registry + +import ( + "context" + "sync" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "github.com/DATA-DOG/go-sqlmock" +) + +// rescueHookRecorder captures the args of every BootFailureRescueHook +// invocation so tests can assert the rescue capture fires exactly on the +// boot-failure verdict — and never on a healthy/raced row. +type rescueHookRecorder struct { + mu sync.Mutex + calls [][3]string // {workspaceID, instanceID, reason} +} + +func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) { + return func(workspaceID, instanceID, reason string) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason}) + } +} + +func (r *rescueHookRecorder) count() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.calls) +} + +// withRescueHook installs a recorder as the package-level +// BootFailureRescueHook for the test's duration. +func withRescueHook(t *testing.T) *rescueHookRecorder { + t.Helper() + rec := &rescueHookRecorder{} + prev := BootFailureRescueHook + BootFailureRescueHook = rec.hook() + t.Cleanup(func() { BootFailureRescueHook = prev }) + return rec +} + +// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742 +// assertion: when the sweep flips a stuck workspace to `failed`, the +// rescue hook fires once with the workspace + instance id and the +// provision_timeout_sweep reason, BEFORE teardown. +func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 1 { + t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count()) + } + got := rec.calls[0] + if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" { + t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got) + } +} + +// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to +// online/restart between SELECT and UPDATE. That is NOT a boot-failure +// verdict, so the rescue capture must NOT fire (we'd be snapshotting a +// healthy box that's about to come online). +func TestSweep_RescueDoesNotFireOnRace(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 0 { + t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count()) + } +} + +// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is +// never flipped, so the rescue capture must not fire. Guards against the +// hook being attached above the age gate. +func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) { + mock := setupTestDB(t) + rec := withRescueHook(t) + + // hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip. + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660})) + + sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil) + + if rec.count() != 0 { + t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count()) + } +} + +// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired +// (self-hosted / no rescue shipping), the sweep must still flip + emit +// without panicking on the nil hook. +func TestSweep_RescueNilHookIsSafe(t *testing.T) { + mock := setupTestDB(t) + prev := BootFailureRescueHook + BootFailureRescueHook = nil + t.Cleanup(func() { BootFailureRescueHook = prev }) + + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700})) + mock.ExpectExec(`UPDATE workspaces`). + WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). + WillReturnResult(sqlmock.NewResult(0, 1)) + + emit := &fakeEmitter{} + sweepStuckProvisioning(context.Background(), emit, nil) // must not panic + + if emit.count() != 1 { + t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count()) + } +} diff --git a/workspace-server/internal/registry/provisiontimeout_test.go b/workspace-server/internal/registry/provisiontimeout_test.go index 1c1517103..f314d05fd 100644 --- a/workspace-server/internal/registry/provisiontimeout_test.go +++ b/workspace-server/internal/registry/provisiontimeout_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" - "github.com/DATA-DOG/go-sqlmock" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models" + "github.com/DATA-DOG/go-sqlmock" ) // fakeEmitter records every RecordAndBroadcast call so tests can assert @@ -42,12 +42,15 @@ func (f *fakeEmitter) count() int { return len(f.events) } -// candidateRows builds the new-shape query result (id, runtime, age_sec). -// Use this in every sweep test to match the runtime-aware SELECT. -func candidateRows(rows ...[3]any) *sqlmock.Rows { - r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"}) +// candidateRows builds the query result (id, runtime, instance_id, +// age_sec). instance_id was added for the RFC internal#742 rescue hook — +// it rides alongside runtime so the boot-failure capture can reach the +// still-running box. Tests that don't care about the rescue path pass +// "" for instance_id. Use this in every sweep test to match the SELECT. +func candidateRows(rows ...[4]any) *sqlmock.Rows { + r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"}) for _, row := range rows { - r = r.AddRow(row[0], row[1], row[2]) + r = r.AddRow(row[0], row[1], row[2], row[3]) } return r } @@ -58,8 +61,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) { mock := setupTestDB(t) // claude-code workspace, 700s old > 600s default timeout → flipped. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). @@ -92,8 +95,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) { // 11 min = 660 sec. < HermesProvisioningTimeout (1800s). // No UPDATE should fire — hermes still has time. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660})) emit := &fakeEmitter{} sweepStuckProvisioning(context.Background(), emit, nil) @@ -114,8 +117,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) { mock := setupTestDB(t) // 31 min = 1860 sec > HermesProvisioningTimeout (1800s). - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -150,8 +153,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) { func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660})) // No ExpectExec — if the sweeper still flips the row, sqlmock will // fail with an unexpected-query error. @@ -183,8 +186,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin mock := setupTestDB(t) // 21 min = 1260s > 1200s manifest override → flipped. - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -221,8 +224,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin func TestSweepStuckProvisioning_RaceSafe(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). @@ -244,7 +247,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) { func TestSweepStuckProvisioning_NoStuck(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). WillReturnRows(candidateRows()) emit := &fakeEmitter{} @@ -265,10 +268,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) { func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). WillReturnRows(candidateRows( - [3]any{"ws-claude-code", "claude-code", 700}, - [3]any{"ws-hermes", "hermes", 1860}, + [4]any{"ws-claude-code", "claude-code", "i-cc", 700}, + [4]any{"ws-hermes", "hermes", "i-hh", 1860}, )) mock.ExpectExec(`UPDATE workspaces`). @@ -292,8 +295,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) { func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) { mock := setupTestDB(t) - mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`). - WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700})) + mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`). + WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700})) mock.ExpectExec(`UPDATE workspaces`). WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed). WillReturnResult(sqlmock.NewResult(0, 1)) diff --git a/workspace-server/internal/rescue/rescue.go b/workspace-server/internal/rescue/rescue.go new file mode 100644 index 000000000..38a40e87b --- /dev/null +++ b/workspace-server/internal/rescue/rescue.go @@ -0,0 +1,247 @@ +// Package rescue captures a fixed post-mortem "rescue bundle" off a +// workspace EC2 whose boot FAILED — before the platform's sweeper / +// control-plane reaps the instance — and ships it to obs/Loki so a +// wedged workspace (e.g. the codex provider-derivation failure that +// motivated RFC internal#742) is inspectable instead of an +// uninspectable wall. +// +// Design constraints (RFC internal#742, Part 2): +// +// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure +// semantics or add latency to the failure path. Callers fire +// Capture in its own goroutine; Capture additionally bounds itself +// with CaptureTimeout so a hung EIC tunnel can't wedge the +// goroutine forever. +// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are +// the provision-timeout sweep (registry.sweepStuckProvisioning) and +// the out-of-band bootstrap-watcher signal +// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown / +// deprovision / recreate / billing-suspend / hibernate paths do NOT +// call Capture — see the RFC's path enumeration. +// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is +// run through the injected Redact func (wired to the existing +// handlers.redactSecrets secret-scan) before it is shipped. Raw +// tokens/keys never reach Loki. +// +// The package is a LEAF: it imports only internal/audit (the obs +// shipper) so it can be called from both handlers and registry without +// an import cycle (registry must not import handlers). The two heavy +// dependencies — the EIC/SSH remote-command runner and the redactor — +// are injected as package-level func vars, wired once at boot from the +// handlers package (which owns withEICTunnel + redactSecrets). Tests +// swap them for fakes. +package rescue + +import ( + "context" + "fmt" + "log" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit" +) + +// CaptureTimeout bounds the whole bundle collection. The sweeper runs +// every 30s and the CP reap follows the failure verdict; 45s gives the +// EIC dance (~3-5s) plus six short remote commands (<2s each) generous +// headroom while still finishing well before the instance is torn down. +// Distinct from the per-op eicFileOpTimeout so a slow box that already +// failed to boot can't hang the capture goroutine indefinitely. +const CaptureTimeout = 45 * time.Second + +// LokiKind is the Loki stream label value that tags every rescue +// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels). +const LokiKind = "rescue" + +// RescueVolumeGrace is how long a boot-failed workspace's /configs data +// volume (and its still-running instance) must be RETAINED past the +// boot-failure verdict so a live rescue read is possible — distinct from +// the user-requested prune path (cp#415), which is an explicit erase. +// +// In molecule-core (the tenant platform) the boot-failure verdict only +// flips workspaces.status to `failed`; it never issues a terminate. The +// platform's two reapers (registry.StartCPOrphanSweeper + +// handlers deprovision) act ONLY on status='removed', so a `failed` +// workspace's instance + /configs volume are retained here by +// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The +// time-bounded reap of the failed instance is the control plane's +// bootstrap-watcher concern; this constant is the SSOT for the grace +// the CP must honour (24h covers an operator's next-business-day +// post-mortem without leaking the volume indefinitely). +const RescueVolumeGrace = 24 * time.Hour + +// rescueEventType is the audit event_type carried in the shipped +// record. The obs shipper (internal/audit) already maps event_type to a +// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream +// trivially filterable alongside the existing audit taxonomy. +const rescueEventType = "rescue.bundle" + +// RunRemote runs a single shell command on the still-running (but +// unconfigured) workspace EC2 over EIC/SSH and returns its combined +// output. Wired at boot to the handlers EIC runner +// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a +// logged no-op rather than panicking, so an operator who hasn't wired +// the hook still gets a clear signal instead of a crash on the failure +// path. +var RunRemote func(ctx context.Context, instanceID, command string) (string, error) + +// Redact scrubs secret-shaped substrings from a collected section +// before it leaves the box. Wired at boot to handlers.redactSecrets. +// nil until wired — Capture refuses to ship un-redacted content if the +// redactor is missing (fails closed: logs + aborts rather than leaking +// raw config). +var Redact func(workspaceID, content string) string + +// section is one labelled chunk of the rescue bundle: a human-readable +// name + the remote command that produces it. +type section struct { + name string + command string +} + +// bundleSections is the FIXED set collected on every boot-failure +// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading +// order: config first, then boot logs, then container state, then the +// resolved model/provider env that drove the codex derivation failure. +// +// - /configs/config.yaml + system-prompt.md: the managed config the +// runtime booted against (redacted; system-prompt can embed keys). +// - cloud-init-output.log tail: the user-data execution trace — where +// a wedged boot actually died. +// - docker ps -a: container state (did the agent container even +// start, exit-code, restart loop). +// - agent container logs: the runtime's own stderr (the codex +// provider-derivation panic lives here). +// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated +// the RFC. `sudo cat` of the container env via docker inspect-style +// grep — see the command. +// +// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu +// has passwordless sudo) and swallow missing-target stderr so a section +// that can't be produced ships as a short marker instead of failing the +// whole bundle. Kept as data (not inlined) so the redaction + ship loop +// is uniform and the set is reviewable in one place. +var bundleSections = []section{ + { + name: "config.yaml", + command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'", + }, + { + name: "system-prompt.md", + command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'", + }, + { + name: "cloud-init-output.log.tail", + command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'", + }, + { + name: "docker-ps", + command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'", + }, + { + // The agent container is the first non-infra container; grab the + // most recently created one and tail its logs. `head -1` of + // `docker ps -a -q` is creation-ordered newest-first, which is + // the agent runtime on a workspace box. + name: "agent-container.logs.tail", + command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'", + }, + { + // Resolved model/provider/runtime env from the agent container. + // `docker inspect` the env array and grep the routing keys. This + // is the field that pinpoints a provider-derivation failure. + name: "model-provider-runtime.env", + command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'", + }, +} + +// Input is the identity of the failed workspace being rescued. +type Input struct { + InstanceID string // EC2 instance id of the still-running failed box + WorkspaceID string + OrgID string + // Reason is a short tag for WHY the rescue fired (e.g. + // "provision_timeout_sweep" or "bootstrap_watcher") — carried into + // the Loki record so an operator can correlate the bundle with the + // failure verdict that triggered it. + Reason string +} + +// Capture collects the fixed rescue bundle off the failed instance, +// redacts each section, and ships it to Loki under +// {kind="rescue", org=, workspace_id=}. +// +// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single +// section that won't collect) is logged and does NOT propagate — Capture +// never returns an error and never panics, so the boot-failure handling +// at the call site is unaffected. The caller is expected to invoke this +// in its own goroutine; Capture additionally self-bounds with +// CaptureTimeout. +func Capture(ctx context.Context, in Input) { + defer func() { + // A logging helper on the failure path must never take the + // process down. Recover defensively — the redactor / shipper are + // injected and a future mis-wire shouldn't crash the sweeper. + if r := recover(); r != nil { + log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r) + } + }() + + if in.InstanceID == "" { + // No live box to read — nothing to rescue (e.g. failure before + // any EC2 was launched). Not an error; just skip. + log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID) + return + } + if RunRemote == nil { + log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID) + return + } + if Redact == nil { + // Fail CLOSED: without a redactor we could leak raw tokens to + // Loki. Abort rather than ship unredacted. + log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID) + return + } + + ctx, cancel := context.WithTimeout(ctx, CaptureTimeout) + defer cancel() + + log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason) + + collected := 0 + for _, sec := range bundleSections { + raw, err := RunRemote(ctx, in.InstanceID, sec.command) + if err != nil { + // One section failing (e.g. ssh blip mid-collection) must not + // abort the rest — ship a marker for it and continue. + log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err) + ship(ctx, in, sec.name, fmt.Sprintf("(rescue: section collection failed: %v)", err), false) + continue + } + redacted := Redact(in.WorkspaceID, raw) + ship(ctx, in, sec.name, redacted, true) + collected++ + } + + log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind) +} + +// ship emits one rescue section to Loki via the audit shipper. The +// org / workspace_id / kind ride in the record body (queryable via +// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality +// Loki label the shipper already promotes. `redacted` records whether +// the content passed through the secret-scan, so an operator can tell a +// shipped-but-redacted section from a collection-failure marker. +func ship(ctx context.Context, in Input, name, content string, redacted bool) { + audit.Emit(ctx, rescueEventType, map[string]any{ + "kind": LokiKind, + "org": in.OrgID, + "workspace_id": in.WorkspaceID, + "instance_id": in.InstanceID, + "reason": in.Reason, + "section": name, + "redacted": redacted, + "content": content, + }) +} diff --git a/workspace-server/internal/rescue/rescue_test.go b/workspace-server/internal/rescue/rescue_test.go new file mode 100644 index 000000000..1348d2303 --- /dev/null +++ b/workspace-server/internal/rescue/rescue_test.go @@ -0,0 +1,226 @@ +package rescue + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "strings" + "testing" +) + +// withFakes swaps the injected RunRemote + Redact for the duration of a +// test and restores them after. Mirrors the provisioner test-fake +// pattern (package-var swap + t.Cleanup). +func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) { + t.Helper() + prevRun, prevRedact := RunRemote, Redact + RunRemote = run + Redact = redact + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) +} + +// captureLoki points the audit shipper at a temp JSONL file and returns +// a reader that decodes the records the rescue ship() loop wrote. This +// is the same transport the production rescue stream uses (audit.Emit → +// Loki via the tenant Vector source), so asserting on it proves the +// shipper-reuse + labels end to end. +func captureLoki(t *testing.T) func() []map[string]any { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "audit.jsonl") + t.Setenv("MOLECULE_AUDIT_LOG_PATH", path) + return func() []map[string]any { + b, err := os.ReadFile(path) + if err != nil { + return nil + } + var out []map[string]any + for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") { + if line == "" { + continue + } + var rec map[string]any + if err := json.Unmarshal([]byte(line), &rec); err != nil { + t.Fatalf("bad audit jsonl line %q: %v", line, err) + } + out = append(out, rec) + } + return out + } +} + +func fields(rec map[string]any) map[string]any { + f, _ := rec["fields"].(map[string]any) + return f +} + +// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a +// boot-failure capture collects every fixed section, runs each through +// the redactor, and ships it to Loki under {kind="rescue", org, ws}. +func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) { + readLoki := captureLoki(t) + var seenCmds []string + withFakes(t, + func(_ context.Context, instanceID, cmd string) (string, error) { + seenCmds = append(seenCmds, cmd) + return "OUTPUT for " + instanceID, nil + }, + func(_ws, c string) string { return c }, // identity redactor + ) + + Capture(context.Background(), Input{ + InstanceID: "i-abc123", + WorkspaceID: "ws-1", + OrgID: "org-9", + Reason: "provision_timeout_sweep", + }) + + recs := readLoki() + if len(recs) != len(bundleSections) { + t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs)) + } + if len(seenCmds) != len(bundleSections) { + t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds)) + } + for _, rec := range recs { + if rec["event_type"] != rescueEventType { + t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType) + } + // workspace_id is promoted to the top-level record position by + // the audit shipper. + if rec["workspace_id"] != "ws-1" { + t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"]) + } + f := fields(rec) + if f["kind"] != LokiKind { + t.Errorf("kind = %v, want %q", f["kind"], LokiKind) + } + if f["org"] != "org-9" { + t.Errorf("org = %v, want org-9", f["org"]) + } + if f["instance_id"] != "i-abc123" { + t.Errorf("instance_id = %v, want i-abc123", f["instance_id"]) + } + if f["redacted"] != true { + t.Errorf("redacted = %v, want true for a collected section", f["redacted"]) + } + } +} + +// TestCapture_Redacts proves the bundle is scrubbed before it leaves the +// box: a remote section that contains a secret-shaped token must ship +// with the token replaced, never raw. +func TestCapture_Redacts(t *testing.T) { + readLoki := captureLoki(t) + const secret = "sk-ant-SUPERSECRETTOKENVALUE0001" + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { + return "ANTHROPIC_API_KEY=" + secret, nil + }, + // redactor that mangles anything containing the secret shape + func(_ws, c string) string { + if strings.Contains(c, secret) { + return strings.ReplaceAll(c, secret, "[REDACTED]") + } + return c + }, + ) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"}) + + for _, rec := range readLoki() { + content, _ := fields(rec)["content"].(string) + if strings.Contains(content, secret) { + t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content) + } + } +} + +// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has +// nothing to read — Capture must no-op (ship nothing) rather than dial a +// blank instance id. +func TestCapture_SkipsWhenNoInstance(t *testing.T) { + readLoki := captureLoki(t) + called := false + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil }, + func(_ws, c string) string { return c }, + ) + Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"}) + if called { + t.Error("RunRemote called for an empty instance id") + } + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records for an empty instance id, want 0", len(recs)) + } +} + +// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired, +// Capture must NOT ship anything (would leak raw config). Fail closed. +func TestCapture_FailsClosedWithoutRedactor(t *testing.T) { + readLoki := captureLoki(t) + prevRun, prevRedact := RunRemote, Redact + RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil } + Redact = nil + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"}) + + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs)) + } +} + +// TestCapture_SectionFailureIsIsolated: one section's RunRemote error +// must not abort the rest — the failing section ships a marker and the +// others still ship. +func TestCapture_SectionFailureIsIsolated(t *testing.T) { + readLoki := captureLoki(t) + withFakes(t, + func(_ context.Context, _ string, cmd string) (string, error) { + if strings.Contains(cmd, "config.yaml") { + return "", errors.New("ssh blip") + } + return "ok", nil + }, + func(_ws, c string) string { return c }, + ) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"}) + + recs := readLoki() + if len(recs) != len(bundleSections) { + t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs)) + } + var failureMarkers int + for _, rec := range recs { + if fields(rec)["redacted"] == false { + failureMarkers++ + content, _ := fields(rec)["content"].(string) + if !strings.Contains(content, "section collection failed") { + t.Errorf("failure marker content = %q, want a collection-failed marker", content) + } + } + } + if failureMarkers != 1 { + t.Errorf("want exactly 1 failure marker, got %d", failureMarkers) + } +} + +// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't +// called the boot wiring), Capture must be a logged no-op, never a panic. +func TestCapture_NoWiringIsSafeNoOp(t *testing.T) { + readLoki := captureLoki(t) + prevRun, prevRedact := RunRemote, Redact + RunRemote = nil + Redact = func(_ws, c string) string { return c } + t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"}) + + if recs := readLoki(); len(recs) != 0 { + t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs)) + } +} -- 2.52.0 From 007dabd29bcfa456db5a0cf35f07032069ef0ffb Mon Sep 17 00:00:00 2001 From: claude-ceo-assistant Date: Sun, 31 May 2026 02:13:15 -0700 Subject: [PATCH 2/3] feat(workspace-server): rescue read endpoint GET /workspaces/:id/rescue (RFC internal#742 Part 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serve the latest post-mortem rescue bundle for a boot-failed/terminated workspace so "why won't my agent boot" is answerable WITHOUT a live instance. Powers the future canvas "Why did this fail?" panel. Read-path decision (the key reviewer item): Part 2 (feat/rfc742-rescue-capture) ships the bundle via internal/audit (audit.Emit), which is stdout->Vector->Loki + a best-effort local JSONL on the tenant container's EPHEMERAL rootfs — it does NOT persist to a queryable DB table. Serving the read from Loki would require giving the tenant process a Loki query client + obs read creds it deliberately must not have. So this PR ADDS a minimal, per-tenant `rescue_bundles` table + migration and persists the already-redacted bundle on capture, then reads the latest row. No Loki-query creds added to the tenant. What's added: - migration 20260531000000_rescue_bundles (table + (workspace_id, captured_at DESC, id DESC) index). Idempotent CREATE ... IF NOT EXISTS; unique prefix, no collision. - internal/rescue: Bundle/Section types + an injected PersistBundle package var (leaf-safe, same pattern as RunRemote/Redact). Capture now accumulates the redacted sections and persists ONE bundle row after the per-section Loki ship — Loki behavior unchanged; persist is best-effort + never disturbs the boot-failure path. - internal/rescuestore: queryable store (Persist + GetLatest), org scoped via `($2 = '' OR org_id = $2)`, per-section 64KiB clamp. - handlers.RescueReadHandler: GET /workspaces/:id/rescue. 200 latest / 404 none / 503 store fault. Sections returned verbatim (already redacted at capture; never re-shipped). Response section count bounded. - route registered on the WorkspaceAuth-guarded /workspaces/:id group, next to /files/* and /exec. Org isolation = TenantGuard (routing) + WorkspaceAuth (token bound to :id) + the store's MOLECULE_ORG_ID filter, so a sibling org cannot read another org's bundle. Tests (fake the store; sqlmock for the Postgres store): returns latest, 404 when none, org-scoping (sibling org -> 404), 503 on store error, shape/redaction-preserved, section bound; capture persists exactly once with redacted content, persist failure is swallowed, no-store-wired still ships to Loki. Dependency / merge order: branched from feat/rfc742-rescue-capture (Part 2) because Capture's persist hook is extended here. Part 2 must merge first (or be merged together) — this PR's rescue.go changes build on Part 2's rescue package. go build / go test / -tags=integration all green. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../internal/handlers/rescue_read.go | 161 ++++++++++++ .../internal/handlers/rescue_read_test.go | 234 ++++++++++++++++++ .../internal/handlers/rescue_wiring.go | 10 + workspace-server/internal/rescue/rescue.go | 76 +++++- .../internal/rescue/rescue_persist_test.go | 136 ++++++++++ .../internal/rescuestore/store.go | 155 ++++++++++++ .../internal/rescuestore/store_test.go | 203 +++++++++++++++ workspace-server/internal/router/router.go | 8 + .../20260531000000_rescue_bundles.down.sql | 4 + .../20260531000000_rescue_bundles.up.sql | 59 +++++ 10 files changed, 1045 insertions(+), 1 deletion(-) create mode 100644 workspace-server/internal/handlers/rescue_read.go create mode 100644 workspace-server/internal/handlers/rescue_read_test.go create mode 100644 workspace-server/internal/rescue/rescue_persist_test.go create mode 100644 workspace-server/internal/rescuestore/store.go create mode 100644 workspace-server/internal/rescuestore/store_test.go create mode 100644 workspace-server/migrations/20260531000000_rescue_bundles.down.sql create mode 100644 workspace-server/migrations/20260531000000_rescue_bundles.up.sql diff --git a/workspace-server/internal/handlers/rescue_read.go b/workspace-server/internal/handlers/rescue_read.go new file mode 100644 index 000000000..421043eb6 --- /dev/null +++ b/workspace-server/internal/handlers/rescue_read.go @@ -0,0 +1,161 @@ +package handlers + +// rescue_read.go — GET /workspaces/:id/rescue (RFC internal#742 Part 3). +// +// Serves the LATEST post-mortem rescue bundle captured for a +// boot-failed/terminated workspace, so "why won't my agent boot" is +// answerable WITHOUT a live instance. Powers the future canvas +// "Why did this fail?" panel. +// +// Read-path: the bundle is read from the queryable rescue_bundles table +// (internal/rescuestore), NOT from obs/Loki. Part 2 ships the bundle via +// internal/audit (Loki-only); reading from Loki would require obs read +// creds the tenant deliberately lacks. Part 3 persists the +// already-redacted bundle on capture and serves it here — see the +// migration header for the full rationale. +// +// Auth/scoping: registered on the WorkspaceAuth-guarded /workspaces/:id +// group (same gate as /files/* and /exec), so the caller must hold a +// valid per-workspace or org bearer token for :id. TenantGuard already +// 404s cross-org requests at the routing layer; on top of that the store +// read is org-scoped by MOLECULE_ORG_ID, so a row written under a +// different org is never returned (defense in depth). +// +// Redaction: the stored sections were already scrubbed at capture time +// (Part 2's SAFE-T1201 secret-scan). This handler returns them verbatim +// — it never re-ships or re-derives secrets. + +import ( + "log" + "net/http" + "os" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore" + "github.com/gin-gonic/gin" +) + +// maxResponseSections bounds how many sections the read response +// returns. The fixed capture set is small (6), so this is a backstop +// against a future capture set growth or a hand-written row — keeps the +// JSON response bounded regardless of what's stored. Per-section content +// is already clamped at persist time (rescuestore.maxSectionBytes). +const maxResponseSections = 64 + +// RescueReadHandler serves GET /workspaces/:id/rescue. The store is +// injected so tests fake it; production wires a Postgres store over +// db.DB (see NewRescueReadHandler). +type RescueReadHandler struct { + store rescuestore.Store +} + +// NewRescueReadHandler builds the handler over the package db.DB. db.DB +// is nil in some unit-test binaries; the handler tolerates that by +// returning 503 rather than nil-deref (the store guards nil db). +func NewRescueReadHandler() *RescueReadHandler { + return &RescueReadHandler{store: rescuestore.NewPostgres(db.DB)} +} + +// WithStore overrides the store (test seam). Returns the handler for +// chaining. +func (h *RescueReadHandler) WithStore(s rescuestore.Store) *RescueReadHandler { + h.store = s + return h +} + +// rescueSection is one labelled chunk in the read response. +type rescueSection struct { + Name string `json:"name"` + Content string `json:"content"` + Redacted bool `json:"redacted"` +} + +// rescueReadResponse is the JSON shape returned for a found bundle. +// `sections` is an ordered array (capture reading order), not a map, so +// the order config→logs→state→env is preserved for the canvas panel. +type rescueReadResponse struct { + WorkspaceID string `json:"workspace_id"` + CapturedAt time.Time `json:"captured_at"` + Reason string `json:"reason"` + InstanceID string `json:"instance_id"` + Sections []rescueSection `json:"sections"` + // Truncated is true when the stored bundle had more sections than + // maxResponseSections and the response was capped. + Truncated bool `json:"truncated,omitempty"` +} + +// GetRescue handles GET /workspaces/:id/rescue. +// +// 200 — latest rescue bundle for the workspace (org-scoped). +// 404 — no rescue bundle on file for this workspace (or wrong org). +// 503 — store/datastore unavailable. +func (h *RescueReadHandler) GetRescue(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + if h.store == nil { + log.Printf("GetRescue: store not configured for ws=%s", workspaceID) + c.JSON(http.StatusServiceUnavailable, gin.H{ + "error": "rescue store unavailable", + "code": "platform_unavailable", + }) + return + } + + // org_id is the tenant's configured org (one tenant = one org). When + // unset (self-hosted / dev), pass "" so the store returns any row for + // the workspace; when set, the store requires org_id to match so a + // sibling org's row is never served. + orgID := os.Getenv("MOLECULE_ORG_ID") + + stored, err := h.store.GetLatest(ctx, workspaceID, orgID) + if err != nil { + // Per the Store contract a missing bundle is (nil, nil), NOT an + // error — so any error here is a genuine datastore fault → 503, + // never a masquerading 404 that would hide an outage. + log.Printf("GetRescue: store query failed for ws=%s: %v", workspaceID, err) + c.JSON(http.StatusServiceUnavailable, gin.H{ + "error": "rescue store query failed", + "code": "platform_unavailable", + }) + return + } + if stored == nil { + // No bundle captured (workspace never boot-failed, or its grace + // window lapsed). 404 — existence-non-inferring; a workspace in a + // sibling org reaches the same 404 via the org filter. + c.JSON(http.StatusNotFound, gin.H{"error": "no rescue bundle for this workspace"}) + return + } + + resp := buildRescueResponse(workspaceID, stored) + c.JSON(http.StatusOK, resp) +} + +// buildRescueResponse maps a stored bundle to the read response, bounding +// the section count. Split out so the mapping/limit is unit-testable. +func buildRescueResponse(workspaceID string, stored *rescuestore.StoredBundle) rescueReadResponse { + secs := stored.Bundle.Sections + truncated := false + if len(secs) > maxResponseSections { + secs = secs[:maxResponseSections] + truncated = true + } + out := make([]rescueSection, 0, len(secs)) + for _, s := range secs { + // rescue.Section and rescueSection are field-identical; the + // explicit conversion keeps the handler's JSON shape independent + // of the leaf package's struct (which could gain non-response + // fields later). + out = append(out, rescueSection(s)) + } + return rescueReadResponse{ + WorkspaceID: workspaceID, + CapturedAt: stored.CapturedAt, + Reason: stored.Bundle.Reason, + InstanceID: stored.Bundle.InstanceID, + Sections: out, + Truncated: truncated, + } +} diff --git a/workspace-server/internal/handlers/rescue_read_test.go b/workspace-server/internal/handlers/rescue_read_test.go new file mode 100644 index 000000000..dddde8419 --- /dev/null +++ b/workspace-server/internal/handlers/rescue_read_test.go @@ -0,0 +1,234 @@ +package handlers + +// Tests for GET /workspaces/:id/rescue (RFC internal#742 Part 3). +// +// These exercise the handler against a FAKE store (no DB) so every path +// is deterministic without external infra: +// - returns the latest bundle in the documented shape +// - 404 when no bundle exists for the workspace +// - org-scoping: the handler passes the tenant's MOLECULE_ORG_ID to +// the store, so a fake that returns nil for a mismatched org proves a +// sibling org cannot read another org's bundle +// - 503 on a store/datastore error (not a 404 masquerade) +// - redaction/shape preserved: stored sections are returned verbatim, +// no re-derivation +// +// WorkspaceAuth gating itself is covered by the middleware tests; here we +// invoke the handler directly (the route is registered on the wsAuth +// group in router.go). + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore" + "github.com/gin-gonic/gin" +) + +func init() { gin.SetMode(gin.TestMode) } + +// fakeRescueStore records the args it was called with and returns a +// scripted result. Implements rescuestore.Store. +type fakeRescueStore struct { + // gotWorkspaceID/gotOrgID capture what the handler passed. + gotWorkspaceID string + gotOrgID string + // ret/err are the scripted GetLatest result. + ret *rescuestore.StoredBundle + err error +} + +func (f *fakeRescueStore) Persist(_ context.Context, _ rescue.Bundle) error { return nil } + +func (f *fakeRescueStore) GetLatest(_ context.Context, workspaceID, orgID string) (*rescuestore.StoredBundle, error) { + f.gotWorkspaceID = workspaceID + f.gotOrgID = orgID + return f.ret, f.err +} + +// doRescueGet runs the handler for ws against the given fake and returns +// the recorder. orgEnv sets MOLECULE_ORG_ID for the duration. +func doRescueGet(t *testing.T, ws, orgEnv string, fake *fakeRescueStore) *httptest.ResponseRecorder { + t.Helper() + t.Setenv("MOLECULE_ORG_ID", orgEnv) + + h := (&RescueReadHandler{}).WithStore(fake) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: ws}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+ws+"/rescue", nil) + h.GetRescue(c) + return w +} + +// sampleStored builds a representative stored bundle with a redacted + +// a failure-marker section. +func sampleStored() *rescuestore.StoredBundle { + return &rescuestore.StoredBundle{ + CapturedAt: time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC), + Bundle: rescue.Bundle{ + WorkspaceID: "ws-1", + OrgID: "org-9", + InstanceID: "i-abc123", + Reason: "provision_timeout_sweep", + Sections: []rescue.Section{ + {Name: "config.yaml", Content: "model: gpt-4\nANTHROPIC_API_KEY=[REDACTED]", Redacted: true}, + {Name: "docker-ps", Content: "(rescue: section collection failed: ssh blip)", Redacted: false}, + }, + }, + } +} + +// TestGetRescue_ReturnsLatestBundle — happy path: 200 with the full +// documented shape, sections in order, redaction-preserved. +func TestGetRescue_ReturnsLatestBundle(t *testing.T) { + fake := &fakeRescueStore{ret: sampleStored()} + w := doRescueGet(t, "ws-1", "org-9", fake) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200; body=%s", w.Code, w.Body.String()) + } + var resp struct { + WorkspaceID string `json:"workspace_id"` + CapturedAt time.Time `json:"captured_at"` + Reason string `json:"reason"` + InstanceID string `json:"instance_id"` + Sections []struct { + Name string `json:"name"` + Content string `json:"content"` + Redacted bool `json:"redacted"` + } `json:"sections"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v; body=%s", err, w.Body.String()) + } + if resp.WorkspaceID != "ws-1" { + t.Errorf("workspace_id = %q, want ws-1", resp.WorkspaceID) + } + if resp.Reason != "provision_timeout_sweep" { + t.Errorf("reason = %q", resp.Reason) + } + if resp.InstanceID != "i-abc123" { + t.Errorf("instance_id = %q", resp.InstanceID) + } + if !resp.CapturedAt.Equal(time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC)) { + t.Errorf("captured_at = %v", resp.CapturedAt) + } + if len(resp.Sections) != 2 { + t.Fatalf("sections = %d, want 2", len(resp.Sections)) + } + // Order preserved: config first, docker-ps second. + if resp.Sections[0].Name != "config.yaml" || resp.Sections[1].Name != "docker-ps" { + t.Errorf("section order wrong: %q, %q", resp.Sections[0].Name, resp.Sections[1].Name) + } + // Redaction-preserved: the redacted flag rides through untouched, and + // the failure marker stays a non-redacted marker. + if !resp.Sections[0].Redacted { + t.Error("config.yaml section should be redacted=true") + } + if resp.Sections[1].Redacted { + t.Error("failure-marker section should be redacted=false") + } + // Handler does NOT re-derive secrets; stored [REDACTED] verbatim. + if want := "ANTHROPIC_API_KEY=[REDACTED]"; !strings.Contains(resp.Sections[0].Content, want) { + t.Errorf("section content = %q, want it to contain %q", resp.Sections[0].Content, want) + } +} + +// TestGetRescue_404WhenNone — no bundle on file → 404, not 500/200. +func TestGetRescue_404WhenNone(t *testing.T) { + fake := &fakeRescueStore{ret: nil} // store returns (nil, nil) + w := doRescueGet(t, "ws-none", "org-9", fake) + if w.Code != http.StatusNotFound { + t.Fatalf("status = %d, want 404; body=%s", w.Code, w.Body.String()) + } +} + +// TestGetRescue_OrgScopingPassedToStore — the handler must hand the +// tenant's MOLECULE_ORG_ID to the store, and a store that returns nil for +// a mismatched org yields 404. This is the sibling-org isolation: a +// caller in org B (a different tenant process, MOLECULE_ORG_ID=org-B) +// reading ws-1 (which belongs to org-9) gets the org filter applied → no +// row → 404. +func TestGetRescue_OrgScopingPassedToStore(t *testing.T) { + // Tenant configured as a DIFFERENT org than the bundle's owner. + // Fake mimics the Postgres org filter: returns nil because org-B + // doesn't match the row's org-9. + fake := &fakeRescueStore{ret: nil} + w := doRescueGet(t, "ws-1", "org-B", fake) + + if fake.gotOrgID != "org-B" { + t.Errorf("store got org_id = %q, want the tenant's org-B", fake.gotOrgID) + } + if fake.gotWorkspaceID != "ws-1" { + t.Errorf("store got workspace_id = %q, want ws-1", fake.gotWorkspaceID) + } + if w.Code != http.StatusNotFound { + t.Fatalf("sibling-org read: status = %d, want 404", w.Code) + } +} + +// TestGetRescue_EmptyOrgEnvPassesEmptyFilter — self-hosted / unset +// MOLECULE_ORG_ID passes "" so the store returns any row for the ws. +func TestGetRescue_EmptyOrgEnvPassesEmptyFilter(t *testing.T) { + fake := &fakeRescueStore{ret: sampleStored()} + w := doRescueGet(t, "ws-1", "", fake) + if fake.gotOrgID != "" { + t.Errorf("store got org_id = %q, want empty (unset MOLECULE_ORG_ID)", fake.gotOrgID) + } + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200", w.Code) + } +} + +// TestGetRescue_StoreErrorIs503 — an actual datastore error must surface +// as 503, never a 404 (which would hide an outage as "no bundle"). +func TestGetRescue_StoreErrorIs503(t *testing.T) { + fake := &fakeRescueStore{err: errors.New("connection refused")} + w := doRescueGet(t, "ws-1", "org-9", fake) + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String()) + } +} + +// TestGetRescue_NilStoreIs503 — defensive: a handler with no store wired +// (db.DB nil in a degraded boot) returns 503, never panics. +func TestGetRescue_NilStoreIs503(t *testing.T) { + t.Setenv("MOLECULE_ORG_ID", "org-9") + h := &RescueReadHandler{} // store == nil + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/rescue", nil) + h.GetRescue(c) + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want 503", w.Code) + } +} + +// TestBuildRescueResponse_BoundsSections — a stored bundle with more than +// maxResponseSections sections is capped + flagged truncated. +func TestBuildRescueResponse_BoundsSections(t *testing.T) { + many := make([]rescue.Section, maxResponseSections+5) + for i := range many { + many[i] = rescue.Section{Name: "s", Content: "c", Redacted: true} + } + stored := &rescuestore.StoredBundle{ + CapturedAt: time.Now(), + Bundle: rescue.Bundle{WorkspaceID: "ws-1", Sections: many}, + } + resp := buildRescueResponse("ws-1", stored) + if len(resp.Sections) != maxResponseSections { + t.Errorf("sections = %d, want capped at %d", len(resp.Sections), maxResponseSections) + } + if !resp.Truncated { + t.Error("truncated flag should be set when sections were capped") + } +} diff --git a/workspace-server/internal/handlers/rescue_wiring.go b/workspace-server/internal/handlers/rescue_wiring.go index 08000ce7e..6c08e19ca 100644 --- a/workspace-server/internal/handlers/rescue_wiring.go +++ b/workspace-server/internal/handlers/rescue_wiring.go @@ -26,6 +26,7 @@ import ( "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescuestore" ) func init() { @@ -38,6 +39,15 @@ func init() { out, _ := redactSecrets(workspaceID, content) return out } + // Part 3: persist the redacted bundle to the queryable store on + // capture so GET /workspaces/:id/rescue can serve it without obs/Loki + // read creds. db.DB is resolved per-call (rescuestore guards a nil + // handle) so wiring at init() is safe even before InitPostgres has + // run; a capture before the DB is up logs + skips the persist rather + // than failing the boot-failure path. + rescue.PersistBundle = func(ctx context.Context, b rescue.Bundle) error { + return rescuestore.NewPostgres(db.DB).Persist(ctx, b) + } } // rescueRunRemoteViaEIC runs a single shell command on the still-running diff --git a/workspace-server/internal/rescue/rescue.go b/workspace-server/internal/rescue/rescue.go index 38a40e87b..41ddf58de 100644 --- a/workspace-server/internal/rescue/rescue.go +++ b/workspace-server/internal/rescue/rescue.go @@ -167,6 +167,44 @@ type Input struct { Reason string } +// Section is one labelled, already-redacted chunk of the persisted +// rescue bundle. It mirrors what ship() emits to Loki per-section, but +// is the unit the queryable store (and the read endpoint) returns. +// `Redacted` is false only for collection-failure markers (the section +// command couldn't run); true sections passed through the secret-scan. +type Section struct { + Name string `json:"name"` + Content string `json:"content"` + Redacted bool `json:"redacted"` +} + +// Bundle is the full, already-redacted post-mortem bundle for ONE +// boot-failure capture — the unit persisted to the queryable store on +// capture (RFC internal#742 Part 3) and served by +// GET /workspaces/:id/rescue. Sections are in fixed reading order +// (config → boot logs → container state → resolved routing env). +type Bundle struct { + WorkspaceID string `json:"workspace_id"` + OrgID string `json:"org_id"` + InstanceID string `json:"instance_id"` + Reason string `json:"reason"` + Sections []Section `json:"sections"` +} + +// PersistBundle writes the fully-collected, already-redacted bundle to +// the queryable per-tenant store (rescue_bundles table) so the rescue +// READ endpoint can serve it without obs/Loki read creds (RFC +// internal#742 Part 3 read-path decision — see the migration header). +// +// Wired at boot from the handlers package (which owns db.DB) to keep +// internal/rescue a leaf: it must NOT import internal/db, or registry — +// which imports rescue — would inherit a db dependency it can call +// without a cycle, but more importantly the leaf stays trivially +// testable with a fake. nil until wired: a capture with no store wired +// still ships to Loki (Part 2 behavior preserved) and logs that it +// skipped the DB persist, rather than failing the capture. +var PersistBundle func(ctx context.Context, b Bundle) error + // Capture collects the fixed rescue bundle off the failed instance, // redacts each section, and ships it to Loki under // {kind="rescue", org=, workspace_id=}. @@ -210,21 +248,57 @@ func Capture(ctx context.Context, in Input) { log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason) collected := 0 + // Accumulate the per-section result alongside shipping each to Loki, + // so the same already-redacted content is persisted to the queryable + // store as one bundle row after the loop. Shipping stays per-section + // (Part 2 Loki behavior unchanged); persistence is the single + // bundle the read endpoint serves. + bundle := Bundle{ + WorkspaceID: in.WorkspaceID, + OrgID: in.OrgID, + InstanceID: in.InstanceID, + Reason: in.Reason, + Sections: make([]Section, 0, len(bundleSections)), + } for _, sec := range bundleSections { raw, err := RunRemote(ctx, in.InstanceID, sec.command) if err != nil { // One section failing (e.g. ssh blip mid-collection) must not // abort the rest — ship a marker for it and continue. log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err) - ship(ctx, in, sec.name, fmt.Sprintf("(rescue: section collection failed: %v)", err), false) + marker := fmt.Sprintf("(rescue: section collection failed: %v)", err) + ship(ctx, in, sec.name, marker, false) + bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: marker, Redacted: false}) continue } redacted := Redact(in.WorkspaceID, raw) ship(ctx, in, sec.name, redacted, true) + bundle.Sections = append(bundle.Sections, Section{Name: sec.name, Content: redacted, Redacted: true}) collected++ } log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind) + + // Persist the redacted bundle to the queryable store so the rescue + // READ endpoint can serve it without obs/Loki read creds. Best-effort + // and last: a persist failure (or no store wired) must NOT undo the + // Loki ship that already succeeded, and never panics the failure path. + persistBundle(ctx, bundle) +} + +// persistBundle writes the collected bundle to the queryable store if a +// store is wired. Best-effort: a nil store (operator hasn't wired the +// READ path) or a DB error is logged and swallowed — the Loki ship is +// the durable cross-tenant copy, and the failure path must never be +// disturbed by the post-mortem read store. +func persistBundle(ctx context.Context, b Bundle) { + if PersistBundle == nil { + log.Printf("rescue: store not wired — bundle for ws=%s shipped to Loki only (no queryable copy)", b.WorkspaceID) + return + } + if err := PersistBundle(ctx, b); err != nil { + log.Printf("rescue: persist bundle for ws=%s failed (shipped to Loki regardless): %v", b.WorkspaceID, err) + } } // ship emits one rescue section to Loki via the audit shipper. The diff --git a/workspace-server/internal/rescue/rescue_persist_test.go b/workspace-server/internal/rescue/rescue_persist_test.go new file mode 100644 index 000000000..a3a81b210 --- /dev/null +++ b/workspace-server/internal/rescue/rescue_persist_test.go @@ -0,0 +1,136 @@ +package rescue + +// Part 3 coverage: Capture, after collecting + redacting every section, +// persists the bundle exactly once to the queryable store (in addition +// to the per-section Loki ship verified in rescue_test.go). + +import ( + "context" + "errors" + "strings" + "testing" +) + +// withPersist swaps the injected PersistBundle for the test and restores +// it after. +func withPersist(t *testing.T, fn func(ctx context.Context, b Bundle) error) { + t.Helper() + prev := PersistBundle + PersistBundle = fn + t.Cleanup(func() { PersistBundle = prev }) +} + +// TestCapture_PersistsBundleOnce: the happy path persists one bundle +// carrying every section, with identity + redacted content matching what +// was shipped. +func TestCapture_PersistsBundleOnce(t *testing.T) { + _ = captureLoki(t) // keep Loki transport pointed at a temp file + withFakes(t, + func(_ context.Context, instanceID, cmd string) (string, error) { + return "OUT:" + instanceID, nil + }, + func(_ws, c string) string { return "RED:" + c }, + ) + + var persisted []Bundle + withPersist(t, func(_ context.Context, b Bundle) error { + persisted = append(persisted, b) + return nil + }) + + Capture(context.Background(), Input{ + InstanceID: "i-abc", + WorkspaceID: "ws-1", + OrgID: "org-9", + Reason: "provision_timeout_sweep", + }) + + if len(persisted) != 1 { + t.Fatalf("PersistBundle called %d times, want exactly 1", len(persisted)) + } + b := persisted[0] + if b.WorkspaceID != "ws-1" || b.OrgID != "org-9" || b.InstanceID != "i-abc" || b.Reason != "provision_timeout_sweep" { + t.Errorf("bundle identity wrong: %+v", b) + } + if len(b.Sections) != len(bundleSections) { + t.Fatalf("persisted %d sections, want %d", len(b.Sections), len(bundleSections)) + } + for _, s := range b.Sections { + if !s.Redacted { + t.Errorf("section %q persisted with redacted=false on the happy path", s.Name) + } + // Redactor ("RED:" prefix) must have run on persisted content. + if !strings.HasPrefix(s.Content, "RED:") { + t.Errorf("section %q persisted un-redacted content: %q", s.Name, s.Content) + } + } +} + +// TestCapture_PersistFailureDoesNotPanic: a store error is swallowed — +// Capture still completes (the Loki ship already succeeded). +func TestCapture_PersistFailureDoesNotPanic(t *testing.T) { + _ = captureLoki(t) + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil }, + func(_ws, c string) string { return c }, + ) + withPersist(t, func(_ context.Context, _ Bundle) error { + return errors.New("db down") + }) + // Must not panic / must return normally. + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"}) +} + +// TestCapture_NoPersistWiredIsSafe: with PersistBundle unwired (operator +// hasn't wired the read path), Capture still ships to Loki and does not +// panic. +func TestCapture_NoPersistWiredIsSafe(t *testing.T) { + readLoki := captureLoki(t) + withFakes(t, + func(_ context.Context, _ string, _ string) (string, error) { return "ok", nil }, + func(_ws, c string) string { return c }, + ) + prev := PersistBundle + PersistBundle = nil + t.Cleanup(func() { PersistBundle = prev }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-3", OrgID: "o"}) + + // Loki ship still happened for every section. + if recs := readLoki(); len(recs) != len(bundleSections) { + t.Errorf("shipped %d records, want %d (Loki unaffected by missing store)", len(recs), len(bundleSections)) + } +} + +// TestCapture_FailureMarkerPersistedAsNonRedacted: a section whose +// collection fails is persisted with redacted=false + a marker, matching +// the Loki record. +func TestCapture_FailureMarkerPersistedAsNonRedacted(t *testing.T) { + _ = captureLoki(t) + withFakes(t, + func(_ context.Context, _ string, cmd string) (string, error) { + if strings.Contains(cmd, "config.yaml") { + return "", errors.New("ssh blip") + } + return "ok", nil + }, + func(_ws, c string) string { return c }, + ) + var got Bundle + withPersist(t, func(_ context.Context, b Bundle) error { got = b; return nil }) + + Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"}) + + var markers int + for _, s := range got.Sections { + if !s.Redacted { + markers++ + if !strings.Contains(s.Content, "section collection failed") { + t.Errorf("non-redacted section %q content = %q, want a failure marker", s.Name, s.Content) + } + } + } + if markers != 1 { + t.Errorf("want exactly 1 failure marker persisted, got %d", markers) + } +} diff --git a/workspace-server/internal/rescuestore/store.go b/workspace-server/internal/rescuestore/store.go new file mode 100644 index 000000000..f18b01dfe --- /dev/null +++ b/workspace-server/internal/rescuestore/store.go @@ -0,0 +1,155 @@ +// Package rescuestore is the queryable persistence layer for rescue +// bundles (RFC internal#742 Part 3). It is the DB side of the read-path +// decision: because internal/audit (Part 2's ship transport) is +// Loki-only and tenants hold no obs read creds, the redacted bundle is +// ALSO written here on capture so GET /workspaces/:id/rescue can serve +// the latest one with a plain Postgres read. +// +// The package owns both the write (Persist, wired into +// rescue.PersistBundle at boot) and the read (GetLatest, used by the +// handler). It depends on internal/db and internal/rescue (for the +// Bundle/Section types); it is imported by handlers, never by the leaf +// internal/rescue or by registry — so no import cycle. +package rescuestore + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" +) + +// maxSectionBytes bounds a single persisted section's content so a +// pathological capture (e.g. a multi-megabyte container log) can't bloat +// the row or the read response. Capture already tails to ~200 lines per +// section, so this is a backstop, not the primary limit. Truncated +// content is suffixed with a marker so a reader knows it was clipped. +const maxSectionBytes = 64 * 1024 // 64 KiB per section + +// truncationMarker is appended to any section clipped at maxSectionBytes. +const truncationMarker = "\n…(rescue: section truncated at 64KiB)" + +// StoredBundle is a persisted bundle plus its capture timestamp (the DB +// assigns captured_at on write). The handler maps this to the read +// response shape. +type StoredBundle struct { + Bundle rescue.Bundle + CapturedAt time.Time +} + +// Store is the read/write surface the handler and the capture wiring +// depend on. An interface so the handler test can fake it without a +// sqlmock; the production implementation is Postgres. +type Store interface { + // Persist writes one bundle row (captured_at = now()). + Persist(ctx context.Context, b rescue.Bundle) error + // GetLatest returns the most recent bundle for workspaceID. When + // orgID is non-empty the row must also match org_id (cross-org + // defense-in-depth behind TenantGuard). Returns (nil, nil) — NOT an + // error — when no bundle exists, so the handler can 404 cleanly. + GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error) +} + +// Postgres is the production Store backed by the rescue_bundles table. +type Postgres struct{ db *sql.DB } + +// NewPostgres builds a Postgres-backed store over the given handle. +func NewPostgres(db *sql.DB) *Postgres { return &Postgres{db: db} } + +// Persist writes the bundle as one row. Sections are stored as JSONB. +// Each section's content is clamped to maxSectionBytes before write. +func (p *Postgres) Persist(ctx context.Context, b rescue.Bundle) error { + if p.db == nil { + return fmt.Errorf("rescuestore: nil db") + } + clamped := clampSections(b.Sections) + payload, err := json.Marshal(clamped) + if err != nil { + return fmt.Errorf("rescuestore: marshal sections: %w", err) + } + _, err = p.db.ExecContext(ctx, + `INSERT INTO rescue_bundles (workspace_id, org_id, instance_id, reason, sections) + VALUES ($1, $2, $3, $4, $5::jsonb)`, + b.WorkspaceID, b.OrgID, b.InstanceID, b.Reason, string(payload), + ) + if err != nil { + return fmt.Errorf("rescuestore: insert: %w", err) + } + return nil +} + +// GetLatest returns the newest bundle for workspaceID, optionally +// org-scoped. The (workspace_id, captured_at DESC, id DESC) index serves +// this directly. sql.ErrNoRows maps to (nil, nil) so the handler 404s. +func (p *Postgres) GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error) { + if p.db == nil { + return nil, fmt.Errorf("rescuestore: nil db") + } + + // org_id filter is applied only when an org is configured. The + // `($2 = '' OR org_id = $2)` form keeps it a single prepared query: + // empty orgID (self-hosted / unset MOLECULE_ORG_ID) matches any row; + // a set orgID requires the row's org to match, so a bundle written + // under a different org is never returned. + var ( + instanceID string + reason string + capturedAt time.Time + sectionsRaw []byte + ) + err := p.db.QueryRowContext(ctx, + `SELECT instance_id, reason, captured_at, sections + FROM rescue_bundles + WHERE workspace_id = $1 + AND ($2 = '' OR org_id = $2) + ORDER BY captured_at DESC, id DESC + LIMIT 1`, + workspaceID, orgID, + ).Scan(&instanceID, &reason, &capturedAt, §ionsRaw) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("rescuestore: query latest: %w", err) + } + + var sections []rescue.Section + if len(sectionsRaw) > 0 { + if err := json.Unmarshal(sectionsRaw, §ions); err != nil { + return nil, fmt.Errorf("rescuestore: unmarshal sections: %w", err) + } + } + + return &StoredBundle{ + Bundle: rescue.Bundle{ + WorkspaceID: workspaceID, + OrgID: orgID, + InstanceID: instanceID, + Reason: reason, + Sections: sections, + }, + CapturedAt: capturedAt, + }, nil +} + +// clampSections returns a copy with each section's content clamped to +// maxSectionBytes. Clamps on a rune boundary so the marker doesn't split +// a multibyte sequence — the content is a forensic blob, never parsed. +func clampSections(in []rescue.Section) []rescue.Section { + out := make([]rescue.Section, len(in)) + for i, s := range in { + if len(s.Content) > maxSectionBytes { + b := []byte(s.Content[:maxSectionBytes]) + // Back off to a valid utf-8 boundary (at most 3 bytes). + for len(b) > 0 && b[len(b)-1]&0xC0 == 0x80 { + b = b[:len(b)-1] + } + s.Content = string(b) + truncationMarker + } + out[i] = s + } + return out +} diff --git a/workspace-server/internal/rescuestore/store_test.go b/workspace-server/internal/rescuestore/store_test.go new file mode 100644 index 000000000..3d6f62ec8 --- /dev/null +++ b/workspace-server/internal/rescuestore/store_test.go @@ -0,0 +1,203 @@ +package rescuestore + +// Sqlmock-backed coverage for the rescue_bundles store (RFC internal#742 +// Part 3). Exercises Persist (incl. section clamp) + GetLatest (happy +// path, no-rows→nil, org-scoping, query error) without a real DB. + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "regexp" + "strings" + "testing" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue" + "github.com/DATA-DOG/go-sqlmock" +) + +func newMock(t *testing.T) (*sql.DB, sqlmock.Sqlmock) { + t.Helper() + dbh, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + t.Cleanup(func() { _ = dbh.Close() }) + return dbh, mock +} + +func sampleBundle() rescue.Bundle { + return rescue.Bundle{ + WorkspaceID: "ws-1", + OrgID: "org-9", + InstanceID: "i-abc", + Reason: "bootstrap_watcher", + Sections: []rescue.Section{ + {Name: "config.yaml", Content: "model: gpt-4", Redacted: true}, + {Name: "docker-ps", Content: "(no agent container)", Redacted: false}, + }, + } +} + +// TestPersist_InsertsRow asserts Persist issues one INSERT with the +// bundle fields and a JSON sections payload. +func TestPersist_InsertsRow(t *testing.T) { + dbh, mock := newMock(t) + b := sampleBundle() + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)). + WithArgs("ws-1", "org-9", "i-abc", "bootstrap_watcher", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil { + t.Fatalf("Persist: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestClampSections: a section over maxSectionBytes is truncated + +// marker-suffixed; a small section is untouched. +func TestClampSections(t *testing.T) { + huge := strings.Repeat("x", maxSectionBytes+5000) + in := []rescue.Section{ + {Name: "container.logs", Content: huge, Redacted: true}, + {Name: "small", Content: "ok", Redacted: true}, + } + out := clampSections(in) + + if len(out[0].Content) > maxSectionBytes+len(truncationMarker) { + t.Errorf("clamped content len = %d, want <= %d", len(out[0].Content), maxSectionBytes+len(truncationMarker)) + } + if !strings.HasSuffix(out[0].Content, truncationMarker) { + t.Error("clamped section missing truncation marker suffix") + } + if out[1].Content != "ok" { + t.Errorf("small section was modified: %q", out[1].Content) + } +} + +// TestPersist_WritesClampedPayload: Persist marshals the clamped +// sections into the JSONB arg (the INSERT carries the truncation marker). +func TestPersist_WritesClampedPayload(t *testing.T) { + dbh, mock := newMock(t) + huge := strings.Repeat("x", maxSectionBytes+5000) + b := rescue.Bundle{ + WorkspaceID: "ws-1", + Sections: []rescue.Section{{Name: "container.logs", Content: huge, Redacted: true}}, + } + want, _ := json.Marshal(clampSections(b.Sections)) + + mock.ExpectExec(regexp.QuoteMeta(`INSERT INTO rescue_bundles`)). + WithArgs("ws-1", "", "", "", string(want)). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := NewPostgres(dbh).Persist(context.Background(), b); err != nil { + t.Fatalf("Persist: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet: %v", err) + } +} + +// TestGetLatest_ReturnsBundle: a found row decodes back into the bundle. +func TestGetLatest_ReturnsBundle(t *testing.T) { + dbh, mock := newMock(t) + ts := time.Date(2026, 5, 31, 12, 0, 0, 0, time.UTC) + secs, _ := json.Marshal([]rescue.Section{ + {Name: "config.yaml", Content: "redacted", Redacted: true}, + }) + + mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)). + WithArgs("ws-1", "org-9"). + WillReturnRows(sqlmock.NewRows([]string{"instance_id", "reason", "captured_at", "sections"}). + AddRow("i-abc", "bootstrap_watcher", ts, secs)) + + got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9") + if err != nil { + t.Fatalf("GetLatest: %v", err) + } + if got == nil { + t.Fatal("got nil, want a bundle") + } + if !got.CapturedAt.Equal(ts) { + t.Errorf("captured_at = %v, want %v", got.CapturedAt, ts) + } + if got.Bundle.InstanceID != "i-abc" || got.Bundle.Reason != "bootstrap_watcher" { + t.Errorf("bundle meta wrong: %+v", got.Bundle) + } + if len(got.Bundle.Sections) != 1 || got.Bundle.Sections[0].Name != "config.yaml" { + t.Errorf("sections decoded wrong: %+v", got.Bundle.Sections) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet: %v", err) + } +} + +// TestGetLatest_NoRowsReturnsNil: no bundle → (nil, nil), so the handler +// can 404 without treating it as an error. +func TestGetLatest_NoRowsReturnsNil(t *testing.T) { + dbh, mock := newMock(t) + mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)). + WithArgs("ws-none", "org-9"). + WillReturnError(sql.ErrNoRows) + + got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-none", "org-9") + if err != nil { + t.Fatalf("GetLatest err = %v, want nil for no-rows", err) + } + if got != nil { + t.Fatalf("got %+v, want nil for no-rows", got) + } +} + +// TestGetLatest_OrgScopingArg: the org id is passed as the $2 filter arg, +// so a row in a sibling org is excluded by the query itself. We assert +// the arg binding (the `($2 = ” OR org_id = $2)` predicate is in the +// SQL); a mismatched org → no row → nil (same as no-rows). +func TestGetLatest_OrgScopingArg(t *testing.T) { + dbh, mock := newMock(t) + // Tenant org-B asks for ws-1 (owned by org-9). The Postgres predicate + // filters it out → ErrNoRows → nil. We model that by scripting the + // query for ("ws-1","org-B") to return no rows. + mock.ExpectQuery(regexp.QuoteMeta(`AND ($2 = '' OR org_id = $2)`)). + WithArgs("ws-1", "org-B"). + WillReturnError(sql.ErrNoRows) + + got, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-B") + if err != nil { + t.Fatalf("GetLatest: %v", err) + } + if got != nil { + t.Fatal("sibling-org read returned a bundle; want nil") + } +} + +// TestGetLatest_QueryErrorPropagates: a real DB error (not ErrNoRows) +// surfaces as an error so the handler returns 503, not a false 404. +func TestGetLatest_QueryErrorPropagates(t *testing.T) { + dbh, mock := newMock(t) + mock.ExpectQuery(regexp.QuoteMeta(`SELECT instance_id, reason, captured_at, sections`)). + WithArgs("ws-1", "org-9"). + WillReturnError(errors.New("connection reset")) + + _, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "org-9") + if err == nil { + t.Fatal("want an error for a non-ErrNoRows DB failure") + } +} + +// TestNilDB: both methods return an error (never panic) when the db +// handle is nil — the degraded-boot guard the wiring relies on. +func TestNilDB(t *testing.T) { + p := NewPostgres(nil) + if err := p.Persist(context.Background(), sampleBundle()); err == nil { + t.Error("Persist(nil db) should error") + } + if _, err := p.GetLatest(context.Background(), "ws-1", "org-9"); err == nil { + t.Error("GetLatest(nil db) should error") + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 524367df9..88393fa26 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -703,6 +703,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.PUT("/files/*path", tmplh.WriteFile) wsAuth.DELETE("/files/*path", tmplh.DeleteFile) + // Rescue read (RFC internal#742 Part 3) — latest post-mortem bundle + // for a boot-failed/terminated workspace, so "why won't my agent + // boot" is answerable without a live instance. Same WorkspaceAuth + // gate as /files/*; the handler org-scopes the store read by + // MOLECULE_ORG_ID so a sibling org cannot read another org's bundle. + rescueReadH := handlers.NewRescueReadHandler() + wsAuth.GET("/rescue", rescueReadH.GetRescue) + // Chat attachments — file upload (user → agent) and binary-safe // streaming download (agent → user). Namespaced under /chat/ so // the security model is obviously distinct from /files/* (which diff --git a/workspace-server/migrations/20260531000000_rescue_bundles.down.sql b/workspace-server/migrations/20260531000000_rescue_bundles.down.sql new file mode 100644 index 000000000..0b9c79ab9 --- /dev/null +++ b/workspace-server/migrations/20260531000000_rescue_bundles.down.sql @@ -0,0 +1,4 @@ +-- Reverse RFC internal#742 Part 3 rescue_bundles table. +-- Forensic-only table; dropping it loses post-mortem read history but +-- does not affect boot-failure semantics (capture still ships to Loki). +DROP TABLE IF EXISTS rescue_bundles; diff --git a/workspace-server/migrations/20260531000000_rescue_bundles.up.sql b/workspace-server/migrations/20260531000000_rescue_bundles.up.sql new file mode 100644 index 000000000..d05527681 --- /dev/null +++ b/workspace-server/migrations/20260531000000_rescue_bundles.up.sql @@ -0,0 +1,59 @@ +-- 20260531000000_rescue_bundles.up.sql — RFC internal#742 Part 3. +-- +-- A queryable, post-mortem-inspectable copy of the rescue bundle that +-- Part 2 (internal/rescue.Capture) collects off a boot-failed workspace +-- EC2 before the control plane reaps it. +-- +-- WHY a DB table (the Part 3 read-path decision): +-- Part 2 ships the bundle via internal/audit (audit.Emit), which is +-- stdout→Vector→Loki + a best-effort local JSONL on the tenant +-- container's EPHEMERAL rootfs — NOT a queryable store. Serving +-- GET /workspaces/:id/rescue from Loki would require giving the +-- tenant process a Loki *query* client + obs read creds, which it +-- deliberately does not have (and must not — RFC internal#742 keeps +-- obs read creds out of tenants). So Part 3 ALSO persists the +-- already-redacted bundle to this small per-tenant table on capture, +-- and the read endpoint serves the latest row. The Loki stream +-- remains the cross-tenant operator firehose; this table is the +-- tenant-local, org-scoped read surface that powers the future +-- canvas "Why did this fail?" panel. +-- +-- REDACTION: the `sections` payload written here is the SAME content +-- the Loki ship loop emits — i.e. already run through the SAFE-T1201 +-- secret-scan (handlers.redactSecrets) at capture time. This table +-- never holds raw tokens; the read endpoint returns the stored content +-- verbatim without re-redacting. +-- +-- ORG SCOPING: org_id is denormalized onto the row so the read handler +-- can filter by (workspace_id, org_id) and a row whose org doesn't +-- match the tenant's MOLECULE_ORG_ID is never returned — defense in +-- depth behind TenantGuard (which already 404s cross-org requests at +-- the routing layer). +-- +-- RETENTION: bounded by RescueVolumeGrace semantics on the capture +-- side; rows are small (a redacted forensic blob, capped at capture). +-- A future sweeper can prune rows past the grace window — out of scope +-- for Part 3; the table is append-only here. + +CREATE TABLE IF NOT EXISTS rescue_bundles ( + id BIGSERIAL PRIMARY KEY, + workspace_id TEXT NOT NULL, + org_id TEXT NOT NULL DEFAULT '', + instance_id TEXT NOT NULL DEFAULT '', + reason TEXT NOT NULL DEFAULT '', + captured_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + -- sections is the ordered, already-redacted bundle: + -- [{ "name": "config.yaml", "content": "...", "redacted": true }, ...] + -- Stored as JSONB so the read handler returns it as a structured map + -- and a future query can index into a single section if needed. + sections JSONB NOT NULL DEFAULT '[]'::jsonb +); + +-- Read hot path: "latest bundle for this workspace" — the only query +-- the GET /workspaces/:id/rescue endpoint runs. +-- SELECT ... WHERE workspace_id = $1 [AND org_id = $2] +-- ORDER BY captured_at DESC, id DESC LIMIT 1 +-- Partial-free composite index; (workspace_id, captured_at DESC) covers +-- the filter + ordering. id DESC tiebreaks same-timestamp captures. +CREATE INDEX IF NOT EXISTS idx_rescue_bundles_ws_captured + ON rescue_bundles (workspace_id, captured_at DESC, id DESC); -- 2.52.0 From 62b5f65208beed2a1513e5a05a91aa34e159bfd4 Mon Sep 17 00:00:00 2001 From: "Molecule AI Code Reviewer (2)" Date: Tue, 2 Jun 2026 20:55:38 +0000 Subject: [PATCH 3/3] fix(security): fail closed rescue reads without org --- .../internal/handlers/rescue_read.go | 15 ++++++++---- .../internal/handlers/rescue_read_test.go | 16 ++++++++----- .../internal/rescuestore/store.go | 16 ++++++------- .../internal/rescuestore/store_test.go | 24 ++++++++++++------- 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/workspace-server/internal/handlers/rescue_read.go b/workspace-server/internal/handlers/rescue_read.go index 421043eb6..9826753c0 100644 --- a/workspace-server/internal/handlers/rescue_read.go +++ b/workspace-server/internal/handlers/rescue_read.go @@ -103,11 +103,18 @@ func (h *RescueReadHandler) GetRescue(c *gin.Context) { return } - // org_id is the tenant's configured org (one tenant = one org). When - // unset (self-hosted / dev), pass "" so the store returns any row for - // the workspace; when set, the store requires org_id to match so a - // sibling org's row is never served. + // org_id is the tenant's configured org (one tenant = one org). + // Fail closed: an empty org_id disables org isolation and must not + // reach the store (#2020). orgID := os.Getenv("MOLECULE_ORG_ID") + if orgID == "" { + log.Printf("GetRescue: missing MOLECULE_ORG_ID for ws=%s", workspaceID) + c.JSON(http.StatusServiceUnavailable, gin.H{ + "error": "rescue org not configured", + "code": "platform_misconfigured", + }) + return + } stored, err := h.store.GetLatest(ctx, workspaceID, orgID) if err != nil { diff --git a/workspace-server/internal/handlers/rescue_read_test.go b/workspace-server/internal/handlers/rescue_read_test.go index dddde8419..63dd0dd02 100644 --- a/workspace-server/internal/handlers/rescue_read_test.go +++ b/workspace-server/internal/handlers/rescue_read_test.go @@ -175,16 +175,20 @@ func TestGetRescue_OrgScopingPassedToStore(t *testing.T) { } } -// TestGetRescue_EmptyOrgEnvPassesEmptyFilter — self-hosted / unset -// MOLECULE_ORG_ID passes "" so the store returns any row for the ws. -func TestGetRescue_EmptyOrgEnvPassesEmptyFilter(t *testing.T) { +// TestGetRescue_EmptyOrgEnvRejected — empty MOLECULE_ORG_ID is a +// fail-closed security violation (#2020). The handler must 503 before +// calling the store, so the org filter cannot be bypassed. +func TestGetRescue_EmptyOrgEnvRejected(t *testing.T) { fake := &fakeRescueStore{ret: sampleStored()} w := doRescueGet(t, "ws-1", "", fake) if fake.gotOrgID != "" { - t.Errorf("store got org_id = %q, want empty (unset MOLECULE_ORG_ID)", fake.gotOrgID) + t.Errorf("store was called with org_id = %q; want no call when env empty", fake.gotOrgID) } - if w.Code != http.StatusOK { - t.Fatalf("status = %d, want 200", w.Code) + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status = %d, want 503; body=%s", w.Code, w.Body.String()) + } + if !strings.Contains(w.Body.String(), "platform_misconfigured") { + t.Fatalf("body = %s, want platform_misconfigured code", w.Body.String()) } } diff --git a/workspace-server/internal/rescuestore/store.go b/workspace-server/internal/rescuestore/store.go index f18b01dfe..ff0208cfa 100644 --- a/workspace-server/internal/rescuestore/store.go +++ b/workspace-server/internal/rescuestore/store.go @@ -81,19 +81,17 @@ func (p *Postgres) Persist(ctx context.Context, b rescue.Bundle) error { return nil } -// GetLatest returns the newest bundle for workspaceID, optionally -// org-scoped. The (workspace_id, captured_at DESC, id DESC) index serves -// this directly. sql.ErrNoRows maps to (nil, nil) so the handler 404s. +// GetLatest returns the newest bundle for workspaceID, org-scoped. The +// (workspace_id, captured_at DESC, id DESC) index serves this directly. +// sql.ErrNoRows maps to (nil, nil) so the handler 404s. func (p *Postgres) GetLatest(ctx context.Context, workspaceID, orgID string) (*StoredBundle, error) { if p.db == nil { return nil, fmt.Errorf("rescuestore: nil db") } + if orgID == "" { + return nil, fmt.Errorf("rescuestore: org_id required") + } - // org_id filter is applied only when an org is configured. The - // `($2 = '' OR org_id = $2)` form keeps it a single prepared query: - // empty orgID (self-hosted / unset MOLECULE_ORG_ID) matches any row; - // a set orgID requires the row's org to match, so a bundle written - // under a different org is never returned. var ( instanceID string reason string @@ -104,7 +102,7 @@ func (p *Postgres) GetLatest(ctx context.Context, workspaceID, orgID string) (*S `SELECT instance_id, reason, captured_at, sections FROM rescue_bundles WHERE workspace_id = $1 - AND ($2 = '' OR org_id = $2) + AND org_id = $2 ORDER BY captured_at DESC, id DESC LIMIT 1`, workspaceID, orgID, diff --git a/workspace-server/internal/rescuestore/store_test.go b/workspace-server/internal/rescuestore/store_test.go index 3d6f62ec8..447b7fc62 100644 --- a/workspace-server/internal/rescuestore/store_test.go +++ b/workspace-server/internal/rescuestore/store_test.go @@ -154,16 +154,14 @@ func TestGetLatest_NoRowsReturnsNil(t *testing.T) { } } -// TestGetLatest_OrgScopingArg: the org id is passed as the $2 filter arg, -// so a row in a sibling org is excluded by the query itself. We assert -// the arg binding (the `($2 = ” OR org_id = $2)` predicate is in the -// SQL); a mismatched org → no row → nil (same as no-rows). +// TestGetLatest_OrgScopingArg: the org id is passed as the $2 filter arg +// with strict equality, so a row in a sibling org is excluded by the query +// itself. A mismatched org → no row → nil (same as no-rows). func TestGetLatest_OrgScopingArg(t *testing.T) { dbh, mock := newMock(t) - // Tenant org-B asks for ws-1 (owned by org-9). The Postgres predicate - // filters it out → ErrNoRows → nil. We model that by scripting the - // query for ("ws-1","org-B") to return no rows. - mock.ExpectQuery(regexp.QuoteMeta(`AND ($2 = '' OR org_id = $2)`)). + // Tenant org-B asks for ws-1 (owned by org-9). The strict predicate + // filters it out → ErrNoRows → nil. + mock.ExpectQuery(regexp.QuoteMeta(`AND org_id = $2`)). WithArgs("ws-1", "org-B"). WillReturnError(sql.ErrNoRows) @@ -176,6 +174,16 @@ func TestGetLatest_OrgScopingArg(t *testing.T) { } } +// TestGetLatest_EmptyOrgIDRejected: an empty orgID must fail closed with +// an error rather than disabling the org filter (#2020). +func TestGetLatest_EmptyOrgIDRejected(t *testing.T) { + dbh, _ := newMock(t) + _, err := NewPostgres(dbh).GetLatest(context.Background(), "ws-1", "") + if err == nil { + t.Fatal("GetLatest(empty orgID) should error") + } +} + // TestGetLatest_QueryErrorPropagates: a real DB error (not ErrNoRows) // surfaces as an error so the handler returns 503, not a false 404. func TestGetLatest_QueryErrorPropagates(t *testing.T) { -- 2.52.0