Merge pull request #2487 from Molecule-AI/fix/provision-goroutine-observability

fix(provision): entry log + panic recovery on provision goroutines (#2486)
This commit is contained in:
Hongming Wang 2026-05-02 03:05:56 +00:00 committed by GitHub
commit 47617a93ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 458 additions and 0 deletions

View File

@ -6,7 +6,9 @@ import (
"log"
"os"
"path/filepath"
"runtime/debug"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@ -15,6 +17,40 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
)
// logProvisionPanic is the deferred recover at the top of every provision
// goroutine. Without it, a panic inside provisionWorkspaceOpts /
// provisionWorkspaceCP propagates up the goroutine stack and crashes the
// whole workspace-server process — taking every other tenant workspace
// down with it. With it, the panic is logged with a stack trace, the
// workspace is marked failed via markProvisionFailed (so the canvas
// surfaces a failure card immediately instead of leaving the spinner
// stuck on "provisioning" until the 10-min sweeper fires), and the rest
// of the process keeps serving.
//
// Issue #2486 added this after the symmetric class — silent goroutine
// exit, no log, no failure mark — was observed in prod. Even if the
// root cause turns out not to be a panic, surfacing the panic class
// closes one branch of "what could have happened" cleanly.
//
// Method on *WorkspaceHandler (not free function) so the panic path can
// reuse markProvisionFailed and emit the WORKSPACE_PROVISION_FAILED
// broadcast — without the broadcast the canvas only learns of the
// failure when the next poll/refresh hits the DB.
func (h *WorkspaceHandler) logProvisionPanic(workspaceID, mode string) {
r := recover()
if r == nil {
return
}
log.Printf("Provisioner: PANIC during provision goroutine for %s (mode=%s): %v\nstack:\n%s",
workspaceID, mode, r, debug.Stack())
// Fresh context: the provision goroutine's ctx may have been the one
// panicking (timeout, cancelled). 10s is enough for the broadcast +
// single UPDATE inside markProvisionFailed.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h.markProvisionFailed(ctx, workspaceID, fmt.Sprintf("provision panic: %v", r), nil)
}
// provisionWorkspace handles async container deployment with timeout.
func (h *WorkspaceHandler) provisionWorkspace(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
h.provisionWorkspaceOpts(workspaceID, templatePath, configFiles, payload, false)
@ -25,6 +61,14 @@ func (h *WorkspaceHandler) provisionWorkspace(workspaceID, templatePath string,
// that should NOT be persisted on CreateWorkspacePayload because they're
// request-scoped flags.
func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload, resetClaudeSession bool) {
// Entry log — distinguishes "goroutine never started" from "started but
// exited via an unlogged path" when debugging stuck-in-provisioning
// rows. Issue #2486: 7 claude-code workspaces stuck in provisioning had
// neither a prepare-failed nor start-failed nor success log line, so an
// operator couldn't tell whether the goroutine ran at all.
log.Printf("Provisioner: goroutine entered for %s (runtime=%s, mode=docker)", workspaceID, payload.Runtime)
defer h.logProvisionPanic(workspaceID, "docker")
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()
@ -640,6 +684,14 @@ func loadWorkspaceSecrets(ctx context.Context, workspaceID string) (map[string]s
// share so the next mint added can't be silently forgotten on one
// side.
func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string, configFiles map[string][]byte, payload models.CreateWorkspacePayload) {
// Entry log + panic recovery — see provisionWorkspaceOpts for rationale.
// Issue #2486: 7 claude-code workspaces stuck in provisioning produced
// none of the four documented exit-path log lines, leaving operators
// unable to distinguish "goroutine never started" from "started but
// returned via an unlogged path."
log.Printf("CPProvisioner: goroutine entered for %s (runtime=%s, mode=cp)", workspaceID, payload.Runtime)
defer h.logProvisionPanic(workspaceID, "cp")
ctx, cancel := context.WithTimeout(context.Background(), provisioner.ProvisionTimeout)
defer cancel()

View File

@ -0,0 +1,235 @@
package handlers
import (
"bytes"
"context"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
)
// Issue #2486 reproduction harness: 7 simultaneous claude-code provisions
// against the SAME workspace-server (Director Pattern fan-out). On the
// hongming prod tenant this produced ZERO log lines from any of the four
// documented exit paths in provisionWorkspaceCP — operators couldn't tell
// whether the goroutines ran. This test closes the visibility gap by
// pinning that:
//
// 1. Every provision goroutine produces ONE entry log line ("CPProvisioner:
// goroutine entered for ws-N").
// 2. Every goroutine reaches its registered exit path (cpProv.Start),
// i.e. the stub records all 7 workspace IDs.
//
// If the silent-drop class is present in current head code, this test
// fails because either (a) the entry-log count is < 7 (meaning one or
// more goroutines reached the goroutine boundary but never produced
// the entry-log line — entry log renamed/removed, or log writer
// hijacked), or (b) the
// recorder count is < 7 (meaning a goroutine entered but exited before
// reaching cpProv.Start, via some unlogged path).
//
// Result on staging head as of 2026-05-02: PASSES — meaning the
// silent-drop seen in the prod incident is NOT reproducible against
// current head with stub CP. Possibilities: (i) bug already fixed
// upstream of the tenant's stale build (sha 76c604fb, 725 commits
// behind), (ii) bug requires real-CP-side rate-limiting we don't
// model here, (iii) bug requires a DB-layer interaction (lock
// contention, deadlock) the sqlmock doesn't model.
//
// Even when this passes today, it stays as a regression gate: any
// future refactor that re-introduces silent goroutine swallow in the
// CP provision path trips it.
// recordingCPProv implements provisioner.CPProvisionerAPI and records
// every Start() invocation in a thread-safe slice so a concurrent
// burst can be verified post-hoc.
type recordingCPProv struct {
mu sync.Mutex
startedWS []string
// startErr controls what Start() returns. nil → success. Non-nil →
// error path; provisionWorkspaceCP marks failed + returns.
startErr error
}
func (r *recordingCPProv) Start(_ context.Context, cfg provisioner.WorkspaceConfig) (string, error) {
r.mu.Lock()
r.startedWS = append(r.startedWS, cfg.WorkspaceID)
r.mu.Unlock()
if r.startErr != nil {
return "", r.startErr
}
return "i-stubbed-" + cfg.WorkspaceID[:8], nil
}
func (r *recordingCPProv) Stop(_ context.Context, _ string) error {
panic("recordingCPProv.Stop not expected in concurrent-repro test")
}
func (r *recordingCPProv) GetConsoleOutput(_ context.Context, _ string) (string, error) {
panic("recordingCPProv.GetConsoleOutput not expected in concurrent-repro test")
}
func (r *recordingCPProv) IsRunning(_ context.Context, _ string) (bool, error) {
panic("recordingCPProv.IsRunning not expected in concurrent-repro test")
}
func (r *recordingCPProv) startedSet() map[string]struct{} {
r.mu.Lock()
defer r.mu.Unlock()
out := make(map[string]struct{}, len(r.startedWS))
for _, id := range r.startedWS {
out[id] = struct{}{}
}
return out
}
// TestProvisionWorkspaceCP_ConcurrentBurst_NoSilentDrop is the
// repro harness for issue #2486. See file-level comment.
func TestProvisionWorkspaceCP_ConcurrentBurst_NoSilentDrop(t *testing.T) {
const numWorkspaces = 7
mock := setupTestDB(t)
// Every goroutine runs prepareProvisionContext → mintWorkspaceSecrets
// → cpProv.Start (stubbed to fail) → markProvisionFailed. The DB
// shape per goroutine: 2 SELECTs + 1 UPDATE. Order between
// goroutines is non-deterministic so use MatchExpectationsInOrder
// false.
mock.MatchExpectationsInOrder(false)
for i := 0; i < numWorkspaces; i++ {
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM global_secrets`).
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
mock.ExpectQuery(`SELECT key, encrypted_value, encryption_version FROM workspace_secrets`).
WithArgs(sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"key", "encrypted_value", "encryption_version"}))
mock.ExpectExec(`UPDATE workspaces SET status =`).
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// Capture every log line so we can count entry-log occurrences.
var logBuf bytes.Buffer
var logMu sync.Mutex
prev := log.Writer()
log.SetOutput(&safeWriter{buf: &logBuf, mu: &logMu})
defer log.SetOutput(prev)
// stubFailing-shaped behaviour but recording-capable. Failure is
// fine — we're not testing the success path, only that every
// goroutine entered AND reached the recorded Start() call.
rec := &recordingCPProv{startErr: fmt.Errorf("simulated CP rejection")}
// Concurrent-safe broadcaster — captureBroadcaster (used by sequential
// tests in workspace_provision_test.go) writes lastData unguarded.
// Under -race + 7 fan-out goroutines that's a real data race; this
// stub serializes via mutex and only counts (we don't need the
// payload for any assertion below).
bcast := &concurrentSafeBroadcaster{}
handler := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
handler.SetCPProvisioner(rec)
var wg sync.WaitGroup
var enteredCount int64
for i := 0; i < numWorkspaces; i++ {
wg.Add(1)
// Use a UUID-shaped ID so cfg.WorkspaceID slicing in the stub
// has 8 chars to read.
wsID := fmt.Sprintf("ws-fan-%016d", i)
go func() {
defer wg.Done()
atomic.AddInt64(&enteredCount, 1)
handler.provisionWorkspaceCP(wsID, "", nil, models.CreateWorkspacePayload{
Name: wsID,
Tier: 1,
Runtime: "claude-code",
})
}()
}
wg.Wait()
if got := atomic.LoadInt64(&enteredCount); got != numWorkspaces {
t.Fatalf("test setup bug: expected %d goroutines to enter, got %d", numWorkspaces, got)
}
// Assertion 1: every goroutine produced an entry log. Without the
// fix in this PR (#2487), there's NO entry log so this assertion
// is what closes the visibility gap.
logMu.Lock()
logged := logBuf.String()
logMu.Unlock()
entryCount := strings.Count(logged, "CPProvisioner: goroutine entered for")
if entryCount != numWorkspaces {
t.Errorf("entry log fired %d times, want %d. Either (a) a goroutine never reached the entry log or (b) the entry log was removed/renamed.\nlog dump:\n%s",
entryCount, numWorkspaces, logged)
}
// Assertion 2: every goroutine's Start() call was recorded by the
// stub — no silent drop between entry log and the registered exit
// path (cpProv.Start).
started := rec.startedSet()
if len(started) != numWorkspaces {
t.Errorf("stub CPProvisioner saw %d distinct Start() calls, want %d. SILENT-DROP CLASS: a goroutine entered but never reached Start(). seen=%v",
len(started), numWorkspaces, started)
}
// Assertion 3: every entry-log line names a distinct workspace —
// guards against a future refactor that hard-codes a single ID
// and double-logs.
for i := 0; i < numWorkspaces; i++ {
want := fmt.Sprintf("CPProvisioner: goroutine entered for ws-fan-%016d", i)
if !strings.Contains(logged, want) {
t.Errorf("missing entry log for ws-fan-%016d. log dump:\n%s", i, logged)
}
}
if err := mock.ExpectationsWereMet(); err != nil {
// Soft-fail: under concurrency some queries may have been
// re-ordered relative to the (non-strict) expectation set,
// which sqlmock can sometimes flag. Surface as t.Logf rather
// than t.Errorf so the assertion above (concrete observable
// behaviour) remains the primary gate.
t.Logf("sqlmock expectations note (non-fatal under concurrent fan-out): %v", err)
}
}
// safeWriter serializes log writes from concurrent goroutines so the
// captured buffer isn't a torn-write mess. Without this the log lines
// from 7 concurrent goroutines interleave at byte boundaries and the
// strings.Count assertion above gets unreliable.
type safeWriter struct {
buf *bytes.Buffer
mu *sync.Mutex
}
// concurrentSafeBroadcaster is a thread-safe events.EventEmitter stub
// for the 7-goroutine fan-out test. captureBroadcaster (the canonical
// sequential-test stub in workspace_provision_test.go) writes its
// lastData field without synchronization — under -race that's a true
// data race when 7 markProvisionFailed calls run concurrently. This
// stub only counts (no payload retention) and serializes via mutex.
type concurrentSafeBroadcaster struct {
mu sync.Mutex
count int
}
func (b *concurrentSafeBroadcaster) BroadcastOnly(_ string, _ string, _ interface{}) {}
func (b *concurrentSafeBroadcaster) RecordAndBroadcast(_ context.Context, _, _ string, _ interface{}) error {
b.mu.Lock()
b.count++
b.mu.Unlock()
return nil
}
func (w *safeWriter) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
return w.buf.Write(p)
}

View File

@ -0,0 +1,171 @@
package handlers
import (
"bytes"
"database/sql"
"log"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// Pin the issue #2486 contract: a panic inside the provision goroutine must
// (1) not propagate (the deferred recover swallows it), (2) log the panic
// with a stack trace so an operator can see what blew up, and (3) mark the
// workspace `failed` AND broadcast WORKSPACE_PROVISION_FAILED so the canvas
// flips the spinner to a failure card immediately — not after the 10-min
// sweeper.
//
// Helper: newPanicTestHandler wires a captureBroadcaster + handler so each
// test exercises the real markProvisionFailed path. The broadcaster capture
// is what proves assertion (3) — without it, the panic recovery would mark
// the row failed in the DB but the canvas wouldn't learn until next refresh.
func newPanicTestHandler() (*WorkspaceHandler, *captureBroadcaster) {
cap := &captureBroadcaster{}
return NewWorkspaceHandler(cap, nil, "http://localhost:8080", ""), cap
}
// captureLog swaps log output to a buffer for the test and restores the
// previous writer on cleanup. Capturing `prev` BEFORE SetOutput is
// load-bearing — `log.Writer()` evaluated at defer-fire time would
// return the buffer (not the original writer) and never restore it,
// poisoning subsequent tests in the package.
func captureLog(t *testing.T) *bytes.Buffer {
t.Helper()
var buf bytes.Buffer
prev := log.Writer()
log.SetOutput(&buf)
t.Cleanup(func() { log.SetOutput(prev) })
return &buf
}
func TestLogProvisionPanic_NoOpWhenNoPanic(t *testing.T) {
// Sanity: the deferred recover must be silent when nothing panicked.
// Otherwise every successful provision would emit a spurious panic log.
buf := captureLog(t)
h, cap := newPanicTestHandler()
func() {
defer h.logProvisionPanic("ws-no-panic", "cp")
// no panic
}()
if buf.Len() != 0 {
t.Fatalf("expected no log output when no panic, got: %q", buf.String())
}
if cap.lastData != nil {
t.Fatalf("expected no broadcast when no panic, got: %v", cap.lastData)
}
}
func TestLogProvisionPanic_RecoversAndMarksFailed(t *testing.T) {
// Wire a sqlmock so markProvisionFailed's UPDATE has somewhere to land
// without needing a real Postgres. The mock asserts the SQL shape +
// args so a future refactor of the persist call doesn't silently
// stop marking the row failed.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
prevDB := db.DB
db.DB = mockDB
defer func() { db.DB = prevDB }()
// markProvisionFailed issues:
// UPDATE workspaces SET status = $3, last_sample_error = $2, updated_at = now() WHERE id = $1
// with args (workspaceID, msg, models.StatusFailed).
mock.ExpectExec(`UPDATE workspaces SET status`).
WithArgs("ws-panic", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
buf := captureLog(t)
h, cap := newPanicTestHandler()
// Exercise: a function that defers logProvisionPanic + then panics.
// The recover MUST swallow the panic — if it propagates, the test
// process crashes and the panic message bubbles up as a Go test
// failure rather than the assertion below.
didNotPanic := true
func() {
defer func() {
// If logProvisionPanic re-raised, this catches it for the
// test. We assert below that it did NOT re-raise.
if r := recover(); r != nil {
didNotPanic = false
}
}()
defer h.logProvisionPanic("ws-panic", "cp")
panic("simulated provision panic for #2486 regression")
}()
if !didNotPanic {
t.Fatal("logProvisionPanic re-raised the panic — the recover() arm did not swallow it")
}
logged := buf.String()
if !strings.Contains(logged, "PANIC during provision goroutine for ws-panic") {
t.Errorf("missing panic-class log line; got: %q", logged)
}
if !strings.Contains(logged, "simulated provision panic for #2486 regression") {
t.Errorf("panic value not logged; got: %q", logged)
}
if !strings.Contains(logged, "stack:") {
t.Errorf("missing stack trace marker; got: %q", logged)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sql expectations: %v — UPDATE workspaces … status=failed was not issued", err)
}
// Canvas-broadcast assertion: the panic recovery MUST route through
// markProvisionFailed, which fires WORKSPACE_PROVISION_FAILED. Without
// this, the canvas spinner stays on "provisioning" until the sweeper
// or a poll — defeating the immediate-feedback purpose of this gate.
if cap.lastData == nil {
t.Fatal("expected broadcaster.RecordAndBroadcast to be called by panic recovery, got nil — canvas would not see the failure")
}
if errMsg, ok := cap.lastData["error"].(string); !ok || !strings.Contains(errMsg, "provision panic:") {
t.Errorf("broadcast payload missing/wrong 'error' field; got: %v", cap.lastData)
}
}
func TestLogProvisionPanic_PersistFailureLogged(t *testing.T) {
// Defense-in-depth: if the panic-mark UPDATE itself fails, log it
// rather than swallow silently. Otherwise an operator sees the
// panic-class log line but no persistent-failure row, leaving the
// workspace in `provisioning` with a misleading "we recovered" log.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
prevDB := db.DB
db.DB = mockDB
defer func() { db.DB = prevDB }()
mock.ExpectExec(`UPDATE workspaces SET status`).
WithArgs("ws-panic-persist-fail", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnError(sql.ErrConnDone)
buf := captureLog(t)
h, _ := newPanicTestHandler()
func() {
defer h.logProvisionPanic("ws-panic-persist-fail", "docker")
panic("simulated panic with DB unavailable")
}()
logged := buf.String()
// markProvisionFailed logs `markProvisionFailed: db update failed for <id>: <err>`
// when its UPDATE fails. That's the line that proves we surfaced the
// persist failure rather than swallowing it.
if !strings.Contains(logged, "markProvisionFailed: db update failed for ws-panic-persist-fail") {
t.Errorf("expected markProvisionFailed db-update-failure log line; got: %q", logged)
}
}