Merge pull request #1032 from Molecule-AI/fix/scheduler-advance-next-run-1029

fix(scheduler): advance next_run_at on panic to prevent stuck schedules (#1029)
This commit is contained in:
molecule-ai[bot] 2026-04-21 00:59:32 +00:00 committed by GitHub
commit a5a495c804
3 changed files with 65 additions and 33 deletions

View File

@ -807,9 +807,11 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
pq.Array(allIDs)); err != nil {
log.Printf("Delete token revocation error for %s: %v", id, err)
}
// Disable schedules for removed workspaces (#1027)
// #1027: cascade-disable all schedules for the deleted workspaces so
// the scheduler never fires a cron into a removed container.
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspace_schedules SET enabled = false WHERE workspace_id = ANY($1::uuid[])`,
`UPDATE workspace_schedules SET enabled = false, updated_at = now()
WHERE workspace_id = ANY($1::uuid[]) AND enabled = true`,
pq.Array(allIDs)); err != nil {
log.Printf("Delete schedule disable error for %s: %v", id, err)
}

View File

@ -242,7 +242,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: panic recovered in fireSchedule for '%s' (%s): %v",
sched.Name, sched.ID, r)
sched.Name, sched.ID, r)
// Always advance next_run_at even on panic so the schedule doesn't get
// stuck re-firing the same panicking schedule indefinitely (#1029).
if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil {

View File

@ -276,6 +276,12 @@ func TestFireSchedule_ComputeNextRunError(t *testing.T) {
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// #795 consecutive_empty_runs reset — successProxy returns {"ok":true}
// which is non-empty, so the counter is reset to 0.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil.
// AnyArg for $2 because it will be nil (ComputeNextRun failed).
mock.ExpectExec(`UPDATE workspace_schedules`).
@ -453,14 +459,19 @@ func TestRecordSkipped_shortWorkspaceIDNoPanic(t *testing.T) {
// ── Panic-recovery next_run_at advancement (#1029) ──────────────────────────
//
// Issue #1029: when fireSchedule panics, the deferred recover must advance
// next_run_at to the next cron window. Without this fix the schedule's
// next_run_at stays in the past and fires on every 30-second tick.
// Issue #1029: when fireSchedule panics (e.g. a nil-pointer in the A2A proxy
// or a bad JSON marshal), the deferred recover must advance next_run_at to the
// next cron window. Without this fix the schedule's next_run_at stays in the
// past and fires on every 30-second tick — a tight retry loop that amplifies
// the original failure.
const testCronPanic = "0 * * * *"
// TestPanicRecovery_AdvancesNextRunAt verifies that recover issues an UPDATE
// to advance next_run_at when the proxy panics. panic -> recover -> advance.
// TestPanicRecovery_AdvancesNextRunAt verifies that the recover block in
// fireSchedule issues an UPDATE to advance next_run_at when the proxy panics.
//
// This is the core invariant of the #1029 fix: panic → recover → advance.
// The test calls fireSchedule directly (not via tick) so the sqlmock
// expectations are precise — we know exactly which queries fire and in what
// order.
func TestPanicRecovery_AdvancesNextRunAt(t *testing.T) {
mock := setupTestDB(t)
@ -468,33 +479,42 @@ func TestPanicRecovery_AdvancesNextRunAt(t *testing.T) {
ID: "aaa11111-1111-1111-1111-111111111111",
WorkspaceID: "bbb22222-2222-2222-2222-222222222222",
Name: "panic-advance-test",
CronExpr: testCronPanic,
CronExpr: "0 * * * *", // every hour — valid expr so ComputeNextRun succeeds
Timezone: "UTC",
Prompt: "trigger panic",
}
// fireSchedule checks active_tasks first.
// 1. fireSchedule first checks active_tasks on the workspace.
// Return 0 so the fire proceeds (not skipped).
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// panicProxy causes ProxyA2ARequest to panic.
// Deferred recover calls ComputeNextRun, then ExecContext for next_run_at.
// 2. ProxyA2ARequest panics (panicProxy).
// The deferred recover catches it and calls:
// ComputeNextRun(cronExpr, tz, time.Now())
// db.DB.ExecContext(ctx, `UPDATE workspace_schedules SET next_run_at = $1 ... WHERE id = $2`, nextTime, sched.ID)
//
// We expect this UPDATE with the schedule ID as arg 2.
mock.ExpectExec(`UPDATE workspace_schedules SET next_run_at`).
WithArgs(sqlmock.AnyArg(), sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&panicProxy{}, nil)
// fireSchedule must not propagate the panic — the recover catches it.
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations: %v
"+
"Panic-recovery defer must advance next_run_at via UPDATE (#1029)", err)
t.Errorf("unmet DB expectations: %v\n"+
"The panic-recovery defer must advance next_run_at via UPDATE (#1029)", err)
}
}
// TestFireSchedule_NormalSuccess_AdvancesNextRunAt regression guard: normal
// fireSchedule completion must still advance next_run_at via the post-fire UPDATE.
// TestFireSchedule_NormalSuccess_AdvancesNextRunAt is a regression guard for
// the happy path: when fireSchedule completes without error, next_run_at must
// be advanced as part of the normal UPDATE (not via the panic path).
//
// This ensures the #1029 panic-recovery change didn't accidentally break the
// normal flow where both the proxy call and the post-fire UPDATE succeed.
func TestFireSchedule_NormalSuccess_AdvancesNextRunAt(t *testing.T) {
mock := setupTestDB(t)
@ -502,26 +522,28 @@ func TestFireSchedule_NormalSuccess_AdvancesNextRunAt(t *testing.T) {
ID: "ccc33333-3333-3333-3333-333333333333",
WorkspaceID: "ddd44444-4444-4444-4444-444444444444",
Name: "normal-advance-test",
CronExpr: "30 * * * *",
CronExpr: "30 * * * *", // every hour at :30
Timezone: "UTC",
Prompt: "do work",
}
// active_tasks check -> workspace idle
// 1. active_tasks check → workspace idle
mock.ExpectQuery(`SELECT COALESCE`).
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
// #795 consecutive_empty_runs reset
// 2. #795 consecutive_empty_runs reset — successProxy returns {"ok":true}
// which is non-empty, so the counter is reset to 0.
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID).
WillReturnResult(sqlmock.NewResult(0, 1))
// Normal post-fire UPDATE
// 3. Normal UPDATE after successful proxy call.
// Args: $1=sched.ID, $2=nextRunPtr (computed time), $3=lastStatus, $4=lastError
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), "ok", "").
WillReturnResult(sqlmock.NewResult(0, 1))
// activity_logs INSERT
// 4. activity_logs INSERT
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", "").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -530,14 +552,21 @@ func TestFireSchedule_NormalSuccess_AdvancesNextRunAt(t *testing.T) {
s.fireSchedule(context.Background(), sched)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations: %v
"+
t.Errorf("unmet DB expectations: %v\n"+
"Normal fire must still advance next_run_at via the post-fire UPDATE", err)
}
}
// TestRecordSkipped_AdvancesNextRunAt verifies recordSkipped advances
// next_run_at so the schedule doesn't re-fire on the very next tick.
// TestRecordSkipped_AdvancesNextRunAt verifies that when a workspace is busy
// and the cron fire is skipped, recordSkipped advances next_run_at so the
// schedule doesn't re-fire on the very next tick.
//
// This is the third leg of the #1029 invariant: fire, panic, AND skip must
// all advance next_run_at.
//
// We call recordSkipped directly rather than going through fireSchedule
// because #969 added a deferral loop (poll every 10s for up to 2 min) that
// makes end-to-end testing via fireSchedule impractical with sqlmock.
func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) {
mock := setupTestDB(t)
@ -545,27 +574,28 @@ func TestRecordSkipped_AdvancesNextRunAt(t *testing.T) {
ID: "eee55555-5555-5555-5555-555555555555",
WorkspaceID: "fff66666-6666-6666-6666-666666666666",
Name: "skipped-advance-test",
CronExpr: "15 * * * *",
CronExpr: "15 * * * *", // every hour at :15
Timezone: "UTC",
Prompt: "skipped work",
}
// recordSkipped UPDATE
// 1. recordSkipped UPDATE — must include next_run_at ($2) and reason ($3).
mock.ExpectExec(`UPDATE workspace_schedules`).
WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// activity_logs INSERT
// 2. activity_logs INSERT for the skip event
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
s := New(&successProxy{}, nil)
// Call recordSkipped directly — simulates the skip path when workspace is busy.
s.recordSkipped(context.Background(), sched, 2)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet DB expectations: %v
"+
t.Errorf("unmet DB expectations: %v\n"+
"recordSkipped must advance next_run_at when workspace is busy (#1029)", err)
}
}
// trigger CI