diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 4ae82247..fc9f6e81 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -17,10 +17,12 @@ import ( ) const ( - pollInterval = 30 * time.Second - maxConcurrent = 10 - batchLimit = 50 - fireTimeout = 5 * time.Minute + pollInterval = 30 * time.Second + maxConcurrent = 10 + batchLimit = 50 + fireTimeout = 5 * time.Minute + phantomSweepInterval = 5 * time.Minute + phantomStaleThreshold = 10 * time.Minute ) // A2AProxy is the interface the scheduler needs to send messages to workspaces. @@ -63,6 +65,7 @@ type Scheduler struct { // Atomic-ish via the mutex; tick rate is 30s so contention is trivial. mu sync.RWMutex lastTickAt time.Time + lastSweepAt time.Time tickInterval time.Duration // defaults to pollInterval; overridable in tests } @@ -164,6 +167,7 @@ func (s *Scheduler) Start(ctx context.Context) { return case <-ticker.C: tickWithRecover() + s.maybeSweepPhantomBusy(ctx) supervised.Heartbeat("scheduler") } } @@ -565,6 +569,78 @@ func (s *Scheduler) repairNullNextRunAt(ctx context.Context) { } } +// maybeSweepPhantomBusy runs sweepPhantomBusy at most once every +// phantomSweepInterval (5 min). Called on every tick but gated by a timer +// so the DB query doesn't run on every 30s poll. +func (s *Scheduler) maybeSweepPhantomBusy(ctx context.Context) { + s.mu.RLock() + last := s.lastSweepAt + s.mu.RUnlock() + + if time.Since(last) < phantomSweepInterval { + return + } + + s.sweepPhantomBusy(ctx) + + s.mu.Lock() + s.lastSweepAt = time.Now() + s.mu.Unlock() +} + +// sweepPhantomBusy finds workspaces stuck with active_tasks > 0 but no +// recent activity_log entry (within phantomStaleThreshold). This happens +// when an agent errors out (MiniMax timeout, OOM, etc.) and the finally +// block fails to decrement active_tasks. Without this sweep the scheduler +// skips cron fires for those workspaces indefinitely ("workspace busy — +// retry"), requiring manual DB intervention. +// +// The query mirrors the manual fix that was being run every 30 min: +// +// UPDATE workspaces SET active_tasks = 0 +// WHERE active_tasks > 0 +// AND id NOT IN (SELECT DISTINCT workspace_id +// FROM activity_logs +// WHERE created_at > NOW() - INTERVAL '10 minutes') +func (s *Scheduler) sweepPhantomBusy(ctx context.Context) { + rows, err := db.DB.QueryContext(ctx, ` + UPDATE workspaces + SET active_tasks = 0, + current_task = '', + updated_at = now() + WHERE active_tasks > 0 + AND status != 'removed' + AND id NOT IN ( + SELECT DISTINCT workspace_id + FROM activity_logs + WHERE created_at > NOW() - $1::interval + ) + RETURNING id, name + `, fmt.Sprintf("%d minutes", int(phantomStaleThreshold.Minutes()))) + if err != nil { + log.Printf("Scheduler: phantom-busy sweep query error: %v", err) + return + } + defer rows.Close() + + count := 0 + for rows.Next() { + var id, name string + if err := rows.Scan(&id, &name); err != nil { + log.Printf("Scheduler: phantom-busy sweep scan error: %v", err) + continue + } + log.Printf("Scheduler: phantom-busy sweep — reset %s (no activity in %d min)", name, int(phantomStaleThreshold.Minutes())) + count++ + } + if err := rows.Err(); err != nil { + log.Printf("Scheduler: phantom-busy sweep rows error: %v", err) + } + if count > 0 { + log.Printf("Scheduler: phantom-busy sweep complete — reset %d workspace(s)", count) + } +} + // isEmptyResponse checks if an A2A response body indicates the agent // produced no meaningful output. Catches "(no response generated)" from // the workspace runtime + genuinely empty/null responses. Used by the