Merge pull request #90 from Molecule-AI/fix/scheduler-watchdog-recover

fix(scheduler): recover from panics + add liveness watchdog (#85)
This commit is contained in:
Hongming Wang 2026-04-14 20:30:31 -07:00 committed by GitHub
commit c02bfb4257
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -47,26 +47,77 @@ type scheduleRow struct {
type Scheduler struct {
proxy A2AProxy
broadcaster Broadcaster
// lastTickAt records the wall-clock time of the most recent tick
// (whether it fired schedules or not). Read by Healthy() and the
// /admin/scheduler/health endpoint to detect stuck-tick conditions.
// Atomic-ish via the mutex; tick rate is 30s so contention is trivial.
mu sync.RWMutex
lastTickAt time.Time
}
func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler {
return &Scheduler{proxy: proxy, broadcaster: broadcaster}
}
// LastTickAt returns the wall-clock time of the most recent successful tick.
// Returns the zero Time if Start() has never been called or no tick has
// completed since process start.
func (s *Scheduler) LastTickAt() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastTickAt
}
// Healthy returns true if a tick completed within the last 2× pollInterval
// (i.e. at most 1 missed tick is tolerated). Use from /health and from
// /admin/scheduler/health to surface scheduler liveness.
func (s *Scheduler) Healthy() bool {
last := s.LastTickAt()
if last.IsZero() {
return false
}
return time.Since(last) < 2*pollInterval
}
// Start runs the scheduler poll loop. Blocks until ctx is cancelled.
//
// Defends against panics inside tick() so a single bad row / bad cron
// expression / DB blip can't permanently kill the scheduler. Without
// this recover the goroutine dies and the only signal to the operator
// is "no crons firing" — which we observed as a 12+ hour silent outage
// on 2026-04-14 (issue #85).
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
log.Printf("Scheduler: started (poll interval=%s)", pollInterval)
tickWithRecover := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: PANIC in tick — recovered: %v (next tick in %s)", r, pollInterval)
}
}()
s.tick(ctx)
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
}
// Mark a tick immediately on startup so Healthy() returns true before
// the first ticker fires (avoids spurious unhealthy on fresh start).
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
for {
select {
case <-ctx.Done():
log.Println("Scheduler: stopped")
return
case <-ticker.C:
s.tick(ctx)
tickWithRecover()
}
}
}
@ -101,6 +152,12 @@ func (s *Scheduler) tick(ctx context.Context) {
go func(s2 scheduleRow) {
defer wg.Done()
defer func() { <-sem }()
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: PANIC firing '%s' on workspace %s — recovered: %v",
s2.Name, s2.WorkspaceID, r)
}
}()
s.fireSchedule(ctx, s2)
}(sched)
}