Merge pull request #671 from Molecule-AI/feat/issue-618-admin-schedules-health

feat(platform): GET /admin/schedules/health — cross-workspace cron firing status (#618)
This commit is contained in:
Hongming Wang 2026-04-17 03:47:44 -07:00 committed by GitHub
commit b4590023cd
3 changed files with 619 additions and 0 deletions

View File

@ -0,0 +1,163 @@
package handlers
import (
"log"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
)
// AdminSchedulesHealthHandler serves GET /admin/schedules/health — a cross-workspace
// schedule monitoring view gated behind AdminAuth. Unlike the per-workspace
// GET /workspaces/:id/schedules/health (which requires caller identity + CanCommunicate),
// this endpoint is intended for operators and automated audit agents that hold a
// global admin bearer token. Issue #618.
type AdminSchedulesHealthHandler struct{}
// NewAdminSchedulesHealthHandler returns an AdminSchedulesHealthHandler.
func NewAdminSchedulesHealthHandler() *AdminSchedulesHealthHandler {
return &AdminSchedulesHealthHandler{}
}
// adminScheduleHealth is the per-schedule entry in the health response.
type adminScheduleHealth struct {
WorkspaceID string `json:"workspace_id"`
WorkspaceName string `json:"workspace_name"`
ScheduleID string `json:"schedule_id"`
ScheduleName string `json:"schedule_name"`
CronExpr string `json:"cron_expr"`
LastRunAt *time.Time `json:"last_run_at"`
ExpectedNextRun *time.Time `json:"expected_next_run"`
Status string `json:"status"` // "ok" | "stale" | "never_run"
StaleThresholdSeconds int64 `json:"stale_threshold_seconds"`
}
// computeStaleThreshold returns 2× the cron interval for the given expression
// and timezone. The interval is approximated as the gap between two consecutive
// scheduled fire times computed from now.
//
// Exported as a package-level function so it can be unit-tested independently
// from the handler.
func computeStaleThreshold(cronExpr, tz string, now time.Time) (time.Duration, error) {
t1, err := scheduler.ComputeNextRun(cronExpr, tz, now)
if err != nil {
return 0, err
}
t2, err := scheduler.ComputeNextRun(cronExpr, tz, t1)
if err != nil {
return 0, err
}
return 2 * t2.Sub(t1), nil
}
// Health handles GET /admin/schedules/health.
//
// It joins workspace_schedules with workspaces and, for each schedule, computes:
// - status: "never_run" (last_run_at IS NULL),
// "stale" (now - last_run_at > 2 × cron interval), or
// "ok" (recently run).
// - stale_threshold_seconds: 2 × the cron interval derived from cron_expr.
// - expected_next_run: the next_run_at value stored by the scheduler.
//
// Returns 200 with a JSON array (empty if no schedules exist), 500 on DB error.
// Auth is enforced by the adminAuth() middleware registered in router.go.
func (h *AdminSchedulesHealthHandler) Health(c *gin.Context) {
ctx := c.Request.Context()
now := time.Now()
rows, err := db.DB.QueryContext(ctx, `
SELECT
w.id AS workspace_id,
w.name AS workspace_name,
s.id AS schedule_id,
s.name AS schedule_name,
s.cron_expr,
s.timezone,
s.last_run_at,
s.next_run_at
FROM workspace_schedules s
JOIN workspaces w ON w.id = s.workspace_id
WHERE w.status != 'removed'
ORDER BY w.name ASC, s.name ASC
`)
if err != nil {
log.Printf("AdminSchedulesHealth: query error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query schedules"})
return
}
defer rows.Close()
entries := make([]adminScheduleHealth, 0)
for rows.Next() {
var (
workspaceID string
workspaceName string
scheduleID string
scheduleName string
cronExpr string
timezone string
lastRunAt *time.Time
nextRunAt *time.Time
)
if err := rows.Scan(
&workspaceID, &workspaceName,
&scheduleID, &scheduleName,
&cronExpr, &timezone,
&lastRunAt, &nextRunAt,
); err != nil {
log.Printf("AdminSchedulesHealth: scan error: %v", err)
continue
}
// Compute stale threshold = 2 × cron interval.
// On parse failure (malformed cron_expr in DB) we report 0 and still
// classify the row — a bad cron_expr itself is worth surfacing in the
// health view rather than silently skipping the row.
staleThreshold, cronErr := computeStaleThreshold(cronExpr, timezone, now)
var staleThresholdSeconds int64
if cronErr == nil {
staleThresholdSeconds = int64(staleThreshold.Seconds())
} else {
log.Printf("AdminSchedulesHealth: cron parse error for schedule %s (%q): %v",
scheduleID, cronExpr, cronErr)
}
// Classify schedule status.
status := classifyScheduleStatus(lastRunAt, staleThreshold, now)
entries = append(entries, adminScheduleHealth{
WorkspaceID: workspaceID,
WorkspaceName: workspaceName,
ScheduleID: scheduleID,
ScheduleName: scheduleName,
CronExpr: cronExpr,
LastRunAt: lastRunAt,
ExpectedNextRun: nextRunAt,
Status: status,
StaleThresholdSeconds: staleThresholdSeconds,
})
}
if err := rows.Err(); err != nil {
log.Printf("AdminSchedulesHealth: rows iteration error: %v", err)
}
c.JSON(http.StatusOK, entries)
}
// classifyScheduleStatus returns the health status string for a schedule.
// - "never_run" — last_run_at is NULL (schedule has never fired)
// - "stale" — now - last_run_at > staleThreshold (and threshold > 0)
// - "ok" — recently run within the expected window
func classifyScheduleStatus(lastRunAt *time.Time, staleThreshold time.Duration, now time.Time) string {
if lastRunAt == nil {
return "never_run"
}
if staleThreshold > 0 && now.Sub(*lastRunAt) > staleThreshold {
return "stale"
}
return "ok"
}

View File

@ -0,0 +1,446 @@
package handlers
import (
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// adminHealthCols is the column set returned by the admin schedules health SELECT.
var adminHealthCols = []string{
"workspace_id", "workspace_name",
"schedule_id", "schedule_name",
"cron_expr", "timezone",
"last_run_at", "next_run_at",
}
// ==================== computeStaleThreshold unit tests ====================
// TestComputeStaleThreshold_FiveMinuteCron verifies that "*/5 * * * *" produces
// a 600 s (2 × 5 min) stale threshold.
func TestComputeStaleThreshold_FiveMinuteCron(t *testing.T) {
threshold, err := computeStaleThreshold("*/5 * * * *", "UTC", time.Now())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
const want = 600 * time.Second
if threshold != want {
t.Errorf("expected %v, got %v", want, threshold)
}
}
// TestComputeStaleThreshold_HourlyCron verifies that "0 * * * *" produces
// a 7200 s (2 h) stale threshold.
func TestComputeStaleThreshold_HourlyCron(t *testing.T) {
threshold, err := computeStaleThreshold("0 * * * *", "UTC", time.Now())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
const want = 2 * time.Hour
if threshold != want {
t.Errorf("expected %v, got %v", want, threshold)
}
}
// TestComputeStaleThreshold_DailyCron verifies that "0 9 * * *" (09:00 UTC daily)
// produces a 48 h (2 × 24 h) stale threshold.
func TestComputeStaleThreshold_DailyCron(t *testing.T) {
threshold, err := computeStaleThreshold("0 9 * * *", "UTC", time.Now())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
const want = 48 * time.Hour
if threshold != want {
t.Errorf("expected %v, got %v", want, threshold)
}
}
// TestComputeStaleThreshold_InvalidCron verifies that a malformed cron expression
// returns an error rather than silently returning zero.
func TestComputeStaleThreshold_InvalidCron(t *testing.T) {
_, err := computeStaleThreshold("not-a-cron", "UTC", time.Now())
if err == nil {
t.Error("expected error for invalid cron expression, got nil")
}
}
// TestComputeStaleThreshold_InvalidTimezone verifies that an unknown timezone
// returns an error.
func TestComputeStaleThreshold_InvalidTimezone(t *testing.T) {
_, err := computeStaleThreshold("*/5 * * * *", "Not/ATimezone", time.Now())
if err == nil {
t.Error("expected error for invalid timezone, got nil")
}
}
// ==================== classifyScheduleStatus unit tests ====================
// TestClassifyScheduleStatus_NeverRun verifies nil last_run_at → "never_run".
func TestClassifyScheduleStatus_NeverRun(t *testing.T) {
status := classifyScheduleStatus(nil, 10*time.Minute, time.Now())
if status != "never_run" {
t.Errorf("expected never_run, got %q", status)
}
}
// TestClassifyScheduleStatus_Stale verifies that a run older than the threshold
// produces "stale".
func TestClassifyScheduleStatus_Stale(t *testing.T) {
now := time.Now()
lastRun := now.Add(-11 * time.Minute) // older than 10-min threshold
status := classifyScheduleStatus(&lastRun, 10*time.Minute, now)
if status != "stale" {
t.Errorf("expected stale, got %q", status)
}
}
// TestClassifyScheduleStatus_OK verifies that a run within the threshold → "ok".
func TestClassifyScheduleStatus_OK(t *testing.T) {
now := time.Now()
lastRun := now.Add(-4 * time.Minute) // within 10-min threshold
status := classifyScheduleStatus(&lastRun, 10*time.Minute, now)
if status != "ok" {
t.Errorf("expected ok, got %q", status)
}
}
// TestClassifyScheduleStatus_ZeroThreshold_NeverStale verifies that when
// the threshold is 0 (cron parse failed), a run is never classified as stale
// — we degrade gracefully rather than false-alarming.
func TestClassifyScheduleStatus_ZeroThreshold_NeverStale(t *testing.T) {
now := time.Now()
lastRun := now.Add(-365 * 24 * time.Hour) // very old run
status := classifyScheduleStatus(&lastRun, 0, now)
if status != "ok" {
t.Errorf("expected ok (zero threshold = no stale detection), got %q", status)
}
}
// ==================== AdminSchedulesHealthHandler integration tests ====================
// TestAdminSchedulesHealth_Empty verifies that 200 + empty array is returned
// when no schedules exist.
func TestAdminSchedulesHealth_Empty(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []adminScheduleHealth
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 0 {
t.Errorf("expected empty array, got %d entries", len(resp))
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_NeverRun verifies that a schedule with last_run_at=NULL
// is classified as "never_run" and that stale_threshold_seconds is computed
// correctly from the cron expression.
func TestAdminSchedulesHealth_NeverRun(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
nextRun := time.Now().Add(5 * time.Minute)
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols).AddRow(
"ws-aaa", "Alpha WS",
"sched-1", "hourly",
"0 * * * *", "UTC",
nil, &nextRun,
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []adminScheduleHealth
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0].Status != "never_run" {
t.Errorf("expected status=never_run, got %q", resp[0].Status)
}
if resp[0].LastRunAt != nil {
t.Errorf("expected last_run_at=nil, got %v", resp[0].LastRunAt)
}
// "0 * * * *" → interval = 1 h → stale_threshold = 2 h = 7200 s
if resp[0].StaleThresholdSeconds != 7200 {
t.Errorf("expected stale_threshold_seconds=7200 for hourly cron, got %d",
resp[0].StaleThresholdSeconds)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_StaleDetection verifies that a schedule whose
// last_run_at is older than 2× its cron interval is classified as "stale".
func TestAdminSchedulesHealth_StaleDetection(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
// "*/5 * * * *" (every 5 min). Stale threshold = 2 × 5 min = 10 min.
// Set last_run_at to 15 minutes ago → stale.
lastRun := time.Now().Add(-15 * time.Minute)
nextRun := time.Now().Add(5 * time.Minute)
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols).AddRow(
"ws-bbb", "Beta WS",
"sched-2", "every5min",
"*/5 * * * *", "UTC",
&lastRun, &nextRun,
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []adminScheduleHealth
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0].Status != "stale" {
t.Errorf("expected status=stale (last run 15m ago, threshold 10m), got %q",
resp[0].Status)
}
// Stale threshold = 2 × 5 min = 600 s
if resp[0].StaleThresholdSeconds != 600 {
t.Errorf("expected stale_threshold_seconds=600, got %d",
resp[0].StaleThresholdSeconds)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_OKStatus verifies that a recently-run schedule
// (within 2× its cron interval) is classified as "ok".
func TestAdminSchedulesHealth_OKStatus(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
// "*/30 * * * *" (every 30 min). Stale threshold = 2 × 30 min = 60 min.
// last_run_at = 20 min ago → ok.
lastRun := time.Now().Add(-20 * time.Minute)
nextRun := time.Now().Add(10 * time.Minute)
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols).AddRow(
"ws-ccc", "Gamma WS",
"sched-3", "every30min",
"*/30 * * * *", "UTC",
&lastRun, &nextRun,
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []adminScheduleHealth
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(resp))
}
if resp[0].Status != "ok" {
t.Errorf("expected status=ok (20m ago, threshold 60m), got %q", resp[0].Status)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_DBError verifies that a DB failure returns 500, not a panic.
func TestAdminSchedulesHealth_DBError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected 500 on DB error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_MultipleWorkspaces verifies that schedules from
// multiple workspaces are all returned in order with correct workspace metadata
// and individual status classifications.
func TestAdminSchedulesHealth_MultipleWorkspaces(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
now := time.Now()
recentRun := now.Add(-1 * time.Minute) // within 2h threshold → ok
nextRun := now.Add(59 * time.Minute)
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols).
AddRow("ws-1", "WS One", "s1", "hourly-1", "0 * * * *", "UTC",
&recentRun, &nextRun).
AddRow("ws-2", "WS Two", "s2", "hourly-2", "0 * * * *", "America/New_York",
nil, &nextRun))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []adminScheduleHealth
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 2 {
t.Fatalf("expected 2 entries, got %d", len(resp))
}
// First entry: ws-1, recently run within threshold → ok
if resp[0].WorkspaceID != "ws-1" {
t.Errorf("expected ws-1 first, got %q", resp[0].WorkspaceID)
}
if resp[0].WorkspaceName != "WS One" {
t.Errorf("expected workspace_name=WS One, got %q", resp[0].WorkspaceName)
}
if resp[0].Status != "ok" {
t.Errorf("expected ok for ws-1 schedule, got %q", resp[0].Status)
}
// Second entry: ws-2, never run
if resp[1].WorkspaceID != "ws-2" {
t.Errorf("expected ws-2 second, got %q", resp[1].WorkspaceID)
}
if resp[1].Status != "never_run" {
t.Errorf("expected never_run for ws-2 schedule, got %q", resp[1].Status)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestAdminSchedulesHealth_ResponseFields verifies that all required fields
// (workspace_id, workspace_name, schedule_id, schedule_name, cron_expr,
// last_run_at, expected_next_run, status, stale_threshold_seconds) are
// present in the JSON response.
func TestAdminSchedulesHealth_ResponseFields(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewAdminSchedulesHealthHandler()
lastRun := time.Now().Add(-1 * time.Minute)
nextRun := time.Now().Add(4 * time.Minute)
mock.ExpectQuery(`SELECT\s+w\.id`).
WillReturnRows(sqlmock.NewRows(adminHealthCols).AddRow(
"ws-fields", "Fields WS",
"sched-fields", "test-schedule",
"*/5 * * * *", "UTC",
&lastRun, &nextRun,
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
// Parse as raw map to check field presence
var rawResp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &rawResp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(rawResp) != 1 {
t.Fatalf("expected 1 entry, got %d", len(rawResp))
}
requiredFields := []string{
"workspace_id", "workspace_name",
"schedule_id", "schedule_name",
"cron_expr", "last_run_at", "expected_next_run",
"status", "stale_threshold_seconds",
}
entry := rawResp[0]
for _, field := range requiredFields {
if _, ok := entry[field]; !ok {
t.Errorf("response missing required field %q", field)
}
}
if entry["workspace_id"] != "ws-fields" {
t.Errorf("workspace_id mismatch: %v", entry["workspace_id"])
}
if entry["schedule_name"] != "test-schedule" {
t.Errorf("schedule_name mismatch: %v", entry["schedule_name"])
}
if entry["cron_expr"] != "*/5 * * * *" {
t.Errorf("cron_expr mismatch: %v", entry["cron_expr"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}

View File

@ -320,6 +320,16 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal)
}
// Admin — cross-workspace schedule health monitoring (issue #618).
// Lets cron-audit agents and operators detect silent schedule failures
// across all workspaces without holding individual workspace bearer tokens.
// AdminAuth mirrors the /admin/liveness gate — fail-open on fresh install,
// strict bearer-only once any token exists.
{
asHealth := handlers.NewAdminSchedulesHealthHandler()
r.GET("/admin/schedules/health", middleware.AdminAuth(db.DB), asHealth.Health)
}
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
// AdminAuth is a second defence-in-depth layer: on a fresh install with no tokens yet,
// AdminAuth is fail-open (HasAnyLiveTokenGlobal == 0), so the bootstrap still works.