From 69d9b4e38d8eb646507d70cb1fcb15f16bc3d052 Mon Sep 17 00:00:00 2001 From: core-be Date: Fri, 15 May 2026 17:53:36 -0700 Subject: [PATCH] fix(handlers): drain detached async goroutines before test db.DB swap (data race) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: platform/internal/db.DB is a swappable package global. setupTestDB (+ peer test helpers) saves/restores it via t.Cleanup, but production code spawns fire-and-forget goroutines (maybeMarkContainerDead/ preflightContainerHealth -> RestartByID -> runRestartCycle, logA2ASuccess/ Failure activity logging, gracefulPreRestart, sendRestartContext) that read db.DB. These detached goroutines outlive the test that triggered them and race the db.DB pointer write in a LATER test's cleanup — WARNING: DATA RACE on platform/internal/db.DB, surfaced deterministically by PR#1240's expanded A2A test corpus on staging (a sibling of the mc#664/mc#774 Phase-3-masked handler-test family). Pre-existing since be5fbb5a (2026-05-07); NOT introduced by #1240/#1250. Fix: - Convert the leaked raw `go ...` restart/a2a-logging goroutines to the existing tracked h.goAsync (asyncWG) — matches the already-correct site at a2a_proxy.go:648 and goAsync's documented intent. - Wire the never-connected test-drain half: a newHandlerHook (nil in prod, zero cost) lets the test harness register every handler; setupTestDB's cleanup now drains all tracked async goroutines BEFORE restoring db.DB, eliminating the race window. Verified: full `go test -race -timeout ./...` (CI step) green, 0 races, 0 failures; the 8 originally-failing tests pass -race -count=5. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/a2a_proxy_helpers.go | 31 ++++++++----- .../internal/handlers/handlers_test.go | 43 ++++++++++++++++++- .../internal/handlers/restart_signals.go | 10 +++-- .../internal/handlers/workspace.go | 12 ++++++ .../internal/handlers/workspace_restart.go | 10 +++-- 5 files changed, 87 insertions(+), 19 deletions(-) diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index c3ff562ea..5c1d3c2ba 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -194,7 +194,12 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace } db.ClearWorkspaceKeys(ctx, workspaceID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) - go h.RestartByID(workspaceID) + // Tracked via goAsync (not bare `go`) so the asyncWG can be drained + // before a test swaps the global db.DB. runRestartCycle reads db.DB + // before its provisioner gate, so an untracked detached goroutine + // races setupTestDB's t.Cleanup db.DB restore. Matches the already- + // correct site at a2a_proxy.go:648. + h.goAsync(func() { h.RestartByID(workspaceID) }) return true } @@ -241,7 +246,10 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa } db.ClearWorkspaceKeys(ctx, workspaceID) h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{}) - go h.RestartByID(workspaceID) + // Tracked via goAsync (see maybeMarkContainerDead): preflight's + // detached restart must be drainable so it doesn't race the global + // db.DB swap in test cleanup. + h.goAsync(func() { h.RestartByID(workspaceID) }) return &proxyA2AError{ Status: http.StatusServiceUnavailable, Response: gin.H{ @@ -262,7 +270,8 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle errWsName = workspaceID } summary := "A2A request to " + errWsName + " failed: " + errMsg - go func(parent context.Context) { + parent := ctx + h.goAsync(func() { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ @@ -277,7 +286,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle Status: "error", ErrorDetail: &errMsg, }) - }(ctx) + }) } // logA2ASuccess records a successful A2A round-trip and (for canvas-initiated @@ -298,18 +307,19 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle // silent workspaces. Only update when callerID is a real workspace (not // canvas, not a system caller) and the target returned 2xx/3xx. if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 { - go func() { + h.goAsync(func() { bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if _, err := db.DB.ExecContext(bgCtx, `UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil { log.Printf("last_outbound_at update failed for %s: %v", callerID, err) } - }() + }) } summary := a2aMethod + " → " + wsNameForLog toolTrace := extractToolTrace(respBody) - go func(parent context.Context) { + parent := ctx + h.goAsync(func() { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ @@ -325,7 +335,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle DurationMs: &durationMs, Status: logStatus, }) - }(ctx) + }) if callerID == "" && statusCode < 400 { h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{ @@ -510,7 +520,8 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, wsName = workspaceID } summary := a2aMethod + " → " + wsName + " (queued for poll)" - go func(parent context.Context) { + parent := ctx + h.goAsync(func() { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() LogActivity(logCtx, h.broadcaster, ActivityParams{ @@ -523,7 +534,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, RequestBody: json.RawMessage(body), Status: "ok", }) - }(ctx) + }) } // readUsageMap extracts input_tokens / output_tokens from the "usage" key of m. diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index 958858f02..18d8cf502 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -8,6 +8,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -22,8 +23,39 @@ import ( "github.com/redis/go-redis/v9" ) +// liveTestHandlers tracks every WorkspaceHandler built during the test +// binary's lifetime so setupTestDB can drain their in-flight goAsync +// goroutines (notably the detached RestartByID restart cycle, which +// reads the global db.DB) BEFORE restoring db.DB. Without this drain a +// fire-and-forget restart goroutine spawned by one test outlives that +// test and races the db.DB swap in a later test's t.Cleanup — the +// 0x...d548 data race on platform/internal/db.DB. +var ( + liveTestHandlersMu sync.Mutex + liveTestHandlers []*WorkspaceHandler +) + func init() { gin.SetMode(gin.TestMode) + newHandlerHook = func(h *WorkspaceHandler) { + liveTestHandlersMu.Lock() + liveTestHandlers = append(liveTestHandlers, h) + liveTestHandlersMu.Unlock() + } +} + +// drainTestAsync waits for every tracked handler's goAsync goroutines to +// finish. Called from setupTestDB's cleanup before db.DB is restored so +// no detached restart/provision goroutine is mid-read of db.DB when the +// pointer is swapped. +func drainTestAsync() { + liveTestHandlersMu.Lock() + handlers := make([]*WorkspaceHandler, len(liveTestHandlers)) + copy(handlers, liveTestHandlers) + liveTestHandlersMu.Unlock() + for _, h := range handlers { + h.waitAsyncForTest() + } } // setupTestDB creates a sqlmock DB and assigns it to the global db.DB. @@ -37,7 +69,16 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock { } prevDB := db.DB db.DB = mockDB - t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) + t.Cleanup(func() { + // Drain detached async goroutines (e.g. goAsync(RestartByID), + // which reads db.DB in runRestartCycle before its provisioner + // gate) BEFORE swapping db.DB back. Doing the restore first + // would let an in-flight restart goroutine read db.DB while + // this line writes it — the data race this guards against. + drainTestAsync() + db.DB = prevDB + mockDB.Close() + }) // Disable SSRF checks for the duration of this test only. Restore // the previous state via t.Cleanup so that TestIsSafeURL_* tests diff --git a/workspace-server/internal/handlers/restart_signals.go b/workspace-server/internal/handlers/restart_signals.go index a947a560b..88ff480a3 100644 --- a/workspace-server/internal/handlers/restart_signals.go +++ b/workspace-server/internal/handlers/restart_signals.go @@ -56,9 +56,11 @@ const ( // (an externally routable address) is used directly. func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID string) { // Non-blocking send — don't stall the restart cycle. - // Run in a detached goroutine so the caller (runRestartCycle) can - // proceed to stopForRestart without waiting. - go func() { + // Run in a tracked async goroutine (goAsync, not bare `go`) so the + // caller (runRestartCycle) can proceed to stopForRestart without + // waiting, while the test harness can still drain it before swapping + // the global db.DB (resolveAgentURLForRestartSignal reads db.DB). + h.goAsync(func() { signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout) defer cancel() @@ -109,7 +111,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s } else { log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode) } - }() + }) } // resolveAgentURLForRestartSignal returns the routable URL for the workspace diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index b3651d2a1..b310fe6f7 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -80,6 +80,15 @@ type WorkspaceHandler struct { asyncWG sync.WaitGroup } +// newHandlerHook, when non-nil, is invoked for every WorkspaceHandler +// created via NewWorkspaceHandler. It is nil in production (zero cost); +// the test harness sets it so setupTestDB can drain every handler's +// in-flight async goroutines before swapping the global db.DB. Without +// this, a detached restart goroutine (maybeMarkContainerDead -> +// goAsync(RestartByID) -> runRestartCycle reads db.DB) races the +// db.DB restore in another test's t.Cleanup. +var newHandlerHook func(*WorkspaceHandler) + func (h *WorkspaceHandler) goAsync(fn func()) { h.asyncWG.Add(1) go func() { @@ -108,6 +117,9 @@ func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, plat if p != nil { h.provisioner = p } + if newHandlerHook != nil { + newHandlerHook(h) + } return h } diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 985b9ca56..c2ab5828e 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -237,10 +237,10 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { // the silent-drop bugs PRs #2811/#2824 closed). RestartWorkspaceAuto // enforces CP-FIRST ordering matching the other dispatchers — see // docs/architecture/backends.md. - go func() { + h.goAsync(func() { h.RestartWorkspaceAutoOpts(context.Background(), id, templatePath, configFiles, payload, resetClaudeSession) - }() - go h.sendRestartContext(id, restartData) + }) + h.goAsync(func() { h.sendRestartContext(id, restartData) }) c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession}) } @@ -610,7 +610,9 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { h.provisionWorkspaceAutoSync(workspaceID, "", nil, payload) // sendRestartContext is a one-way notification to the new container; safe // to fire async — the next restart cycle won't depend on it completing. - go h.sendRestartContext(workspaceID, restartData) + // Tracked via goAsync so the test harness can drain it before the + // global db.DB swap (sendRestartContext reads db.DB). + h.goAsync(func() { h.sendRestartContext(workspaceID, restartData) }) } // Pause handles POST /workspaces/:id/pause -- 2.52.0