feat(workspace-server): rescue capture on boot-failure (internal#742 Part 2) #2019

Open
devops-engineer wants to merge 1 commits from feat/rfc742-rescue-capture into main
10 changed files with 1022 additions and 29 deletions
+11
View File
@@ -349,6 +349,17 @@ func main() {
codexauth.StartCodexAuthRefresher(c, db.DB)
})
// RFC internal#742 Part 2: wire the boot-failure rescue capture into
// the provision-timeout sweep's failure verdict. When the sweep flips
// a stuck workspace to `failed`, this hook captures a forensic rescue
// bundle off the still-running (but boot-failed) EC2 and ships it to
// obs/Loki before the control plane reaps the instance. Best-effort +
// non-blocking (handlers.BootFailureRescueHook dispatches on its own
// goroutine + timeout). The handler-side boot-failure path
// (WorkspaceHandler.BootstrapFailed) wires its own capture inline.
registry.BootFailureRescueHook = handlers.BootFailureRescueHook
// Provision-timeout sweep — flips workspaces that have been stuck in
// status='provisioning' past the timeout window to 'failed' and emits
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic
@@ -0,0 +1,158 @@
package handlers
// rescue_wiring.go — bridges the leaf internal/rescue package to the
// handlers package's EIC/SSH runner + secret redactor, and exposes the
// boot-failure rescue hook used by both boot-failure verdict paths
// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via
// an injected hook wired in main.go).
//
// Why the indirection: internal/rescue is a leaf so registry (which
// must NOT import handlers — that's an import cycle) can call it. The
// two heavy dependencies live here in handlers — `withEICTunnel`
// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets`
// (the SAFE-T1201 secret-scan) — so we inject them into rescue's
// package-level func vars at init().
//
// RFC internal#742 Part 2.
import (
"bytes"
"context"
"database/sql"
"fmt"
"os"
"os/exec"
"strings"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
func init() {
// Wire the leaf rescue package to handlers' EIC runner + redactor.
// Done in init() (not main.go) so the binding is present for any
// caller of rescue.Capture, including the registry sweeper hook and
// the handler path, without each call site re-wiring it.
rescue.RunRemote = rescueRunRemoteViaEIC
rescue.Redact = func(workspaceID, content string) string {
out, _ := redactSecrets(workspaceID, content)
return out
}
}
// rescueRunRemoteViaEIC runs a single shell command on the still-running
// (but boot-failed) workspace EC2 over an EIC tunnel and returns its
// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the
// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel →
// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g.
// PR #2822's LogLevel=ERROR shim) for free.
//
// Combined output (2>&1) is intentional: a boot-failed box's most
// useful signal is often on stderr (a panic, a missing-file error), and
// the rescue bundle is a forensic blob, not a parsed value — we want
// everything the command emitted.
func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) {
var combined []byte
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...)
sshCmd.Env = os.Environ()
var buf bytes.Buffer
sshCmd.Stdout = &buf
sshCmd.Stderr = &buf
// A non-zero remote exit is NOT a transport error for the rescue
// path — each section command already falls back to an
// `|| echo '(...)'` marker, so a clean exit is expected. Only
// surface an error when ssh/tunnel itself failed AND produced no
// output to ship.
err := sshCmd.Run()
combined = buf.Bytes()
if err != nil && len(combined) == 0 {
return fmt.Errorf("rescue ssh exec: %w", err)
}
return nil
})
if runErr != nil {
return "", runErr
}
return strings.TrimRight(string(combined), "\n"), nil
}
// captureRescueBundle fires a best-effort, non-blocking rescue capture
// for a boot-failed workspace. It is the single entry point both
// boot-failure verdict paths funnel through.
//
// NON-BLOCKING: the actual collection runs in its own goroutine with
// its own timeout (rescue.CaptureTimeout), detached from the caller's
// request/sweep context so it can't add latency to — or be cancelled
// by — the failure-handling path that triggered it. We snapshot the
// identity into a fresh context.Background() for the same reason: a
// gin request context is cancelled the instant the HTTP handler
// returns, which would kill the EIC tunnel mid-collection.
//
// instanceID/orgID are resolved here (best-effort) so the two call
// sites only need the workspace id. A missing instance id → rescue.Capture
// no-ops (logged), so an early-failure workspace that never got an EC2
// is handled cleanly.
func captureRescueBundle(workspaceID, reason string) {
rescueDispatch(func() {
ctx := context.Background()
instanceID, err := rescueResolveInstanceID(ctx, workspaceID)
if err != nil {
// Best-effort: a resolve failure is logged inside Capture's
// caller chain; pass empty so Capture no-ops cleanly.
instanceID = ""
}
rescue.Capture(ctx, rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
})
}
// rescueDispatch runs the rescue collection off the request path. In
// production it's `go fn()` so the capture never blocks or adds latency
// to the boot-failure handler. Tests swap it for a synchronous runner so
// they can assert the capture fired (or didn't) deterministically
// without racing the goroutine.
var rescueDispatch = func(fn func()) { go fn() }
// BootFailureRescueHook is the registry-facing adapter wired into
// registry.BootFailureRescueHook from main.go. The registry sweeper
// already resolved the instance id (it's in the candidate row), so this
// path uses it directly rather than re-querying — symmetric with the
// captureRescueBundle handler path but skipping the lookup.
//
// Best-effort + non-blocking: dispatches the capture on its own
// goroutine with its own timeout, so the sweep loop is never slowed.
func BootFailureRescueHook(workspaceID, instanceID, reason string) {
go rescue.Capture(context.Background(), rescue.Input{
InstanceID: instanceID,
WorkspaceID: workspaceID,
OrgID: os.Getenv("MOLECULE_ORG_ID"),
Reason: reason,
})
}
// rescueResolveInstanceID looks up the EC2 instance id for a workspace.
// Package var so tests can stub it without a sqlmock. Mirrors
// provisioner.resolveInstanceID (same query) but lives here to keep the
// rescue wiring self-contained and avoid widening the provisioner
// surface.
var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
if db.DB == nil {
return "", nil // nil in unit tests
}
var instanceID sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&instanceID)
if err != nil && err != sql.ErrNoRows {
return "", err
}
if !instanceID.Valid {
return "", nil
}
return instanceID.String, nil
}
@@ -0,0 +1,119 @@
package handlers
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// rescueTestHarness makes the otherwise-async rescue capture
// deterministic + observable for handler tests:
// - rescueDispatch runs synchronously (no goroutine race).
// - rescueResolveInstanceID returns a fixed instance id.
// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH
// fires; runCalls counts how many remote-command collections ran,
// which is the proxy for "did the capture fire".
//
// All originals are restored on cleanup.
func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) {
t.Helper()
n := 0
runCalls = &n
prevDispatch := rescueDispatch
rescueDispatch = func(fn func()) { fn() } // synchronous
prevResolve := rescueResolveInstanceID
rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil }
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil }
rescue.Redact = func(_ws, c string) string { return c }
t.Cleanup(func() {
rescueDispatch = prevDispatch
rescueResolveInstanceID = prevResolve
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
})
return runCalls
}
// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler
// hook: when BootstrapFailed actually flips a workspace to `failed`
// (affected==1), the rescue capture fires against the resolved instance.
func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-failed01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if *runCalls != len(rescueBundleSectionCount()) {
t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount()))
}
}
// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned
// workspace (affected==0: raced to online, or double-report) is NOT a
// boot-failure verdict here, so the rescue capture must NOT fire.
func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) {
h, mock := setupBootstrapHandler(t)
runCalls := rescueTestHarness(t, "i-online01")
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d", w.Code)
}
if *runCalls != 0 {
t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls)
}
}
// rescueBundleSectionCount returns the production rescue bundle section
// list length by running a capture against a counting runner once. It's
// a small indirection so the handler test stays decoupled from the exact
// section set in internal/rescue (which has its own tests).
func rescueBundleSectionCount() []struct{} {
count := 0
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil }
rescue.Redact = func(_ws, c string) string { return c }
rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"})
rescue.RunRemote = prevRun
rescue.Redact = prevRedact
return make([]struct{}, count)
}
@@ -91,6 +91,18 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
"log_tail": tail,
"source": "bootstrap_watcher",
})
// RFC internal#742 Part 2: this is one of the two boot-failure
// verdict points. We've just flipped a still-running (but
// unconfigured) workspace EC2 to `failed`; the control plane will
// reap the instance shortly. Capture a forensic rescue bundle off
// the live box NOW, before it's torn down, so a wedged workspace is
// post-mortem-inspectable. Best-effort + non-blocking: runs in its
// own goroutine with its own timeout, detached from this request's
// context (which is cancelled the instant this handler returns).
// Failure to capture never changes the boot-failure handling.
captureRescueBundle(id, "bootstrap_watcher")
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
c.JSON(http.StatusOK, gin.H{"ok": true})
}
@@ -10,8 +10,15 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
)
// rescueVolumeGraceHours surfaces the rescue grace as whole hours for
// the retention-contract assertion (RFC internal#742 Part 2).
func rescueVolumeGraceHours() int {
return int(rescue.RescueVolumeGrace.Hours())
}
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
// sweeper tests. Records every Stop call so tests can assert which
// workspace IDs were re-issued.
@@ -97,6 +104,55 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) {
}
}
// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2
// volume-retention guarantee, molecule-core side.
//
// A boot-FAILED workspace (status='failed') must NOT be terminated by
// the platform's orphan sweeper: its instance + /configs data volume are
// retained through the rescue grace (rescue.RescueVolumeGrace) so a live
// rescue read is possible, distinct from the user-prune erase path. The
// sweeper reaps ONLY status='removed' (the explicit deprovision path),
// so a `failed` row is structurally excluded at the SELECT — it never
// reaches reaper.Stop. We assert the predicate filters to 'removed'
// (so the failed instance survives) and that no Stop fires for a DB
// whose only orphan-shaped row is `failed`.
//
// This is the "if the sweeper already keeps volumes by default, confirm
// + add a test asserting it" branch of the RFC: it does, by construction.
func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) {
mock := setupTestDB(t)
reaper := &fakeCPReaper{}
// The sweeper's SELECT carries `status = 'removed'`. A boot-failed
// workspace (status='failed') does not match that predicate, so the
// real DB returns it nowhere in this result set — modelled as the
// empty result the `removed`-only filter produces when the only
// instance-bearing row is `failed`. The regex pins the retention-
// critical predicate so a future widening to include 'failed' (which
// would terminate boot-failed boxes mid-rescue) fails this test.
mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`).
WithArgs(cpSweepLimit).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate
cpSweepOnce(context.Background(), reaper)
if len(reaper.stopCalls) != 0 {
t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue
// grace is its own contract (24h) and not coupled to any prune timing —
// the value is the SSOT the control-plane reaper must honour.
func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) {
if rescueVolumeGraceHours() != 24 {
t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours())
}
}
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
// independently; one failure doesn't block others.
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
@@ -92,6 +92,23 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du
return DefaultProvisioningTimeout
}
// BootFailureRescueHook, when wired, is invoked once per workspace the
// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure
// verdict, BEFORE the control plane reaps the instance. It captures a
// forensic rescue bundle off the still-running (but boot-failed) EC2 and
// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to
// handlers.captureRescueBundle via a thin adapter; nil in tests + on
// self-hosted deploys (no rescue shipping there).
//
// Function-typed injection (not an import of handlers) keeps the
// existing handlers→registry import direction intact — registry must not
// import handlers.
//
// MUST be best-effort + non-blocking: the hook itself dispatches the
// capture on its own goroutine with its own timeout, so the sweep loop
// is never slowed or blocked by a hung EIC tunnel on the dead box.
var BootFailureRescueHook func(workspaceID, instanceID, reason string)
// StartProvisioningTimeoutSweep periodically scans for workspaces stuck in
// `status='provisioning'` past the timeout window, flips them to `failed`,
// and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can
@@ -144,7 +161,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
// flight, not historical) and the partial index on status keeps
// it fast.
rows, err := db.DB.QueryContext(ctx, `
SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
FROM workspaces
WHERE status = 'provisioning'
`)
@@ -155,14 +172,15 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
defer rows.Close()
type candidate struct {
id string
runtime string
ageSec int
id string
runtime string
instanceID string
ageSec int
}
var ids []candidate
for rows.Next() {
var c candidate
if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil {
if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil {
ids = append(ids, c)
}
}
@@ -200,6 +218,19 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
continue
}
log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout)
// RFC internal#742 Part 2: this flip is a boot-failure verdict.
// The instance is still running (the CP reaps it shortly after);
// capture a forensic rescue bundle off it NOW, before teardown.
// Best-effort + non-blocking — the hook dispatches on its own
// goroutine + timeout, so a hung EIC tunnel on the dead box can't
// slow the sweep. Only fires on a real flip (affected==1), never
// on a race (affected==0) or a non-overdue row — guaranteeing it
// runs once per boot-failure verdict and never on a healthy row.
if BootFailureRescueHook != nil {
BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep")
}
// Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the
// canvas event handler only flips node state on the _FAILED case.
// A separate event type was considered but the UI reaction is
@@ -0,0 +1,130 @@
package registry
import (
"context"
"sync"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
)
// rescueHookRecorder captures the args of every BootFailureRescueHook
// invocation so tests can assert the rescue capture fires exactly on the
// boot-failure verdict — and never on a healthy/raced row.
type rescueHookRecorder struct {
mu sync.Mutex
calls [][3]string // {workspaceID, instanceID, reason}
}
func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) {
return func(workspaceID, instanceID, reason string) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason})
}
}
func (r *rescueHookRecorder) count() int {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.calls)
}
// withRescueHook installs a recorder as the package-level
// BootFailureRescueHook for the test's duration.
func withRescueHook(t *testing.T) *rescueHookRecorder {
t.Helper()
rec := &rescueHookRecorder{}
prev := BootFailureRescueHook
BootFailureRescueHook = rec.hook()
t.Cleanup(func() { BootFailureRescueHook = prev })
return rec
}
// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742
// assertion: when the sweep flips a stuck workspace to `failed`, the
// rescue hook fires once with the workspace + instance id and the
// provision_timeout_sweep reason, BEFORE teardown.
func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 1 {
t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count())
}
got := rec.calls[0]
if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" {
t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got)
}
}
// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to
// online/restart between SELECT and UPDATE. That is NOT a boot-failure
// verdict, so the rescue capture must NOT fire (we'd be snapshotting a
// healthy box that's about to come online).
func TestSweep_RescueDoesNotFireOnRace(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count())
}
}
// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is
// never flipped, so the rescue capture must not fire. Guards against the
// hook being attached above the age gate.
func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) {
mock := setupTestDB(t)
rec := withRescueHook(t)
// hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660}))
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
if rec.count() != 0 {
t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count())
}
}
// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired
// (self-hosted / no rescue shipping), the sweep must still flip + emit
// without panicking on the nil hook.
func TestSweep_RescueNilHookIsSafe(t *testing.T) {
mock := setupTestDB(t)
prev := BootFailureRescueHook
BootFailureRescueHook = nil
t.Cleanup(func() { BootFailureRescueHook = prev })
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil) // must not panic
if emit.count() != 1 {
t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count())
}
}
@@ -7,8 +7,8 @@ import (
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
)
// fakeEmitter records every RecordAndBroadcast call so tests can assert
@@ -42,12 +42,15 @@ func (f *fakeEmitter) count() int {
return len(f.events)
}
// candidateRows builds the new-shape query result (id, runtime, age_sec).
// Use this in every sweep test to match the runtime-aware SELECT.
func candidateRows(rows ...[3]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"})
// candidateRows builds the query result (id, runtime, instance_id,
// age_sec). instance_id was added for the RFC internal#742 rescue hook —
// it rides alongside runtime so the boot-failure capture can reach the
// still-running box. Tests that don't care about the rescue path pass
// "" for instance_id. Use this in every sweep test to match the SELECT.
func candidateRows(rows ...[4]any) *sqlmock.Rows {
r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"})
for _, row := range rows {
r = r.AddRow(row[0], row[1], row[2])
r = r.AddRow(row[0], row[1], row[2], row[3])
}
return r
}
@@ -58,8 +61,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
mock := setupTestDB(t)
// claude-code workspace, 700s old > 600s default timeout → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -92,8 +95,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) {
// 11 min = 660 sec. < HermesProvisioningTimeout (1800s).
// No UPDATE should fire — hermes still has time.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660}))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit, nil)
@@ -114,8 +117,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
mock := setupTestDB(t)
// 31 min = 1860 sec > HermesProvisioningTimeout (1800s).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -150,8 +153,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660}))
// No ExpectExec — if the sweeper still flips the row, sqlmock will
// fail with an unexpected-query error.
@@ -183,8 +186,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
mock := setupTestDB(t)
// 21 min = 1260s > 1200s manifest override → flipped.
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -221,8 +224,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
@@ -244,7 +247,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows())
emit := &fakeEmitter{}
@@ -265,10 +268,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows(
[3]any{"ws-claude-code", "claude-code", 700},
[3]any{"ws-hermes", "hermes", 1860},
[4]any{"ws-claude-code", "claude-code", "i-cc", 700},
[4]any{"ws-hermes", "hermes", "i-hh", 1860},
))
mock.ExpectExec(`UPDATE workspaces`).
@@ -292,8 +295,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
+247
View File
@@ -0,0 +1,247 @@
// Package rescue captures a fixed post-mortem "rescue bundle" off a
// workspace EC2 whose boot FAILED — before the platform's sweeper /
// control-plane reaps the instance — and ships it to obs/Loki so a
// wedged workspace (e.g. the codex provider-derivation failure that
// motivated RFC internal#742) is inspectable instead of an
// uninspectable wall.
//
// Design constraints (RFC internal#742, Part 2):
//
// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure
// semantics or add latency to the failure path. Callers fire
// Capture in its own goroutine; Capture additionally bounds itself
// with CaptureTimeout so a hung EIC tunnel can't wedge the
// goroutine forever.
// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are
// the provision-timeout sweep (registry.sweepStuckProvisioning) and
// the out-of-band bootstrap-watcher signal
// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown /
// deprovision / recreate / billing-suspend / hibernate paths do NOT
// call Capture — see the RFC's path enumeration.
// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is
// run through the injected Redact func (wired to the existing
// handlers.redactSecrets secret-scan) before it is shipped. Raw
// tokens/keys never reach Loki.
//
// The package is a LEAF: it imports only internal/audit (the obs
// shipper) so it can be called from both handlers and registry without
// an import cycle (registry must not import handlers). The two heavy
// dependencies — the EIC/SSH remote-command runner and the redactor —
// are injected as package-level func vars, wired once at boot from the
// handlers package (which owns withEICTunnel + redactSecrets). Tests
// swap them for fakes.
package rescue
import (
"context"
"fmt"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit"
)
// CaptureTimeout bounds the whole bundle collection. The sweeper runs
// every 30s and the CP reap follows the failure verdict; 45s gives the
// EIC dance (~3-5s) plus six short remote commands (<2s each) generous
// headroom while still finishing well before the instance is torn down.
// Distinct from the per-op eicFileOpTimeout so a slow box that already
// failed to boot can't hang the capture goroutine indefinitely.
const CaptureTimeout = 45 * time.Second
// LokiKind is the Loki stream label value that tags every rescue
// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels).
const LokiKind = "rescue"
// RescueVolumeGrace is how long a boot-failed workspace's /configs data
// volume (and its still-running instance) must be RETAINED past the
// boot-failure verdict so a live rescue read is possible — distinct from
// the user-requested prune path (cp#415), which is an explicit erase.
//
// In molecule-core (the tenant platform) the boot-failure verdict only
// flips workspaces.status to `failed`; it never issues a terminate. The
// platform's two reapers (registry.StartCPOrphanSweeper +
// handlers deprovision) act ONLY on status='removed', so a `failed`
// workspace's instance + /configs volume are retained here by
// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The
// time-bounded reap of the failed instance is the control plane's
// bootstrap-watcher concern; this constant is the SSOT for the grace
// the CP must honour (24h covers an operator's next-business-day
// post-mortem without leaking the volume indefinitely).
const RescueVolumeGrace = 24 * time.Hour
// rescueEventType is the audit event_type carried in the shipped
// record. The obs shipper (internal/audit) already maps event_type to a
// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream
// trivially filterable alongside the existing audit taxonomy.
const rescueEventType = "rescue.bundle"
// RunRemote runs a single shell command on the still-running (but
// unconfigured) workspace EC2 over EIC/SSH and returns its combined
// output. Wired at boot to the handlers EIC runner
// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a
// logged no-op rather than panicking, so an operator who hasn't wired
// the hook still gets a clear signal instead of a crash on the failure
// path.
var RunRemote func(ctx context.Context, instanceID, command string) (string, error)
// Redact scrubs secret-shaped substrings from a collected section
// before it leaves the box. Wired at boot to handlers.redactSecrets.
// nil until wired — Capture refuses to ship un-redacted content if the
// redactor is missing (fails closed: logs + aborts rather than leaking
// raw config).
var Redact func(workspaceID, content string) string
// section is one labelled chunk of the rescue bundle: a human-readable
// name + the remote command that produces it.
type section struct {
name string
command string
}
// bundleSections is the FIXED set collected on every boot-failure
// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading
// order: config first, then boot logs, then container state, then the
// resolved model/provider env that drove the codex derivation failure.
//
// - /configs/config.yaml + system-prompt.md: the managed config the
// runtime booted against (redacted; system-prompt can embed keys).
// - cloud-init-output.log tail: the user-data execution trace — where
// a wedged boot actually died.
// - docker ps -a: container state (did the agent container even
// start, exit-code, restart loop).
// - agent container logs: the runtime's own stderr (the codex
// provider-derivation panic lives here).
// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated
// the RFC. `sudo cat` of the container env via docker inspect-style
// grep — see the command.
//
// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu
// has passwordless sudo) and swallow missing-target stderr so a section
// that can't be produced ships as a short marker instead of failing the
// whole bundle. Kept as data (not inlined) so the redaction + ship loop
// is uniform and the set is reviewable in one place.
var bundleSections = []section{
{
name: "config.yaml",
command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'",
},
{
name: "system-prompt.md",
command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'",
},
{
name: "cloud-init-output.log.tail",
command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'",
},
{
name: "docker-ps",
command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'",
},
{
// The agent container is the first non-infra container; grab the
// most recently created one and tail its logs. `head -1` of
// `docker ps -a -q` is creation-ordered newest-first, which is
// the agent runtime on a workspace box.
name: "agent-container.logs.tail",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'",
},
{
// Resolved model/provider/runtime env from the agent container.
// `docker inspect` the env array and grep the routing keys. This
// is the field that pinpoints a provider-derivation failure.
name: "model-provider-runtime.env",
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'",
},
}
// Input is the identity of the failed workspace being rescued.
type Input struct {
InstanceID string // EC2 instance id of the still-running failed box
WorkspaceID string
OrgID string
// Reason is a short tag for WHY the rescue fired (e.g.
// "provision_timeout_sweep" or "bootstrap_watcher") — carried into
// the Loki record so an operator can correlate the bundle with the
// failure verdict that triggered it.
Reason string
}
// Capture collects the fixed rescue bundle off the failed instance,
// redacts each section, and ships it to Loki under
// {kind="rescue", org=<OrgID>, workspace_id=<WorkspaceID>}.
//
// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single
// section that won't collect) is logged and does NOT propagate — Capture
// never returns an error and never panics, so the boot-failure handling
// at the call site is unaffected. The caller is expected to invoke this
// in its own goroutine; Capture additionally self-bounds with
// CaptureTimeout.
func Capture(ctx context.Context, in Input) {
defer func() {
// A logging helper on the failure path must never take the
// process down. Recover defensively — the redactor / shipper are
// injected and a future mis-wire shouldn't crash the sweeper.
if r := recover(); r != nil {
log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r)
}
}()
if in.InstanceID == "" {
// No live box to read — nothing to rescue (e.g. failure before
// any EC2 was launched). Not an error; just skip.
log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID)
return
}
if RunRemote == nil {
log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID)
return
}
if Redact == nil {
// Fail CLOSED: without a redactor we could leak raw tokens to
// Loki. Abort rather than ship unredacted.
log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID)
return
}
ctx, cancel := context.WithTimeout(ctx, CaptureTimeout)
defer cancel()
log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason)
collected := 0
for _, sec := range bundleSections {
raw, err := RunRemote(ctx, in.InstanceID, sec.command)
if err != nil {
// One section failing (e.g. ssh blip mid-collection) must not
// abort the rest — ship a marker for it and continue.
log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err)
ship(ctx, in, sec.name, fmt.Sprintf("(rescue: section collection failed: %v)", err), false)
continue
}
redacted := Redact(in.WorkspaceID, raw)
ship(ctx, in, sec.name, redacted, true)
collected++
}
log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind)
}
// ship emits one rescue section to Loki via the audit shipper. The
// org / workspace_id / kind ride in the record body (queryable via
// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality
// Loki label the shipper already promotes. `redacted` records whether
// the content passed through the secret-scan, so an operator can tell a
// shipped-but-redacted section from a collection-failure marker.
func ship(ctx context.Context, in Input, name, content string, redacted bool) {
audit.Emit(ctx, rescueEventType, map[string]any{
"kind": LokiKind,
"org": in.OrgID,
"workspace_id": in.WorkspaceID,
"instance_id": in.InstanceID,
"reason": in.Reason,
"section": name,
"redacted": redacted,
"content": content,
})
}
@@ -0,0 +1,226 @@
package rescue
import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"strings"
"testing"
)
// withFakes swaps the injected RunRemote + Redact for the duration of a
// test and restores them after. Mirrors the provisioner test-fake
// pattern (package-var swap + t.Cleanup).
func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) {
t.Helper()
prevRun, prevRedact := RunRemote, Redact
RunRemote = run
Redact = redact
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
}
// captureLoki points the audit shipper at a temp JSONL file and returns
// a reader that decodes the records the rescue ship() loop wrote. This
// is the same transport the production rescue stream uses (audit.Emit →
// Loki via the tenant Vector source), so asserting on it proves the
// shipper-reuse + labels end to end.
func captureLoki(t *testing.T) func() []map[string]any {
t.Helper()
dir := t.TempDir()
path := filepath.Join(dir, "audit.jsonl")
t.Setenv("MOLECULE_AUDIT_LOG_PATH", path)
return func() []map[string]any {
b, err := os.ReadFile(path)
if err != nil {
return nil
}
var out []map[string]any
for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
if line == "" {
continue
}
var rec map[string]any
if err := json.Unmarshal([]byte(line), &rec); err != nil {
t.Fatalf("bad audit jsonl line %q: %v", line, err)
}
out = append(out, rec)
}
return out
}
}
func fields(rec map[string]any) map[string]any {
f, _ := rec["fields"].(map[string]any)
return f
}
// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a
// boot-failure capture collects every fixed section, runs each through
// the redactor, and ships it to Loki under {kind="rescue", org, ws}.
func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) {
readLoki := captureLoki(t)
var seenCmds []string
withFakes(t,
func(_ context.Context, instanceID, cmd string) (string, error) {
seenCmds = append(seenCmds, cmd)
return "OUTPUT for " + instanceID, nil
},
func(_ws, c string) string { return c }, // identity redactor
)
Capture(context.Background(), Input{
InstanceID: "i-abc123",
WorkspaceID: "ws-1",
OrgID: "org-9",
Reason: "provision_timeout_sweep",
})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs))
}
if len(seenCmds) != len(bundleSections) {
t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds))
}
for _, rec := range recs {
if rec["event_type"] != rescueEventType {
t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType)
}
// workspace_id is promoted to the top-level record position by
// the audit shipper.
if rec["workspace_id"] != "ws-1" {
t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"])
}
f := fields(rec)
if f["kind"] != LokiKind {
t.Errorf("kind = %v, want %q", f["kind"], LokiKind)
}
if f["org"] != "org-9" {
t.Errorf("org = %v, want org-9", f["org"])
}
if f["instance_id"] != "i-abc123" {
t.Errorf("instance_id = %v, want i-abc123", f["instance_id"])
}
if f["redacted"] != true {
t.Errorf("redacted = %v, want true for a collected section", f["redacted"])
}
}
}
// TestCapture_Redacts proves the bundle is scrubbed before it leaves the
// box: a remote section that contains a secret-shaped token must ship
// with the token replaced, never raw.
func TestCapture_Redacts(t *testing.T) {
readLoki := captureLoki(t)
const secret = "sk-ant-SUPERSECRETTOKENVALUE0001"
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) {
return "ANTHROPIC_API_KEY=" + secret, nil
},
// redactor that mangles anything containing the secret shape
func(_ws, c string) string {
if strings.Contains(c, secret) {
return strings.ReplaceAll(c, secret, "[REDACTED]")
}
return c
},
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
for _, rec := range readLoki() {
content, _ := fields(rec)["content"].(string)
if strings.Contains(content, secret) {
t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content)
}
}
}
// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has
// nothing to read — Capture must no-op (ship nothing) rather than dial a
// blank instance id.
func TestCapture_SkipsWhenNoInstance(t *testing.T) {
readLoki := captureLoki(t)
called := false
withFakes(t,
func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil },
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"})
if called {
t.Error("RunRemote called for an empty instance id")
}
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records for an empty instance id, want 0", len(recs))
}
}
// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired,
// Capture must NOT ship anything (would leak raw config). Fail closed.
func TestCapture_FailsClosedWithoutRedactor(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil }
Redact = nil
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs))
}
}
// TestCapture_SectionFailureIsIsolated: one section's RunRemote error
// must not abort the rest — the failing section ships a marker and the
// others still ship.
func TestCapture_SectionFailureIsIsolated(t *testing.T) {
readLoki := captureLoki(t)
withFakes(t,
func(_ context.Context, _ string, cmd string) (string, error) {
if strings.Contains(cmd, "config.yaml") {
return "", errors.New("ssh blip")
}
return "ok", nil
},
func(_ws, c string) string { return c },
)
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"})
recs := readLoki()
if len(recs) != len(bundleSections) {
t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs))
}
var failureMarkers int
for _, rec := range recs {
if fields(rec)["redacted"] == false {
failureMarkers++
content, _ := fields(rec)["content"].(string)
if !strings.Contains(content, "section collection failed") {
t.Errorf("failure marker content = %q, want a collection-failed marker", content)
}
}
}
if failureMarkers != 1 {
t.Errorf("want exactly 1 failure marker, got %d", failureMarkers)
}
}
// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't
// called the boot wiring), Capture must be a logged no-op, never a panic.
func TestCapture_NoWiringIsSafeNoOp(t *testing.T) {
readLoki := captureLoki(t)
prevRun, prevRedact := RunRemote, Redact
RunRemote = nil
Redact = func(_ws, c string) string { return c }
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"})
if recs := readLoki(); len(recs) != 0 {
t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs))
}
}