From fb942fbb0c62c49d270f9555cd2569c097bdedfb Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Wed, 15 Apr 2026 11:13:15 -0700 Subject: [PATCH] =?UTF-8?q?fix(scheduler):=20#115=20=E2=80=94=20skip=20cro?= =?UTF-8?q?n=20fire=20when=20workspace=20is=20busy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #115. The Security Auditor hourly cron (and likely others) hit a ~36% miss rate because the platform's A2A proxy rejected fires with "workspace agent busy — retry after a short backoff" while the agent was still executing the prior audit. That error was recorded as a hard failure and polluted last_error. New behaviour: Before fireSchedule calls into the A2A proxy, it reads workspaces.active_tasks for the target. If >0, it: - Advances next_run_at to the next cron slot (cron keeps ticking) - Bumps run_count - Sets last_status='skipped' + last_error= - Inserts a cron_run activity_logs row with status='skipped' + error_detail - Broadcasts CRON_SKIPPED for canvas + operators Effect: busy-collision ceases to be an error. The history surface now distinguishes "ran and failed" from "skipped because busy". Operators can tell the difference at a glance, and the liveness view doesn't stall waiting for the next ticker cycle. Pairs with #149 (dedicated heartbeat pulse) and #152 problem B (error_detail surfaced in history) for a coherent scheduler story. Co-Authored-By: Claude Opus 4.6 (1M context) --- platform/internal/scheduler/scheduler.go | 71 ++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index cfb596eb..43285f47 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } }() + // #115 concurrency-aware skip — before firing check if the target + // workspace is already executing a task. If so, skip this tick instead + // of colliding (which used to surface as "workspace agent busy" errors + // and register as a hard fail). advance next_run_at so the next cron + // slot gets a fresh chance; log a skipped cron_run row so history shows + // the gap instead of a silent miss. COALESCE guards against NULL. + var activeTasks int + if err := db.DB.QueryRowContext(ctx, + `SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`, + sched.WorkspaceID, + ).Scan(&activeTasks); err == nil && activeTasks > 0 { + wsID := sched.WorkspaceID + if len(wsID) > 12 { + wsID = wsID[:12] + } + log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)", + sched.Name, wsID, activeTasks) + s.recordSkipped(ctx, sched, activeTasks) + return + } + fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() @@ -308,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } } +// recordSkipped advances next_run_at and logs a cron_run activity entry +// with status='skipped' when the target workspace was already busy. +// Issue #115 — replaces the previous "busy → fire → fail → retry next +// tick" loop with "busy → skip → advance → try next slot". Keeps the +// history surface honest (a skip is not an error) and stops filling +// last_error with noise. +func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) { + reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks) + + nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) + var nextRunPtr *time.Time + if nextErr == nil { + nextRunPtr = &nextRun + } + + // Advance next_run_at + bump run_count so the liveness view reflects + // that we're still ticking. last_status='skipped', last_error carries + // the reason for operators debugging via the schedule history API. + _, _ = db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET last_run_at = now(), + next_run_at = $2, + run_count = run_count + 1, + last_status = 'skipped', + last_error = $3, + updated_at = now() + WHERE id = $1 + `, sched.ID, nextRunPtr, reason) + + cronMeta, _ := json.Marshal(map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "cron_expr": sched.CronExpr, + "skipped": true, + "active_tasks": activeTasks, + }) + _, _ = db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now()) + `, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason) + + if s.broadcaster != nil { + _ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "reason": reason, + }) + } +} + func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s