From 7dc9d837921c30191ead294a488cbc0a75b58ea9 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Tue, 14 Apr 2026 19:32:01 -0700 Subject: [PATCH] fix(scheduler): recover from panics + add liveness watchdog (#85) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduler died silently on 2026-04-14 14:21 UTC and stayed dead for 12+ hours. Platform restart didn't recover it. Root cause: tick() and fireSchedule() goroutines have no panic recovery. A single bad row, bad cron expression, DB blip, or transient panic anywhere in the chain permanently kills the scheduler goroutine — and the only signal to an operator is "no crons firing", which is invisible if you're not watching. Specifically: func (s *Scheduler) Start(ctx context.Context) { for { select { case <-ticker.C: s.tick(ctx) // <- if this panics, the for-loop exits forever } } } And inside tick: go func(s2 scheduleRow) { defer wg.Done() defer func() { <-sem }() s.fireSchedule(ctx, s2) // <- panic here propagates up wg.Wait() }(sched) Two `defer recover()` additions: 1. In Start's tick wrapper — a panic in tick() (DB scan, cron parse, row processing) is logged and the next tick fires normally. 2. In each fireSchedule goroutine — a single bad workspace can't take the rest of the batch down. Plus a liveness watchdog: - Scheduler now records `lastTickAt` after each successful tick. - New methods `LastTickAt()` and `Healthy()` (true if last tick within 2× pollInterval = 60s). - Initialised at Start so Healthy() returns true on a fresh process. Endpoint plumbing for /admin/scheduler/health is a follow-up — needs threading the scheduler instance through router.Setup(). Documented on #85. Closes the silent-outage failure mode of #85. The other proposed fixes (force-kill on /restart hang, active_tasks watchdog) are separate concerns tracked in #85's comments. --- platform/internal/scheduler/scheduler.go | 59 +++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 235eed4d..6be3015a 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -47,26 +47,77 @@ type scheduleRow struct { type Scheduler struct { proxy A2AProxy broadcaster Broadcaster + + // lastTickAt records the wall-clock time of the most recent tick + // (whether it fired schedules or not). Read by Healthy() and the + // /admin/scheduler/health endpoint to detect stuck-tick conditions. + // Atomic-ish via the mutex; tick rate is 30s so contention is trivial. + mu sync.RWMutex + lastTickAt time.Time } func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler { return &Scheduler{proxy: proxy, broadcaster: broadcaster} } +// LastTickAt returns the wall-clock time of the most recent successful tick. +// Returns the zero Time if Start() has never been called or no tick has +// completed since process start. +func (s *Scheduler) LastTickAt() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return s.lastTickAt +} + +// Healthy returns true if a tick completed within the last 2× pollInterval +// (i.e. at most 1 missed tick is tolerated). Use from /health and from +// /admin/scheduler/health to surface scheduler liveness. +func (s *Scheduler) Healthy() bool { + last := s.LastTickAt() + if last.IsZero() { + return false + } + return time.Since(last) < 2*pollInterval +} + // Start runs the scheduler poll loop. Blocks until ctx is cancelled. +// +// Defends against panics inside tick() so a single bad row / bad cron +// expression / DB blip can't permanently kill the scheduler. Without +// this recover the goroutine dies and the only signal to the operator +// is "no crons firing" — which we observed as a 12+ hour silent outage +// on 2026-04-14 (issue #85). func (s *Scheduler) Start(ctx context.Context) { ticker := time.NewTicker(pollInterval) defer ticker.Stop() log.Printf("Scheduler: started (poll interval=%s)", pollInterval) + tickWithRecover := func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Scheduler: PANIC in tick — recovered: %v (next tick in %s)", r, pollInterval) + } + }() + s.tick(ctx) + s.mu.Lock() + s.lastTickAt = time.Now() + s.mu.Unlock() + } + + // Mark a tick immediately on startup so Healthy() returns true before + // the first ticker fires (avoids spurious unhealthy on fresh start). + s.mu.Lock() + s.lastTickAt = time.Now() + s.mu.Unlock() + for { select { case <-ctx.Done(): log.Println("Scheduler: stopped") return case <-ticker.C: - s.tick(ctx) + tickWithRecover() } } } @@ -101,6 +152,12 @@ func (s *Scheduler) tick(ctx context.Context) { go func(s2 scheduleRow) { defer wg.Done() defer func() { <-sem }() + defer func() { + if r := recover(); r != nil { + log.Printf("Scheduler: PANIC firing '%s' on workspace %s — recovered: %v", + s2.Name, s2.WorkspaceID, r) + } + }() s.fireSchedule(ctx, s2) }(sched) }