feat(admin-schedules): orphan monitor + cleaner endpoints (internal#2006 backstops) #2008

Merged
hongming merged 1 commits from feat/schedule-orphan-monitor-cleaner into main 2026-05-29 09:38:34 +00:00
3 changed files with 190 additions and 0 deletions
@@ -148,6 +148,125 @@ func (h *AdminSchedulesHealthHandler) Health(c *gin.Context) {
c.JSON(http.StatusOK, entries)
}
// orphanScheduleEntry is one row in the Orphans response.
type orphanScheduleEntry struct {
WorkspaceID string `json:"workspace_id"`
WorkspaceStatus string `json:"workspace_status"` // "removed" | "missing"
ScheduleID string `json:"schedule_id"`
ScheduleName string `json:"schedule_name"`
Source string `json:"source"`
Enabled bool `json:"enabled"`
CronExpr string `json:"cron_expr"`
}
// Orphans handles GET /admin/schedules/orphans — the monitor surface for
// internal#2006. Health (above) reports only LIVE workspaces' schedules, so a
// schedule left on a removed/recreated workspace silently stops firing and
// never appears there. This endpoint lists exactly those orphans (workspace
// removed OR missing) so an operator/monitor can alert. Returns 200 + JSON
// array (empty when none). Auth via adminAuth() in router.go.
func (h *AdminSchedulesHealthHandler) Orphans(c *gin.Context) {
ctx := c.Request.Context()
rows, err := db.DB.QueryContext(ctx, `
SELECT s.workspace_id,
CASE WHEN w.id IS NULL THEN 'missing' ELSE 'removed' END AS ws_status,
s.id, s.name, COALESCE(s.source, ''), s.enabled, s.cron_expr
FROM workspace_schedules s
LEFT JOIN workspaces w ON w.id = s.workspace_id
WHERE w.id IS NULL OR w.status = 'removed'
ORDER BY s.name ASC
`)
if err != nil {
log.Printf("AdminSchedulesOrphans: query error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query orphans"})
return
}
defer rows.Close()
out := make([]orphanScheduleEntry, 0)
for rows.Next() {
var e orphanScheduleEntry
if err := rows.Scan(&e.WorkspaceID, &e.WorkspaceStatus, &e.ScheduleID, &e.ScheduleName, &e.Source, &e.Enabled, &e.CronExpr); err != nil {
log.Printf("AdminSchedulesOrphans: scan error: %v", err)
continue
}
out = append(out, e)
}
if err := rows.Err(); err != nil {
log.Printf("AdminSchedulesOrphans: rows iteration error: %v", err)
}
c.JSON(http.StatusOK, out)
}
// ReapOrphans handles POST /admin/schedules/reap-orphans — the orphan cleaner
// (internal#2006). For every schedule bound to a removed/nonexistent workspace
// it re-points runtime-created schedules onto the live successor agent (matched
// by role+parent, falling back to name+parent) when one exists and doesn't
// already carry a same-named schedule; schedules with no live successor are
// disabled (enabled=false) so the scheduler stops firing into a dead workspace.
// Idempotent: re-running with no orphans is a no-op. Returns a summary count.
// Auth is enforced by the adminAuth() middleware registered in router.go.
func (h *AdminSchedulesHealthHandler) ReapOrphans(c *gin.Context) {
ctx := c.Request.Context()
// 1. Re-point runtime schedules onto a live successor (same role+parent,
// else same name+parent). Skip names already present on the successor.
repointed, err := db.DB.ExecContext(ctx, `
WITH orphan AS (
SELECT s.id, s.name, s.workspace_id, prev.role AS role, prev.parent_id AS parent_id
FROM workspace_schedules s
JOIN workspaces prev ON prev.id = s.workspace_id
WHERE prev.status = 'removed' AND s.source = 'runtime'
),
successor AS (
SELECT o.id AS schedule_id, o.name AS schedule_name,
(
SELECT w.id FROM workspaces w
WHERE w.status != 'removed'
AND w.parent_id IS NOT DISTINCT FROM o.parent_id
AND ((o.role IS NOT NULL AND w.role = o.role))
ORDER BY w.updated_at DESC NULLS LAST LIMIT 1
) AS live_id
FROM orphan o
)
UPDATE workspace_schedules s
SET workspace_id = su.live_id, updated_at = now()
FROM successor su
WHERE s.id = su.schedule_id
AND su.live_id IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM workspace_schedules t
WHERE t.workspace_id = su.live_id AND t.name = su.schedule_name
)
`)
if err != nil {
log.Printf("ReapOrphans: re-point error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "re-point failed"})
return
}
repointedN, _ := repointed.RowsAffected()
// 2. Disable any remaining schedules still bound to a removed/missing
// workspace (no live successor, or template schedules on a dead row).
disabled, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules s
SET enabled = false, updated_at = now()
WHERE s.enabled = true
AND NOT EXISTS (
SELECT 1 FROM workspaces w
WHERE w.id = s.workspace_id AND w.status != 'removed'
)
`)
if err != nil {
log.Printf("ReapOrphans: disable error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "disable failed"})
return
}
disabledN, _ := disabled.RowsAffected()
log.Printf("ReapOrphans: re-pointed %d, disabled %d orphaned schedule(s)", repointedN, disabledN)
c.JSON(http.StatusOK, gin.H{"repointed": repointedN, "disabled": disabledN})
}
// classifyScheduleStatus returns the health status string for a schedule.
// - "never_run" — last_run_at is NULL (schedule has never fired)
// - "stale" — now - last_run_at > staleThreshold (and threshold > 0)
@@ -444,3 +444,72 @@ func TestAdminSchedulesHealth_ResponseFields(t *testing.T) {
t.Fatalf("unmet expectations: %v", err)
}
}
// ==================== Orphans + ReapOrphans (internal#2006) ====================
// TestAdminSchedulesOrphans verifies the monitor surface lists schedules bound
// to a removed/missing workspace (the recreate-orphan failure mode).
func TestAdminSchedulesOrphans(t *testing.T) {
mock := setupTestDB(t)
handler := NewAdminSchedulesHealthHandler()
mock.ExpectQuery(`LEFT JOIN workspaces`).
WillReturnRows(sqlmock.NewRows([]string{
"workspace_id", "ws_status", "id", "name", "source", "enabled", "cron_expr",
}).AddRow("dead-ws", "removed", "sched-1", "minimax-autonomous-tick", "runtime", false, "*/5 * * * *"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/schedules/orphans", nil)
handler.Orphans(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []orphanScheduleEntry
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if len(resp) != 1 {
t.Fatalf("expected 1 orphan, got %d", len(resp))
}
if resp[0].ScheduleName != "minimax-autonomous-tick" || resp[0].WorkspaceStatus != "removed" || resp[0].Source != "runtime" {
t.Errorf("unexpected orphan entry: %+v", resp[0])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestReapOrphans verifies the cleaner re-points runtime schedules onto a live
// successor then disables any remaining dead-bound schedules, returning counts.
func TestReapOrphans(t *testing.T) {
mock := setupTestDB(t)
handler := NewAdminSchedulesHealthHandler()
mock.ExpectExec(`UPDATE workspace_schedules s\s+SET workspace_id`).
WillReturnResult(sqlmock.NewResult(0, 2))
mock.ExpectExec(`UPDATE workspace_schedules s\s+SET enabled = false`).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/admin/schedules/reap-orphans", nil)
handler.ReapOrphans(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]int64
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
if resp["repointed"] != 2 || resp["disabled"] != 1 {
t.Errorf("expected repointed=2 disabled=1, got %+v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
@@ -539,6 +539,8 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
{
asHealth := handlers.NewAdminSchedulesHealthHandler()
r.GET("/admin/schedules/health", middleware.AdminAuth(db.DB), asHealth.Health)
r.GET("/admin/schedules/orphans", middleware.AdminAuth(db.DB), asHealth.Orphans)
r.POST("/admin/schedules/reap-orphans", middleware.AdminAuth(db.DB), asHealth.ReapOrphans)
}
// Admin — stale a2a_queue cleanup (issue #1947). Marks queued items older