Merge pull request #207 from Molecule-AI/fix/issue-115-scheduler-busy-skip

fix(scheduler): #115 — skip cron fire when workspace busy
This commit is contained in:
Hongming Wang 2026-04-15 11:13:20 -07:00 committed by GitHub
commit 519d478ea2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
}
}()
// #115 concurrency-aware skip — before firing check if the target
// workspace is already executing a task. If so, skip this tick instead
// of colliding (which used to surface as "workspace agent busy" errors
// and register as a hard fail). advance next_run_at so the next cron
// slot gets a fresh chance; log a skipped cron_run row so history shows
// the gap instead of a silent miss. COALESCE guards against NULL.
var activeTasks int
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks); err == nil && activeTasks > 0 {
wsID := sched.WorkspaceID
if len(wsID) > 12 {
wsID = wsID[:12]
}
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
sched.Name, wsID, activeTasks)
s.recordSkipped(ctx, sched, activeTasks)
return
}
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
@ -308,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
}
}
// recordSkipped advances next_run_at and logs a cron_run activity entry
// with status='skipped' when the target workspace was already busy.
// Issue #115 — replaces the previous "busy → fire → fail → retry next
// tick" loop with "busy → skip → advance → try next slot". Keeps the
// history surface honest (a skip is not an error) and stops filling
// last_error with noise.
func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) {
reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks)
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
nextRunPtr = &nextRun
}
// Advance next_run_at + bump run_count so the liveness view reflects
// that we're still ticking. last_status='skipped', last_error carries
// the reason for operators debugging via the schedule history API.
_, _ = db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = $2,
run_count = run_count + 1,
last_status = 'skipped',
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, reason)
cronMeta, _ := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"skipped": true,
"active_tasks": activeTasks,
})
_, _ = db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason)
if s.broadcaster != nil {
_ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"reason": reason,
})
}
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s