fix(provision): fail-closed on instance_id persist failure to prevent EC2 orphan #2392
@@ -19,6 +19,15 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// instanceIDPersistRetryAttempts caps total instance_id UPDATE attempts
|
||||
// (initial + retries). 3 catches transient DB blips without stalling the
|
||||
// provision goroutine past the context timeout.
|
||||
var instanceIDPersistRetryAttempts = 3
|
||||
|
||||
// instanceIDPersistRetryBaseDelay is the first-retry backoff. Doubles each
|
||||
// attempt: 100ms → 200ms → 400ms. Total stall ≤ 700ms.
|
||||
var instanceIDPersistRetryBaseDelay = 100 * time.Millisecond
|
||||
|
||||
// logProvisionPanic is the deferred recover at the top of every provision
|
||||
// goroutine. Without it, a panic inside provisionWorkspaceOpts /
|
||||
// provisionWorkspaceCP propagates up the goroutine stack and crashes the
|
||||
@@ -1393,13 +1402,38 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
|
||||
// Persist the backing instance id so later operations (terminal via
|
||||
// EIC+SSH, live logs, debug introspection) can resolve workspace → EC2
|
||||
// without re-asking CP on every request.
|
||||
if _, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET instance_id = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, machineID); err != nil {
|
||||
// Non-fatal: provisioning succeeded, the workspace will still run.
|
||||
// The row stays without instance_id — terminal falls back to the
|
||||
// "CP-provisioned but unreachable" error, not a silent failure.
|
||||
log.Printf("CPProvisioner: persist instance_id failed for %s: %v", workspaceID, err)
|
||||
//
|
||||
// Bounded retry with exponential backoff: a transient DB blip must not
|
||||
// orphan a healthy running instance. If all retries fail, mark the
|
||||
// workspace failed and record the instance_id in the broadcast event +
|
||||
// last_sample_error so an operator/reaper can reconcile later. The live
|
||||
// EC2 is NOT terminated — it may contain valuable state. (#1)
|
||||
var persistErr error
|
||||
delay := instanceIDPersistRetryBaseDelay
|
||||
for attempt := 1; attempt <= instanceIDPersistRetryAttempts; attempt++ {
|
||||
_, persistErr = db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET instance_id = $2, updated_at = now() WHERE id = $1`,
|
||||
workspaceID, machineID)
|
||||
if persistErr == nil {
|
||||
if attempt > 1 {
|
||||
log.Printf("CPProvisioner: instance_id persist for %s succeeded on attempt %d", workspaceID, attempt)
|
||||
}
|
||||
break
|
||||
}
|
||||
if attempt < instanceIDPersistRetryAttempts {
|
||||
time.Sleep(delay)
|
||||
delay *= 2
|
||||
}
|
||||
}
|
||||
if persistErr != nil {
|
||||
log.Printf("CPProvisioner: CRITICAL persist instance_id failed for %s after %d attempts: %v — EC2 instance %s is RUNNING but UNTRACKED. Operator must manually reconcile or remove the workspace to trigger orphan cleanup.", workspaceID, instanceIDPersistRetryAttempts, persistErr, machineID)
|
||||
// Server-only log already captures the raw error above; broadcast gets
|
||||
// safe fields only (no client-visible DB error). Security: RC 9378.
|
||||
h.markProvisionFailed(ctx, workspaceID, "instance_id persist failed after retry — EC2 untracked", map[string]interface{}{
|
||||
"instance_id": machineID,
|
||||
"attempts": instanceIDPersistRetryAttempts,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("CPProvisioner: workspace %s started as machine %s via control plane", workspaceID, machineID)
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/contract"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
@@ -1760,6 +1761,173 @@ func (m *mockResolver) Fetch(_ context.Context, _, _ string) (string, error) {
|
||||
return m.fetchName, m.fetchErr
|
||||
}
|
||||
|
||||
// TestProvisionWorkspaceCP_InstanceIDPersistFail_MarksFailed asserts that
|
||||
// when cpProv.Start succeeds but the DB UPDATE for instance_id fails on ALL
|
||||
// retry attempts, the handler marks the workspace failed WITHOUT terminating
|
||||
// the live EC2. The orphaned instance_id is recorded in the broadcast event
|
||||
// for operator reconciliation. Regression test for ticket #1.
|
||||
func TestProvisionWorkspaceCP_InstanceIDPersistFail_MarksFailed(t *testing.T) {
|
||||
// Shrink retry backoff so the test doesn't stall.
|
||||
prevDelay := instanceIDPersistRetryBaseDelay
|
||||
instanceIDPersistRetryBaseDelay = 1 * time.Millisecond
|
||||
t.Cleanup(func() { instanceIDPersistRetryBaseDelay = prevDelay })
|
||||
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
|
||||
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM workspace_secrets`).
|
||||
WithArgs("ws-cp-orphan").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
|
||||
|
||||
// mintWorkspaceSecrets: revoke + issue auth token + inbound secret
|
||||
mock.ExpectExec(`UPDATE workspace_auth_tokens SET revoked_at`).
|
||||
WithArgs("ws-cp-orphan").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
|
||||
WithArgs("ws-cp-orphan", sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret`).
|
||||
WithArgs(sqlmock.AnyArg(), "ws-cp-orphan").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// All 3 retry attempts fail.
|
||||
for i := 0; i < instanceIDPersistRetryAttempts; i++ {
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
|
||||
WithArgs("ws-cp-orphan", "i-12345").
|
||||
WillReturnError(fmt.Errorf("connection reset by peer"))
|
||||
}
|
||||
|
||||
// markProvisionFailed updates status to failed.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status =`).
|
||||
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cap := &captureBroadcaster{}
|
||||
stub := &stubInstanceIDPersistFailCPProv{instanceID: "i-12345"}
|
||||
handler := NewWorkspaceHandler(cap, nil, "http://localhost:8080", t.TempDir())
|
||||
handler.SetCPProvisioner(stub)
|
||||
|
||||
handler.provisionWorkspaceCP("ws-cp-orphan", "/nonexistent/template", nil, models.CreateWorkspacePayload{
|
||||
Name: "ws-cp-orphan",
|
||||
Tier: 1,
|
||||
Runtime: "claude-code",
|
||||
})
|
||||
|
||||
if cap.lastData == nil {
|
||||
t.Fatal("expected RecordAndBroadcast to capture data on persist failure; got nothing")
|
||||
}
|
||||
if got := cap.lastData["error"]; got != "instance_id persist failed after retry — EC2 untracked" {
|
||||
t.Errorf("broadcast error message = %q, want 'instance_id persist failed after retry — EC2 untracked'", got)
|
||||
}
|
||||
if got := cap.lastData["instance_id"]; got != "i-12345" {
|
||||
t.Errorf("broadcast instance_id = %v, want 'i-12345'", got)
|
||||
}
|
||||
if got := cap.lastData["attempts"]; got != instanceIDPersistRetryAttempts {
|
||||
t.Errorf("broadcast attempts = %v, want %d", got, instanceIDPersistRetryAttempts)
|
||||
}
|
||||
// Security: RC 9378 — raw DB error must NEVER be client-visible in broadcast/WS/SSE.
|
||||
for _, key := range []string{"detail", "db_error", "raw_error"} {
|
||||
if val, has := cap.lastData[key]; has {
|
||||
t.Errorf("broadcast must NOT contain raw DB error under key %q; got %v", key, val)
|
||||
}
|
||||
}
|
||||
// Also verify no raw error string leaked into any broadcast field.
|
||||
for key, val := range cap.lastData {
|
||||
if s, ok := val.(string); ok && strings.Contains(s, "connection reset by peer") {
|
||||
t.Errorf("broadcast field %q contains raw DB error leak: %q", key, s)
|
||||
}
|
||||
}
|
||||
if stub.stopCalls != 0 {
|
||||
t.Errorf("Stop called %d times; want 0 (live instance must NOT be terminated)", stub.stopCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestProvisionWorkspaceCP_InstanceIDPersistFail_RetrySucceeds asserts that a
|
||||
// transient DB blip on the first attempt is recovered by the bounded retry:
|
||||
// the second UPDATE succeeds and the workspace proceeds to online normally.
|
||||
func TestProvisionWorkspaceCP_InstanceIDPersistFail_RetrySucceeds(t *testing.T) {
|
||||
prevDelay := instanceIDPersistRetryBaseDelay
|
||||
instanceIDPersistRetryBaseDelay = 1 * time.Millisecond
|
||||
t.Cleanup(func() { instanceIDPersistRetryBaseDelay = prevDelay })
|
||||
|
||||
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
|
||||
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
|
||||
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM workspace_secrets`).
|
||||
WithArgs("ws-cp-retry-ok").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
|
||||
|
||||
// mintWorkspaceSecrets: revoke + issue auth token + inbound secret
|
||||
mock.ExpectExec(`UPDATE workspace_auth_tokens SET revoked_at`).
|
||||
WithArgs("ws-cp-retry-ok").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
mock.ExpectExec(`INSERT INTO workspace_auth_tokens`).
|
||||
WithArgs("ws-cp-retry-ok", sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret`).
|
||||
WithArgs(sqlmock.AnyArg(), "ws-cp-retry-ok").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// First attempt fails, second succeeds.
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
|
||||
WithArgs("ws-cp-retry-ok", "i-retry-ok").
|
||||
WillReturnError(fmt.Errorf("connection reset by peer"))
|
||||
mock.ExpectExec(`UPDATE workspaces SET instance_id =`).
|
||||
WithArgs("ws-cp-retry-ok", "i-retry-ok").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
cap := &captureBroadcaster{}
|
||||
stub := &stubInstanceIDPersistFailCPProv{instanceID: "i-retry-ok"}
|
||||
handler := NewWorkspaceHandler(cap, nil, "http://localhost:8080", t.TempDir())
|
||||
handler.SetCPProvisioner(stub)
|
||||
|
||||
handler.provisionWorkspaceCP("ws-cp-retry-ok", "/nonexistent/template", nil, models.CreateWorkspacePayload{
|
||||
Name: "ws-cp-retry-ok",
|
||||
Tier: 1,
|
||||
Runtime: "claude-code",
|
||||
})
|
||||
|
||||
// No failure broadcast should have fired.
|
||||
if cap.lastData != nil {
|
||||
t.Fatalf("expected NO failure broadcast on retry success; got %v", cap.lastData)
|
||||
}
|
||||
if stub.stopCalls != 0 {
|
||||
t.Errorf("Stop called %d times; want 0", stub.stopCalls)
|
||||
}
|
||||
}
|
||||
|
||||
// stubInstanceIDPersistFailCPProv implements CPProvisionerAPI for the
|
||||
// instance-id-persist-failure tests.
|
||||
type stubInstanceIDPersistFailCPProv struct {
|
||||
instanceID string
|
||||
stopCalls int
|
||||
}
|
||||
|
||||
func (s *stubInstanceIDPersistFailCPProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
|
||||
return s.instanceID, nil
|
||||
}
|
||||
func (s *stubInstanceIDPersistFailCPProv) Stop(_ context.Context, _ string) error {
|
||||
s.stopCalls++
|
||||
return nil
|
||||
}
|
||||
func (s *stubInstanceIDPersistFailCPProv) StopAndPrune(_ context.Context, _ string) error { return nil }
|
||||
func (s *stubInstanceIDPersistFailCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (s *stubInstanceIDPersistFailCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TestRuntimeUsesAnthropicNativeProxy_CaseAndWhitespace proves the
|
||||
// strings.EqualFold hardening: the runtime check now matches "claude-code"
|
||||
// case-insensitively (and after trimming whitespace) instead of relying on
|
||||
|
||||
Reference in New Issue
Block a user