Merge pull request #1731 from Molecule-AI/fix/scheduler-sweep-phantom-busy

feat(scheduler): sweepPhantomBusy — clear stuck active_tasks from crashed runs
This commit is contained in:
Hongming Wang 2026-04-22 20:03:00 -07:00 committed by GitHub
commit 3c785bc7f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -17,10 +17,12 @@ import (
)
const (
pollInterval = 30 * time.Second
maxConcurrent = 10
batchLimit = 50
fireTimeout = 5 * time.Minute
pollInterval = 30 * time.Second
maxConcurrent = 10
batchLimit = 50
fireTimeout = 5 * time.Minute
phantomSweepInterval = 5 * time.Minute
phantomStaleThreshold = 10 * time.Minute
)
// A2AProxy is the interface the scheduler needs to send messages to workspaces.
@ -63,6 +65,7 @@ type Scheduler struct {
// Atomic-ish via the mutex; tick rate is 30s so contention is trivial.
mu sync.RWMutex
lastTickAt time.Time
lastSweepAt time.Time
tickInterval time.Duration // defaults to pollInterval; overridable in tests
}
@ -164,6 +167,7 @@ func (s *Scheduler) Start(ctx context.Context) {
return
case <-ticker.C:
tickWithRecover()
s.maybeSweepPhantomBusy(ctx)
supervised.Heartbeat("scheduler")
}
}
@ -565,6 +569,78 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
}
}
// maybeSweepPhantomBusy runs sweepPhantomBusy at most once every
// phantomSweepInterval (5 min). Called on every tick but gated by a timer
// so the DB query doesn't run on every 30s poll.
func (s *Scheduler) maybeSweepPhantomBusy(ctx context.Context) {
s.mu.RLock()
last := s.lastSweepAt
s.mu.RUnlock()
if time.Since(last) < phantomSweepInterval {
return
}
s.sweepPhantomBusy(ctx)
s.mu.Lock()
s.lastSweepAt = time.Now()
s.mu.Unlock()
}
// sweepPhantomBusy finds workspaces stuck with active_tasks > 0 but no
// recent activity_log entry (within phantomStaleThreshold). This happens
// when an agent errors out (MiniMax timeout, OOM, etc.) and the finally
// block fails to decrement active_tasks. Without this sweep the scheduler
// skips cron fires for those workspaces indefinitely ("workspace busy —
// retry"), requiring manual DB intervention.
//
// The query mirrors the manual fix that was being run every 30 min:
//
// UPDATE workspaces SET active_tasks = 0
// WHERE active_tasks > 0
// AND id NOT IN (SELECT DISTINCT workspace_id
// FROM activity_logs
// WHERE created_at > NOW() - INTERVAL '10 minutes')
func (s *Scheduler) sweepPhantomBusy(ctx context.Context) {
rows, err := db.DB.QueryContext(ctx, `
UPDATE workspaces
SET active_tasks = 0,
current_task = '',
updated_at = now()
WHERE active_tasks > 0
AND status != 'removed'
AND id NOT IN (
SELECT DISTINCT workspace_id
FROM activity_logs
WHERE created_at > NOW() - $1::interval
)
RETURNING id, name
`, fmt.Sprintf("%d minutes", int(phantomStaleThreshold.Minutes())))
if err != nil {
log.Printf("Scheduler: phantom-busy sweep query error: %v", err)
return
}
defer rows.Close()
count := 0
for rows.Next() {
var id, name string
if err := rows.Scan(&id, &name); err != nil {
log.Printf("Scheduler: phantom-busy sweep scan error: %v", err)
continue
}
log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes()))
count++
}
if err := rows.Err(); err != nil {
log.Printf("Scheduler: phantom-busy sweep rows error: %v", err)
}
if count > 0 {
log.Printf("Scheduler: phantom-busy sweep complete — reset %d workspace(s)", count)
}
}
// isEmptyResponse checks if an A2A response body indicates the agent
// produced no meaningful output. Catches "(no response generated)" from
// the workspace runtime + genuinely empty/null responses. Used by the