diff --git a/platform/internal/handlers/org.go b/platform/internal/handlers/org.go index 583565b5..d74ab01f 100644 --- a/platform/internal/handlers/org.go +++ b/platform/internal/handlers/org.go @@ -643,7 +643,14 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa log.Printf("Org import: schedule '%s' on %s has empty prompt (neither prompt nor prompt_file set) — skipping insert", sched.Name, ws.Name) continue } - nextRun, _ := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now()) + // #722: surface the error rather than silently using time.Time{} (zero) + // which lib/pq stores as 0001-01-01 and may confuse the fire query. + nextRun, nextRunErr := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now()) + if nextRunErr != nil { + log.Printf("Org import: invalid cron expression for schedule '%s' on %s: %v — skipping insert", + sched.Name, ws.Name, nextRunErr) + continue + } if _, err := db.DB.ExecContext(context.Background(), orgImportScheduleSQL, id, sched.Name, sched.CronExpr, tz, prompt, enabled, nextRun); err != nil { log.Printf("Org import: failed to upsert schedule '%s' for %s: %v", sched.Name, ws.Name, err) diff --git a/platform/internal/handlers/org_test.go b/platform/internal/handlers/org_test.go index a1e133a7..556f1611 100644 --- a/platform/internal/handlers/org_test.go +++ b/platform/internal/handlers/org_test.go @@ -3,8 +3,11 @@ package handlers import ( "strings" "testing" + "time" "gopkg.in/yaml.v3" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" ) func TestOrgDefaults_InitialPrompt_YAMLParsing(t *testing.T) { @@ -602,3 +605,48 @@ func TestPlugins_BackwardCompat(t *testing.T) { t.Fatalf("got %v, want %v", got, want) } } + +// ── TestOrgImport_ScheduleComputeError (#722 Bug 2) ─────────────────────────── +// +// The org importer previously used `nextRun, _ := scheduler.ComputeNextRun(...)`, +// discarding the error and passing time.Time{} (zero value) to the INSERT. +// After fix #722 it surfaces the error and skips the INSERT via `continue`. +// +// This test verifies that the inputs an org.yaml schedule can supply (bad cron +// expression, invalid timezone) DO cause ComputeNextRun to return a non-nil +// error — confirming that the fix is meaningful and the skip path is reachable. + +func TestOrgImport_ScheduleComputeError(t *testing.T) { + now := time.Now() + cases := []struct { + name string + cronExpr string + tz string + }{ + { + name: "invalid cron expression", + cronExpr: "not-a-cron-expr", + tz: "UTC", + }, + { + name: "invalid timezone", + cronExpr: "0 9 * * 1", + tz: "Not/A/Valid/Timezone", + }, + { + name: "both invalid", + cronExpr: "every monday", + tz: "Moon/Far_Side", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := scheduler.ComputeNextRun(tc.cronExpr, tc.tz, now) + if err == nil { + t.Errorf("ComputeNextRun(%q, %q) returned nil error — "+ + "org importer would silently insert zero next_run_at; #722 fix requires non-nil", + tc.cronExpr, tc.tz) + } + }) + } +} diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index 8839fe0e..ee9c0cc5 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -112,6 +112,11 @@ func (s *Scheduler) Start(ctx context.Context) { s.mu.Unlock() } + // #722 — startup repair: find any enabled schedule whose next_run_at was + // NULL'd by the pre-fix bug and recompute it now. Without this pass those + // schedules would never fire again even after the binary is updated. + s.repairNullNextRunAt(ctx) + // Heartbeat + initial lastTickAt so /admin/liveness and Healthy() both // pass during the first 30s interval after startup. supervised.Heartbeat("scheduler") @@ -279,12 +284,19 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { var nextRunPtr *time.Time if nextErr == nil { nextRunPtr = &nextRun + } else { + // #722: if ComputeNextRun fails, keep the existing next_run_at so the + // schedule is not silently removed from the fire query (NULL next_run_at + // is excluded by the tick WHERE clause). COALESCE($2, next_run_at) does + // this: when $2 is NULL the DB column value is preserved as-is. + log.Printf("Scheduler: ComputeNextRun error for '%s' (%s) — preserving existing next_run_at: %v", + sched.Name, sched.ID, nextErr) } _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_schedules SET last_run_at = now(), - next_run_at = $2, + next_run_at = COALESCE($2, next_run_at), run_count = run_count + 1, last_status = $3, last_error = $4, @@ -334,6 +346,11 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active var nextRunPtr *time.Time if nextErr == nil { nextRunPtr = &nextRun + } else { + // #722: same guard as in fireSchedule — preserve existing next_run_at + // rather than writing NULL when the cron expression cannot be parsed. + log.Printf("Scheduler: ComputeNextRun error in recordSkipped for '%s' (%s) — preserving existing next_run_at: %v", + sched.Name, sched.ID, nextErr) } // Advance next_run_at + bump run_count so the liveness view reflects @@ -342,7 +359,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active _, _ = db.DB.ExecContext(ctx, ` UPDATE workspace_schedules SET last_run_at = now(), - next_run_at = $2, + next_run_at = COALESCE($2, next_run_at), run_count = run_count + 1, last_status = 'skipped', last_error = $3, @@ -371,6 +388,60 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active } } +// repairNullNextRunAt is called once during Start() to recompute next_run_at +// for any enabled schedule where it is NULL — a state left by the pre-#722 bug +// where a ComputeNextRun error caused an UPDATE that wrote NULL. +// Without this repair those schedules would never appear in the tick query +// (which requires next_run_at IS NOT NULL) even after the binary is patched. +func (s *Scheduler) repairNullNextRunAt(ctx context.Context) { + rows, err := db.DB.QueryContext(ctx, ` + SELECT id, cron_expr, timezone + FROM workspace_schedules + WHERE enabled = true AND next_run_at IS NULL + `) + if err != nil { + log.Printf("Scheduler: startup repair query error: %v", err) + return + } + defer rows.Close() + + type repairRow struct { + ID string + CronExpr string + Timezone string + } + + var repaired, failed int + for rows.Next() { + var r repairRow + if err := rows.Scan(&r.ID, &r.CronExpr, &r.Timezone); err != nil { + log.Printf("Scheduler: startup repair scan error: %v", err) + continue + } + nextRun, err := ComputeNextRun(r.CronExpr, r.Timezone, time.Now()) + if err != nil { + log.Printf("Scheduler: startup repair: cannot compute next_run_at for schedule %s (%s): %v — leaving NULL", + r.ID, r.CronExpr, err) + failed++ + continue + } + if _, err := db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules SET next_run_at = $2, updated_at = now() WHERE id = $1 + `, r.ID, nextRun); err != nil { + log.Printf("Scheduler: startup repair: update failed for schedule %s: %v", r.ID, err) + failed++ + } else { + repaired++ + } + } + if err := rows.Err(); err != nil { + log.Printf("Scheduler: startup repair rows error: %v", err) + } + if repaired > 0 || failed > 0 { + log.Printf("Scheduler: startup repair: %d schedule(s) repaired, %d skipped (bad cron/tz)", repaired, failed) + } +} + func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s diff --git a/platform/internal/scheduler/scheduler_test.go b/platform/internal/scheduler/scheduler_test.go index b3e58e9a..c7fe9ed2 100644 --- a/platform/internal/scheduler/scheduler_test.go +++ b/platform/internal/scheduler/scheduler_test.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "database/sql" "testing" "time" @@ -10,6 +11,9 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) +// errDBDown is a sentinel error used by tests to simulate a DB connection failure. +var errDBDown = sql.ErrConnDone + // setupTestDB replaces the global db.DB with a sqlmock and returns the mock // handle. The real DB is restored (by closing the mock conn) via t.Cleanup. func setupTestDB(t *testing.T) sqlmock.Sqlmock { @@ -237,6 +241,142 @@ func TestRecordSkipped_writesSkippedStatus(t *testing.T) { } } +// ── successProxy ───────────────────────────────────────────────────────────── + +// successProxy is a test double whose ProxyA2ARequest always returns HTTP 200 +// with no error, simulating a healthy A2A round-trip. +type successProxy struct{} + +func (p *successProxy) ProxyA2ARequest( + _ context.Context, _ string, _ []byte, _ string, _ bool, +) (int, []byte, error) { + return 200, []byte(`{"ok":true}`), nil +} + +// ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ───────────────────────── +// +// When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write +// NULL to next_run_at — it must use COALESCE so the existing DB value is kept. +// Proof: the UPDATE ExecContext must still be called (schedule not abandoned) +// and sqlmock satisfies all expectations (no unexpected SQL). + +func TestFireSchedule_ComputeNextRunError(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "11111111-dead-beef-0000-000000000001", + WorkspaceID: "22222222-dead-beef-0000-000000000002", + Name: "bad-cron-job", + CronExpr: "not-a-valid-cron", // guaranteed to fail ComputeNextRun + Timezone: "UTC", + Prompt: "do something", + } + + // active_tasks check → 0 (workspace is idle; proceed to fire) + mock.ExpectQuery(`SELECT COALESCE`). + WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0)) + + // UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil. + // AnyArg for $2 because it will be nil (ComputeNextRun failed). + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), "ok", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // activity_logs INSERT always fires + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(&successProxy{}, nil) + s.fireSchedule(context.Background(), sched) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations — schedule update was skipped or next_run_at not preserved: %v", err) + } +} + +// ── TestRecordSkipped_ComputeNextRunError (#722 Bug 1 — skipped path) ───────── +// +// Same invariant as TestFireSchedule_ComputeNextRunError but for the +// recordSkipped path: a bad cron expression must not NULL out next_run_at. + +func TestRecordSkipped_ComputeNextRunError(t *testing.T) { + mock := setupTestDB(t) + + sched := scheduleRow{ + ID: "33333333-dead-beef-0000-000000000003", + WorkspaceID: "44444444-dead-beef-0000-000000000004", + Name: "bad-cron-skip", + CronExpr: "not-a-valid-cron", + Timezone: "UTC", + Prompt: "skipped task", + } + + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(nil, nil) + s.recordSkipped(context.Background(), sched, 2) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// ── TestRepairNullNextRunAt_RepairsRows (#722 Bug 3) ────────────────────────── +// +// repairNullNextRunAt must SELECT enabled schedules with NULL next_run_at, +// compute the next fire time, and UPDATE each row. + +func TestRepairNullNextRunAt_RepairsRows(t *testing.T) { + mock := setupTestDB(t) + + // Two schedules whose next_run_at is NULL and whose cron exprs are valid. + mock.ExpectQuery(`SELECT id, cron_expr, timezone`). + WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}). + AddRow("sched-repair-01", "0 * * * *", "UTC"). + AddRow("sched-repair-02", "30 9 * * 1", "America/New_York")) + + // Expect one UPDATE per repaired row. + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs("sched-repair-01", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs("sched-repair-02", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(nil, nil) + s.repairNullNextRunAt(context.Background()) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// ── TestRepairNullNextRunAt_DBError_NoPanic (#722 Bug 3) ────────────────────── +// +// A DB error from the SELECT must be logged but must not panic — the scheduler +// startup should proceed normally. + +func TestRepairNullNextRunAt_DBError_NoPanic(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectQuery(`SELECT id, cron_expr, timezone`). + WillReturnError(errDBDown) + + s := New(nil, nil) + // Must not panic: + s.repairNullNextRunAt(context.Background()) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + // ── TestRecordSkipped_shortWorkspaceIDNoPanic ───────────────────────────────── // Guards against the short() regression: recordSkipped must not panic if // WorkspaceID is unexpectedly shorter than the 12-char prefix used in logs.