forked from molecule-ai/molecule-core
fix(scheduler): defer cron fires when workspace busy instead of skipping (#969)
Previously, the scheduler skipped cron fires entirely when a workspace had active_tasks > 0 (#115). This caused permanent cron misses for workspaces kept perpetually busy by the 5-min Orchestrator pulse — work crons (pick-up-work, PR review) were skipped every fire because the agent was always processing a delegation. Measured impact on Dev Lead: 17 context-deadline-exceeded timeouts in 2 hours, ~30% of inter-agent messages silently dropped. Fix: when workspace is busy, poll every 10s for up to 2 minutes waiting for idle. If idle within the window, fire normally. If still busy after 2 min, fall back to the original skip behavior. This is a minimal, safe change: - No new goroutines or channels - Same fire path once idle - Bounded wait (2 min max, won't block the scheduler pool) - Falls back to skip if workspace never becomes idle Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
04292f419c
commit
a3d30c1ece
@ -241,21 +241,41 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
}
|
||||
}()
|
||||
|
||||
// #115 concurrency-aware skip — before firing check if the target
|
||||
// workspace is already executing a task. If so, skip this tick instead
|
||||
// of colliding (which used to surface as "workspace agent busy" errors
|
||||
// and register as a hard fail). advance next_run_at so the next cron
|
||||
// slot gets a fresh chance; log a skipped cron_run row so history shows
|
||||
// the gap instead of a silent miss. COALESCE guards against NULL.
|
||||
// #969 concurrency-aware queue — when the target workspace is busy,
|
||||
// defer the fire instead of skipping. Polls every 10s for up to 2 min
|
||||
// waiting for the workspace to become idle. If still busy after 2 min,
|
||||
// falls back to the original skip behavior.
|
||||
//
|
||||
// This replaces the #115 "skip when busy" pattern which caused crons
|
||||
// to permanently miss when workspaces were perpetually busy from the
|
||||
// Orchestrator pulse delegation chain (~30% message drop rate on Dev Lead).
|
||||
var activeTasks int
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks); err == nil && activeTasks > 0 {
|
||||
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
|
||||
log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d), deferring up to 2 min",
|
||||
sched.Name, short(sched.WorkspaceID, 12), activeTasks)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
// Poll every 10s for up to 2 minutes
|
||||
waited := false
|
||||
for i := 0; i < 12; i++ {
|
||||
time.Sleep(10 * time.Second)
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks); err != nil || activeTasks == 0 {
|
||||
waited = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !waited && activeTasks > 0 {
|
||||
log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d)",
|
||||
sched.Name, short(sched.WorkspaceID, 12), activeTasks)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
}
|
||||
log.Printf("Scheduler: '%s' workspace %s now idle after deferral, firing",
|
||||
sched.Name, short(sched.WorkspaceID, 12))
|
||||
}
|
||||
|
||||
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user