forked from molecule-ai/molecule-core
fix(scheduler): heartbeat at tick start + per-fire so liveness reflects work-in-progress
The first scheduler heartbeat (#95) only fired AFTER each tick completed. A tick that runs fireSchedule for 110+ seconds (long agent prompts) would make /admin/liveness report scheduler as stale even though it was actively working. Observed today: scheduler firing UIUX audit, last_tick_at lagged by 95s+ and incrementing. Three places now call Heartbeat: 1. Top of tick() — proves we're past the ticker.C wait 2. Inside each fire goroutine, before fireSchedule — ANY active fire keeps the heartbeat fresh 3. Inside each fire goroutine, after fireSchedule — captures the moment the per-fire work completes (The post-tick Heartbeat in Start() is still there as the "all idle" case.) Net result: /admin/liveness reports stale only if the scheduler genuinely isn't doing anything for >2× pollInterval, which is the actual signal we want.
This commit is contained in:
parent
76a36e8062
commit
80a6fa6db5
@ -80,7 +80,16 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
// tick queries all due schedules and fires each in a goroutine.
|
||||
// Waits for all goroutines to finish before returning so the next tick
|
||||
// doesn't re-fire schedules whose next_run_at hasn't been updated yet.
|
||||
//
|
||||
// Heartbeat is called at three points to keep /admin/liveness fresh during
|
||||
// long-running fires (some prompts take minutes — without these heartbeats
|
||||
// the scheduler looks "stale" the whole time it's working):
|
||||
// - immediately on entering tick (proves we're past the ticker.C wait)
|
||||
// - inside each per-fire goroutine (every fire bumps the heartbeat)
|
||||
// - implicitly via the post-tick heartbeat in Start()
|
||||
func (s *Scheduler) tick(ctx context.Context) {
|
||||
supervised.Heartbeat("scheduler")
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, workspace_id, name, cron_expr, timezone, prompt
|
||||
FROM workspace_schedules
|
||||
@ -107,7 +116,15 @@ func (s *Scheduler) tick(ctx context.Context) {
|
||||
go func(s2 scheduleRow) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Scheduler: PANIC firing '%s' on workspace %s — recovered: %v",
|
||||
s2.Name, s2.WorkspaceID, r)
|
||||
}
|
||||
}()
|
||||
supervised.Heartbeat("scheduler")
|
||||
s.fireSchedule(ctx, s2)
|
||||
supervised.Heartbeat("scheduler")
|
||||
}(sched)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user