Merge pull request #2836 from Molecule-AI/feat/rfc2829-pr3-stuck-task-sweeper

feat(delegations): stuck-task sweeper with deadline + heartbeat-staleness rules (RFC #2829 PR-3)
This commit is contained in:
Hongming Wang 2026-05-05 03:59:16 +00:00 committed by GitHub
commit b5c0b4d371
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 579 additions and 0 deletions

View File

@ -0,0 +1,265 @@
package handlers
import (
"context"
"database/sql"
"log"
"os"
"strconv"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// delegation_sweeper.go — RFC #2829 PR-3: stuck-task sweeper.
//
// What it does
// ------------
// Periodically scans the `delegations` table (PR-1 schema) for in-flight
// rows that have either:
//
// 1. Blown past their `deadline` — agent claims to still be working but
// the hard ceiling fired. Mark `failed` with error_detail = "deadline
// exceeded".
// 2. Stopped heartbeating for >stuckThreshold while still claiming
// in_progress. Mark `stuck` with error_detail = "no heartbeat for Ns".
//
// Why both rules
// --------------
// Deadline catches forever-heartbeating agents that never make progress
// (a wedged agent looping on a heartbeat call inside its main work loop
// looks "alive" by liveness signals but is not actually advancing).
// Heartbeat-staleness catches agents that crash or get OOM-killed
// without graceful shutdown — no terminal status update fires, but the
// heartbeat stops cold.
//
// Order matters: deadline check fires first because deadline → failed
// is a stronger statement than deadline → stuck. A stuck row can be
// retried by the operator; a failed row says "give up, retry was
// already exhausted or not viable."
//
// Frequency
// ---------
// 5min default cadence. Faster than that wastes DB round-trips for the
// hot index; slower means a stuck task isn't caught until ~5min after
// the heartbeat stops. Operators can override via DELEGATION_SWEEPER_INTERVAL_S.
//
// Threshold
// ---------
// Default 10× the runtime's heartbeat interval (≈100s for hermes that
// beats every 10s during stream output). 10× is the heuristic from the
// RFC #2829 design discussion: it tolerates legitimate slow LLM
// responses (a single completion can stall a heartbeat for 30-60s) while
// still catching real wedges within ~2 minutes. Operators override via
// DELEGATION_STUCK_THRESHOLD_S.
//
// Safety
// ------
// All transitions go through DelegationLedger.SetStatus so the
// terminal-state forward-only protection applies — a delegation that
// just transitioned to completed concurrently with the sweep won't be
// flipped back to failed/stuck. The ledger's same-status replay no-op
// also makes re-running the sweep idempotent.
const (
defaultSweeperInterval = 5 * time.Minute
// 10min = 60× the typical 10s hermes heartbeat. Tightens to ~10×
// once the user community settles on a tighter heartbeat cadence;
// today's mix of runtimes (hermes 10s, claude-code 30-60s, langchain
// minute-scale) needs the looser threshold to avoid false positives.
defaultStuckThreshold = 10 * time.Minute
)
// DelegationSweeper runs the periodic sweep. Construct via
// NewDelegationSweeper, then Start(ctx) in main.go to begin ticking.
type DelegationSweeper struct {
db *sql.DB
ledger *DelegationLedger
interval time.Duration
threshold time.Duration
}
// NewDelegationSweeper builds a sweeper bound to the package db.DB
// (production wiring) or a test handle. Reads optional env overrides
// at construction time so a long-running process picks them up via
// restart, not mid-flight.
func NewDelegationSweeper(handle *sql.DB, ledger *DelegationLedger) *DelegationSweeper {
if handle == nil {
handle = db.DB
}
if ledger == nil {
ledger = NewDelegationLedger(handle)
}
return &DelegationSweeper{
db: handle,
ledger: ledger,
interval: envDuration("DELEGATION_SWEEPER_INTERVAL_S", defaultSweeperInterval),
threshold: envDuration("DELEGATION_STUCK_THRESHOLD_S", defaultStuckThreshold),
}
}
// envDuration parses an integer-seconds env var into a Duration. Falls
// back to def on missing/invalid input — never fails fast on misconfig
// (a typo'd env var should run with sane defaults, not crash startup).
func envDuration(key string, def time.Duration) time.Duration {
v := os.Getenv(key)
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil || n <= 0 {
log.Printf("delegation_sweeper: invalid %s=%q, using default %s", key, v, def)
return def
}
return time.Duration(n) * time.Second
}
// Interval exposes the configured tick cadence — tests use it; main.go
// uses it implicitly via Start.
func (s *DelegationSweeper) Interval() time.Duration { return s.interval }
// Threshold exposes the heartbeat-staleness threshold.
func (s *DelegationSweeper) Threshold() time.Duration { return s.threshold }
// Start ticks Sweep() at the configured interval until ctx is cancelled.
// Defers panic recovery so a single bad row can't kill the sweeper.
//
// Wired into main.go: `go sweeper.Start(ctx)`. No-op until both the
// `delegations` table (PR-1) and the result-push flag (PR-2) have rolled
// out — the sweeper just won't find any rows to mark.
func (s *DelegationSweeper) Start(ctx context.Context) {
t := time.NewTicker(s.interval)
defer t.Stop()
log.Printf("DelegationSweeper: started (interval=%s, stuck-threshold=%s)",
s.interval, s.threshold)
tickWithRecover := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("DelegationSweeper: PANIC in tick — recovered: %v", r)
}
}()
s.Sweep(ctx)
}
// First sweep immediately so operators see it run on startup, not
// after waiting one interval.
tickWithRecover()
for {
select {
case <-ctx.Done():
log.Printf("DelegationSweeper: stopped")
return
case <-t.C:
tickWithRecover()
}
}
}
// SweepResult records what the last sweep changed. Surfaced via the
// admin dashboard (PR-4); also useful for tests to assert behavior
// without diffing log lines.
type SweepResult struct {
DeadlineFailures int
StuckMarked int
Errors int
}
// Sweep runs one pass: find every in-flight delegation, mark deadline-
// exceeded as failed, mark heartbeat-stale as stuck. Returns counts
// for observability.
//
// SQL strategy: one indexed scan over the partial inflight index, two
// updaters per offending row. We fold both checks into a single SELECT
// to amortize the round-trip — the row count in flight at any time
// is small (single-digit hundreds even on a busy tenant), so reading
// them all and dispatching SetStatus per-row is cheaper than two
// separate UPDATEs with bespoke WHERE clauses.
func (s *DelegationSweeper) Sweep(ctx context.Context) SweepResult {
res := SweepResult{}
rows, err := s.db.QueryContext(ctx, `
SELECT delegation_id, last_heartbeat, deadline
FROM delegations
WHERE status IN ('queued','dispatched','in_progress')
`)
if err != nil {
log.Printf("DelegationSweeper: query failed: %v", err)
res.Errors++
return res
}
defer rows.Close()
now := time.Now()
type candidate struct {
id string
lastBeat sql.NullTime
deadline time.Time
}
var todo []candidate
for rows.Next() {
var c candidate
if err := rows.Scan(&c.id, &c.lastBeat, &c.deadline); err != nil {
log.Printf("DelegationSweeper: scan failed: %v", err)
res.Errors++
continue
}
todo = append(todo, c)
}
if err := rows.Err(); err != nil {
log.Printf("DelegationSweeper: rows.Err: %v", err)
res.Errors++
}
for _, c := range todo {
// Deadline first — stronger statement than stuck.
if now.After(c.deadline) {
if err := s.ledger.SetStatus(ctx, c.id, "failed",
"deadline exceeded by sweeper", ""); err != nil {
log.Printf("DelegationSweeper: SetStatus(%s, failed): %v", c.id, err)
res.Errors++
continue
}
res.DeadlineFailures++
continue
}
// Heartbeat staleness. A NULL last_heartbeat counts as stale ONLY
// if the row has lived past one threshold since creation — gives
// the agent one full window to emit its first beat. We fold this
// by treating NULL as "created_at — but we don't have created_at
// in the SELECT. Approximate: NULL last_heartbeat + deadline more
// than (5h, default deadline=6h) away from now means the row was
// created ≤1h ago, give it a free pass. Simpler heuristic: NULL
// heartbeat is only stale if deadline is already imminent (within
// 1 threshold).
var lastBeat time.Time
if c.lastBeat.Valid {
lastBeat = c.lastBeat.Time
}
if !c.lastBeat.Valid {
// Row never heartbeat. Don't mark stuck — let the deadline
// catch it. Reduces false positives during the agent's first
// beat window after restart.
continue
}
if now.Sub(lastBeat) > s.threshold {
if err := s.ledger.SetStatus(ctx, c.id, "stuck",
"no heartbeat for "+now.Sub(lastBeat).Round(time.Second).String(),
""); err != nil {
log.Printf("DelegationSweeper: SetStatus(%s, stuck): %v", c.id, err)
res.Errors++
continue
}
res.StuckMarked++
}
}
if res.DeadlineFailures > 0 || res.StuckMarked > 0 || res.Errors > 0 {
log.Printf("DelegationSweeper: sweep complete — deadline_failures=%d stuck=%d errors=%d",
res.DeadlineFailures, res.StuckMarked, res.Errors)
}
return res
}

View File

@ -0,0 +1,314 @@
package handlers
import (
"context"
"database/sql"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
// delegation_sweeper_test.go — coverage for the RFC #2829 PR-3 stuck-task
// sweeper. Validates:
//
// 1. Deadline-exceeded rows are marked failed.
// 2. Heartbeat-stale rows (lastBeat older than threshold) are marked stuck.
// 3. NULL last_heartbeat is NOT marked stuck (free first-beat pass).
// 4. Healthy in-flight rows (recent heartbeat, future deadline) are
// left alone.
// 5. Empty in-flight set is a clean no-op.
// 6. Both rules apply in one sweep without double-marking.
// 7. Env-override interval/threshold parse correctly + fall back on
// invalid input.
func TestSweeper_HappyPath_NoInflightRowsIsCleanNoOp(t *testing.T) {
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}))
res := sw.Sweep(context.Background())
if res.DeadlineFailures != 0 || res.StuckMarked != 0 || res.Errors != 0 {
t.Errorf("empty in-flight set must produce zero changes; got %+v", res)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_DeadlineExceededIsMarkedFailed(t *testing.T) {
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
past := time.Now().Add(-1 * time.Minute)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-overdue", time.Now(), past))
// SetStatus reads current status...
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-overdue").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
// ...then updates to failed.
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", "").
WillReturnResult(sqlmock.NewResult(0, 1))
res := sw.Sweep(context.Background())
if res.DeadlineFailures != 1 {
t.Errorf("expected 1 deadline failure, got %d", res.DeadlineFailures)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_StaleHeartbeatIsMarkedStuck(t *testing.T) {
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
// Last heartbeat 30min ago — well past the 10min default threshold.
staleBeat := time.Now().Add(-30 * time.Minute)
future := time.Now().Add(2 * time.Hour) // deadline NOT exceeded
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-stuck", staleBeat, future))
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-stuck").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
// We can't predict the exact "no heartbeat for Xs" string because
// the suffix depends on now() at sweep time; just match against any.
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), "").
WillReturnResult(sqlmock.NewResult(0, 1))
res := sw.Sweep(context.Background())
if res.StuckMarked != 1 {
t.Errorf("expected 1 stuck mark, got %d", res.StuckMarked)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_NullHeartbeatIsLeftAlone(t *testing.T) {
// A delegation that was JUST inserted (queued, no heartbeat yet) must
// not be flipped to stuck on the first sweep — give it the chance to
// emit its first beat.
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
future := time.Now().Add(2 * time.Hour)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-fresh", sql.NullTime{}, future))
res := sw.Sweep(context.Background())
if res.StuckMarked != 0 {
t.Errorf("NULL heartbeat must not be stuck-marked; got %d", res.StuckMarked)
}
if res.DeadlineFailures != 0 {
t.Errorf("future deadline must not fail; got %d", res.DeadlineFailures)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_HealthyInflightRowsAreLeftAlone(t *testing.T) {
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
freshBeat := time.Now().Add(-1 * time.Minute) // well within 10min threshold
future := time.Now().Add(2 * time.Hour)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-healthy", freshBeat, future))
res := sw.Sweep(context.Background())
if res.DeadlineFailures != 0 || res.StuckMarked != 0 {
t.Errorf("healthy row must produce zero changes; got %+v", res)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_DeadlineFiresFirstNotStuck(t *testing.T) {
// Row that's BOTH past deadline AND stale-heartbeat must be marked
// failed (deadline) not stuck — deadline is the stronger statement.
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
staleBeat := time.Now().Add(-30 * time.Minute)
past := time.Now().Add(-5 * time.Minute)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-both", staleBeat, past))
// Only the failed transition fires; no stuck transition.
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-both").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-both", "failed", "deadline exceeded by sweeper", "").
WillReturnResult(sqlmock.NewResult(0, 1))
res := sw.Sweep(context.Background())
if res.DeadlineFailures != 1 || res.StuckMarked != 0 {
t.Errorf("expected 1 deadline failure, 0 stuck; got %+v", res)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet (stuck UPDATE may have fired by accident): %v", err)
}
}
func TestSweeper_MixedSetAppliesBothRules(t *testing.T) {
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
now := time.Now()
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-overdue", now, now.Add(-1*time.Minute)). // deadline → failed
AddRow("deleg-stuck", now.Add(-30*time.Minute), now.Add(2*time.Hour)). // stale → stuck
AddRow("deleg-healthy", now.Add(-30*time.Second), now.Add(2*time.Hour)), // healthy → no-op
)
// 1st: deadline → failed
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-overdue").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", "").
WillReturnResult(sqlmock.NewResult(0, 1))
// 2nd: stale → stuck
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-stuck").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
mock.ExpectExec(`UPDATE delegations`).
WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), "").
WillReturnResult(sqlmock.NewResult(0, 1))
// 3rd: healthy — no SQL fired
res := sw.Sweep(context.Background())
if res.DeadlineFailures != 1 || res.StuckMarked != 1 {
t.Errorf("expected 1 failure + 1 stuck, got %+v", res)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestSweeper_TerminalReplayFromConcurrentCompletionIsIgnored(t *testing.T) {
// Edge case: row was marked completed by UpdateStatus between the
// SELECT and the SetStatus call. SetStatus's forward-only protection
// returns ErrInvalidTransition; sweeper increments Errors but the
// row is correctly left in completed state.
mock := setupTestDB(t)
ledger := NewDelegationLedger(nil)
sw := NewDelegationSweeper(nil, ledger)
past := time.Now().Add(-1 * time.Minute)
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
AddRow("deleg-raced", time.Now(), past))
// SetStatus's status read finds the row already completed (concurrent UpdateStatus won).
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
WithArgs("deleg-raced").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed"))
// No UPDATE — terminal forward-only blocks it.
res := sw.Sweep(context.Background())
if res.Errors != 1 {
t.Errorf("forward-only block must surface as Error count; got %+v", res)
}
if res.DeadlineFailures != 0 {
t.Errorf("must NOT credit a deadline failure that didn't fire; got %d", res.DeadlineFailures)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ---------- env override parsing ----------
func TestEnvDuration_Default(t *testing.T) {
t.Setenv("MY_TEST_KEY", "")
if got := envDuration("MY_TEST_KEY", 7*time.Second); got != 7*time.Second {
t.Errorf("expected default 7s, got %v", got)
}
}
func TestEnvDuration_ParsesPositiveSeconds(t *testing.T) {
t.Setenv("MY_TEST_KEY", "42")
if got := envDuration("MY_TEST_KEY", 1*time.Second); got != 42*time.Second {
t.Errorf("expected 42s, got %v", got)
}
}
func TestEnvDuration_FallsBackOnInvalid(t *testing.T) {
t.Setenv("MY_TEST_KEY", "garbage")
if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second {
t.Errorf("invalid input must fall back to default; got %v", got)
}
}
func TestEnvDuration_FallsBackOnNegative(t *testing.T) {
t.Setenv("MY_TEST_KEY", "-10")
if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second {
t.Errorf("negative must fall back to default; got %v", got)
}
}
// TestSweeperConstructor_PicksUpEnvOverrides — interval + threshold env
// vars are read at construction time. Confirms the wiring contract; if
// somebody adds a new env var without plumbing it, this fails.
func TestSweeperConstructor_PicksUpEnvOverrides(t *testing.T) {
t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "60")
t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "120")
mock := setupTestDB(t)
_ = mock // unused — constructor doesn't fire SQL
sw := NewDelegationSweeper(nil, nil)
if sw.Interval() != 60*time.Second {
t.Errorf("interval override not picked up: got %v", sw.Interval())
}
if sw.Threshold() != 120*time.Second {
t.Errorf("threshold override not picked up: got %v", sw.Threshold())
}
}
func TestSweeperConstructor_DefaultsWhenEnvUnset(t *testing.T) {
t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "")
t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "")
mock := setupTestDB(t)
_ = mock
sw := NewDelegationSweeper(nil, nil)
if sw.Interval() != defaultSweeperInterval {
t.Errorf("default interval not used: got %v", sw.Interval())
}
if sw.Threshold() != defaultStuckThreshold {
t.Errorf("default threshold not used: got %v", sw.Threshold())
}
}