Merge pull request #1128 from Molecule-AI/staging

staging → main: details crash + preflight + provision sweeper
This commit is contained in:
Hongming Wang 2026-04-20 15:40:12 -07:00 committed by GitHub
commit 0e70871cfa
10 changed files with 564 additions and 4 deletions

View File

@ -78,17 +78,17 @@ export function WorkspaceUsage({ workspaceId }: WorkspaceUsageProps) {
<>
<StatRow
label="Input tokens"
value={`${metrics.input_tokens.toLocaleString()} tokens`}
value={`${(metrics.input_tokens ?? 0).toLocaleString()} tokens`}
testId="usage-input-tokens"
/>
<StatRow
label="Output tokens"
value={`${metrics.output_tokens.toLocaleString()} tokens`}
value={`${(metrics.output_tokens ?? 0).toLocaleString()} tokens`}
testId="usage-output-tokens"
/>
<StatRow
label="Estimated cost"
value={`$${parseFloat(metrics.estimated_cost_usd).toFixed(6)}`}
value={`$${parseFloat(metrics.estimated_cost_usd ?? "0").toFixed(6)}`}
testId="usage-estimated-cost"
/>
</>

View File

@ -154,6 +154,16 @@ describe("BudgetSection — stats row", () => {
await renderLoaded(budgetResponse({ budget_remaining: null }));
expect(screen.queryByTestId("budget-remaining")).toBeNull();
});
it("does not crash when budget_used is missing from the response", async () => {
// Backend for a provisioning-stuck workspace may return a partial
// shape. Regression: previously this threw
// "Cannot read properties of undefined (reading 'toLocaleString')"
// and crashed the whole Details tab.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await renderLoaded({ budget_limit: 1000, budget_remaining: null } as any);
expect(screen.getByTestId("budget-used-value").textContent).toBe("0");
});
});
// ── Progress bar ──────────────────────────────────────────────────────────────

View File

@ -84,6 +84,20 @@ describe("WorkspaceUsage", () => {
});
});
it("does not crash when token/cost fields are missing", async () => {
// Regression: Details tab crashed with
// "Cannot read properties of undefined (reading 'toLocaleString')"
// when a workspace stuck in provisioning returned partial metrics.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue({} as any);
render(<WorkspaceUsage workspaceId="ws-1" />);
await waitFor(() => {
expect(screen.getByTestId("usage-input-tokens").textContent).toContain("0");
expect(screen.getByTestId("usage-output-tokens").textContent).toContain("0");
expect(screen.getByTestId("usage-estimated-cost").textContent).toBe("Estimated cost$0.000000");
});
});
it("displays estimated cost formatted as $X.XXXXXX after load", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue(METRICS_RESPONSE as any);

View File

@ -171,7 +171,7 @@ export function BudgetSection({ workspaceId }: Props) {
<div className="flex items-baseline justify-between" data-testid="budget-stats-row">
<span className="text-xs text-zinc-400">Credits used</span>
<span className="text-xs font-mono text-zinc-300">
<span data-testid="budget-used-value">{budget.budget_used.toLocaleString()}</span>
<span data-testid="budget-used-value">{(budget.budget_used ?? 0).toLocaleString()}</span>
<span className="text-zinc-500 mx-1">/</span>
<span data-testid="budget-limit-value">
{budget.budget_limit != null

View File

@ -191,6 +191,15 @@ func main() {
})
}
// Provision-timeout sweep — flips workspaces that have been stuck in
// status='provisioning' past the timeout window to 'failed' and emits
// WORKSPACE_PROVISION_TIMEOUT. Without this the UI banner is cosmetic
// and the state is incoherent (e.g. user sees "Retry" after 15min but
// backend still thinks provisioning is in progress).
go supervised.RunWithRecover(ctx, "provision-timeout-sweep", func(c context.Context) {
registry.StartProvisioningTimeoutSweep(c, broadcaster, registry.DefaultProvisionSweepInterval)
})
// Cron Scheduler — fires A2A messages to workspaces on user-defined schedules
cronSched := scheduler.New(wh, broadcaster)
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)

View File

@ -0,0 +1,70 @@
package handlers
import (
"fmt"
"gopkg.in/yaml.v3"
)
// requiredEnvSchema is the subset of config.yaml we read to decide which env
// vars must be present before a container launch. It maps the YAML path
// `runtime_config.required_env: [...]` which is the same shape the workspace
// adapter's preflight reads inside the container (workspace/preflight.py).
//
// Mirroring the check server-side lets us fail fast with a readable error
// instead of letting the container crash-loop and the workspace sit in
// `provisioning` until a sweeper or the user intervenes.
type requiredEnvSchema struct {
RuntimeConfig struct {
RequiredEnv []string `yaml:"required_env"`
} `yaml:"runtime_config"`
}
// missingRequiredEnv returns the list of env var names declared in the
// workspace's config.yaml under `runtime_config.required_env` that are NOT
// present (or are empty) in the assembled envVars map. Returns an empty
// slice when the config declares no requirements or when all are satisfied.
//
// A parse failure returns no missing vars — config.yaml shape is enforced by
// the in-container preflight, and the server's job here is only to catch the
// common "forgot to add the OAuth token secret" footgun, not to be a second
// config validator.
func missingRequiredEnv(configFiles map[string][]byte, envVars map[string]string) []string {
if len(configFiles) == 0 {
return nil
}
raw, ok := configFiles["config.yaml"]
if !ok || len(raw) == 0 {
return nil
}
var schema requiredEnvSchema
if err := yaml.Unmarshal(raw, &schema); err != nil {
return nil
}
if len(schema.RuntimeConfig.RequiredEnv) == 0 {
return nil
}
var missing []string
for _, name := range schema.RuntimeConfig.RequiredEnv {
if v, ok := envVars[name]; !ok || v == "" {
missing = append(missing, name)
}
}
return missing
}
// formatMissingEnvError builds the user-facing message for a provision
// failure caused by unset required env vars. Kept stable because it's
// rendered verbatim in the canvas Events tab and Details banner.
func formatMissingEnvError(missing []string) string {
if len(missing) == 1 {
return fmt.Sprintf(
"missing required env var %q — add it under Config → Env Vars (or as a Global secret) and retry",
missing[0],
)
}
return fmt.Sprintf(
"missing required env vars %q — add them under Config → Env Vars (or as Global secrets) and retry",
missing,
)
}

View File

@ -0,0 +1,124 @@
package handlers
import (
"strings"
"testing"
)
func TestMissingRequiredEnv_NoConfig(t *testing.T) {
// Zero configFiles → nothing to validate → no missing.
if got := missingRequiredEnv(nil, map[string]string{}); got != nil {
t.Errorf("nil configFiles: got %v, want nil", got)
}
if got := missingRequiredEnv(map[string][]byte{}, map[string]string{}); got != nil {
t.Errorf("empty configFiles: got %v, want nil", got)
}
}
func TestMissingRequiredEnv_NoConfigYaml(t *testing.T) {
// A map without config.yaml → no schema → no missing.
files := map[string][]byte{
"other.txt": []byte("irrelevant"),
}
if got := missingRequiredEnv(files, map[string]string{}); got != nil {
t.Errorf("no config.yaml: got %v, want nil", got)
}
}
func TestMissingRequiredEnv_NoRequiredEnvInYaml(t *testing.T) {
// config.yaml without runtime_config.required_env → no missing.
// Mirrors the default config emitted by ensureDefaultConfig (see the
// #1028 comment in workspace_provision.go about why required_env is
// intentionally omitted for auto-generated configs).
yml := `
name: example
runtime: langgraph
runtime_config:
timeout: 0
`
files := map[string][]byte{"config.yaml": []byte(yml)}
if got := missingRequiredEnv(files, map[string]string{}); got != nil {
t.Errorf("no required_env in YAML: got %v, want nil", got)
}
}
func TestMissingRequiredEnv_AllSatisfied(t *testing.T) {
yml := `
runtime: claude-code
runtime_config:
required_env:
- CLAUDE_CODE_OAUTH_TOKEN
- ANTHROPIC_API_KEY
`
files := map[string][]byte{"config.yaml": []byte(yml)}
env := map[string]string{
"CLAUDE_CODE_OAUTH_TOKEN": "sk-ant-oat01-...",
"ANTHROPIC_API_KEY": "sk-ant-api-...",
}
if got := missingRequiredEnv(files, env); got != nil {
t.Errorf("all set: got %v, want nil", got)
}
}
func TestMissingRequiredEnv_OneMissing(t *testing.T) {
// Reproduces the reported issue: Claude Code Agent config declares
// CLAUDE_CODE_OAUTH_TOKEN required but the tenant only has
// ANTHROPIC_API_KEY set globally.
yml := `
runtime: claude-code
runtime_config:
required_env:
- CLAUDE_CODE_OAUTH_TOKEN
- ANTHROPIC_API_KEY
`
files := map[string][]byte{"config.yaml": []byte(yml)}
env := map[string]string{
"ANTHROPIC_API_KEY": "sk-ant-...",
}
got := missingRequiredEnv(files, env)
if len(got) != 1 || got[0] != "CLAUDE_CODE_OAUTH_TOKEN" {
t.Errorf("expected [CLAUDE_CODE_OAUTH_TOKEN], got %v", got)
}
}
func TestMissingRequiredEnv_EmptyStringCountsAsMissing(t *testing.T) {
// A secret row with empty value is effectively unset; the in-container
// preflight treats empty string as missing, so the server must match.
yml := `
runtime_config:
required_env: [FOO]
`
files := map[string][]byte{"config.yaml": []byte(yml)}
env := map[string]string{"FOO": ""}
got := missingRequiredEnv(files, env)
if len(got) != 1 || got[0] != "FOO" {
t.Errorf("expected [FOO], got %v", got)
}
}
func TestMissingRequiredEnv_MalformedYamlReturnsNil(t *testing.T) {
// Malformed YAML should not panic and should not block provisioning —
// the in-container preflight is the source of truth for config.yaml
// shape, and we don't want to double-fail on parse quirks.
files := map[string][]byte{"config.yaml": []byte("{ not: valid: yaml: [[")}
if got := missingRequiredEnv(files, map[string]string{}); got != nil {
t.Errorf("malformed YAML: got %v, want nil", got)
}
}
func TestFormatMissingEnvError_Single(t *testing.T) {
msg := formatMissingEnvError([]string{"CLAUDE_CODE_OAUTH_TOKEN"})
if !strings.Contains(msg, "CLAUDE_CODE_OAUTH_TOKEN") {
t.Errorf("message should name the var: %q", msg)
}
if !strings.Contains(msg, "retry") {
t.Errorf("message should tell user how to fix it: %q", msg)
}
}
func TestFormatMissingEnvError_Multiple(t *testing.T) {
msg := formatMissingEnvError([]string{"A", "B"})
if !strings.Contains(msg, "A") || !strings.Contains(msg, "B") {
t.Errorf("message should name both vars: %q", msg)
}
}

View File

@ -114,6 +114,27 @@ func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath stri
return
}
// Preflight: refuse to launch when config.yaml declares required env vars
// that are not set. Without this, a missing CLAUDE_CODE_OAUTH_TOKEN (or
// similar) crashes the in-container preflight, the container never calls
// /registry/register, and the workspace sits in `provisioning` until a
// sweeper flips it or the user retries. Failing fast here gives the user
// an immediate, actionable error in the Events tab.
if missing := missingRequiredEnv(configFiles, envVars); len(missing) > 0 {
msg := formatMissingEnvError(missing)
log.Printf("Provisioner: %s (workspace=%s)", msg, workspaceID)
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
workspaceID, msg); dbErr != nil {
log.Printf("Provisioner: failed to mark workspace %s as failed: %v", workspaceID, dbErr)
}
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", workspaceID, map[string]interface{}{
"error": msg,
"missing": missing,
})
return
}
cfg := h.buildProvisionerConfig(workspaceID, templatePath, configFiles, payload, envVars, pluginsPath, awarenessNamespace)
cfg.ResetClaudeSession = resetClaudeSession // #12

View File

@ -0,0 +1,137 @@
package registry
import (
"context"
"log"
"os"
"strconv"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// ProvisionTimeoutEmitter is the narrow broadcaster dependency the sweeper
// needs. Defined locally so the registry package stays event-bus agnostic
// (same pattern as OfflineHandler in healthsweep.go).
type ProvisionTimeoutEmitter interface {
RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error
}
// DefaultProvisioningTimeout is how long a workspace may sit in
// status='provisioning' before the sweeper flips it to 'failed'. The
// container-launch path has its own 3-minute context timeout
// (provisioner.ProvisionTimeout) but that only bounds the docker API call —
// a container that started but crashes before /registry/register never
// triggers that path and would sit in provisioning forever. 10 minutes
// covers pathological image-pull + user-data execution on a cold EC2 worker
// while still getting well ahead of the "15+ minute" stuck state users see
// in production.
const DefaultProvisioningTimeout = 10 * time.Minute
// DefaultProvisionSweepInterval is how often the sweeper polls. Same cadence
// as the hibernation monitor — cheap and bounded by the provisioning-state
// query which hits the primary key / status partial index.
const DefaultProvisionSweepInterval = 30 * time.Second
// provisioningTimeout reads the override from env, falling back to the
// default. Env var expressed in seconds so operators can tune via a normal
// container restart without a code change.
func provisioningTimeout() time.Duration {
if v := os.Getenv("PROVISION_TIMEOUT_SECONDS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
return time.Duration(n) * time.Second
}
}
return DefaultProvisioningTimeout
}
// StartProvisioningTimeoutSweep periodically scans for workspaces stuck in
// `status='provisioning'` past the timeout window, flips them to `failed`,
// and broadcasts a WORKSPACE_PROVISION_TIMEOUT event so the canvas can
// render a fail-state instead of the indefinite cosmetic "Provisioning
// Timeout" banner.
//
// The sweep is idempotent: the UPDATE's WHERE clause re-checks both status
// and age under the same row lock, so a workspace that raced to `online` or
// was restarted while the sweep was scanning will not get flipped.
func StartProvisioningTimeoutSweep(ctx context.Context, emitter ProvisionTimeoutEmitter, interval time.Duration) {
if emitter == nil {
log.Println("Provision-timeout sweep: emitter is nil — skipping (no one to broadcast to)")
return
}
if interval <= 0 {
interval = DefaultProvisionSweepInterval
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Printf("Provision-timeout sweep: started (interval=%s, timeout=%s)", interval, provisioningTimeout())
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sweepStuckProvisioning(ctx, emitter)
}
}
}
// sweepStuckProvisioning is one tick of the sweeper. Exported-for-test via
// the package boundary: keep all time.Now reads inside so tests can drive it
// deterministically by seeding updated_at rather than manipulating time.
func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter) {
timeout := provisioningTimeout()
timeoutSec := int(timeout / time.Second)
// Read candidates first so the event broadcast can include each id. The
// subsequent UPDATE re-checks the predicate to stay race-safe against
// concurrent restart / register paths that write updated_at.
rows, err := db.DB.QueryContext(ctx, `
SELECT id FROM workspaces
WHERE status = 'provisioning'
AND updated_at < now() - ($1 || ' seconds')::interval
`, timeoutSec)
if err != nil {
log.Printf("Provision-timeout sweep: query error: %v", err)
return
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err == nil {
ids = append(ids, id)
}
}
for _, id := range ids {
msg := "provisioning timed out — container never reported online. Check the workspace's required env vars and retry."
res, err := db.DB.ExecContext(ctx, `
UPDATE workspaces
SET status = 'failed',
last_sample_error = $2,
updated_at = now()
WHERE id = $1
AND status = 'provisioning'
AND updated_at < now() - ($3 || ' seconds')::interval
`, id, msg, timeoutSec)
if err != nil {
log.Printf("Provision-timeout sweep: failed to flip %s to failed: %v", id, err)
continue
}
affected, _ := res.RowsAffected()
if affected == 0 {
// Raced with restart / register — no harm, just skip.
continue
}
log.Printf("Provision-timeout sweep: %s stuck in provisioning > %s — marked failed", id, timeout)
if emitErr := emitter.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_TIMEOUT", id, map[string]interface{}{
"error": msg,
"timeout_secs": timeoutSec,
}); emitErr != nil {
log.Printf("Provision-timeout sweep: broadcast failed for %s: %v", id, emitErr)
}
}
}

View File

@ -0,0 +1,175 @@
package registry
import (
"context"
"errors"
"sync"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// fakeEmitter records every RecordAndBroadcast call so tests can assert
// payload shape + emission count. Safe for concurrent use since the sweeper
// itself is single-goroutine but keeping the lock lets the suite fan out.
type fakeEmitter struct {
mu sync.Mutex
events []emittedEvent
fail bool
}
type emittedEvent struct {
Type string
WorkspaceID string
Payload interface{}
}
func (f *fakeEmitter) RecordAndBroadcast(_ context.Context, eventType string, workspaceID string, payload interface{}) error {
f.mu.Lock()
defer f.mu.Unlock()
f.events = append(f.events, emittedEvent{eventType, workspaceID, payload})
if f.fail {
return errors.New("broadcast boom")
}
return nil
}
func (f *fakeEmitter) count() int {
f.mu.Lock()
defer f.mu.Unlock()
return len(f.events)
}
// TestSweepStuckProvisioning_FlipsOverdue verifies the happy path: a stuck
// provisioning workspace gets flipped to failed AND an event is broadcast.
func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-stuck"))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit)
if emit.count() != 1 {
t.Fatalf("expected 1 event, got %d", emit.count())
}
if emit.events[0].Type != "WORKSPACE_PROVISION_TIMEOUT" {
t.Errorf("event type = %q, want WORKSPACE_PROVISION_TIMEOUT", emit.events[0].Type)
}
if emit.events[0].WorkspaceID != "ws-stuck" {
t.Errorf("workspace id = %q, want ws-stuck", emit.events[0].WorkspaceID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestSweepStuckProvisioning_RaceSafe covers the case where UPDATE affects
// 0 rows because the workspace flipped to online (or got restarted) between
// the SELECT and the UPDATE. We should skip the event, not emit a false
// WORKSPACE_PROVISION_TIMEOUT.
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-raced"))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-raced", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 0)) // 0 rows — raced
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit)
if emit.count() != 0 {
t.Errorf("expected 0 events on race, got %d", emit.count())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestSweepStuckProvisioning_NoStuck verifies that an empty candidate list
// produces no events and no update queries.
func TestSweepStuckProvisioning_NoStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit)
if emit.count() != 0 {
t.Errorf("expected 0 events when nothing stuck, got %d", emit.count())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestSweepStuckProvisioning_MultipleStuck covers the realistic case where
// both agents (claude-code + hermes) are stuck — both should get flipped
// and both should get events.
func TestSweepStuckProvisioning_MultipleStuck(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-claude-code").
AddRow("ws-hermes"))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-claude-code", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-hermes", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{}
sweepStuckProvisioning(context.Background(), emit)
if emit.count() != 2 {
t.Fatalf("expected 2 events, got %d", emit.count())
}
}
// TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash ensures the
// sweeper tolerates a broadcast error (Redis hiccup) — the DB row is
// already flipped so the state stays coherent.
func TestSweepStuckProvisioning_BroadcastFailureDoesNotCrash(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-stuck"))
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-stuck", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
emit := &fakeEmitter{fail: true}
// Must not panic.
sweepStuckProvisioning(context.Background(), emit)
}
// TestProvisioningTimeout_EnvOverride verifies PROVISION_TIMEOUT_SECONDS
// env var takes effect when set to a positive integer, and falls back to
// default otherwise.
func TestProvisioningTimeout_EnvOverride(t *testing.T) {
t.Setenv("PROVISION_TIMEOUT_SECONDS", "60")
if got := provisioningTimeout(); got.Seconds() != 60 {
t.Errorf("override: got %v, want 60s", got)
}
t.Setenv("PROVISION_TIMEOUT_SECONDS", "")
if got := provisioningTimeout(); got != DefaultProvisioningTimeout {
t.Errorf("default: got %v, want %v", got, DefaultProvisioningTimeout)
}
t.Setenv("PROVISION_TIMEOUT_SECONDS", "not-a-number")
if got := provisioningTimeout(); got != DefaultProvisioningTimeout {
t.Errorf("bad override: got %v, want default %v", got, DefaultProvisioningTimeout)
}
}