feat(workspace-server): rescue capture on boot-failure (internal#742 Part 2) #2019
@@ -349,6 +349,17 @@ func main() {
|
||||
codexauth.StartCodexAuthRefresher(c, db.DB)
|
||||
})
|
||||
|
||||
// RFC internal#742 Part 2: wire the boot-failure rescue capture into
|
||||
// the provision-timeout sweep's failure verdict. When the sweep flips
|
||||
// a stuck workspace to `failed`, this hook captures a forensic rescue
|
||||
// bundle off the still-running (but boot-failed) EC2 and ships it to
|
||||
// obs/Loki before the control plane reaps the instance. Best-effort +
|
||||
// non-blocking (handlers.BootFailureRescueHook dispatches on its own
|
||||
// goroutine + timeout). The handler-side boot-failure path
|
||||
// (WorkspaceHandler.BootstrapFailed) wires its own capture inline.
|
||||
registry.BootFailureRescueHook = handlers.BootFailureRescueHook
|
||||
|
||||
|
||||
// Provision-timeout sweep — flips workspaces that have been stuck in
|
||||
// status='provisioning' past the timeout window to 'failed' and emits
|
||||
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic
|
||||
|
||||
@@ -0,0 +1,158 @@
|
||||
package handlers
|
||||
|
||||
// rescue_wiring.go — bridges the leaf internal/rescue package to the
|
||||
// handlers package's EIC/SSH runner + secret redactor, and exposes the
|
||||
// boot-failure rescue hook used by both boot-failure verdict paths
|
||||
// (handlers.BootstrapFailed here, registry.sweepStuckProvisioning via
|
||||
// an injected hook wired in main.go).
|
||||
//
|
||||
// Why the indirection: internal/rescue is a leaf so registry (which
|
||||
// must NOT import handlers — that's an import cycle) can call it. The
|
||||
// two heavy dependencies live here in handlers — `withEICTunnel`
|
||||
// (the EIC keypair → push → tunnel → ssh dance) and `redactSecrets`
|
||||
// (the SAFE-T1201 secret-scan) — so we inject them into rescue's
|
||||
// package-level func vars at init().
|
||||
//
|
||||
// RFC internal#742 Part 2.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Wire the leaf rescue package to handlers' EIC runner + redactor.
|
||||
// Done in init() (not main.go) so the binding is present for any
|
||||
// caller of rescue.Capture, including the registry sweeper hook and
|
||||
// the handler path, without each call site re-wiring it.
|
||||
rescue.RunRemote = rescueRunRemoteViaEIC
|
||||
rescue.Redact = func(workspaceID, content string) string {
|
||||
out, _ := redactSecrets(workspaceID, content)
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
// rescueRunRemoteViaEIC runs a single shell command on the still-running
|
||||
// (but boot-failed) workspace EC2 over an EIC tunnel and returns its
|
||||
// combined stdout+stderr. Reuses the same `withEICTunnel` dance as the
|
||||
// canvas file ops (ephemeral keypair → SendSSHPublicKey → open-tunnel →
|
||||
// ssh) so the rescue path inherits every fix to the EIC mechanism (e.g.
|
||||
// PR #2822's LogLevel=ERROR shim) for free.
|
||||
//
|
||||
// Combined output (2>&1) is intentional: a boot-failed box's most
|
||||
// useful signal is often on stderr (a panic, a missing-file error), and
|
||||
// the rescue bundle is a forensic blob, not a parsed value — we want
|
||||
// everything the command emitted.
|
||||
func rescueRunRemoteViaEIC(ctx context.Context, instanceID, command string) (string, error) {
|
||||
var combined []byte
|
||||
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
|
||||
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(command)...)
|
||||
sshCmd.Env = os.Environ()
|
||||
var buf bytes.Buffer
|
||||
sshCmd.Stdout = &buf
|
||||
sshCmd.Stderr = &buf
|
||||
// A non-zero remote exit is NOT a transport error for the rescue
|
||||
// path — each section command already falls back to an
|
||||
// `|| echo '(...)'` marker, so a clean exit is expected. Only
|
||||
// surface an error when ssh/tunnel itself failed AND produced no
|
||||
// output to ship.
|
||||
err := sshCmd.Run()
|
||||
combined = buf.Bytes()
|
||||
if err != nil && len(combined) == 0 {
|
||||
return fmt.Errorf("rescue ssh exec: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if runErr != nil {
|
||||
return "", runErr
|
||||
}
|
||||
return strings.TrimRight(string(combined), "\n"), nil
|
||||
}
|
||||
|
||||
// captureRescueBundle fires a best-effort, non-blocking rescue capture
|
||||
// for a boot-failed workspace. It is the single entry point both
|
||||
// boot-failure verdict paths funnel through.
|
||||
//
|
||||
// NON-BLOCKING: the actual collection runs in its own goroutine with
|
||||
// its own timeout (rescue.CaptureTimeout), detached from the caller's
|
||||
// request/sweep context so it can't add latency to — or be cancelled
|
||||
// by — the failure-handling path that triggered it. We snapshot the
|
||||
// identity into a fresh context.Background() for the same reason: a
|
||||
// gin request context is cancelled the instant the HTTP handler
|
||||
// returns, which would kill the EIC tunnel mid-collection.
|
||||
//
|
||||
// instanceID/orgID are resolved here (best-effort) so the two call
|
||||
// sites only need the workspace id. A missing instance id → rescue.Capture
|
||||
// no-ops (logged), so an early-failure workspace that never got an EC2
|
||||
// is handled cleanly.
|
||||
func captureRescueBundle(workspaceID, reason string) {
|
||||
rescueDispatch(func() {
|
||||
ctx := context.Background()
|
||||
instanceID, err := rescueResolveInstanceID(ctx, workspaceID)
|
||||
if err != nil {
|
||||
// Best-effort: a resolve failure is logged inside Capture's
|
||||
// caller chain; pass empty so Capture no-ops cleanly.
|
||||
instanceID = ""
|
||||
}
|
||||
rescue.Capture(ctx, rescue.Input{
|
||||
InstanceID: instanceID,
|
||||
WorkspaceID: workspaceID,
|
||||
OrgID: os.Getenv("MOLECULE_ORG_ID"),
|
||||
Reason: reason,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// rescueDispatch runs the rescue collection off the request path. In
|
||||
// production it's `go fn()` so the capture never blocks or adds latency
|
||||
// to the boot-failure handler. Tests swap it for a synchronous runner so
|
||||
// they can assert the capture fired (or didn't) deterministically
|
||||
// without racing the goroutine.
|
||||
var rescueDispatch = func(fn func()) { go fn() }
|
||||
|
||||
// BootFailureRescueHook is the registry-facing adapter wired into
|
||||
// registry.BootFailureRescueHook from main.go. The registry sweeper
|
||||
// already resolved the instance id (it's in the candidate row), so this
|
||||
// path uses it directly rather than re-querying — symmetric with the
|
||||
// captureRescueBundle handler path but skipping the lookup.
|
||||
//
|
||||
// Best-effort + non-blocking: dispatches the capture on its own
|
||||
// goroutine with its own timeout, so the sweep loop is never slowed.
|
||||
func BootFailureRescueHook(workspaceID, instanceID, reason string) {
|
||||
go rescue.Capture(context.Background(), rescue.Input{
|
||||
InstanceID: instanceID,
|
||||
WorkspaceID: workspaceID,
|
||||
OrgID: os.Getenv("MOLECULE_ORG_ID"),
|
||||
Reason: reason,
|
||||
})
|
||||
}
|
||||
|
||||
// rescueResolveInstanceID looks up the EC2 instance id for a workspace.
|
||||
// Package var so tests can stub it without a sqlmock. Mirrors
|
||||
// provisioner.resolveInstanceID (same query) but lives here to keep the
|
||||
// rescue wiring self-contained and avoid widening the provisioner
|
||||
// surface.
|
||||
var rescueResolveInstanceID = func(ctx context.Context, workspaceID string) (string, error) {
|
||||
if db.DB == nil {
|
||||
return "", nil // nil in unit tests
|
||||
}
|
||||
var instanceID sql.NullString
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT instance_id FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&instanceID)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", err
|
||||
}
|
||||
if !instanceID.Valid {
|
||||
return "", nil
|
||||
}
|
||||
return instanceID.String, nil
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// rescueTestHarness makes the otherwise-async rescue capture
|
||||
// deterministic + observable for handler tests:
|
||||
// - rescueDispatch runs synchronously (no goroutine race).
|
||||
// - rescueResolveInstanceID returns a fixed instance id.
|
||||
// - rescue.RunRemote / rescue.Redact are stubbed so no real EIC/SSH
|
||||
// fires; runCalls counts how many remote-command collections ran,
|
||||
// which is the proxy for "did the capture fire".
|
||||
//
|
||||
// All originals are restored on cleanup.
|
||||
func rescueTestHarness(t *testing.T, instanceID string) (runCalls *int) {
|
||||
t.Helper()
|
||||
n := 0
|
||||
runCalls = &n
|
||||
|
||||
prevDispatch := rescueDispatch
|
||||
rescueDispatch = func(fn func()) { fn() } // synchronous
|
||||
prevResolve := rescueResolveInstanceID
|
||||
rescueResolveInstanceID = func(_ context.Context, _ string) (string, error) { return instanceID, nil }
|
||||
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
|
||||
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { n++; return "out", nil }
|
||||
rescue.Redact = func(_ws, c string) string { return c }
|
||||
|
||||
t.Cleanup(func() {
|
||||
rescueDispatch = prevDispatch
|
||||
rescueResolveInstanceID = prevResolve
|
||||
rescue.RunRemote = prevRun
|
||||
rescue.Redact = prevRedact
|
||||
})
|
||||
return runCalls
|
||||
}
|
||||
|
||||
// TestBootstrapFailed_FiresRescueOnFlip — the RFC internal#742 handler
|
||||
// hook: when BootstrapFailed actually flips a workspace to `failed`
|
||||
// (affected==1), the rescue capture fires against the resolved instance.
|
||||
func TestBootstrapFailed_FiresRescueOnFlip(t *testing.T) {
|
||||
h, mock := setupBootstrapHandler(t)
|
||||
runCalls := rescueTestHarness(t, "i-failed01")
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-crashed", sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
|
||||
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
|
||||
bytes.NewBufferString(`{"error":"codex provider derivation failed","log_tail":"panic"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.BootstrapFailed(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if *runCalls != len(rescueBundleSectionCount()) {
|
||||
t.Errorf("rescue capture ran %d remote commands, want %d (one per bundle section)", *runCalls, len(rescueBundleSectionCount()))
|
||||
}
|
||||
}
|
||||
|
||||
// TestBootstrapFailed_NoRescueOnNoChange — an already-transitioned
|
||||
// workspace (affected==0: raced to online, or double-report) is NOT a
|
||||
// boot-failure verdict here, so the rescue capture must NOT fire.
|
||||
func TestBootstrapFailed_NoRescueOnNoChange(t *testing.T) {
|
||||
h, mock := setupBootstrapHandler(t)
|
||||
runCalls := rescueTestHarness(t, "i-online01")
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-online", sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // already transitioned
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
|
||||
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
|
||||
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.BootstrapFailed(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d", w.Code)
|
||||
}
|
||||
if *runCalls != 0 {
|
||||
t.Errorf("rescue capture fired (%d cmds) on a no-change report; it must only fire on a real flip", *runCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// rescueBundleSectionCount returns the production rescue bundle section
|
||||
// list length by running a capture against a counting runner once. It's
|
||||
// a small indirection so the handler test stays decoupled from the exact
|
||||
// section set in internal/rescue (which has its own tests).
|
||||
func rescueBundleSectionCount() []struct{} {
|
||||
count := 0
|
||||
prevRun, prevRedact := rescue.RunRemote, rescue.Redact
|
||||
rescue.RunRemote = func(_ context.Context, _ string, _ string) (string, error) { count++; return "", nil }
|
||||
rescue.Redact = func(_ws, c string) string { return c }
|
||||
rescue.Capture(context.Background(), rescue.Input{InstanceID: "i-probe", WorkspaceID: "w", OrgID: "o"})
|
||||
rescue.RunRemote = prevRun
|
||||
rescue.Redact = prevRedact
|
||||
return make([]struct{}, count)
|
||||
}
|
||||
@@ -91,6 +91,18 @@ func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
|
||||
"log_tail": tail,
|
||||
"source": "bootstrap_watcher",
|
||||
})
|
||||
|
||||
// RFC internal#742 Part 2: this is one of the two boot-failure
|
||||
// verdict points. We've just flipped a still-running (but
|
||||
// unconfigured) workspace EC2 to `failed`; the control plane will
|
||||
// reap the instance shortly. Capture a forensic rescue bundle off
|
||||
// the live box NOW, before it's torn down, so a wedged workspace is
|
||||
// post-mortem-inspectable. Best-effort + non-blocking: runs in its
|
||||
// own goroutine with its own timeout, detached from this request's
|
||||
// context (which is cancelled the instant this handler returns).
|
||||
// Failure to capture never changes the boot-failure handling.
|
||||
captureRescueBundle(id, "bootstrap_watcher")
|
||||
|
||||
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
}
|
||||
|
||||
@@ -10,8 +10,15 @@ import (
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/rescue"
|
||||
)
|
||||
|
||||
// rescueVolumeGraceHours surfaces the rescue grace as whole hours for
|
||||
// the retention-contract assertion (RFC internal#742 Part 2).
|
||||
func rescueVolumeGraceHours() int {
|
||||
return int(rescue.RescueVolumeGrace.Hours())
|
||||
}
|
||||
|
||||
// fakeCPReaper is a hand-rolled CPOrphanReaper for the SaaS-mode
|
||||
// sweeper tests. Records every Stop call so tests can assert which
|
||||
// workspace IDs were re-issued.
|
||||
@@ -97,6 +104,55 @@ func TestCPSweepOnce_NoOrphans(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_DoesNotReapFailedWorkspace — RFC internal#742 Part 2
|
||||
// volume-retention guarantee, molecule-core side.
|
||||
//
|
||||
// A boot-FAILED workspace (status='failed') must NOT be terminated by
|
||||
// the platform's orphan sweeper: its instance + /configs data volume are
|
||||
// retained through the rescue grace (rescue.RescueVolumeGrace) so a live
|
||||
// rescue read is possible, distinct from the user-prune erase path. The
|
||||
// sweeper reaps ONLY status='removed' (the explicit deprovision path),
|
||||
// so a `failed` row is structurally excluded at the SELECT — it never
|
||||
// reaches reaper.Stop. We assert the predicate filters to 'removed'
|
||||
// (so the failed instance survives) and that no Stop fires for a DB
|
||||
// whose only orphan-shaped row is `failed`.
|
||||
//
|
||||
// This is the "if the sweeper already keeps volumes by default, confirm
|
||||
// + add a test asserting it" branch of the RFC: it does, by construction.
|
||||
func TestCPSweepOnce_DoesNotReapFailedWorkspace(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
reaper := &fakeCPReaper{}
|
||||
|
||||
// The sweeper's SELECT carries `status = 'removed'`. A boot-failed
|
||||
// workspace (status='failed') does not match that predicate, so the
|
||||
// real DB returns it nowhere in this result set — modelled as the
|
||||
// empty result the `removed`-only filter produces when the only
|
||||
// instance-bearing row is `failed`. The regex pins the retention-
|
||||
// critical predicate so a future widening to include 'failed' (which
|
||||
// would terminate boot-failed boxes mid-rescue) fails this test.
|
||||
mock.ExpectQuery(`(?s)WHERE status = 'removed'\s+AND instance_id IS NOT NULL`).
|
||||
WithArgs(cpSweepLimit).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"})) // failed row excluded by predicate
|
||||
|
||||
cpSweepOnce(context.Background(), reaper)
|
||||
|
||||
if len(reaper.stopCalls) != 0 {
|
||||
t.Fatalf("boot-failed workspace must be RETAINED (no terminate); got Stop calls %v", reaper.stopCalls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRescueVolumeGraceIsDistinctFromPrune documents that the rescue
|
||||
// grace is its own contract (24h) and not coupled to any prune timing —
|
||||
// the value is the SSOT the control-plane reaper must honour.
|
||||
func TestRescueVolumeGraceIsDistinctFromPrune(t *testing.T) {
|
||||
if rescueVolumeGraceHours() != 24 {
|
||||
t.Errorf("rescue volume grace = %dh, want 24h (RFC internal#742)", rescueVolumeGraceHours())
|
||||
}
|
||||
}
|
||||
|
||||
// TestCPSweepOnce_MultipleOrphans — all rows in the batch get Stop'd
|
||||
// independently; one failure doesn't block others.
|
||||
func TestCPSweepOnce_MultipleOrphans(t *testing.T) {
|
||||
|
||||
@@ -92,6 +92,23 @@ func provisioningTimeoutFor(runtime string, lookup RuntimeTimeoutLookup) time.Du
|
||||
return DefaultProvisioningTimeout
|
||||
}
|
||||
|
||||
// BootFailureRescueHook, when wired, is invoked once per workspace the
|
||||
// sweep flips from `provisioning` to `failed` — i.e. on the boot-failure
|
||||
// verdict, BEFORE the control plane reaps the instance. It captures a
|
||||
// forensic rescue bundle off the still-running (but boot-failed) EC2 and
|
||||
// ships it to obs/Loki (RFC internal#742 Part 2). Wired in main.go to
|
||||
// handlers.captureRescueBundle via a thin adapter; nil in tests + on
|
||||
// self-hosted deploys (no rescue shipping there).
|
||||
//
|
||||
// Function-typed injection (not an import of handlers) keeps the
|
||||
// existing handlers→registry import direction intact — registry must not
|
||||
// import handlers.
|
||||
//
|
||||
// MUST be best-effort + non-blocking: the hook itself dispatches the
|
||||
// capture on its own goroutine with its own timeout, so the sweep loop
|
||||
// is never slowed or blocked by a hung EIC tunnel on the dead box.
|
||||
var BootFailureRescueHook func(workspaceID, instanceID, reason string)
|
||||
|
||||
// StartProvisioningTimeoutSweep periodically scans for workspaces stuck in
|
||||
// `status='provisioning'` past the timeout window, flips them to `failed`,
|
||||
// and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can
|
||||
@@ -144,7 +161,7 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
|
||||
// flight, not historical) and the partial index on status keeps
|
||||
// it fast.
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, COALESCE(runtime, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
|
||||
SELECT id, COALESCE(runtime, ''), COALESCE(instance_id, ''), EXTRACT(EPOCH FROM (now() - updated_at))::int
|
||||
FROM workspaces
|
||||
WHERE status = 'provisioning'
|
||||
`)
|
||||
@@ -155,14 +172,15 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
|
||||
defer rows.Close()
|
||||
|
||||
type candidate struct {
|
||||
id string
|
||||
runtime string
|
||||
ageSec int
|
||||
id string
|
||||
runtime string
|
||||
instanceID string
|
||||
ageSec int
|
||||
}
|
||||
var ids []candidate
|
||||
for rows.Next() {
|
||||
var c candidate
|
||||
if err := rows.Scan(&c.id, &c.runtime, &c.ageSec); err == nil {
|
||||
if err := rows.Scan(&c.id, &c.runtime, &c.instanceID, &c.ageSec); err == nil {
|
||||
ids = append(ids, c)
|
||||
}
|
||||
}
|
||||
@@ -200,6 +218,19 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
|
||||
continue
|
||||
}
|
||||
log.Printf("Provision-timeout sweep: %s (runtime=%q) stuck in provisioning > %s — marked failed", c.id, c.runtime, timeout)
|
||||
|
||||
// RFC internal#742 Part 2: this flip is a boot-failure verdict.
|
||||
// The instance is still running (the CP reaps it shortly after);
|
||||
// capture a forensic rescue bundle off it NOW, before teardown.
|
||||
// Best-effort + non-blocking — the hook dispatches on its own
|
||||
// goroutine + timeout, so a hung EIC tunnel on the dead box can't
|
||||
// slow the sweep. Only fires on a real flip (affected==1), never
|
||||
// on a race (affected==0) or a non-overdue row — guaranteeing it
|
||||
// runs once per boot-failure verdict and never on a healthy row.
|
||||
if BootFailureRescueHook != nil {
|
||||
BootFailureRescueHook(c.id, c.instanceID, "provision_timeout_sweep")
|
||||
}
|
||||
|
||||
// Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the
|
||||
// canvas event handler only flips node state on the _FAILED case.
|
||||
// A separate event type was considered but the UI reaction is
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
// rescueHookRecorder captures the args of every BootFailureRescueHook
|
||||
// invocation so tests can assert the rescue capture fires exactly on the
|
||||
// boot-failure verdict — and never on a healthy/raced row.
|
||||
type rescueHookRecorder struct {
|
||||
mu sync.Mutex
|
||||
calls [][3]string // {workspaceID, instanceID, reason}
|
||||
}
|
||||
|
||||
func (r *rescueHookRecorder) hook() func(workspaceID, instanceID, reason string) {
|
||||
return func(workspaceID, instanceID, reason string) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.calls = append(r.calls, [3]string{workspaceID, instanceID, reason})
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rescueHookRecorder) count() int {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
return len(r.calls)
|
||||
}
|
||||
|
||||
// withRescueHook installs a recorder as the package-level
|
||||
// BootFailureRescueHook for the test's duration.
|
||||
func withRescueHook(t *testing.T) *rescueHookRecorder {
|
||||
t.Helper()
|
||||
rec := &rescueHookRecorder{}
|
||||
prev := BootFailureRescueHook
|
||||
BootFailureRescueHook = rec.hook()
|
||||
t.Cleanup(func() { BootFailureRescueHook = prev })
|
||||
return rec
|
||||
}
|
||||
|
||||
// TestSweep_RescueFiresOnBootFailureVerdict — the core RFC internal#742
|
||||
// assertion: when the sweep flips a stuck workspace to `failed`, the
|
||||
// rescue hook fires once with the workspace + instance id and the
|
||||
// provision_timeout_sweep reason, BEFORE teardown.
|
||||
func TestSweep_RescueFiresOnBootFailureVerdict(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
rec := withRescueHook(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-0badf00d", 700}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
|
||||
|
||||
if rec.count() != 1 {
|
||||
t.Fatalf("rescue hook should fire once on a boot-failure flip, got %d", rec.count())
|
||||
}
|
||||
got := rec.calls[0]
|
||||
if got[0] != "ws-stuck" || got[1] != "i-0badf00d" || got[2] != "provision_timeout_sweep" {
|
||||
t.Errorf("rescue hook args = %v, want {ws-stuck i-0badf00d provision_timeout_sweep}", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweep_RescueDoesNotFireOnRace — affected==0 means the row raced to
|
||||
// online/restart between SELECT and UPDATE. That is NOT a boot-failure
|
||||
// verdict, so the rescue capture must NOT fire (we'd be snapshotting a
|
||||
// healthy box that's about to come online).
|
||||
func TestSweep_RescueDoesNotFireOnRace(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
rec := withRescueHook(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-raced", "codex", "i-raced", 700}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 0)) // raced — 0 rows
|
||||
|
||||
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
|
||||
|
||||
if rec.count() != 0 {
|
||||
t.Errorf("rescue hook must NOT fire on a raced flip (affected==0), got %d calls", rec.count())
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweep_RescueDoesNotFireOnHealthyRow — a not-yet-overdue row is
|
||||
// never flipped, so the rescue capture must not fire. Guards against the
|
||||
// hook being attached above the age gate.
|
||||
func TestSweep_RescueDoesNotFireOnHealthyRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
rec := withRescueHook(t)
|
||||
|
||||
// hermes at 11 min (660s) < 30 min hermes budget → not overdue, no flip.
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-healthy", "hermes", "i-healthy", 660}))
|
||||
|
||||
sweepStuckProvisioning(context.Background(), &fakeEmitter{}, nil)
|
||||
|
||||
if rec.count() != 0 {
|
||||
t.Errorf("rescue hook must NOT fire on a non-overdue (healthy) row, got %d calls", rec.count())
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweep_RescueNilHookIsSafe — on a deploy where the hook is unwired
|
||||
// (self-hosted / no rescue shipping), the sweep must still flip + emit
|
||||
// without panicking on the nil hook.
|
||||
func TestSweep_RescueNilHookIsSafe(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
prev := BootFailureRescueHook
|
||||
BootFailureRescueHook = nil
|
||||
t.Cleanup(func() { BootFailureRescueHook = prev })
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-stuck", "codex", "i-x", 700}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
emit := &fakeEmitter{}
|
||||
sweepStuckProvisioning(context.Background(), emit, nil) // must not panic
|
||||
|
||||
if emit.count() != 1 {
|
||||
t.Errorf("flip+emit must still happen with a nil rescue hook, got %d events", emit.count())
|
||||
}
|
||||
}
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
// fakeEmitter records every RecordAndBroadcast call so tests can assert
|
||||
@@ -42,12 +42,15 @@ func (f *fakeEmitter) count() int {
|
||||
return len(f.events)
|
||||
}
|
||||
|
||||
// candidateRows builds the new-shape query result (id, runtime, age_sec).
|
||||
// Use this in every sweep test to match the runtime-aware SELECT.
|
||||
func candidateRows(rows ...[3]any) *sqlmock.Rows {
|
||||
r := sqlmock.NewRows([]string{"id", "runtime", "age_sec"})
|
||||
// candidateRows builds the query result (id, runtime, instance_id,
|
||||
// age_sec). instance_id was added for the RFC internal#742 rescue hook —
|
||||
// it rides alongside runtime so the boot-failure capture can reach the
|
||||
// still-running box. Tests that don't care about the rescue path pass
|
||||
// "" for instance_id. Use this in every sweep test to match the SELECT.
|
||||
func candidateRows(rows ...[4]any) *sqlmock.Rows {
|
||||
r := sqlmock.NewRows([]string{"id", "runtime", "instance_id", "age_sec"})
|
||||
for _, row := range rows {
|
||||
r = r.AddRow(row[0], row[1], row[2])
|
||||
r = r.AddRow(row[0], row[1], row[2], row[3])
|
||||
}
|
||||
return r
|
||||
}
|
||||
@@ -58,8 +61,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// claude-code workspace, 700s old > 600s default timeout → flipped.
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
@@ -92,8 +95,8 @@ func TestSweepStuckProvisioning_HermesGets30MinSlack(t *testing.T) {
|
||||
|
||||
// 11 min = 660 sec. < HermesProvisioningTimeout (1800s).
|
||||
// No UPDATE should fire — hermes still has time.
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-hermes-booting", "hermes", 660}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-hermes-booting", "hermes", "i-h1", 660}))
|
||||
|
||||
emit := &fakeEmitter{}
|
||||
sweepStuckProvisioning(context.Background(), emit, nil)
|
||||
@@ -114,8 +117,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// 31 min = 1860 sec > HermesProvisioningTimeout (1800s).
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-hermes-stuck", "hermes", 1860}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-hermes-stuck", "hermes", "i-h2", 1860}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-hermes-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@@ -150,8 +153,8 @@ func TestSweepStuckProvisioning_HermesPastDeadline(t *testing.T) {
|
||||
func TestSweepStuckProvisioning_ManifestOverrideSparesRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-claude-templated", "claude-code", 660}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-claude-templated", "claude-code", "i-ct", 660}))
|
||||
|
||||
// No ExpectExec — if the sweeper still flips the row, sqlmock will
|
||||
// fail with an unexpected-query error.
|
||||
@@ -183,8 +186,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// 21 min = 1260s > 1200s manifest override → flipped.
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-claude-truly-stuck", "claude-code", 1260}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-claude-truly-stuck", "claude-code", "i-cts", 1260}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-claude-truly-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
@@ -221,8 +224,8 @@ func TestSweepStuckProvisioning_ManifestOverrideStillFlipsPastDeadline(t *testin
|
||||
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-raced", "claude-code", 700}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-raced", "claude-code", "i-raced", 700}))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
@@ -244,7 +247,7 @@ func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
|
||||
func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows())
|
||||
|
||||
emit := &fakeEmitter{}
|
||||
@@ -265,10 +268,10 @@ func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
|
||||
func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows(
|
||||
[3]any{"ws-claude-code", "claude-code", 700},
|
||||
[3]any{"ws-hermes", "hermes", 1860},
|
||||
[4]any{"ws-claude-code", "claude-code", "i-cc", 700},
|
||||
[4]any{"ws-hermes", "hermes", "i-hh", 1860},
|
||||
))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
@@ -292,8 +295,8 @@ func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
|
||||
func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([3]any{"ws-stuck", "claude-code", 700}))
|
||||
mock.ExpectQuery(`SELECT id, COALESCE\(runtime, ''\), COALESCE\(instance_id, ''\), EXTRACT`).
|
||||
WillReturnRows(candidateRows([4]any{"ws-stuck", "claude-code", "i-stuck", 700}))
|
||||
mock.ExpectExec(`UPDATE workspaces`).
|
||||
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg(), models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
@@ -0,0 +1,247 @@
|
||||
// Package rescue captures a fixed post-mortem "rescue bundle" off a
|
||||
// workspace EC2 whose boot FAILED — before the platform's sweeper /
|
||||
// control-plane reaps the instance — and ships it to obs/Loki so a
|
||||
// wedged workspace (e.g. the codex provider-derivation failure that
|
||||
// motivated RFC internal#742) is inspectable instead of an
|
||||
// uninspectable wall.
|
||||
//
|
||||
// Design constraints (RFC internal#742, Part 2):
|
||||
//
|
||||
// - BEST-EFFORT + NON-BLOCKING. Capture MUST NOT change boot-failure
|
||||
// semantics or add latency to the failure path. Callers fire
|
||||
// Capture in its own goroutine; Capture additionally bounds itself
|
||||
// with CaptureTimeout so a hung EIC tunnel can't wedge the
|
||||
// goroutine forever.
|
||||
// - FIRES ON THE BOOT-FAILURE VERDICT ONLY. The two hook points are
|
||||
// the provision-timeout sweep (registry.sweepStuckProvisioning) and
|
||||
// the out-of-band bootstrap-watcher signal
|
||||
// (handlers.WorkspaceHandler.BootstrapFailed). Normal teardown /
|
||||
// deprovision / recreate / billing-suspend / hibernate paths do NOT
|
||||
// call Capture — see the RFC's path enumeration.
|
||||
// - REDACT BEFORE ANYTHING LEAVES THE BOX. Every collected section is
|
||||
// run through the injected Redact func (wired to the existing
|
||||
// handlers.redactSecrets secret-scan) before it is shipped. Raw
|
||||
// tokens/keys never reach Loki.
|
||||
//
|
||||
// The package is a LEAF: it imports only internal/audit (the obs
|
||||
// shipper) so it can be called from both handlers and registry without
|
||||
// an import cycle (registry must not import handlers). The two heavy
|
||||
// dependencies — the EIC/SSH remote-command runner and the redactor —
|
||||
// are injected as package-level func vars, wired once at boot from the
|
||||
// handlers package (which owns withEICTunnel + redactSecrets). Tests
|
||||
// swap them for fakes.
|
||||
package rescue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/audit"
|
||||
)
|
||||
|
||||
// CaptureTimeout bounds the whole bundle collection. The sweeper runs
|
||||
// every 30s and the CP reap follows the failure verdict; 45s gives the
|
||||
// EIC dance (~3-5s) plus six short remote commands (<2s each) generous
|
||||
// headroom while still finishing well before the instance is torn down.
|
||||
// Distinct from the per-op eicFileOpTimeout so a slow box that already
|
||||
// failed to boot can't hang the capture goroutine indefinitely.
|
||||
const CaptureTimeout = 45 * time.Second
|
||||
|
||||
// LokiKind is the Loki stream label value that tags every rescue
|
||||
// record. Queryable as `kind="rescue"` (RFC internal#742 §Loki labels).
|
||||
const LokiKind = "rescue"
|
||||
|
||||
// RescueVolumeGrace is how long a boot-failed workspace's /configs data
|
||||
// volume (and its still-running instance) must be RETAINED past the
|
||||
// boot-failure verdict so a live rescue read is possible — distinct from
|
||||
// the user-requested prune path (cp#415), which is an explicit erase.
|
||||
//
|
||||
// In molecule-core (the tenant platform) the boot-failure verdict only
|
||||
// flips workspaces.status to `failed`; it never issues a terminate. The
|
||||
// platform's two reapers (registry.StartCPOrphanSweeper +
|
||||
// handlers deprovision) act ONLY on status='removed', so a `failed`
|
||||
// workspace's instance + /configs volume are retained here by
|
||||
// construction — see TestCPSweepOnce_DoesNotReapFailedWorkspace. The
|
||||
// time-bounded reap of the failed instance is the control plane's
|
||||
// bootstrap-watcher concern; this constant is the SSOT for the grace
|
||||
// the CP must honour (24h covers an operator's next-business-day
|
||||
// post-mortem without leaking the volume indefinitely).
|
||||
const RescueVolumeGrace = 24 * time.Hour
|
||||
|
||||
// rescueEventType is the audit event_type carried in the shipped
|
||||
// record. The obs shipper (internal/audit) already maps event_type to a
|
||||
// low-cardinality Loki label; "rescue.bundle" keeps the rescue stream
|
||||
// trivially filterable alongside the existing audit taxonomy.
|
||||
const rescueEventType = "rescue.bundle"
|
||||
|
||||
// RunRemote runs a single shell command on the still-running (but
|
||||
// unconfigured) workspace EC2 over EIC/SSH and returns its combined
|
||||
// output. Wired at boot to the handlers EIC runner
|
||||
// (rescueRunRemoteViaEIC). nil until wired — Capture degrades to a
|
||||
// logged no-op rather than panicking, so an operator who hasn't wired
|
||||
// the hook still gets a clear signal instead of a crash on the failure
|
||||
// path.
|
||||
var RunRemote func(ctx context.Context, instanceID, command string) (string, error)
|
||||
|
||||
// Redact scrubs secret-shaped substrings from a collected section
|
||||
// before it leaves the box. Wired at boot to handlers.redactSecrets.
|
||||
// nil until wired — Capture refuses to ship un-redacted content if the
|
||||
// redactor is missing (fails closed: logs + aborts rather than leaking
|
||||
// raw config).
|
||||
var Redact func(workspaceID, content string) string
|
||||
|
||||
// section is one labelled chunk of the rescue bundle: a human-readable
|
||||
// name + the remote command that produces it.
|
||||
type section struct {
|
||||
name string
|
||||
command string
|
||||
}
|
||||
|
||||
// bundleSections is the FIXED set collected on every boot-failure
|
||||
// rescue (RFC internal#742 §Build.1). Order is the post-mortem reading
|
||||
// order: config first, then boot logs, then container state, then the
|
||||
// resolved model/provider env that drove the codex derivation failure.
|
||||
//
|
||||
// - /configs/config.yaml + system-prompt.md: the managed config the
|
||||
// runtime booted against (redacted; system-prompt can embed keys).
|
||||
// - cloud-init-output.log tail: the user-data execution trace — where
|
||||
// a wedged boot actually died.
|
||||
// - docker ps -a: container state (did the agent container even
|
||||
// start, exit-code, restart loop).
|
||||
// - agent container logs: the runtime's own stderr (the codex
|
||||
// provider-derivation panic lives here).
|
||||
// - MODEL|PROVIDER|RUNTIME env: the resolved routing that motivated
|
||||
// the RFC. `sudo cat` of the container env via docker inspect-style
|
||||
// grep — see the command.
|
||||
//
|
||||
// All commands use `sudo -n` (the box's /configs is root-owned; ubuntu
|
||||
// has passwordless sudo) and swallow missing-target stderr so a section
|
||||
// that can't be produced ships as a short marker instead of failing the
|
||||
// whole bundle. Kept as data (not inlined) so the redaction + ship loop
|
||||
// is uniform and the set is reviewable in one place.
|
||||
var bundleSections = []section{
|
||||
{
|
||||
name: "config.yaml",
|
||||
command: "sudo -n cat /configs/config.yaml 2>/dev/null || echo '(/configs/config.yaml absent)'",
|
||||
},
|
||||
{
|
||||
name: "system-prompt.md",
|
||||
command: "sudo -n cat /configs/system-prompt.md 2>/dev/null || echo '(/configs/system-prompt.md absent)'",
|
||||
},
|
||||
{
|
||||
name: "cloud-init-output.log.tail",
|
||||
command: "sudo -n tail -200 /var/log/cloud-init-output.log 2>/dev/null || echo '(cloud-init-output.log absent)'",
|
||||
},
|
||||
{
|
||||
name: "docker-ps",
|
||||
command: "sudo -n docker ps -a 2>/dev/null || echo '(docker unavailable)'",
|
||||
},
|
||||
{
|
||||
// The agent container is the first non-infra container; grab the
|
||||
// most recently created one and tail its logs. `head -1` of
|
||||
// `docker ps -a -q` is creation-ordered newest-first, which is
|
||||
// the agent runtime on a workspace box.
|
||||
name: "agent-container.logs.tail",
|
||||
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker logs --tail 200 \"$cid\" 2>&1 || echo '(no agent container)'",
|
||||
},
|
||||
{
|
||||
// Resolved model/provider/runtime env from the agent container.
|
||||
// `docker inspect` the env array and grep the routing keys. This
|
||||
// is the field that pinpoints a provider-derivation failure.
|
||||
name: "model-provider-runtime.env",
|
||||
command: "cid=$(sudo -n docker ps -a -q 2>/dev/null | head -1); [ -n \"$cid\" ] && sudo -n docker inspect --format '{{range .Config.Env}}{{println .}}{{end}}' \"$cid\" 2>/dev/null | grep -E 'MODEL|PROVIDER|RUNTIME' || echo '(no env)'",
|
||||
},
|
||||
}
|
||||
|
||||
// Input is the identity of the failed workspace being rescued.
|
||||
type Input struct {
|
||||
InstanceID string // EC2 instance id of the still-running failed box
|
||||
WorkspaceID string
|
||||
OrgID string
|
||||
// Reason is a short tag for WHY the rescue fired (e.g.
|
||||
// "provision_timeout_sweep" or "bootstrap_watcher") — carried into
|
||||
// the Loki record so an operator can correlate the bundle with the
|
||||
// failure verdict that triggered it.
|
||||
Reason string
|
||||
}
|
||||
|
||||
// Capture collects the fixed rescue bundle off the failed instance,
|
||||
// redacts each section, and ships it to Loki under
|
||||
// {kind="rescue", org=<OrgID>, workspace_id=<WorkspaceID>}.
|
||||
//
|
||||
// BEST-EFFORT: every failure mode (missing wiring, EIC error, a single
|
||||
// section that won't collect) is logged and does NOT propagate — Capture
|
||||
// never returns an error and never panics, so the boot-failure handling
|
||||
// at the call site is unaffected. The caller is expected to invoke this
|
||||
// in its own goroutine; Capture additionally self-bounds with
|
||||
// CaptureTimeout.
|
||||
func Capture(ctx context.Context, in Input) {
|
||||
defer func() {
|
||||
// A logging helper on the failure path must never take the
|
||||
// process down. Recover defensively — the redactor / shipper are
|
||||
// injected and a future mis-wire shouldn't crash the sweeper.
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("rescue: capture panicked for ws=%s instance=%s: %v", in.WorkspaceID, in.InstanceID, r)
|
||||
}
|
||||
}()
|
||||
|
||||
if in.InstanceID == "" {
|
||||
// No live box to read — nothing to rescue (e.g. failure before
|
||||
// any EC2 was launched). Not an error; just skip.
|
||||
log.Printf("rescue: skip ws=%s — no instance_id (nothing to capture)", in.WorkspaceID)
|
||||
return
|
||||
}
|
||||
if RunRemote == nil {
|
||||
log.Printf("rescue: skip ws=%s instance=%s — RunRemote not wired (best-effort no-op)", in.WorkspaceID, in.InstanceID)
|
||||
return
|
||||
}
|
||||
if Redact == nil {
|
||||
// Fail CLOSED: without a redactor we could leak raw tokens to
|
||||
// Loki. Abort rather than ship unredacted.
|
||||
log.Printf("rescue: ABORT ws=%s instance=%s — Redact not wired; refusing to ship un-redacted bundle", in.WorkspaceID, in.InstanceID)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, CaptureTimeout)
|
||||
defer cancel()
|
||||
|
||||
log.Printf("rescue: capturing bundle ws=%s instance=%s reason=%s", in.WorkspaceID, in.InstanceID, in.Reason)
|
||||
|
||||
collected := 0
|
||||
for _, sec := range bundleSections {
|
||||
raw, err := RunRemote(ctx, in.InstanceID, sec.command)
|
||||
if err != nil {
|
||||
// One section failing (e.g. ssh blip mid-collection) must not
|
||||
// abort the rest — ship a marker for it and continue.
|
||||
log.Printf("rescue: section %q failed for ws=%s: %v", sec.name, in.WorkspaceID, err)
|
||||
ship(ctx, in, sec.name, fmt.Sprintf("(rescue: section collection failed: %v)", err), false)
|
||||
continue
|
||||
}
|
||||
redacted := Redact(in.WorkspaceID, raw)
|
||||
ship(ctx, in, sec.name, redacted, true)
|
||||
collected++
|
||||
}
|
||||
|
||||
log.Printf("rescue: shipped %d/%d sections ws=%s instance=%s kind=%s", collected, len(bundleSections), in.WorkspaceID, in.InstanceID, LokiKind)
|
||||
}
|
||||
|
||||
// ship emits one rescue section to Loki via the audit shipper. The
|
||||
// org / workspace_id / kind ride in the record body (queryable via
|
||||
// LogQL `| json`); event_type ("rescue.bundle") is the low-cardinality
|
||||
// Loki label the shipper already promotes. `redacted` records whether
|
||||
// the content passed through the secret-scan, so an operator can tell a
|
||||
// shipped-but-redacted section from a collection-failure marker.
|
||||
func ship(ctx context.Context, in Input, name, content string, redacted bool) {
|
||||
audit.Emit(ctx, rescueEventType, map[string]any{
|
||||
"kind": LokiKind,
|
||||
"org": in.OrgID,
|
||||
"workspace_id": in.WorkspaceID,
|
||||
"instance_id": in.InstanceID,
|
||||
"reason": in.Reason,
|
||||
"section": name,
|
||||
"redacted": redacted,
|
||||
"content": content,
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,226 @@
|
||||
package rescue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// withFakes swaps the injected RunRemote + Redact for the duration of a
|
||||
// test and restores them after. Mirrors the provisioner test-fake
|
||||
// pattern (package-var swap + t.Cleanup).
|
||||
func withFakes(t *testing.T, run func(ctx context.Context, instanceID, cmd string) (string, error), redact func(ws, c string) string) {
|
||||
t.Helper()
|
||||
prevRun, prevRedact := RunRemote, Redact
|
||||
RunRemote = run
|
||||
Redact = redact
|
||||
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
|
||||
}
|
||||
|
||||
// captureLoki points the audit shipper at a temp JSONL file and returns
|
||||
// a reader that decodes the records the rescue ship() loop wrote. This
|
||||
// is the same transport the production rescue stream uses (audit.Emit →
|
||||
// Loki via the tenant Vector source), so asserting on it proves the
|
||||
// shipper-reuse + labels end to end.
|
||||
func captureLoki(t *testing.T) func() []map[string]any {
|
||||
t.Helper()
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "audit.jsonl")
|
||||
t.Setenv("MOLECULE_AUDIT_LOG_PATH", path)
|
||||
return func() []map[string]any {
|
||||
b, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
var out []map[string]any
|
||||
for _, line := range strings.Split(strings.TrimSpace(string(b)), "\n") {
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var rec map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &rec); err != nil {
|
||||
t.Fatalf("bad audit jsonl line %q: %v", line, err)
|
||||
}
|
||||
out = append(out, rec)
|
||||
}
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
func fields(rec map[string]any) map[string]any {
|
||||
f, _ := rec["fields"].(map[string]any)
|
||||
return f
|
||||
}
|
||||
|
||||
// TestCapture_ShipsAllSectionsWithRescueLabels is the happy path: a
|
||||
// boot-failure capture collects every fixed section, runs each through
|
||||
// the redactor, and ships it to Loki under {kind="rescue", org, ws}.
|
||||
func TestCapture_ShipsAllSectionsWithRescueLabels(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
var seenCmds []string
|
||||
withFakes(t,
|
||||
func(_ context.Context, instanceID, cmd string) (string, error) {
|
||||
seenCmds = append(seenCmds, cmd)
|
||||
return "OUTPUT for " + instanceID, nil
|
||||
},
|
||||
func(_ws, c string) string { return c }, // identity redactor
|
||||
)
|
||||
|
||||
Capture(context.Background(), Input{
|
||||
InstanceID: "i-abc123",
|
||||
WorkspaceID: "ws-1",
|
||||
OrgID: "org-9",
|
||||
Reason: "provision_timeout_sweep",
|
||||
})
|
||||
|
||||
recs := readLoki()
|
||||
if len(recs) != len(bundleSections) {
|
||||
t.Fatalf("want %d shipped sections, got %d", len(bundleSections), len(recs))
|
||||
}
|
||||
if len(seenCmds) != len(bundleSections) {
|
||||
t.Fatalf("want %d remote commands run, got %d", len(bundleSections), len(seenCmds))
|
||||
}
|
||||
for _, rec := range recs {
|
||||
if rec["event_type"] != rescueEventType {
|
||||
t.Errorf("event_type = %v, want %q", rec["event_type"], rescueEventType)
|
||||
}
|
||||
// workspace_id is promoted to the top-level record position by
|
||||
// the audit shipper.
|
||||
if rec["workspace_id"] != "ws-1" {
|
||||
t.Errorf("top-level workspace_id = %v, want ws-1", rec["workspace_id"])
|
||||
}
|
||||
f := fields(rec)
|
||||
if f["kind"] != LokiKind {
|
||||
t.Errorf("kind = %v, want %q", f["kind"], LokiKind)
|
||||
}
|
||||
if f["org"] != "org-9" {
|
||||
t.Errorf("org = %v, want org-9", f["org"])
|
||||
}
|
||||
if f["instance_id"] != "i-abc123" {
|
||||
t.Errorf("instance_id = %v, want i-abc123", f["instance_id"])
|
||||
}
|
||||
if f["redacted"] != true {
|
||||
t.Errorf("redacted = %v, want true for a collected section", f["redacted"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapture_Redacts proves the bundle is scrubbed before it leaves the
|
||||
// box: a remote section that contains a secret-shaped token must ship
|
||||
// with the token replaced, never raw.
|
||||
func TestCapture_Redacts(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
const secret = "sk-ant-SUPERSECRETTOKENVALUE0001"
|
||||
withFakes(t,
|
||||
func(_ context.Context, _ string, _ string) (string, error) {
|
||||
return "ANTHROPIC_API_KEY=" + secret, nil
|
||||
},
|
||||
// redactor that mangles anything containing the secret shape
|
||||
func(_ws, c string) string {
|
||||
if strings.Contains(c, secret) {
|
||||
return strings.ReplaceAll(c, secret, "[REDACTED]")
|
||||
}
|
||||
return c
|
||||
},
|
||||
)
|
||||
|
||||
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-2", OrgID: "o"})
|
||||
|
||||
for _, rec := range readLoki() {
|
||||
content, _ := fields(rec)["content"].(string)
|
||||
if strings.Contains(content, secret) {
|
||||
t.Fatalf("raw secret leaked to Loki in section %v: %q", fields(rec)["section"], content)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapture_SkipsWhenNoInstance: a failure with no provisioned EC2 has
|
||||
// nothing to read — Capture must no-op (ship nothing) rather than dial a
|
||||
// blank instance id.
|
||||
func TestCapture_SkipsWhenNoInstance(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
called := false
|
||||
withFakes(t,
|
||||
func(_ context.Context, _ string, _ string) (string, error) { called = true; return "", nil },
|
||||
func(_ws, c string) string { return c },
|
||||
)
|
||||
Capture(context.Background(), Input{InstanceID: "", WorkspaceID: "ws-3", OrgID: "o"})
|
||||
if called {
|
||||
t.Error("RunRemote called for an empty instance id")
|
||||
}
|
||||
if recs := readLoki(); len(recs) != 0 {
|
||||
t.Errorf("shipped %d records for an empty instance id, want 0", len(recs))
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapture_FailsClosedWithoutRedactor: if the redactor is not wired,
|
||||
// Capture must NOT ship anything (would leak raw config). Fail closed.
|
||||
func TestCapture_FailsClosedWithoutRedactor(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
prevRun, prevRedact := RunRemote, Redact
|
||||
RunRemote = func(_ context.Context, _ string, _ string) (string, error) { return "raw config", nil }
|
||||
Redact = nil
|
||||
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
|
||||
|
||||
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-4", OrgID: "o"})
|
||||
|
||||
if recs := readLoki(); len(recs) != 0 {
|
||||
t.Errorf("shipped %d records without a redactor wired, want 0 (fail closed)", len(recs))
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapture_SectionFailureIsIsolated: one section's RunRemote error
|
||||
// must not abort the rest — the failing section ships a marker and the
|
||||
// others still ship.
|
||||
func TestCapture_SectionFailureIsIsolated(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
withFakes(t,
|
||||
func(_ context.Context, _ string, cmd string) (string, error) {
|
||||
if strings.Contains(cmd, "config.yaml") {
|
||||
return "", errors.New("ssh blip")
|
||||
}
|
||||
return "ok", nil
|
||||
},
|
||||
func(_ws, c string) string { return c },
|
||||
)
|
||||
|
||||
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-5", OrgID: "o"})
|
||||
|
||||
recs := readLoki()
|
||||
if len(recs) != len(bundleSections) {
|
||||
t.Fatalf("want all %d sections shipped (incl. failure marker), got %d", len(bundleSections), len(recs))
|
||||
}
|
||||
var failureMarkers int
|
||||
for _, rec := range recs {
|
||||
if fields(rec)["redacted"] == false {
|
||||
failureMarkers++
|
||||
content, _ := fields(rec)["content"].(string)
|
||||
if !strings.Contains(content, "section collection failed") {
|
||||
t.Errorf("failure marker content = %q, want a collection-failed marker", content)
|
||||
}
|
||||
}
|
||||
}
|
||||
if failureMarkers != 1 {
|
||||
t.Errorf("want exactly 1 failure marker, got %d", failureMarkers)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapture_NoWiringIsSafeNoOp: with RunRemote unwired (operator hasn't
|
||||
// called the boot wiring), Capture must be a logged no-op, never a panic.
|
||||
func TestCapture_NoWiringIsSafeNoOp(t *testing.T) {
|
||||
readLoki := captureLoki(t)
|
||||
prevRun, prevRedact := RunRemote, Redact
|
||||
RunRemote = nil
|
||||
Redact = func(_ws, c string) string { return c }
|
||||
t.Cleanup(func() { RunRemote = prevRun; Redact = prevRedact })
|
||||
|
||||
Capture(context.Background(), Input{InstanceID: "i-x", WorkspaceID: "ws-6", OrgID: "o"})
|
||||
|
||||
if recs := readLoki(); len(recs) != 0 {
|
||||
t.Errorf("shipped %d records with RunRemote unwired, want 0", len(recs))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user