forked from molecule-ai/molecule-core
fix(scheduler): #115 — skip cron fire when workspace is busy
Closes #115. The Security Auditor hourly cron (and likely others) hit a ~36% miss rate because the platform's A2A proxy rejected fires with "workspace agent busy — retry after a short backoff" while the agent was still executing the prior audit. That error was recorded as a hard failure and polluted last_error. New behaviour: Before fireSchedule calls into the A2A proxy, it reads workspaces.active_tasks for the target. If >0, it: - Advances next_run_at to the next cron slot (cron keeps ticking) - Bumps run_count - Sets last_status='skipped' + last_error=<reason> - Inserts a cron_run activity_logs row with status='skipped' + error_detail - Broadcasts CRON_SKIPPED for canvas + operators Effect: busy-collision ceases to be an error. The history surface now distinguishes "ran and failed" from "skipped because busy". Operators can tell the difference at a glance, and the liveness view doesn't stall waiting for the next ticker cycle. Pairs with #149 (dedicated heartbeat pulse) and #152 problem B (error_detail surfaced in history) for a coherent scheduler story. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
187e8c30ed
commit
fb942fbb0c
@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
}
|
||||
}()
|
||||
|
||||
// #115 concurrency-aware skip — before firing check if the target
|
||||
// workspace is already executing a task. If so, skip this tick instead
|
||||
// of colliding (which used to surface as "workspace agent busy" errors
|
||||
// and register as a hard fail). advance next_run_at so the next cron
|
||||
// slot gets a fresh chance; log a skipped cron_run row so history shows
|
||||
// the gap instead of a silent miss. COALESCE guards against NULL.
|
||||
var activeTasks int
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks); err == nil && activeTasks > 0 {
|
||||
wsID := sched.WorkspaceID
|
||||
if len(wsID) > 12 {
|
||||
wsID = wsID[:12]
|
||||
}
|
||||
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
|
||||
sched.Name, wsID, activeTasks)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
}
|
||||
|
||||
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
|
||||
defer cancel()
|
||||
|
||||
@ -308,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
}
|
||||
}
|
||||
|
||||
// recordSkipped advances next_run_at and logs a cron_run activity entry
|
||||
// with status='skipped' when the target workspace was already busy.
|
||||
// Issue #115 — replaces the previous "busy → fire → fail → retry next
|
||||
// tick" loop with "busy → skip → advance → try next slot". Keeps the
|
||||
// history surface honest (a skip is not an error) and stops filling
|
||||
// last_error with noise.
|
||||
func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) {
|
||||
reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks)
|
||||
|
||||
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
}
|
||||
|
||||
// Advance next_run_at + bump run_count so the liveness view reflects
|
||||
// that we're still ticking. last_status='skipped', last_error carries
|
||||
// the reason for operators debugging via the schedule history API.
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
run_count = run_count + 1,
|
||||
last_status = 'skipped',
|
||||
last_error = $3,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
`, sched.ID, nextRunPtr, reason)
|
||||
|
||||
cronMeta, _ := json.Marshal(map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"cron_expr": sched.CronExpr,
|
||||
"skipped": true,
|
||||
"active_tasks": activeTasks,
|
||||
})
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
|
||||
`, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason)
|
||||
|
||||
if s.broadcaster != nil {
|
||||
_ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"reason": reason,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
|
||||
Loading…
Reference in New Issue
Block a user