forked from molecule-ai/molecule-core
Merge pull request #2266 from Molecule-AI/fix/restart-race-pending-flag
fix(restart): coalesce concurrent restart requests via pending flag
This commit is contained in:
commit
3623e975ec
@ -14,8 +14,33 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// restartMu prevents concurrent RestartByID calls for the same workspace
|
||||
var restartMu sync.Map // map[workspaceID]*sync.Mutex
|
||||
// restartState coalesces concurrent RestartByID calls for one workspace.
|
||||
//
|
||||
// The naive "one mutex per workspace, TryLock+drop" pattern caused a real
|
||||
// data-loss bug: SetSecret + SetModel both fire `go restartFunc(...)` from
|
||||
// the HTTP handler, and both writes commit before either restart goroutine
|
||||
// gets to load workspace_secrets. If the second goroutine arrives while the
|
||||
// first holds the mutex, TryLock returns false and the second is silently
|
||||
// skipped. The first goroutine's loadWorkspaceSecrets ran before the second
|
||||
// write committed, so the new container boots without that env var. Surfaced
|
||||
// as the "No LLM provider configured" hermes error when MODEL_PROVIDER landed
|
||||
// after the API-key write but lost its restart to the mutex.
|
||||
//
|
||||
// The fix is the pending-flag / coalescing pattern: any restart request that
|
||||
// arrives while one is in flight sets the pending flag and returns. The
|
||||
// in-flight runner, on completion, checks the flag and runs another cycle.
|
||||
// This collapses N concurrent requests into at most 2 sequential restarts
|
||||
// (the current one + one more that picks up everything written during it),
|
||||
// while guaranteeing the final container always sees the latest secrets.
|
||||
type restartState struct {
|
||||
mu sync.Mutex
|
||||
running bool // true while a restart cycle is in flight
|
||||
pending bool // set by any caller that arrived during the in-flight cycle
|
||||
}
|
||||
|
||||
// restartStates is a per-workspace map of *restartState. Each workspace gets
|
||||
// its own entry so unrelated workspaces don't serialize on each other.
|
||||
var restartStates sync.Map // map[workspaceID]*restartState
|
||||
|
||||
// isParentPaused checks if any ancestor of the workspace is paused.
|
||||
func isParentPaused(ctx context.Context, workspaceID string) (bool, string) {
|
||||
@ -294,21 +319,63 @@ func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID s
|
||||
log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID)
|
||||
}
|
||||
|
||||
// RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change).
|
||||
// RestartByID restarts a workspace by ID — for programmatic use (e.g.,
|
||||
// auto-restart after secret change). Calls that arrive while one is in flight
|
||||
// are coalesced via the pending-flag pattern (see restartState above): the
|
||||
// in-flight runner picks up the pending request after its current cycle
|
||||
// completes, so writes that committed mid-restart are guaranteed to land.
|
||||
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
if h.provisioner == nil {
|
||||
return
|
||||
}
|
||||
coalesceRestart(workspaceID, func() { h.runRestartCycle(workspaceID) })
|
||||
}
|
||||
|
||||
// Per-workspace mutex — skip if already restarting (last-write-wins)
|
||||
mu, _ := restartMu.LoadOrStore(workspaceID, &sync.Mutex{})
|
||||
wsMu := mu.(*sync.Mutex)
|
||||
if !wsMu.TryLock() {
|
||||
log.Printf("Auto-restart: skipping %s — restart already in progress", workspaceID)
|
||||
// coalesceRestart implements the pending-flag gate around an arbitrary cycle
|
||||
// function. Extracted from RestartByID for direct unit testing — the cycle
|
||||
// function in production is `runRestartCycle`, but tests pass a counter to
|
||||
// verify the coalescing math (N concurrent requests → ≤2 cycles).
|
||||
func coalesceRestart(workspaceID string, cycle func()) {
|
||||
sv, _ := restartStates.LoadOrStore(workspaceID, &restartState{})
|
||||
state := sv.(*restartState)
|
||||
|
||||
// Mark a restart as wanted. If one is already running, return — that
|
||||
// runner will see pending=true on its next loop iteration and run
|
||||
// another cycle that picks up our request's effects. NOT dropped.
|
||||
state.mu.Lock()
|
||||
state.pending = true
|
||||
if state.running {
|
||||
state.mu.Unlock()
|
||||
log.Printf("Auto-restart: %s — coalescing with in-flight cycle (pending=true)", workspaceID)
|
||||
return
|
||||
}
|
||||
defer wsMu.Unlock()
|
||||
state.running = true
|
||||
state.mu.Unlock()
|
||||
|
||||
// Drain pending requests. Each iteration re-loads workspace_secrets
|
||||
// inside provisionWorkspace, so any writes that committed since the
|
||||
// last cycle are picked up. Continues until no pending request was
|
||||
// observed at the top of an iteration.
|
||||
for {
|
||||
state.mu.Lock()
|
||||
if !state.pending {
|
||||
state.running = false
|
||||
state.mu.Unlock()
|
||||
return
|
||||
}
|
||||
state.pending = false
|
||||
state.mu.Unlock()
|
||||
|
||||
cycle()
|
||||
}
|
||||
}
|
||||
|
||||
// runRestartCycle does the actual stop+provision work for one restart
|
||||
// iteration. Synchronous (waits for provisionWorkspace to complete) so the
|
||||
// outer pending-flag loop in RestartByID can correctly coalesce — if this
|
||||
// returned before the new container was up, the loop would race the
|
||||
// in-progress provision goroutine on the next iteration's Stop call.
|
||||
func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
ctx := context.Background()
|
||||
|
||||
var wsName, status, dbRuntime string
|
||||
@ -354,7 +421,12 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
restartData := loadRestartContextData(ctx, workspaceID)
|
||||
|
||||
// On auto-restart, do NOT re-apply templates — preserve existing config volume.
|
||||
go h.provisionWorkspace(workspaceID, "", nil, payload)
|
||||
// SYNCHRONOUS provisionWorkspace: returns when the new container is up
|
||||
// (or has failed). The outer loop relies on this to know when it's safe
|
||||
// to start another restart cycle without racing this one's Stop call.
|
||||
h.provisionWorkspace(workspaceID, "", nil, payload)
|
||||
// sendRestartContext is a one-way notification to the new container; safe
|
||||
// to fire async — the next restart cycle won't depend on it completing.
|
||||
go h.sendRestartContext(workspaceID, restartData)
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,235 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// resetRestartStatesFor clears any state for the given workspaceID. Tests
|
||||
// must call this between scenarios because restartStates is a package-level
|
||||
// sync.Map shared across all RestartByID callers (incl. other tests in the
|
||||
// suite).
|
||||
func resetRestartStatesFor(workspaceID string) {
|
||||
restartStates.Delete(workspaceID)
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_SingleCallRunsOneCycle is the baseline:
|
||||
// no concurrency, one cycle. If this fails the gate logic is broken at
|
||||
// its simplest path.
|
||||
func TestCoalesceRestart_SingleCallRunsOneCycle(t *testing.T) {
|
||||
const wsID = "test-coalesce-single"
|
||||
resetRestartStatesFor(wsID)
|
||||
|
||||
var calls atomic.Int32
|
||||
coalesceRestart(wsID, func() { calls.Add(1) })
|
||||
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Errorf("expected 1 cycle, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_ConcurrentCallsCoalesce verifies the bug fix: N
|
||||
// concurrent requests during a slow cycle collapse to at most 2 cycles
|
||||
// (the in-flight one + one more that picks up everyone who arrived
|
||||
// during it). Reproduces the SetSecret + SetModel race that was
|
||||
// silently dropping the second restart.
|
||||
func TestCoalesceRestart_ConcurrentCallsCoalesce(t *testing.T) {
|
||||
const wsID = "test-coalesce-concurrent"
|
||||
resetRestartStatesFor(wsID)
|
||||
|
||||
var calls atomic.Int32
|
||||
cycleStarted := make(chan struct{}, 1)
|
||||
cycleProceed := make(chan struct{})
|
||||
cycle := func() {
|
||||
n := calls.Add(1)
|
||||
if n == 1 {
|
||||
// First cycle blocks on cycleProceed, so we can fire the
|
||||
// "concurrent" requests during it. Subsequent cycles run
|
||||
// to completion immediately.
|
||||
cycleStarted <- struct{}{}
|
||||
<-cycleProceed
|
||||
}
|
||||
}
|
||||
|
||||
// Kick off the first request in a goroutine so we can observe its
|
||||
// in-flight state.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
coalesceRestart(wsID, cycle)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for the first cycle to actually be running before firing
|
||||
// the concurrent batch — otherwise we might call coalesceRestart
|
||||
// before running=true is set, defeating the test.
|
||||
<-cycleStarted
|
||||
|
||||
// Fire 5 concurrent requests during the in-flight cycle. Each
|
||||
// should set pending=true and return immediately (no cycle run
|
||||
// from the goroutine's POV — the loop in goroutine #1 will pick
|
||||
// up the pending flag).
|
||||
const concurrentCount = 5
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < concurrentCount; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
coalesceRestart(wsID, cycle)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// At this point, exactly 1 cycle is running (the first one). The 5
|
||||
// concurrent requests all set pending=true and returned without
|
||||
// running their own cycle.
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Errorf("expected 1 in-flight cycle so far, got %d (concurrent calls did not coalesce)", got)
|
||||
}
|
||||
|
||||
// Release the first cycle. The goroutine's loop should pick up
|
||||
// pending=true and run exactly ONE more cycle, then exit.
|
||||
close(cycleProceed)
|
||||
<-done
|
||||
|
||||
// Total cycles: 2 (the original + one coalesced follow-up for
|
||||
// all 5 concurrent requests). Anything more = wasted restarts;
|
||||
// anything less = lost requests.
|
||||
if got := calls.Load(); got != 2 {
|
||||
t.Errorf("expected exactly 2 cycles total (1 original + 1 coalesced follow-up for 5 concurrent calls), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_SequentialCallsRunSeparately verifies the gate
|
||||
// doesn't over-coalesce: when calls don't overlap, each gets its own
|
||||
// cycle. Important — the bug was dropping calls; the fix shouldn't
|
||||
// over-correct by collapsing distinct sequential restarts.
|
||||
func TestCoalesceRestart_SequentialCallsRunSeparately(t *testing.T) {
|
||||
const wsID = "test-coalesce-sequential"
|
||||
resetRestartStatesFor(wsID)
|
||||
|
||||
var calls atomic.Int32
|
||||
for i := 0; i < 3; i++ {
|
||||
coalesceRestart(wsID, func() { calls.Add(1) })
|
||||
}
|
||||
|
||||
if got := calls.Load(); got != 3 {
|
||||
t.Errorf("expected 3 cycles for 3 sequential calls, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_RequestDuringCyclePickedUp is the targeted
|
||||
// reproduction of the original bug. One request is in flight; one more
|
||||
// arrives during it. The fix must run a follow-up cycle so the second
|
||||
// request's effects (e.g. a model write that committed after the first
|
||||
// restart's secrets-load) are guaranteed to land in the next container.
|
||||
func TestCoalesceRestart_RequestDuringCyclePickedUp(t *testing.T) {
|
||||
const wsID = "test-coalesce-pickup"
|
||||
resetRestartStatesFor(wsID)
|
||||
|
||||
var calls atomic.Int32
|
||||
cycleStarted := make(chan struct{}, 1)
|
||||
cycleProceed := make(chan struct{})
|
||||
|
||||
cycle := func() {
|
||||
n := calls.Add(1)
|
||||
if n == 1 {
|
||||
cycleStarted <- struct{}{}
|
||||
<-cycleProceed
|
||||
}
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
coalesceRestart(wsID, cycle)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
<-cycleStarted
|
||||
// One concurrent request — mimics SetModel arriving during
|
||||
// SetSecret's restart.
|
||||
coalesceRestart(wsID, cycle)
|
||||
close(cycleProceed)
|
||||
<-done
|
||||
|
||||
if got := calls.Load(); got != 2 {
|
||||
t.Errorf("expected 2 cycles (1 original + 1 picked-up pending), got %d — the pending request was dropped, reverting the fix", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_StateClearedAfterDrain ensures the running flag
|
||||
// is reset to false when the loop exits with pending=false, so a
|
||||
// later restart request starts a fresh cycle instead of being
|
||||
// permanently coalesced into nothing. Defends against a future edit
|
||||
// that forgets to clear running.
|
||||
func TestCoalesceRestart_StateClearedAfterDrain(t *testing.T) {
|
||||
const wsID = "test-coalesce-state-clear"
|
||||
resetRestartStatesFor(wsID)
|
||||
|
||||
// First call: runs one cycle, drains, sets running=false.
|
||||
var calls1 atomic.Int32
|
||||
coalesceRestart(wsID, func() { calls1.Add(1) })
|
||||
|
||||
// Second call (later, no overlap): must run its own cycle.
|
||||
var calls2 atomic.Int32
|
||||
coalesceRestart(wsID, func() { calls2.Add(1) })
|
||||
|
||||
if got := calls1.Load(); got != 1 {
|
||||
t.Errorf("first call: expected 1 cycle, got %d", got)
|
||||
}
|
||||
if got := calls2.Load(); got != 1 {
|
||||
t.Errorf("second call: expected 1 cycle (state should reset between drains), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCoalesceRestart_DifferentWorkspacesDoNotSerialize verifies the
|
||||
// per-workspace state map: an in-flight restart for ws A must not
|
||||
// block restarts for ws B. Important for performance — without this,
|
||||
// unrelated workspaces would queue behind each other.
|
||||
func TestCoalesceRestart_DifferentWorkspacesDoNotSerialize(t *testing.T) {
|
||||
const wsA = "test-coalesce-ws-a"
|
||||
const wsB = "test-coalesce-ws-b"
|
||||
resetRestartStatesFor(wsA)
|
||||
resetRestartStatesFor(wsB)
|
||||
|
||||
aStarted := make(chan struct{}, 1)
|
||||
aProceed := make(chan struct{})
|
||||
var aCycles atomic.Int32
|
||||
var bCycles atomic.Int32
|
||||
|
||||
doneA := make(chan struct{})
|
||||
go func() {
|
||||
coalesceRestart(wsA, func() {
|
||||
aCycles.Add(1)
|
||||
aStarted <- struct{}{}
|
||||
<-aProceed
|
||||
})
|
||||
close(doneA)
|
||||
}()
|
||||
|
||||
<-aStarted
|
||||
// While A is in flight, B's restart must be free to run.
|
||||
bDone := make(chan struct{})
|
||||
go func() {
|
||||
coalesceRestart(wsB, func() { bCycles.Add(1) })
|
||||
close(bDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-bDone:
|
||||
// Correct — B did not block on A.
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("B's restart blocked on A's — per-workspace state isolation is broken")
|
||||
}
|
||||
|
||||
close(aProceed)
|
||||
<-doneA
|
||||
|
||||
if got := aCycles.Load(); got != 1 {
|
||||
t.Errorf("A: expected 1 cycle, got %d", got)
|
||||
}
|
||||
if got := bCycles.Load(); got != 1 {
|
||||
t.Errorf("B: expected 1 cycle, got %d", got)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user