diff --git a/workspace-server/internal/handlers/delegation_sweeper.go b/workspace-server/internal/handlers/delegation_sweeper.go new file mode 100644 index 00000000..8ac673c4 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_sweeper.go @@ -0,0 +1,265 @@ +package handlers + +import ( + "context" + "database/sql" + "log" + "os" + "strconv" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// delegation_sweeper.go — RFC #2829 PR-3: stuck-task sweeper. +// +// What it does +// ------------ +// Periodically scans the `delegations` table (PR-1 schema) for in-flight +// rows that have either: +// +// 1. Blown past their `deadline` — agent claims to still be working but +// the hard ceiling fired. Mark `failed` with error_detail = "deadline +// exceeded". +// 2. Stopped heartbeating for >stuckThreshold while still claiming +// in_progress. Mark `stuck` with error_detail = "no heartbeat for Ns". +// +// Why both rules +// -------------- +// Deadline catches forever-heartbeating agents that never make progress +// (a wedged agent looping on a heartbeat call inside its main work loop +// looks "alive" by liveness signals but is not actually advancing). +// Heartbeat-staleness catches agents that crash or get OOM-killed +// without graceful shutdown — no terminal status update fires, but the +// heartbeat stops cold. +// +// Order matters: deadline check fires first because deadline → failed +// is a stronger statement than deadline → stuck. A stuck row can be +// retried by the operator; a failed row says "give up, retry was +// already exhausted or not viable." +// +// Frequency +// --------- +// 5min default cadence. Faster than that wastes DB round-trips for the +// hot index; slower means a stuck task isn't caught until ~5min after +// the heartbeat stops. Operators can override via DELEGATION_SWEEPER_INTERVAL_S. +// +// Threshold +// --------- +// Default 10× the runtime's heartbeat interval (≈100s for hermes that +// beats every 10s during stream output). 10× is the heuristic from the +// RFC #2829 design discussion: it tolerates legitimate slow LLM +// responses (a single completion can stall a heartbeat for 30-60s) while +// still catching real wedges within ~2 minutes. Operators override via +// DELEGATION_STUCK_THRESHOLD_S. +// +// Safety +// ------ +// All transitions go through DelegationLedger.SetStatus so the +// terminal-state forward-only protection applies — a delegation that +// just transitioned to completed concurrently with the sweep won't be +// flipped back to failed/stuck. The ledger's same-status replay no-op +// also makes re-running the sweep idempotent. + +const ( + defaultSweeperInterval = 5 * time.Minute + + // 10min = 60× the typical 10s hermes heartbeat. Tightens to ~10× + // once the user community settles on a tighter heartbeat cadence; + // today's mix of runtimes (hermes 10s, claude-code 30-60s, langchain + // minute-scale) needs the looser threshold to avoid false positives. + defaultStuckThreshold = 10 * time.Minute +) + +// DelegationSweeper runs the periodic sweep. Construct via +// NewDelegationSweeper, then Start(ctx) in main.go to begin ticking. +type DelegationSweeper struct { + db *sql.DB + ledger *DelegationLedger + interval time.Duration + threshold time.Duration +} + +// NewDelegationSweeper builds a sweeper bound to the package db.DB +// (production wiring) or a test handle. Reads optional env overrides +// at construction time so a long-running process picks them up via +// restart, not mid-flight. +func NewDelegationSweeper(handle *sql.DB, ledger *DelegationLedger) *DelegationSweeper { + if handle == nil { + handle = db.DB + } + if ledger == nil { + ledger = NewDelegationLedger(handle) + } + return &DelegationSweeper{ + db: handle, + ledger: ledger, + interval: envDuration("DELEGATION_SWEEPER_INTERVAL_S", defaultSweeperInterval), + threshold: envDuration("DELEGATION_STUCK_THRESHOLD_S", defaultStuckThreshold), + } +} + +// envDuration parses an integer-seconds env var into a Duration. Falls +// back to def on missing/invalid input — never fails fast on misconfig +// (a typo'd env var should run with sane defaults, not crash startup). +func envDuration(key string, def time.Duration) time.Duration { + v := os.Getenv(key) + if v == "" { + return def + } + n, err := strconv.Atoi(v) + if err != nil || n <= 0 { + log.Printf("delegation_sweeper: invalid %s=%q, using default %s", key, v, def) + return def + } + return time.Duration(n) * time.Second +} + +// Interval exposes the configured tick cadence — tests use it; main.go +// uses it implicitly via Start. +func (s *DelegationSweeper) Interval() time.Duration { return s.interval } + +// Threshold exposes the heartbeat-staleness threshold. +func (s *DelegationSweeper) Threshold() time.Duration { return s.threshold } + +// Start ticks Sweep() at the configured interval until ctx is cancelled. +// Defers panic recovery so a single bad row can't kill the sweeper. +// +// Wired into main.go: `go sweeper.Start(ctx)`. No-op until both the +// `delegations` table (PR-1) and the result-push flag (PR-2) have rolled +// out — the sweeper just won't find any rows to mark. +func (s *DelegationSweeper) Start(ctx context.Context) { + t := time.NewTicker(s.interval) + defer t.Stop() + log.Printf("DelegationSweeper: started (interval=%s, stuck-threshold=%s)", + s.interval, s.threshold) + + tickWithRecover := func() { + defer func() { + if r := recover(); r != nil { + log.Printf("DelegationSweeper: PANIC in tick — recovered: %v", r) + } + }() + s.Sweep(ctx) + } + + // First sweep immediately so operators see it run on startup, not + // after waiting one interval. + tickWithRecover() + + for { + select { + case <-ctx.Done(): + log.Printf("DelegationSweeper: stopped") + return + case <-t.C: + tickWithRecover() + } + } +} + +// SweepResult records what the last sweep changed. Surfaced via the +// admin dashboard (PR-4); also useful for tests to assert behavior +// without diffing log lines. +type SweepResult struct { + DeadlineFailures int + StuckMarked int + Errors int +} + +// Sweep runs one pass: find every in-flight delegation, mark deadline- +// exceeded as failed, mark heartbeat-stale as stuck. Returns counts +// for observability. +// +// SQL strategy: one indexed scan over the partial inflight index, two +// updaters per offending row. We fold both checks into a single SELECT +// to amortize the round-trip — the row count in flight at any time +// is small (single-digit hundreds even on a busy tenant), so reading +// them all and dispatching SetStatus per-row is cheaper than two +// separate UPDATEs with bespoke WHERE clauses. +func (s *DelegationSweeper) Sweep(ctx context.Context) SweepResult { + res := SweepResult{} + + rows, err := s.db.QueryContext(ctx, ` + SELECT delegation_id, last_heartbeat, deadline + FROM delegations + WHERE status IN ('queued','dispatched','in_progress') + `) + if err != nil { + log.Printf("DelegationSweeper: query failed: %v", err) + res.Errors++ + return res + } + defer rows.Close() + + now := time.Now() + type candidate struct { + id string + lastBeat sql.NullTime + deadline time.Time + } + var todo []candidate + for rows.Next() { + var c candidate + if err := rows.Scan(&c.id, &c.lastBeat, &c.deadline); err != nil { + log.Printf("DelegationSweeper: scan failed: %v", err) + res.Errors++ + continue + } + todo = append(todo, c) + } + if err := rows.Err(); err != nil { + log.Printf("DelegationSweeper: rows.Err: %v", err) + res.Errors++ + } + + for _, c := range todo { + // Deadline first — stronger statement than stuck. + if now.After(c.deadline) { + if err := s.ledger.SetStatus(ctx, c.id, "failed", + "deadline exceeded by sweeper", ""); err != nil { + log.Printf("DelegationSweeper: SetStatus(%s, failed): %v", c.id, err) + res.Errors++ + continue + } + res.DeadlineFailures++ + continue + } + + // Heartbeat staleness. A NULL last_heartbeat counts as stale ONLY + // if the row has lived past one threshold since creation — gives + // the agent one full window to emit its first beat. We fold this + // by treating NULL as "created_at — but we don't have created_at + // in the SELECT. Approximate: NULL last_heartbeat + deadline more + // than (5h, default deadline=6h) away from now means the row was + // created ≤1h ago, give it a free pass. Simpler heuristic: NULL + // heartbeat is only stale if deadline is already imminent (within + // 1 threshold). + var lastBeat time.Time + if c.lastBeat.Valid { + lastBeat = c.lastBeat.Time + } + if !c.lastBeat.Valid { + // Row never heartbeat. Don't mark stuck — let the deadline + // catch it. Reduces false positives during the agent's first + // beat window after restart. + continue + } + if now.Sub(lastBeat) > s.threshold { + if err := s.ledger.SetStatus(ctx, c.id, "stuck", + "no heartbeat for "+now.Sub(lastBeat).Round(time.Second).String(), + ""); err != nil { + log.Printf("DelegationSweeper: SetStatus(%s, stuck): %v", c.id, err) + res.Errors++ + continue + } + res.StuckMarked++ + } + } + + if res.DeadlineFailures > 0 || res.StuckMarked > 0 || res.Errors > 0 { + log.Printf("DelegationSweeper: sweep complete — deadline_failures=%d stuck=%d errors=%d", + res.DeadlineFailures, res.StuckMarked, res.Errors) + } + return res +} diff --git a/workspace-server/internal/handlers/delegation_sweeper_test.go b/workspace-server/internal/handlers/delegation_sweeper_test.go new file mode 100644 index 00000000..3f93fb31 --- /dev/null +++ b/workspace-server/internal/handlers/delegation_sweeper_test.go @@ -0,0 +1,314 @@ +package handlers + +import ( + "context" + "database/sql" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +// delegation_sweeper_test.go — coverage for the RFC #2829 PR-3 stuck-task +// sweeper. Validates: +// +// 1. Deadline-exceeded rows are marked failed. +// 2. Heartbeat-stale rows (lastBeat older than threshold) are marked stuck. +// 3. NULL last_heartbeat is NOT marked stuck (free first-beat pass). +// 4. Healthy in-flight rows (recent heartbeat, future deadline) are +// left alone. +// 5. Empty in-flight set is a clean no-op. +// 6. Both rules apply in one sweep without double-marking. +// 7. Env-override interval/threshold parse correctly + fall back on +// invalid input. + +func TestSweeper_HappyPath_NoInflightRowsIsCleanNoOp(t *testing.T) { + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"})) + + res := sw.Sweep(context.Background()) + if res.DeadlineFailures != 0 || res.StuckMarked != 0 || res.Errors != 0 { + t.Errorf("empty in-flight set must produce zero changes; got %+v", res) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_DeadlineExceededIsMarkedFailed(t *testing.T) { + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + past := time.Now().Add(-1 * time.Minute) + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-overdue", time.Now(), past)) + + // SetStatus reads current status... + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-overdue"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + // ...then updates to failed. + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + res := sw.Sweep(context.Background()) + if res.DeadlineFailures != 1 { + t.Errorf("expected 1 deadline failure, got %d", res.DeadlineFailures) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_StaleHeartbeatIsMarkedStuck(t *testing.T) { + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + // Last heartbeat 30min ago — well past the 10min default threshold. + staleBeat := time.Now().Add(-30 * time.Minute) + future := time.Now().Add(2 * time.Hour) // deadline NOT exceeded + + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-stuck", staleBeat, future)) + + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-stuck"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + + // We can't predict the exact "no heartbeat for Xs" string because + // the suffix depends on now() at sweep time; just match against any. + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + res := sw.Sweep(context.Background()) + if res.StuckMarked != 1 { + t.Errorf("expected 1 stuck mark, got %d", res.StuckMarked) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_NullHeartbeatIsLeftAlone(t *testing.T) { + // A delegation that was JUST inserted (queued, no heartbeat yet) must + // not be flipped to stuck on the first sweep — give it the chance to + // emit its first beat. + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + future := time.Now().Add(2 * time.Hour) + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-fresh", sql.NullTime{}, future)) + + res := sw.Sweep(context.Background()) + if res.StuckMarked != 0 { + t.Errorf("NULL heartbeat must not be stuck-marked; got %d", res.StuckMarked) + } + if res.DeadlineFailures != 0 { + t.Errorf("future deadline must not fail; got %d", res.DeadlineFailures) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_HealthyInflightRowsAreLeftAlone(t *testing.T) { + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + freshBeat := time.Now().Add(-1 * time.Minute) // well within 10min threshold + future := time.Now().Add(2 * time.Hour) + + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-healthy", freshBeat, future)) + + res := sw.Sweep(context.Background()) + if res.DeadlineFailures != 0 || res.StuckMarked != 0 { + t.Errorf("healthy row must produce zero changes; got %+v", res) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_DeadlineFiresFirstNotStuck(t *testing.T) { + // Row that's BOTH past deadline AND stale-heartbeat must be marked + // failed (deadline) not stuck — deadline is the stronger statement. + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + staleBeat := time.Now().Add(-30 * time.Minute) + past := time.Now().Add(-5 * time.Minute) + + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-both", staleBeat, past)) + + // Only the failed transition fires; no stuck transition. + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-both"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-both", "failed", "deadline exceeded by sweeper", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + res := sw.Sweep(context.Background()) + if res.DeadlineFailures != 1 || res.StuckMarked != 0 { + t.Errorf("expected 1 deadline failure, 0 stuck; got %+v", res) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet (stuck UPDATE may have fired by accident): %v", err) + } +} + +func TestSweeper_MixedSetAppliesBothRules(t *testing.T) { + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + now := time.Now() + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-overdue", now, now.Add(-1*time.Minute)). // deadline → failed + AddRow("deleg-stuck", now.Add(-30*time.Minute), now.Add(2*time.Hour)). // stale → stuck + AddRow("deleg-healthy", now.Add(-30*time.Second), now.Add(2*time.Hour)), // healthy → no-op + ) + + // 1st: deadline → failed + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-overdue"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 2nd: stale → stuck + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-stuck"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress")) + mock.ExpectExec(`UPDATE delegations`). + WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // 3rd: healthy — no SQL fired + + res := sw.Sweep(context.Background()) + if res.DeadlineFailures != 1 || res.StuckMarked != 1 { + t.Errorf("expected 1 failure + 1 stuck, got %+v", res) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestSweeper_TerminalReplayFromConcurrentCompletionIsIgnored(t *testing.T) { + // Edge case: row was marked completed by UpdateStatus between the + // SELECT and the SetStatus call. SetStatus's forward-only protection + // returns ErrInvalidTransition; sweeper increments Errors but the + // row is correctly left in completed state. + mock := setupTestDB(t) + ledger := NewDelegationLedger(nil) + sw := NewDelegationSweeper(nil, ledger) + + past := time.Now().Add(-1 * time.Minute) + mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`). + WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}). + AddRow("deleg-raced", time.Now(), past)) + + // SetStatus's status read finds the row already completed (concurrent UpdateStatus won). + mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`). + WithArgs("deleg-raced"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed")) + // No UPDATE — terminal forward-only blocks it. + + res := sw.Sweep(context.Background()) + if res.Errors != 1 { + t.Errorf("forward-only block must surface as Error count; got %+v", res) + } + if res.DeadlineFailures != 0 { + t.Errorf("must NOT credit a deadline failure that didn't fire; got %d", res.DeadlineFailures) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// ---------- env override parsing ---------- + +func TestEnvDuration_Default(t *testing.T) { + t.Setenv("MY_TEST_KEY", "") + if got := envDuration("MY_TEST_KEY", 7*time.Second); got != 7*time.Second { + t.Errorf("expected default 7s, got %v", got) + } +} + +func TestEnvDuration_ParsesPositiveSeconds(t *testing.T) { + t.Setenv("MY_TEST_KEY", "42") + if got := envDuration("MY_TEST_KEY", 1*time.Second); got != 42*time.Second { + t.Errorf("expected 42s, got %v", got) + } +} + +func TestEnvDuration_FallsBackOnInvalid(t *testing.T) { + t.Setenv("MY_TEST_KEY", "garbage") + if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second { + t.Errorf("invalid input must fall back to default; got %v", got) + } +} + +func TestEnvDuration_FallsBackOnNegative(t *testing.T) { + t.Setenv("MY_TEST_KEY", "-10") + if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second { + t.Errorf("negative must fall back to default; got %v", got) + } +} + +// TestSweeperConstructor_PicksUpEnvOverrides — interval + threshold env +// vars are read at construction time. Confirms the wiring contract; if +// somebody adds a new env var without plumbing it, this fails. +func TestSweeperConstructor_PicksUpEnvOverrides(t *testing.T) { + t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "60") + t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "120") + + mock := setupTestDB(t) + _ = mock // unused — constructor doesn't fire SQL + sw := NewDelegationSweeper(nil, nil) + + if sw.Interval() != 60*time.Second { + t.Errorf("interval override not picked up: got %v", sw.Interval()) + } + if sw.Threshold() != 120*time.Second { + t.Errorf("threshold override not picked up: got %v", sw.Threshold()) + } +} + +func TestSweeperConstructor_DefaultsWhenEnvUnset(t *testing.T) { + t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "") + t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "") + + mock := setupTestDB(t) + _ = mock + sw := NewDelegationSweeper(nil, nil) + + if sw.Interval() != defaultSweeperInterval { + t.Errorf("default interval not used: got %v", sw.Interval()) + } + if sw.Threshold() != defaultStuckThreshold { + t.Errorf("default threshold not used: got %v", sw.Threshold()) + } +}