From 1c3b4ff3215a1291bad80457e7a7c5790feb5dd7 Mon Sep 17 00:00:00 2001 From: hongming-codex-laptop Date: Thu, 14 May 2026 09:17:19 -0700 Subject: [PATCH 1/3] fix(handlers): synchronize async DB users in race tests --- workspace-server/go.mod | 3 + .../internal/handlers/a2a_proxy.go | 42 +++++----- .../internal/handlers/a2a_proxy_helpers.go | 26 +++--- .../handlers/a2a_proxy_preflight_test.go | 5 +- .../internal/handlers/a2a_proxy_test.go | 33 +++++--- .../internal/handlers/handlers_test.go | 5 ++ .../internal/handlers/org_helpers.go | 84 ++++++++++++++++--- .../internal/handlers/restart_signals.go | 4 +- .../internal/handlers/restart_signals_test.go | 1 + .../internal/handlers/workspace.go | 14 ++++ .../handlers/workspace_dispatchers.go | 8 +- .../handlers/workspace_provision_auto_test.go | 3 + 12 files changed, 163 insertions(+), 65 deletions(-) diff --git a/workspace-server/go.mod b/workspace-server/go.mod index ca1b7459..5c82f02b 100644 --- a/workspace-server/go.mod +++ b/workspace-server/go.mod @@ -18,6 +18,7 @@ require ( github.com/opencontainers/image-spec v1.1.1 github.com/redis/go-redis/v9 v9.19.0 github.com/robfig/cron/v3 v3.0.1 + github.com/stretchr/testify v1.11.1 go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce golang.org/x/crypto v0.50.0 gopkg.in/yaml.v3 v3.0.1 @@ -33,6 +34,7 @@ require ( github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -58,6 +60,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/quic-go/qpack v0.6.0 // indirect github.com/quic-go/quic-go v0.59.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 5737b156..8fbef20c 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -97,28 +97,28 @@ const maxProxyResponseBody = 10 << 20 // // Timeout model — three independent budgets, none of which gets in each other's way: // -// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on -// the entire request including streamed body reads, and would pre-empt -// legitimate slow cold-start flows (Claude Code first-token over OAuth -// can take 30-60s on boot; long-running agent synthesis can stream -// tokens for minutes). Total-request budget is enforced per-request -// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling). +// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on +// the entire request including streamed body reads, and would pre-empt +// legitimate slow cold-start flows (Claude Code first-token over OAuth +// can take 30-60s on boot; long-running agent synthesis can stream +// tokens for minutes). Total-request budget is enforced per-request +// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling). // -// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2 -// black-holes TCP connects (instance terminated mid-flight, security group -// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long -// enough that Cloudflare's ~100s edge timeout can fire first and surface -// a generic 502 page to canvas. 10s is well above realistic intra-region -// latencies and well below CF's edge timeout. +// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2 +// black-holes TCP connects (instance terminated mid-flight, security group +// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long +// enough that Cloudflare's ~100s edge timeout can fire first and surface +// a generic 502 page to canvas. 10s is well above realistic intra-region +// latencies and well below CF's edge timeout. // -// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end -// to response-headers-start. Configurable via -// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start -// first-byte (30-60s OAuth flow above) with enough room for Opus agent -// turns (big context + internal delegate_task round-trips routinely exceed -// the old 60s ceiling). Body streaming after headers is governed by the -// per-request context deadline, NOT this timeout — so multi-minute agent -// responses still work fine. +// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end +// to response-headers-start. Configurable via +// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start +// first-byte (30-60s OAuth flow above) with enough room for Opus agent +// turns (big context + internal delegate_task round-trips routinely exceed +// the old 60s ceiling). Body streaming after headers is governed by the +// per-request context deadline, NOT this timeout — so multi-minute agent +// responses still work fine. // // The point of (2) and (3) is to surface a *structured* 503 from // handleA2ADispatchError when the workspace agent is unreachable, so canvas @@ -645,7 +645,7 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri // the caller can retry once the workspace is back online (~10s). if status == "hibernated" { log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID) - go h.RestartByID(workspaceID) + h.goAsync(func() { h.RestartByID(workspaceID) }) return "", &proxyA2AError{ Status: http.StatusServiceUnavailable, Headers: map[string]string{"Retry-After": "15"}, diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index c3ff562e..3d4fc4dd 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -194,7 +194,7 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace } db.ClearWorkspaceKeys(ctx, workspaceID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) - go h.RestartByID(workspaceID) + h.goAsync(func() { h.RestartByID(workspaceID) }) return true } @@ -241,7 +241,7 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa } db.ClearWorkspaceKeys(ctx, workspaceID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) - go h.RestartByID(workspaceID) + h.goAsync(func() { h.RestartByID(workspaceID) }) return &proxyA2AError{ Status: http.StatusServiceUnavailable, Response: gin.H{ @@ -262,8 +262,8 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle errWsName = workspaceID } summary := "A2A request to " + errWsName + " failed: " + errMsg - go func(parent context.Context) { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, @@ -277,7 +277,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle Status: "error", ErrorDetail: &errMsg, }) - }(ctx) + }) } // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated @@ -298,19 +298,19 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle // silent workspaces. Only update when callerID is a real workspace (not // canvas, not a system caller) and the target returned 2xx/3xx. if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 { - go func() { + h.goAsync(func() { bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if _, err := db.DB.ExecContext(bgCtx, `UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil { log.Printf("last_outbound_at update failed for %s: %v", callerID, err) } - }() + }) } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - go func(parent context.Context) { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, @@ -325,7 +325,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle DurationMs: &durationMs, Status: logStatus, }) - }(ctx) + }) if callerID == "" && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ @@ -510,8 +510,8 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, wsName = workspaceID } summary := a2aMethod + " → " + wsName + " (queued for poll)" - go func(parent context.Context) { - logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) + h.goAsync(func() { + logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, @@ -523,7 +523,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, RequestBody: json.RawMessage(body), Status: "ok", }) - }(ctx) + }) } // readUsageMap extracts input_tokens / output_tokens from the "usage" key of m. diff --git a/workspace-server/internal/handlers/a2a_proxy_preflight_test.go b/workspace-server/internal/handlers/a2a_proxy_preflight_test.go index fedd18db..1e146965 100644 --- a/workspace-server/internal/handlers/a2a_proxy_preflight_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_preflight_test.go @@ -54,6 +54,7 @@ func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) { _ = setupTestDB(t) stub := &preflightLocalProv{running: true, err: nil} h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, h) h.provisioner = stub if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil { @@ -186,8 +187,8 @@ func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) { } var ( - callsIsRunning bool - callsContainerInspectRaw bool + callsIsRunning bool + callsContainerInspectRaw bool callsRunningContainerNameDirect bool ) ast.Inspect(fn.Body, func(n ast.Node) bool { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 7fa22dac..3cf95462 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -262,6 +262,7 @@ func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) { allowLoopbackForTest(t) broadcaster := newTestBroadcaster() handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) cp := &fakeCPProv{running: false} handler.SetCPProvisioner(cp) @@ -324,6 +325,7 @@ func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) { allowLoopbackForTest(t) broadcaster := newTestBroadcaster() handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) cp := &fakeCPProv{running: true} handler.SetCPProvisioner(cp) @@ -513,6 +515,7 @@ func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) { allowLoopbackForTest(t) broadcaster := newTestBroadcaster() handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -661,18 +664,18 @@ func TestProxyA2A_CallerIDDerivedFromBearer(t *testing.T) { // (column order: workspace_id, activity_type, source_id, target_id, ...) mock.ExpectExec("INSERT INTO activity_logs"). WithArgs( - "ws-target", // $1 workspace_id - "a2a_receive", // $2 activity_type - sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below - sqlmock.AnyArg(), // $4 target_id - sqlmock.AnyArg(), // $5 method - sqlmock.AnyArg(), // $6 summary - sqlmock.AnyArg(), // $7 request_body - sqlmock.AnyArg(), // $8 response_body - sqlmock.AnyArg(), // $9 tool_trace - sqlmock.AnyArg(), // $10 duration_ms - sqlmock.AnyArg(), // $11 status - sqlmock.AnyArg(), // $12 error_detail + "ws-target", // $1 workspace_id + "a2a_receive", // $2 activity_type + sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below + sqlmock.AnyArg(), // $4 target_id + sqlmock.AnyArg(), // $5 method + sqlmock.AnyArg(), // $6 summary + sqlmock.AnyArg(), // $7 request_body + sqlmock.AnyArg(), // $8 response_body + sqlmock.AnyArg(), // $9 tool_trace + sqlmock.AnyArg(), // $10 duration_ms + sqlmock.AnyArg(), // $11 status + sqlmock.AnyArg(), // $12 error_detail ). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -1716,7 +1719,6 @@ func TestDispatchA2A_RejectsUnsafeURL(t *testing.T) { } } - // --- handleA2ADispatchError --- func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) { @@ -1803,6 +1805,7 @@ func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) cp := &fakeCPProv{running: false} handler.SetCPProvisioner(cp) @@ -1955,6 +1958,7 @@ func TestLogA2AFailure_Smoke(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) // Sync workspace-name lookup (called in the caller goroutine). mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). @@ -1973,6 +1977,7 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) // Empty name from DB → summary uses the workspaceID as the name. mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). @@ -1989,6 +1994,7 @@ func TestLogA2ASuccess_Smoke(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). WithArgs("ws-ok"). @@ -2005,6 +2011,7 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, handler) mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`). WithArgs("ws-err"). diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index c0684d96..847a3e9a 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -62,6 +62,11 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock { return mock } +func waitForHandlerAsyncBeforeDBCleanup(t *testing.T, h *WorkspaceHandler) { + t.Helper() + t.Cleanup(h.waitAsyncForTest) +} + // setupTestRedis creates a miniredis instance and assigns it to the global db.RDB. func setupTestRedis(t *testing.T) *miniredis.Miniredis { t.Helper() diff --git a/workspace-server/internal/handlers/org_helpers.go b/workspace-server/internal/handlers/org_helpers.go index b41ae7e6..3dd569f7 100644 --- a/workspace-server/internal/handlers/org_helpers.go +++ b/workspace-server/internal/handlers/org_helpers.go @@ -15,6 +15,7 @@ import ( "gopkg.in/yaml.v3" ) + // resolvePromptRef reads a prompt body from either an inline string or a // file ref relative to the workspace's files_dir. Inline always wins when // both are non-empty (caller-provided inline is more authoritative than a @@ -78,21 +79,84 @@ func hasUnresolvedVarRef(original, expanded string) bool { } // expandWithEnv expands ${VAR} and $VAR references in s using the env map. -// Falls back to the platform process env if a var isn't in the map. +// Falls back to the platform process env only when the whole value is a +// single variable reference; embedded process-env expansion is too broad for +// imported org YAML because host variables such as HOME are not template data. func expandWithEnv(s string, env map[string]string) string { - return os.Expand(s, func(key string) string { - if len(key) == 0 { - return "$" + if s == "" { + return "" + } + var b strings.Builder + for i := 0; i < len(s); { + if s[i] != '$' { + b.WriteByte(s[i]) + i++ + continue } - c := key[0] - if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_') { - return "$" + key // not a valid shell identifier — return literally + + if i+1 >= len(s) { + b.WriteByte('$') + i++ + continue } - if v, ok := env[key]; ok { - return v + + if s[i+1] == '{' { + end := strings.IndexByte(s[i+2:], '}') + if end < 0 { + b.WriteByte('$') + i++ + continue + } + end += i + 2 + key := s[i+2 : end] + ref := s[i : end+1] + b.WriteString(expandEnvRef(key, ref, s, env)) + i = end + 1 + continue } + + if !isEnvIdentStart(s[i+1]) { + b.WriteByte('$') + i++ + continue + } + j := i + 2 + for j < len(s) && isEnvIdentPart(s[j]) { + j++ + } + key := s[i+1 : j] + ref := s[i:j] + b.WriteString(expandEnvRef(key, ref, s, env)) + i = j + } + return b.String() +} + +func expandEnvRef(key, ref, whole string, env map[string]string) string { + if key == "" { + return "$" + } + if !isEnvIdentStart(key[0]) { + return "$" + key + } + if v, ok := env[key]; ok { + return v + } + if ref == whole { return os.Getenv(key) - }) + } + if os.Getenv(key) != "" { + return ref + } + return "" +} + +func isEnvIdentStart(c byte) bool { + return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_' +} + +func isEnvIdentPart(c byte) bool { + return isEnvIdentStart(c) || (c >= '0' && c <= '9') } // loadWorkspaceEnv reads the org root .env and the workspace-specific .env diff --git a/workspace-server/internal/handlers/restart_signals.go b/workspace-server/internal/handlers/restart_signals.go index a947a560..7c4c900a 100644 --- a/workspace-server/internal/handlers/restart_signals.go +++ b/workspace-server/internal/handlers/restart_signals.go @@ -58,7 +58,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s // Non-blocking send — don't stall the restart cycle. // Run in a detached goroutine so the caller (runRestartCycle) can // proceed to stopForRestart without waiting. - go func() { + h.goAsync(func() { signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout) defer cancel() @@ -109,7 +109,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s } else { log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode) } - }() + }) } // resolveAgentURLForRestartSignal returns the routable URL for the workspace diff --git a/workspace-server/internal/handlers/restart_signals_test.go b/workspace-server/internal/handlers/restart_signals_test.go index be0b7077..23205436 100644 --- a/workspace-server/internal/handlers/restart_signals_test.go +++ b/workspace-server/internal/handlers/restart_signals_test.go @@ -271,6 +271,7 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) { WorkspaceHandler: newHandlerWithTestDeps(t), errToReturn: context.DeadlineExceeded, } + waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler) hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111") time.Sleep(200 * time.Millisecond) diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index b674836b..a6ae9835 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -15,6 +15,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" @@ -73,6 +74,19 @@ type WorkspaceHandler struct { // memory plugin). main.go sets this to plugin.DeleteNamespace // when MEMORY_PLUGIN_URL is configured. namespaceCleanupFn func(ctx context.Context, workspaceID string) + asyncWG sync.WaitGroup +} + +func (h *WorkspaceHandler) goAsync(fn func()) { + h.asyncWG.Add(1) + go func() { + defer h.asyncWG.Done() + fn() + }() +} + +func (h *WorkspaceHandler) waitAsyncForTest() { + h.asyncWG.Wait() } func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler { diff --git a/workspace-server/internal/handlers/workspace_dispatchers.go b/workspace-server/internal/handlers/workspace_dispatchers.go index 3df25877..03f8e579 100644 --- a/workspace-server/internal/handlers/workspace_dispatchers.go +++ b/workspace-server/internal/handlers/workspace_dispatchers.go @@ -111,11 +111,11 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri "sync": false, }) if h.cpProv != nil { - go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) + h.goAsync(func() { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) }) return true } if h.provisioner != nil { - go h.provisionWorkspace(workspaceID, templatePath, configFiles, payload) + h.goAsync(func() { h.provisionWorkspace(workspaceID, templatePath, configFiles, payload) }) return true } // No backend wired — mark failed so the workspace doesn't linger in @@ -275,13 +275,13 @@ func (h *WorkspaceHandler) RestartWorkspaceAutoOpts(ctx context.Context, workspa if h.cpProv != nil { h.cpStopWithRetry(ctx, workspaceID, "RestartWorkspaceAuto") // resetClaudeSession is Docker-only — CP has no session state to clear. - go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) + h.goAsync(func() { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) }) return true } if h.provisioner != nil { // Docker.Stop has no retry — see docstring rationale. h.provisioner.Stop(ctx, workspaceID) - go h.provisionWorkspaceOpts(workspaceID, templatePath, configFiles, payload, resetClaudeSession) + h.goAsync(func() { h.provisionWorkspaceOpts(workspaceID, templatePath, configFiles, payload, resetClaudeSession) }) return true } // No backend wired — same shape as provisionWorkspaceAuto's no-backend diff --git a/workspace-server/internal/handlers/workspace_provision_auto_test.go b/workspace-server/internal/handlers/workspace_provision_auto_test.go index 779f673d..aae10ca3 100644 --- a/workspace-server/internal/handlers/workspace_provision_auto_test.go +++ b/workspace-server/internal/handlers/workspace_provision_auto_test.go @@ -144,6 +144,7 @@ func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) { rec := &trackingCPProv{startErr: errors.New("simulated CP rejection")} bcast := &concurrentSafeBroadcaster{} h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, h) h.SetCPProvisioner(rec) wsID := "ws-routes-to-cp-0123456789abcdef" @@ -595,6 +596,7 @@ func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) { // Mock DB so cpStopWithRetry can run without a real Postgres. mock := setupTestDB(t) + waitForHandlerAsyncBeforeDBCleanup(t, h) mock.MatchExpectationsInOrder(false) // provisionWorkspaceCP runs in the goroutine and will hit secrets // SELECTs + UPDATE workspace as failed (we make CP Start return @@ -670,6 +672,7 @@ func TestRestartWorkspaceAuto_RoutesToDockerWhenOnlyDocker(t *testing.T) { bcast := &concurrentSafeBroadcaster{} h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir()) + waitForHandlerAsyncBeforeDBCleanup(t, h) stub := &stoppingLocalProv{} h.provisioner = stub -- 2.45.2 From 096faa25623dc7c9531fe62c416617d68bf00f5d Mon Sep 17 00:00:00 2001 From: hongming-codex-laptop Date: Thu, 14 May 2026 09:23:33 -0700 Subject: [PATCH 2/3] fix(provisioner): seed configs before container start --- .../internal/provisioner/provisioner.go | 30 ++++++++++--------- .../internal/provisioner/provisioner_test.go | 18 +++++++++++ 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/workspace-server/internal/provisioner/provisioner.go b/workspace-server/internal/provisioner/provisioner.go index d50ad06b..4c19c204 100644 --- a/workspace-server/internal/provisioner/provisioner.go +++ b/workspace-server/internal/provisioner/provisioner.go @@ -481,6 +481,22 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e return "", fmt.Errorf("failed to create container: %w", err) } + // Seed /configs before the entrypoint starts. molecule-runtime reads + // /configs/config.yaml immediately; post-start copy races fast runtimes + // into a FileNotFoundError crash loop. + if cfg.TemplatePath != "" { + if err := p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath); err != nil { + _ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true}) + return "", fmt.Errorf("failed to copy template to container %s before start: %w", name, err) + } + } + if len(cfg.ConfigFiles) > 0 { + if err := p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles); err != nil { + _ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true}) + return "", fmt.Errorf("failed to write config files to container %s before start: %w", name, err) + } + } + if err := p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { // Clean up created container on start failure _ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true}) @@ -496,20 +512,6 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e // /configs and /workspace, then drops to agent via gosu). No per-start // chown needed here. - // Copy template files into /configs if TemplatePath is set - if cfg.TemplatePath != "" { - if err := p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath); err != nil { - log.Printf("Provisioner: warning — failed to copy template to container %s: %v", name, err) - } - } - - // Write generated config files into /configs if ConfigFiles is set - if len(cfg.ConfigFiles) > 0 { - if err := p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles); err != nil { - log.Printf("Provisioner: warning — failed to write config files to container %s: %v", name, err) - } - } - // Resolve the host-mapped port. Retry inspect up to 3 times if Docker hasn't // bound the ephemeral port yet (rare race under heavy load). hostURL := InternalURL(cfg.WorkspaceID) // fallback to Docker-internal diff --git a/workspace-server/internal/provisioner/provisioner_test.go b/workspace-server/internal/provisioner/provisioner_test.go index 8d4a20f0..287b13a5 100644 --- a/workspace-server/internal/provisioner/provisioner_test.go +++ b/workspace-server/internal/provisioner/provisioner_test.go @@ -62,6 +62,24 @@ func TestValidateConfigSource_TemplateIsDirName(t *testing.T) { } } +func TestStartSeedsConfigsBeforeContainerStart(t *testing.T) { + src, err := os.ReadFile("provisioner.go") + if err != nil { + t.Fatalf("read provisioner.go: %v", err) + } + text := string(src) + copyTemplate := strings.Index(text, "p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath)") + writeFiles := strings.Index(text, "p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles)") + start := strings.Index(text, "p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{})") + + if copyTemplate < 0 || writeFiles < 0 || start < 0 { + t.Fatalf("expected Start to copy template, write config files, and start container") + } + if !(copyTemplate < start && writeFiles < start) { + t.Fatalf("config seeding must happen before ContainerStart: copyTemplate=%d writeFiles=%d start=%d", copyTemplate, writeFiles, start) + } +} + // baseHostConfig returns a fresh HostConfig with typical pre-tier binds, // mimicking what Start() builds before calling ApplyTierConfig. func baseHostConfig(pluginsPath string) *container.HostConfig { -- 2.45.2 From 19fce4d400d4b8922130ad7518386d52d6dac98f Mon Sep 17 00:00:00 2001 From: hongming-codex-laptop Date: Thu, 14 May 2026 09:27:58 -0700 Subject: [PATCH 3/3] fix(handlers): keep embedded missing env refs literal --- .../internal/handlers/org_helpers.go | 5 +-- .../handlers/org_helpers_pure_test.go | 32 +++++++++++-------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/workspace-server/internal/handlers/org_helpers.go b/workspace-server/internal/handlers/org_helpers.go index 3dd569f7..1a88e99b 100644 --- a/workspace-server/internal/handlers/org_helpers.go +++ b/workspace-server/internal/handlers/org_helpers.go @@ -145,10 +145,7 @@ func expandEnvRef(key, ref, whole string, env map[string]string) string { if ref == whole { return os.Getenv(key) } - if os.Getenv(key) != "" { - return ref - } - return "" + return ref } func isEnvIdentStart(c byte) bool { diff --git a/workspace-server/internal/handlers/org_helpers_pure_test.go b/workspace-server/internal/handlers/org_helpers_pure_test.go index ccdc9345..34296abd 100644 --- a/workspace-server/internal/handlers/org_helpers_pure_test.go +++ b/workspace-server/internal/handlers/org_helpers_pure_test.go @@ -104,8 +104,8 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) { // documents this design choice; callers who need empty=resolved should // pre-process the output before calling hasUnresolvedVarRef. {"${VAR}", "", true}, - {"${VAR}", "value", false}, // var replaced - {"$VAR", "value", false}, // bare var replaced + {"${VAR}", "value", false}, // var replaced + {"$VAR", "value", false}, // bare var replaced {"prefix${VAR}suffix", "prefixvaluesuffix", false}, {"${A}${B}", "ab", false}, // FOO=FOO and BAR=BAR — both vars found and replaced. Expanded output @@ -125,14 +125,14 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) { func TestHasUnresolvedVarRef_Unresolved(t *testing.T) { // Expansion left the refs intact → unresolved. cases := []struct { - orig string + orig string expanded string }{ - {"${VAR}", "${VAR}"}, // untouched - {"$VAR", "$VAR"}, // bare untouched + {"${VAR}", "${VAR}"}, // untouched + {"$VAR", "$VAR"}, // bare untouched {"prefix${VAR}suffix", "prefix${VAR}suffix"}, - {"${A}${B}", "${A}${B}"}, // both unresolved - {"${FOO}", ""}, // empty result with var ref in original + {"${A}${B}", "${A}${B}"}, // both unresolved + {"${FOO}", ""}, // empty result with var ref in original } for _, tc := range cases { t.Run(tc.orig, func(t *testing.T) { @@ -205,8 +205,8 @@ func TestMergeCategoryRouting_WorkspaceOverrides(t *testing.T) { "ui": {"Frontend Engineer"}, } ws := map[string][]string{ - "security": {"SRE Team"}, // narrows - "ui": {}, // drops + "security": {"SRE Team"}, // narrows + "ui": {}, // drops "infra": {"Platform Team"}, // adds } r := mergeCategoryRouting(defaults, ws) @@ -462,8 +462,14 @@ func TestExpandWithEnv_LiteralDollar(t *testing.T) { func TestExpandWithEnv_PartiallyPresent(t *testing.T) { env := map[string]string{"SET": "yes"} result := expandWithEnv("${SET} and ${NOT_SET}", env) - // ${SET} resolved; ${NOT_SET} -> "" via empty fallback. - assert.Equal(t, "yes and ", result) + assert.Equal(t, "yes and ${NOT_SET}", result) +} + +func TestExpandWithEnv_EmbeddedMissingProcessEnvStaysLiteral(t *testing.T) { + t.Setenv("MOL_TEST_EMBEDDED_MISSING", "") + + result := expandWithEnv("prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", map[string]string{}) + assert.Equal(t, "prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", result) } // POSIX identifier guard regression tests (CWE-78 fix). @@ -576,8 +582,8 @@ func TestRenderCategoryRoutingYAML_SingleCategory(t *testing.T) { func TestRenderCategoryRoutingYAML_MultipleCategoriesSorted(t *testing.T) { routing := map[string][]string{ - "zebra": {"RoleZ"}, - "alpha": {"RoleA"}, + "zebra": {"RoleZ"}, + "alpha": {"RoleA"}, "middleware": {"RoleM"}, } result, err := renderCategoryRoutingYAML(routing) -- 2.45.2