forked from molecule-ai/molecule-core
fix(scheduler): prevent NULL next_run_at from permanently dropping schedules (#722)
Three bugs caused enabled schedules to silently disappear from the fire query
(which requires next_run_at IS NOT NULL AND next_run_at <= now()):
Bug 1 - fireSchedule() and recordSkipped(): when ComputeNextRun returned an
error, nextRunPtr stayed nil and UPDATE SET next_run_at = $2 wrote NULL.
Fix: change to COALESCE($2, next_run_at) so the existing DB value is preserved
when $2 is NULL, and log the error explicitly.
Bug 2 - org importer (handlers/org.go): nextRun, _ := ComputeNextRun(...)
silently discarded the error. A bad cron expression would pass time.Time{}
(zero value) to the INSERT. Fix: surface the error, log it, and skip the
schedule INSERT via continue.
Bug 3 - no startup repair: schedules already NULL'd by the pre-fix binary
would never recover. Fix: Start() now calls repairNullNextRunAt() once on
boot, recomputing next_run_at for every enabled schedule with a NULL value.
Tests: TestFireSchedule_ComputeNextRunError, TestRecordSkipped_ComputeNextRunError,
TestRepairNullNextRunAt_RepairsRows, TestRepairNullNextRunAt_DBError_NoPanic,
TestOrgImport_ScheduleComputeError (all pass).
Fixes #722
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c53bf6eebd
commit
b83ddc7dff
@ -643,7 +643,14 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa
|
||||
log.Printf("Org import: schedule '%s' on %s has empty prompt (neither prompt nor prompt_file set) — skipping insert", sched.Name, ws.Name)
|
||||
continue
|
||||
}
|
||||
nextRun, _ := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now())
|
||||
// #722: surface the error rather than silently using time.Time{} (zero)
|
||||
// which lib/pq stores as 0001-01-01 and may confuse the fire query.
|
||||
nextRun, nextRunErr := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now())
|
||||
if nextRunErr != nil {
|
||||
log.Printf("Org import: invalid cron expression for schedule '%s' on %s: %v — skipping insert",
|
||||
sched.Name, ws.Name, nextRunErr)
|
||||
continue
|
||||
}
|
||||
if _, err := db.DB.ExecContext(context.Background(), orgImportScheduleSQL,
|
||||
id, sched.Name, sched.CronExpr, tz, prompt, enabled, nextRun); err != nil {
|
||||
log.Printf("Org import: failed to upsert schedule '%s' for %s: %v", sched.Name, ws.Name, err)
|
||||
|
||||
@ -3,8 +3,11 @@ package handlers
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
|
||||
)
|
||||
|
||||
func TestOrgDefaults_InitialPrompt_YAMLParsing(t *testing.T) {
|
||||
@ -602,3 +605,48 @@ func TestPlugins_BackwardCompat(t *testing.T) {
|
||||
t.Fatalf("got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestOrgImport_ScheduleComputeError (#722 Bug 2) ───────────────────────────
|
||||
//
|
||||
// The org importer previously used `nextRun, _ := scheduler.ComputeNextRun(...)`,
|
||||
// discarding the error and passing time.Time{} (zero value) to the INSERT.
|
||||
// After fix #722 it surfaces the error and skips the INSERT via `continue`.
|
||||
//
|
||||
// This test verifies that the inputs an org.yaml schedule can supply (bad cron
|
||||
// expression, invalid timezone) DO cause ComputeNextRun to return a non-nil
|
||||
// error — confirming that the fix is meaningful and the skip path is reachable.
|
||||
|
||||
func TestOrgImport_ScheduleComputeError(t *testing.T) {
|
||||
now := time.Now()
|
||||
cases := []struct {
|
||||
name string
|
||||
cronExpr string
|
||||
tz string
|
||||
}{
|
||||
{
|
||||
name: "invalid cron expression",
|
||||
cronExpr: "not-a-cron-expr",
|
||||
tz: "UTC",
|
||||
},
|
||||
{
|
||||
name: "invalid timezone",
|
||||
cronExpr: "0 9 * * 1",
|
||||
tz: "Not/A/Valid/Timezone",
|
||||
},
|
||||
{
|
||||
name: "both invalid",
|
||||
cronExpr: "every monday",
|
||||
tz: "Moon/Far_Side",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, err := scheduler.ComputeNextRun(tc.cronExpr, tc.tz, now)
|
||||
if err == nil {
|
||||
t.Errorf("ComputeNextRun(%q, %q) returned nil error — "+
|
||||
"org importer would silently insert zero next_run_at; #722 fix requires non-nil",
|
||||
tc.cronExpr, tc.tz)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,6 +112,11 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// #722 — startup repair: find any enabled schedule whose next_run_at was
|
||||
// NULL'd by the pre-fix bug and recompute it now. Without this pass those
|
||||
// schedules would never fire again even after the binary is updated.
|
||||
s.repairNullNextRunAt(ctx)
|
||||
|
||||
// Heartbeat + initial lastTickAt so /admin/liveness and Healthy() both
|
||||
// pass during the first 30s interval after startup.
|
||||
supervised.Heartbeat("scheduler")
|
||||
@ -279,12 +284,19 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
} else {
|
||||
// #722: if ComputeNextRun fails, keep the existing next_run_at so the
|
||||
// schedule is not silently removed from the fire query (NULL next_run_at
|
||||
// is excluded by the tick WHERE clause). COALESCE($2, next_run_at) does
|
||||
// this: when $2 is NULL the DB column value is preserved as-is.
|
||||
log.Printf("Scheduler: ComputeNextRun error for '%s' (%s) — preserving existing next_run_at: %v",
|
||||
sched.Name, sched.ID, nextErr)
|
||||
}
|
||||
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
run_count = run_count + 1,
|
||||
last_status = $3,
|
||||
last_error = $4,
|
||||
@ -334,6 +346,11 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
} else {
|
||||
// #722: same guard as in fireSchedule — preserve existing next_run_at
|
||||
// rather than writing NULL when the cron expression cannot be parsed.
|
||||
log.Printf("Scheduler: ComputeNextRun error in recordSkipped for '%s' (%s) — preserving existing next_run_at: %v",
|
||||
sched.Name, sched.ID, nextErr)
|
||||
}
|
||||
|
||||
// Advance next_run_at + bump run_count so the liveness view reflects
|
||||
@ -342,7 +359,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
run_count = run_count + 1,
|
||||
last_status = 'skipped',
|
||||
last_error = $3,
|
||||
@ -371,6 +388,60 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
}
|
||||
}
|
||||
|
||||
// repairNullNextRunAt is called once during Start() to recompute next_run_at
|
||||
// for any enabled schedule where it is NULL — a state left by the pre-#722 bug
|
||||
// where a ComputeNextRun error caused an UPDATE that wrote NULL.
|
||||
// Without this repair those schedules would never appear in the tick query
|
||||
// (which requires next_run_at IS NOT NULL) even after the binary is patched.
|
||||
func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, cron_expr, timezone
|
||||
FROM workspace_schedules
|
||||
WHERE enabled = true AND next_run_at IS NULL
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("Scheduler: startup repair query error: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type repairRow struct {
|
||||
ID string
|
||||
CronExpr string
|
||||
Timezone string
|
||||
}
|
||||
|
||||
var repaired, failed int
|
||||
for rows.Next() {
|
||||
var r repairRow
|
||||
if err := rows.Scan(&r.ID, &r.CronExpr, &r.Timezone); err != nil {
|
||||
log.Printf("Scheduler: startup repair scan error: %v", err)
|
||||
continue
|
||||
}
|
||||
nextRun, err := ComputeNextRun(r.CronExpr, r.Timezone, time.Now())
|
||||
if err != nil {
|
||||
log.Printf("Scheduler: startup repair: cannot compute next_run_at for schedule %s (%s): %v — leaving NULL",
|
||||
r.ID, r.CronExpr, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules SET next_run_at = $2, updated_at = now() WHERE id = $1
|
||||
`, r.ID, nextRun); err != nil {
|
||||
log.Printf("Scheduler: startup repair: update failed for schedule %s: %v", r.ID, err)
|
||||
failed++
|
||||
} else {
|
||||
repaired++
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Scheduler: startup repair rows error: %v", err)
|
||||
}
|
||||
if repaired > 0 || failed > 0 {
|
||||
log.Printf("Scheduler: startup repair: %d schedule(s) repaired, %d skipped (bad cron/tz)", repaired, failed)
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
|
||||
@ -2,6 +2,7 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -10,6 +11,9 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// errDBDown is a sentinel error used by tests to simulate a DB connection failure.
|
||||
var errDBDown = sql.ErrConnDone
|
||||
|
||||
// setupTestDB replaces the global db.DB with a sqlmock and returns the mock
|
||||
// handle. The real DB is restored (by closing the mock conn) via t.Cleanup.
|
||||
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
@ -237,6 +241,142 @@ func TestRecordSkipped_writesSkippedStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── successProxy ─────────────────────────────────────────────────────────────
|
||||
|
||||
// successProxy is a test double whose ProxyA2ARequest always returns HTTP 200
|
||||
// with no error, simulating a healthy A2A round-trip.
|
||||
type successProxy struct{}
|
||||
|
||||
func (p *successProxy) ProxyA2ARequest(
|
||||
_ context.Context, _ string, _ []byte, _ string, _ bool,
|
||||
) (int, []byte, error) {
|
||||
return 200, []byte(`{"ok":true}`), nil
|
||||
}
|
||||
|
||||
// ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ─────────────────────────
|
||||
//
|
||||
// When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write
|
||||
// NULL to next_run_at — it must use COALESCE so the existing DB value is kept.
|
||||
// Proof: the UPDATE ExecContext must still be called (schedule not abandoned)
|
||||
// and sqlmock satisfies all expectations (no unexpected SQL).
|
||||
|
||||
func TestFireSchedule_ComputeNextRunError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "11111111-dead-beef-0000-000000000001",
|
||||
WorkspaceID: "22222222-dead-beef-0000-000000000002",
|
||||
Name: "bad-cron-job",
|
||||
CronExpr: "not-a-valid-cron", // guaranteed to fail ComputeNextRun
|
||||
Timezone: "UTC",
|
||||
Prompt: "do something",
|
||||
}
|
||||
|
||||
// active_tasks check → 0 (workspace is idle; proceed to fire)
|
||||
mock.ExpectQuery(`SELECT COALESCE`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
|
||||
|
||||
// UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil.
|
||||
// AnyArg for $2 because it will be nil (ComputeNextRun failed).
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), "ok", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// activity_logs INSERT always fires
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(&successProxy{}, nil)
|
||||
s.fireSchedule(context.Background(), sched)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations — schedule update was skipped or next_run_at not preserved: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_ComputeNextRunError (#722 Bug 1 — skipped path) ─────────
|
||||
//
|
||||
// Same invariant as TestFireSchedule_ComputeNextRunError but for the
|
||||
// recordSkipped path: a bad cron expression must not NULL out next_run_at.
|
||||
|
||||
func TestRecordSkipped_ComputeNextRunError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "33333333-dead-beef-0000-000000000003",
|
||||
WorkspaceID: "44444444-dead-beef-0000-000000000004",
|
||||
Name: "bad-cron-skip",
|
||||
CronExpr: "not-a-valid-cron",
|
||||
Timezone: "UTC",
|
||||
Prompt: "skipped task",
|
||||
}
|
||||
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.recordSkipped(context.Background(), sched, 2)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRepairNullNextRunAt_RepairsRows (#722 Bug 3) ──────────────────────────
|
||||
//
|
||||
// repairNullNextRunAt must SELECT enabled schedules with NULL next_run_at,
|
||||
// compute the next fire time, and UPDATE each row.
|
||||
|
||||
func TestRepairNullNextRunAt_RepairsRows(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// Two schedules whose next_run_at is NULL and whose cron exprs are valid.
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}).
|
||||
AddRow("sched-repair-01", "0 * * * *", "UTC").
|
||||
AddRow("sched-repair-02", "30 9 * * 1", "America/New_York"))
|
||||
|
||||
// Expect one UPDATE per repaired row.
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-repair-01", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-repair-02", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRepairNullNextRunAt_DBError_NoPanic (#722 Bug 3) ──────────────────────
|
||||
//
|
||||
// A DB error from the SELECT must be logged but must not panic — the scheduler
|
||||
// startup should proceed normally.
|
||||
|
||||
func TestRepairNullNextRunAt_DBError_NoPanic(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnError(errDBDown)
|
||||
|
||||
s := New(nil, nil)
|
||||
// Must not panic:
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_shortWorkspaceIDNoPanic ─────────────────────────────────
|
||||
// Guards against the short() regression: recordSkipped must not panic if
|
||||
// WorkspaceID is unexpectedly shorter than the 12-char prefix used in logs.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user