fix(provision): fail-closed on instance_id persist failure to prevent EC2 orphan #2392

Merged
devops-engineer merged 6 commits from fix/ec2-orphan-instance-id-persist-failure into main 2026-06-07 22:39:43 +00:00
2 changed files with 209 additions and 7 deletions
@@ -19,6 +19,15 @@ import (
"gopkg.in/yaml.v3"
)
// instanceIDPersistRetryAttempts caps total instance_id UPDATE attempts
// (initial + retries). 3 catches transient DB blips without stalling the
// provision goroutine past the context timeout.
var instanceIDPersistRetryAttempts = 3
// instanceIDPersistRetryBaseDelay is the first-retry backoff. Doubles each
// attempt: 100ms → 200ms → 400ms. Total stall ≤ 700ms.
var instanceIDPersistRetryBaseDelay = 100 * time.Millisecond
// logProvisionPanic is the deferred recover at the top of every provision
// goroutine. Without it, a panic inside provisionWorkspaceOpts /
// provisionWorkspaceCP propagates up the goroutine stack and crashes the
@@ -1393,13 +1402,38 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
// Persist the backing instance id so later operations (terminal via
// EIC+SSH, live logs, debug introspection) can resolve workspace → EC2
// without re-asking CP on every request.
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET instance_id = $2, updated_at = now() WHERE id = $1`,
workspaceID, machineID); err != nil {
// Non-fatal: provisioning succeeded, the workspace will still run.
// The row stays without instance_id — terminal falls back to the
// "CP-provisioned but unreachable" error, not a silent failure.
log.Printf("CPProvisioner: persist instance_id failed for %s: %v", workspaceID, err)
//
// Bounded retry with exponential backoff: a transient DB blip must not
// orphan a healthy running instance. If all retries fail, mark the
// workspace failed and record the instance_id in the broadcast event +
// last_sample_error so an operator/reaper can reconcile later. The live
// EC2 is NOT terminated — it may contain valuable state. (#1)
var persistErr error
delay := instanceIDPersistRetryBaseDelay
for attempt := 1; attempt <= instanceIDPersistRetryAttempts; attempt++ {
_, persistErr = db.DB.ExecContext(ctx,
`UPDATE workspaces SET instance_id = $2, updated_at = now() WHERE id = $1`,
workspaceID, machineID)
if persistErr == nil {
if attempt > 1 {
log.Printf("CPProvisioner: instance_id persist for %s succeeded on attempt %d", workspaceID, attempt)
}
break
}
if attempt < instanceIDPersistRetryAttempts {
time.Sleep(delay)
delay *= 2
}
}
if persistErr != nil {
log.Printf("CPProvisioner: CRITICAL persist instance_id failed for %s after %d attempts: %v — EC2 instance %s is RUNNING but UNTRACKED. Operator must manually reconcile or remove the workspace to trigger orphan cleanup.", workspaceID, instanceIDPersistRetryAttempts, persistErr, machineID)
// Server-only log already captures the raw error above; broadcast gets
// safe fields only (no client-visible DB error). Security: RC 9378.
h.markProvisionFailed(ctx, workspaceID, "instance_id persist failed after retry — EC2 untracked", map[string]interface{}{
"instance_id": machineID,
"attempts": instanceIDPersistRetryAttempts,
})
return
}
log.Printf("CPProvisioner: workspace %s started as machine %s via control plane", workspaceID, machineID)
@@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
@@ -1760,6 +1761,173 @@ func (m *mockResolver) Fetch(_ context.Context, _, _ string) (string, error) {
return m.fetchName, m.fetchErr
}
// TestProvisionWorkspaceCP_InstanceIDPersistFail_MarksFailed asserts that
// when cpProv.Start succeeds but the DB UPDATE for instance_id fails on ALL
// retry attempts, the handler marks the workspace failed WITHOUT terminating
// the live EC2. The orphaned instance_id is recorded in the broadcast event
// for operator reconciliation. Regression test for ticket #1.
func TestProvisionWorkspaceCP_InstanceIDPersistFail_MarksFailed(t *testing.T) {
// Shrink retry backoff so the test doesn't stall.
prevDelay := instanceIDPersistRetryBaseDelay
instanceIDPersistRetryBaseDelay = 1 * time.Millisecond
t.Cleanup(func() { instanceIDPersistRetryBaseDelay = prevDelay })
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM workspace_secrets`).
WithArgs("ws-cp-orphan").
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
// mintWorkspaceSecrets: revoke + issue auth token + inbound secret
mock.ExpectExec(`UPDATE workspace_auth_tokens SET revoked_at`).
WithArgs("ws-cp-orphan").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
WithArgs("ws-cp-orphan", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret`).
WithArgs(sqlmock.AnyArg(), "ws-cp-orphan").
WillReturnResult(sqlmock.NewResult(0, 1))
// All 3 retry attempts fail.
for i := 0; i < instanceIDPersistRetryAttempts; i++ {
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
WithArgs("ws-cp-orphan", "i-12345").
WillReturnError(fmt.Errorf("connection reset by peer"))
}
// markProvisionFailed updates status to failed.
mock.ExpectExec(`UPDATE workspaces SET status =`).
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
cap := &captureBroadcaster{}
stub := &stubInstanceIDPersistFailCPProv{instanceID: "i-12345"}
handler := NewWorkspaceHandler(cap, nil, "http://localhost:8080", t.TempDir())
handler.SetCPProvisioner(stub)
handler.provisionWorkspaceCP("ws-cp-orphan", "/nonexistent/template", nil, models.CreateWorkspacePayload{
Name: "ws-cp-orphan",
Tier: 1,
Runtime: "claude-code",
})
if cap.lastData == nil {
t.Fatal("expected RecordAndBroadcast to capture data on persist failure; got nothing")
}
if got := cap.lastData["error"]; got != "instance_id persist failed after retry — EC2 untracked" {
t.Errorf("broadcast error message = %q, want 'instance_id persist failed after retry — EC2 untracked'", got)
}
if got := cap.lastData["instance_id"]; got != "i-12345" {
t.Errorf("broadcast instance_id = %v, want 'i-12345'", got)
}
if got := cap.lastData["attempts"]; got != instanceIDPersistRetryAttempts {
t.Errorf("broadcast attempts = %v, want %d", got, instanceIDPersistRetryAttempts)
}
// Security: RC 9378 — raw DB error must NEVER be client-visible in broadcast/WS/SSE.
for _, key := range []string{"detail", "db_error", "raw_error"} {
if val, has := cap.lastData[key]; has {
t.Errorf("broadcast must NOT contain raw DB error under key %q; got %v", key, val)
}
}
// Also verify no raw error string leaked into any broadcast field.
for key, val := range cap.lastData {
if s, ok := val.(string); ok && strings.Contains(s, "connection reset by peer") {
t.Errorf("broadcast field %q contains raw DB error leak: %q", key, s)
}
}
if stub.stopCalls != 0 {
t.Errorf("Stop called %d times; want 0 (live instance must NOT be terminated)", stub.stopCalls)
}
}
// TestProvisionWorkspaceCP_InstanceIDPersistFail_RetrySucceeds asserts that a
// transient DB blip on the first attempt is recovered by the bounded retry:
// the second UPDATE succeeds and the workspace proceeds to online normally.
func TestProvisionWorkspaceCP_InstanceIDPersistFail_RetrySucceeds(t *testing.T) {
prevDelay := instanceIDPersistRetryBaseDelay
instanceIDPersistRetryBaseDelay = 1 * time.Millisecond
t.Cleanup(func() { instanceIDPersistRetryBaseDelay = prevDelay })
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM workspace_secrets`).
WithArgs("ws-cp-retry-ok").
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
// mintWorkspaceSecrets: revoke + issue auth token + inbound secret
mock.ExpectExec(`UPDATE workspace_auth_tokens SET revoked_at`).
WithArgs("ws-cp-retry-ok").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
WithArgs("ws-cp-retry-ok", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret`).
WithArgs(sqlmock.AnyArg(), "ws-cp-retry-ok").
WillReturnResult(sqlmock.NewResult(0, 1))
// First attempt fails, second succeeds.
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
WithArgs("ws-cp-retry-ok", "i-retry-ok").
WillReturnError(fmt.Errorf("connection reset by peer"))
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
WithArgs("ws-cp-retry-ok", "i-retry-ok").
WillReturnResult(sqlmock.NewResult(0, 1))
cap := &captureBroadcaster{}
stub := &stubInstanceIDPersistFailCPProv{instanceID: "i-retry-ok"}
handler := NewWorkspaceHandler(cap, nil, "http://localhost:8080", t.TempDir())
handler.SetCPProvisioner(stub)
handler.provisionWorkspaceCP("ws-cp-retry-ok", "/nonexistent/template", nil, models.CreateWorkspacePayload{
Name: "ws-cp-retry-ok",
Tier: 1,
Runtime: "claude-code",
})
// No failure broadcast should have fired.
if cap.lastData != nil {
t.Fatalf("expected NO failure broadcast on retry success; got %v", cap.lastData)
}
if stub.stopCalls != 0 {
t.Errorf("Stop called %d times; want 0", stub.stopCalls)
}
}
// stubInstanceIDPersistFailCPProv implements CPProvisionerAPI for the
// instance-id-persist-failure tests.
type stubInstanceIDPersistFailCPProv struct {
instanceID string
stopCalls int
}
func (s *stubInstanceIDPersistFailCPProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
return s.instanceID, nil
}
func (s *stubInstanceIDPersistFailCPProv) Stop(_ context.Context, _ string) error {
s.stopCalls++
return nil
}
func (s *stubInstanceIDPersistFailCPProv) StopAndPrune(_ context.Context, _ string) error { return nil }
func (s *stubInstanceIDPersistFailCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
return "", nil
}
func (s *stubInstanceIDPersistFailCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
return true, nil
}
// TestRuntimeUsesAnthropicNativeProxy_CaseAndWhitespace proves the
// strings.EqualFold hardening: the runtime check now matches "claude-code"
// case-insensitively (and after trimming whitespace) instead of relying on