diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 235eed4d..6be3015a 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -47,26 +47,77 @@ type scheduleRow struct { type Scheduler struct { proxy A2AProxy broadcaster Broadcaster + + // lastTickAt records the wall-clock time of the most recent tick + // (whether it fired schedules or not). Read by Healthy() and the + // /admin/scheduler/health endpoint to detect stuck-tick conditions. + // Atomic-ish via the mutex; tick rate is 30s so contention is trivial. + mu sync.RWMutex + lastTickAt time.Time } func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler { return &Scheduler{proxy: proxy, broadcaster: broadcaster} } +// LastTickAt returns the wall-clock time of the most recent successful tick. +// Returns the zero Time if Start() has never been called or no tick has +// completed since process start. +func (s *Scheduler) LastTickAt() time.Time { + s.mu.RLock() + defer s.mu.RUnlock() + return s.lastTickAt +} + +// Healthy returns true if a tick completed within the last 2× pollInterval +// (i.e. at most 1 missed tick is tolerated). Use from /health and from +// /admin/scheduler/health to surface scheduler liveness. +func (s *Scheduler) Healthy() bool { + last := s.LastTickAt() + if last.IsZero() { + return false + } + return time.Since(last) < 2*pollInterval +} + // Start runs the scheduler poll loop. Blocks until ctx is cancelled. +// +// Defends against panics inside tick() so a single bad row / bad cron +// expression / DB blip can't permanently kill the scheduler. Without +// this recover the goroutine dies and the only signal to the operator +// is "no crons firing" — which we observed as a 12+ hour silent outage +// on 2026-04-14 (issue #85). func (s *Scheduler) Start(ctx context.Context) { ticker := time.NewTicker(pollInterval) defer ticker.Stop() log.Printf("Scheduler: started (poll interval=%s)", pollInterval) + tickWithRecover := func() { + defer func() { + if r := recover(); r != nil { + log.Printf("Scheduler: PANIC in tick — recovered: %v (next tick in %s)", r, pollInterval) + } + }() + s.tick(ctx) + s.mu.Lock() + s.lastTickAt = time.Now() + s.mu.Unlock() + } + + // Mark a tick immediately on startup so Healthy() returns true before + // the first ticker fires (avoids spurious unhealthy on fresh start). + s.mu.Lock() + s.lastTickAt = time.Now() + s.mu.Unlock() + for { select { case <-ctx.Done(): log.Println("Scheduler: stopped") return case <-ticker.C: - s.tick(ctx) + tickWithRecover() } } } @@ -101,6 +152,12 @@ func (s *Scheduler) tick(ctx context.Context) { go func(s2 scheduleRow) { defer wg.Done() defer func() { <-sem }() + defer func() { + if r := recover(); r != nil { + log.Printf("Scheduler: PANIC firing '%s' on workspace %s — recovered: %v", + s2.Name, s2.WorkspaceID, r) + } + }() s.fireSchedule(ctx, s2) }(sched) }