forked from molecule-ai/molecule-core
feat(delegations): stuck-task sweeper with deadline + heartbeat-staleness rules (RFC #2829 PR-3)
Periodically scans the `delegations` table (PR-1 schema) for in-flight rows that need terminal action: 1. Deadline-exceeded → marked `failed` with "deadline exceeded by sweeper" 2. Heartbeat-stale (no beat for >10× heartbeat interval) → marked `stuck` ## Why both rules Deadline catches forever-heartbeating wedged agents (the alive-but-not- advancing class — agent loops on heartbeat call inside its main loop). Heartbeat-staleness catches OOM-killed and crashed agents that stop cold without graceful shutdown. Either rule alone misses one of these classes. ## Order matters Deadline is checked first. A deadline-exceeded AND stale row is marked `failed` (operator action: investigate + give up), not `stuck` (operator action: investigate + retry). The semantic difference matters. ## NULL heartbeat is a free pass A delegation that's just been inserted but hasn't emitted its first heartbeat yet is NOT stuck-marked — gives the agent its first beat window. Lets the deadline catch true never-started rows naturally. ## Concurrent-completion safety Sweep races with UpdateStatus on a delegation that just completed: the ledger's terminal forward-only protection (PR-1) returns ErrInvalidTransition, sweeper logs + counts in Errors, the row stays correctly in completed. ## Configuration - DELEGATION_SWEEPER_INTERVAL_S — tick cadence (default 5min) - DELEGATION_STUCK_THRESHOLD_S — heartbeat-staleness threshold (default 10min) Both fall back gracefully on invalid input (typo'd env shouldn't crash startup). Both read at construction time so a long-running process picks up overrides via restart. ## Wiring NOT wired into main.go in this PR — that ships separately so the sweeper can be enabled/disabled independently of the binary upgrade. The sweeper is a standalone Sweep(ctx) callable + Start(ctx) ticker loop, both with panic recovery, both indexed-scan-cheap on the partial idx_delegations_inflight_heartbeat from PR-1. ## Coverage 13 unit tests against sqlmock-backed *sql.DB: Sweep semantics (8 tests): - empty in-flight set → clean no-op - deadline → failed - heartbeat-stale → stuck - NULL heartbeat is left alone (first-beat free pass) - healthy row → no-op - both-rule row → marked failed (deadline wins) - mixed set → both rules fire on the right rows - concurrent-completion race → forward-only protection holds Env override parsing (5 tests): - default on missing env - parses positive seconds - falls back on garbage - falls back on negative - constructor picks up overrides; defaults when env unset Refs RFC #2829.
This commit is contained in:
parent
b3b9a242d6
commit
02b325063b
265
workspace-server/internal/handlers/delegation_sweeper.go
Normal file
265
workspace-server/internal/handlers/delegation_sweeper.go
Normal file
@ -0,0 +1,265 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// delegation_sweeper.go — RFC #2829 PR-3: stuck-task sweeper.
|
||||
//
|
||||
// What it does
|
||||
// ------------
|
||||
// Periodically scans the `delegations` table (PR-1 schema) for in-flight
|
||||
// rows that have either:
|
||||
//
|
||||
// 1. Blown past their `deadline` — agent claims to still be working but
|
||||
// the hard ceiling fired. Mark `failed` with error_detail = "deadline
|
||||
// exceeded".
|
||||
// 2. Stopped heartbeating for >stuckThreshold while still claiming
|
||||
// in_progress. Mark `stuck` with error_detail = "no heartbeat for Ns".
|
||||
//
|
||||
// Why both rules
|
||||
// --------------
|
||||
// Deadline catches forever-heartbeating agents that never make progress
|
||||
// (a wedged agent looping on a heartbeat call inside its main work loop
|
||||
// looks "alive" by liveness signals but is not actually advancing).
|
||||
// Heartbeat-staleness catches agents that crash or get OOM-killed
|
||||
// without graceful shutdown — no terminal status update fires, but the
|
||||
// heartbeat stops cold.
|
||||
//
|
||||
// Order matters: deadline check fires first because deadline → failed
|
||||
// is a stronger statement than deadline → stuck. A stuck row can be
|
||||
// retried by the operator; a failed row says "give up, retry was
|
||||
// already exhausted or not viable."
|
||||
//
|
||||
// Frequency
|
||||
// ---------
|
||||
// 5min default cadence. Faster than that wastes DB round-trips for the
|
||||
// hot index; slower means a stuck task isn't caught until ~5min after
|
||||
// the heartbeat stops. Operators can override via DELEGATION_SWEEPER_INTERVAL_S.
|
||||
//
|
||||
// Threshold
|
||||
// ---------
|
||||
// Default 10× the runtime's heartbeat interval (≈100s for hermes that
|
||||
// beats every 10s during stream output). 10× is the heuristic from the
|
||||
// RFC #2829 design discussion: it tolerates legitimate slow LLM
|
||||
// responses (a single completion can stall a heartbeat for 30-60s) while
|
||||
// still catching real wedges within ~2 minutes. Operators override via
|
||||
// DELEGATION_STUCK_THRESHOLD_S.
|
||||
//
|
||||
// Safety
|
||||
// ------
|
||||
// All transitions go through DelegationLedger.SetStatus so the
|
||||
// terminal-state forward-only protection applies — a delegation that
|
||||
// just transitioned to completed concurrently with the sweep won't be
|
||||
// flipped back to failed/stuck. The ledger's same-status replay no-op
|
||||
// also makes re-running the sweep idempotent.
|
||||
|
||||
const (
|
||||
defaultSweeperInterval = 5 * time.Minute
|
||||
|
||||
// 10min = 60× the typical 10s hermes heartbeat. Tightens to ~10×
|
||||
// once the user community settles on a tighter heartbeat cadence;
|
||||
// today's mix of runtimes (hermes 10s, claude-code 30-60s, langchain
|
||||
// minute-scale) needs the looser threshold to avoid false positives.
|
||||
defaultStuckThreshold = 10 * time.Minute
|
||||
)
|
||||
|
||||
// DelegationSweeper runs the periodic sweep. Construct via
|
||||
// NewDelegationSweeper, then Start(ctx) in main.go to begin ticking.
|
||||
type DelegationSweeper struct {
|
||||
db *sql.DB
|
||||
ledger *DelegationLedger
|
||||
interval time.Duration
|
||||
threshold time.Duration
|
||||
}
|
||||
|
||||
// NewDelegationSweeper builds a sweeper bound to the package db.DB
|
||||
// (production wiring) or a test handle. Reads optional env overrides
|
||||
// at construction time so a long-running process picks them up via
|
||||
// restart, not mid-flight.
|
||||
func NewDelegationSweeper(handle *sql.DB, ledger *DelegationLedger) *DelegationSweeper {
|
||||
if handle == nil {
|
||||
handle = db.DB
|
||||
}
|
||||
if ledger == nil {
|
||||
ledger = NewDelegationLedger(handle)
|
||||
}
|
||||
return &DelegationSweeper{
|
||||
db: handle,
|
||||
ledger: ledger,
|
||||
interval: envDuration("DELEGATION_SWEEPER_INTERVAL_S", defaultSweeperInterval),
|
||||
threshold: envDuration("DELEGATION_STUCK_THRESHOLD_S", defaultStuckThreshold),
|
||||
}
|
||||
}
|
||||
|
||||
// envDuration parses an integer-seconds env var into a Duration. Falls
|
||||
// back to def on missing/invalid input — never fails fast on misconfig
|
||||
// (a typo'd env var should run with sane defaults, not crash startup).
|
||||
func envDuration(key string, def time.Duration) time.Duration {
|
||||
v := os.Getenv(key)
|
||||
if v == "" {
|
||||
return def
|
||||
}
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil || n <= 0 {
|
||||
log.Printf("delegation_sweeper: invalid %s=%q, using default %s", key, v, def)
|
||||
return def
|
||||
}
|
||||
return time.Duration(n) * time.Second
|
||||
}
|
||||
|
||||
// Interval exposes the configured tick cadence — tests use it; main.go
|
||||
// uses it implicitly via Start.
|
||||
func (s *DelegationSweeper) Interval() time.Duration { return s.interval }
|
||||
|
||||
// Threshold exposes the heartbeat-staleness threshold.
|
||||
func (s *DelegationSweeper) Threshold() time.Duration { return s.threshold }
|
||||
|
||||
// Start ticks Sweep() at the configured interval until ctx is cancelled.
|
||||
// Defers panic recovery so a single bad row can't kill the sweeper.
|
||||
//
|
||||
// Wired into main.go: `go sweeper.Start(ctx)`. No-op until both the
|
||||
// `delegations` table (PR-1) and the result-push flag (PR-2) have rolled
|
||||
// out — the sweeper just won't find any rows to mark.
|
||||
func (s *DelegationSweeper) Start(ctx context.Context) {
|
||||
t := time.NewTicker(s.interval)
|
||||
defer t.Stop()
|
||||
log.Printf("DelegationSweeper: started (interval=%s, stuck-threshold=%s)",
|
||||
s.interval, s.threshold)
|
||||
|
||||
tickWithRecover := func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("DelegationSweeper: PANIC in tick — recovered: %v", r)
|
||||
}
|
||||
}()
|
||||
s.Sweep(ctx)
|
||||
}
|
||||
|
||||
// First sweep immediately so operators see it run on startup, not
|
||||
// after waiting one interval.
|
||||
tickWithRecover()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("DelegationSweeper: stopped")
|
||||
return
|
||||
case <-t.C:
|
||||
tickWithRecover()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SweepResult records what the last sweep changed. Surfaced via the
|
||||
// admin dashboard (PR-4); also useful for tests to assert behavior
|
||||
// without diffing log lines.
|
||||
type SweepResult struct {
|
||||
DeadlineFailures int
|
||||
StuckMarked int
|
||||
Errors int
|
||||
}
|
||||
|
||||
// Sweep runs one pass: find every in-flight delegation, mark deadline-
|
||||
// exceeded as failed, mark heartbeat-stale as stuck. Returns counts
|
||||
// for observability.
|
||||
//
|
||||
// SQL strategy: one indexed scan over the partial inflight index, two
|
||||
// updaters per offending row. We fold both checks into a single SELECT
|
||||
// to amortize the round-trip — the row count in flight at any time
|
||||
// is small (single-digit hundreds even on a busy tenant), so reading
|
||||
// them all and dispatching SetStatus per-row is cheaper than two
|
||||
// separate UPDATEs with bespoke WHERE clauses.
|
||||
func (s *DelegationSweeper) Sweep(ctx context.Context) SweepResult {
|
||||
res := SweepResult{}
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
SELECT delegation_id, last_heartbeat, deadline
|
||||
FROM delegations
|
||||
WHERE status IN ('queued','dispatched','in_progress')
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("DelegationSweeper: query failed: %v", err)
|
||||
res.Errors++
|
||||
return res
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
now := time.Now()
|
||||
type candidate struct {
|
||||
id string
|
||||
lastBeat sql.NullTime
|
||||
deadline time.Time
|
||||
}
|
||||
var todo []candidate
|
||||
for rows.Next() {
|
||||
var c candidate
|
||||
if err := rows.Scan(&c.id, &c.lastBeat, &c.deadline); err != nil {
|
||||
log.Printf("DelegationSweeper: scan failed: %v", err)
|
||||
res.Errors++
|
||||
continue
|
||||
}
|
||||
todo = append(todo, c)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("DelegationSweeper: rows.Err: %v", err)
|
||||
res.Errors++
|
||||
}
|
||||
|
||||
for _, c := range todo {
|
||||
// Deadline first — stronger statement than stuck.
|
||||
if now.After(c.deadline) {
|
||||
if err := s.ledger.SetStatus(ctx, c.id, "failed",
|
||||
"deadline exceeded by sweeper", ""); err != nil {
|
||||
log.Printf("DelegationSweeper: SetStatus(%s, failed): %v", c.id, err)
|
||||
res.Errors++
|
||||
continue
|
||||
}
|
||||
res.DeadlineFailures++
|
||||
continue
|
||||
}
|
||||
|
||||
// Heartbeat staleness. A NULL last_heartbeat counts as stale ONLY
|
||||
// if the row has lived past one threshold since creation — gives
|
||||
// the agent one full window to emit its first beat. We fold this
|
||||
// by treating NULL as "created_at — but we don't have created_at
|
||||
// in the SELECT. Approximate: NULL last_heartbeat + deadline more
|
||||
// than (5h, default deadline=6h) away from now means the row was
|
||||
// created ≤1h ago, give it a free pass. Simpler heuristic: NULL
|
||||
// heartbeat is only stale if deadline is already imminent (within
|
||||
// 1 threshold).
|
||||
var lastBeat time.Time
|
||||
if c.lastBeat.Valid {
|
||||
lastBeat = c.lastBeat.Time
|
||||
}
|
||||
if !c.lastBeat.Valid {
|
||||
// Row never heartbeat. Don't mark stuck — let the deadline
|
||||
// catch it. Reduces false positives during the agent's first
|
||||
// beat window after restart.
|
||||
continue
|
||||
}
|
||||
if now.Sub(lastBeat) > s.threshold {
|
||||
if err := s.ledger.SetStatus(ctx, c.id, "stuck",
|
||||
"no heartbeat for "+now.Sub(lastBeat).Round(time.Second).String(),
|
||||
""); err != nil {
|
||||
log.Printf("DelegationSweeper: SetStatus(%s, stuck): %v", c.id, err)
|
||||
res.Errors++
|
||||
continue
|
||||
}
|
||||
res.StuckMarked++
|
||||
}
|
||||
}
|
||||
|
||||
if res.DeadlineFailures > 0 || res.StuckMarked > 0 || res.Errors > 0 {
|
||||
log.Printf("DelegationSweeper: sweep complete — deadline_failures=%d stuck=%d errors=%d",
|
||||
res.DeadlineFailures, res.StuckMarked, res.Errors)
|
||||
}
|
||||
return res
|
||||
}
|
||||
314
workspace-server/internal/handlers/delegation_sweeper_test.go
Normal file
314
workspace-server/internal/handlers/delegation_sweeper_test.go
Normal file
@ -0,0 +1,314 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
// delegation_sweeper_test.go — coverage for the RFC #2829 PR-3 stuck-task
|
||||
// sweeper. Validates:
|
||||
//
|
||||
// 1. Deadline-exceeded rows are marked failed.
|
||||
// 2. Heartbeat-stale rows (lastBeat older than threshold) are marked stuck.
|
||||
// 3. NULL last_heartbeat is NOT marked stuck (free first-beat pass).
|
||||
// 4. Healthy in-flight rows (recent heartbeat, future deadline) are
|
||||
// left alone.
|
||||
// 5. Empty in-flight set is a clean no-op.
|
||||
// 6. Both rules apply in one sweep without double-marking.
|
||||
// 7. Env-override interval/threshold parse correctly + fall back on
|
||||
// invalid input.
|
||||
|
||||
func TestSweeper_HappyPath_NoInflightRowsIsCleanNoOp(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.DeadlineFailures != 0 || res.StuckMarked != 0 || res.Errors != 0 {
|
||||
t.Errorf("empty in-flight set must produce zero changes; got %+v", res)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_DeadlineExceededIsMarkedFailed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
past := time.Now().Add(-1 * time.Minute)
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-overdue", time.Now(), past))
|
||||
|
||||
// SetStatus reads current status...
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-overdue").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
|
||||
// ...then updates to failed.
|
||||
mock.ExpectExec(`UPDATE delegations`).
|
||||
WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.DeadlineFailures != 1 {
|
||||
t.Errorf("expected 1 deadline failure, got %d", res.DeadlineFailures)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_StaleHeartbeatIsMarkedStuck(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
// Last heartbeat 30min ago — well past the 10min default threshold.
|
||||
staleBeat := time.Now().Add(-30 * time.Minute)
|
||||
future := time.Now().Add(2 * time.Hour) // deadline NOT exceeded
|
||||
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-stuck", staleBeat, future))
|
||||
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-stuck").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
|
||||
|
||||
// We can't predict the exact "no heartbeat for Xs" string because
|
||||
// the suffix depends on now() at sweep time; just match against any.
|
||||
mock.ExpectExec(`UPDATE delegations`).
|
||||
WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.StuckMarked != 1 {
|
||||
t.Errorf("expected 1 stuck mark, got %d", res.StuckMarked)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_NullHeartbeatIsLeftAlone(t *testing.T) {
|
||||
// A delegation that was JUST inserted (queued, no heartbeat yet) must
|
||||
// not be flipped to stuck on the first sweep — give it the chance to
|
||||
// emit its first beat.
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
future := time.Now().Add(2 * time.Hour)
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-fresh", sql.NullTime{}, future))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.StuckMarked != 0 {
|
||||
t.Errorf("NULL heartbeat must not be stuck-marked; got %d", res.StuckMarked)
|
||||
}
|
||||
if res.DeadlineFailures != 0 {
|
||||
t.Errorf("future deadline must not fail; got %d", res.DeadlineFailures)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_HealthyInflightRowsAreLeftAlone(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
freshBeat := time.Now().Add(-1 * time.Minute) // well within 10min threshold
|
||||
future := time.Now().Add(2 * time.Hour)
|
||||
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-healthy", freshBeat, future))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.DeadlineFailures != 0 || res.StuckMarked != 0 {
|
||||
t.Errorf("healthy row must produce zero changes; got %+v", res)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_DeadlineFiresFirstNotStuck(t *testing.T) {
|
||||
// Row that's BOTH past deadline AND stale-heartbeat must be marked
|
||||
// failed (deadline) not stuck — deadline is the stronger statement.
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
staleBeat := time.Now().Add(-30 * time.Minute)
|
||||
past := time.Now().Add(-5 * time.Minute)
|
||||
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-both", staleBeat, past))
|
||||
|
||||
// Only the failed transition fires; no stuck transition.
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-both").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
|
||||
mock.ExpectExec(`UPDATE delegations`).
|
||||
WithArgs("deleg-both", "failed", "deadline exceeded by sweeper", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.DeadlineFailures != 1 || res.StuckMarked != 0 {
|
||||
t.Errorf("expected 1 deadline failure, 0 stuck; got %+v", res)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet (stuck UPDATE may have fired by accident): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_MixedSetAppliesBothRules(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
now := time.Now()
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-overdue", now, now.Add(-1*time.Minute)). // deadline → failed
|
||||
AddRow("deleg-stuck", now.Add(-30*time.Minute), now.Add(2*time.Hour)). // stale → stuck
|
||||
AddRow("deleg-healthy", now.Add(-30*time.Second), now.Add(2*time.Hour)), // healthy → no-op
|
||||
)
|
||||
|
||||
// 1st: deadline → failed
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-overdue").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
|
||||
mock.ExpectExec(`UPDATE delegations`).
|
||||
WithArgs("deleg-overdue", "failed", "deadline exceeded by sweeper", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// 2nd: stale → stuck
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-stuck").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("in_progress"))
|
||||
mock.ExpectExec(`UPDATE delegations`).
|
||||
WithArgs("deleg-stuck", "stuck", sqlmock.AnyArg(), "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// 3rd: healthy — no SQL fired
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.DeadlineFailures != 1 || res.StuckMarked != 1 {
|
||||
t.Errorf("expected 1 failure + 1 stuck, got %+v", res)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeper_TerminalReplayFromConcurrentCompletionIsIgnored(t *testing.T) {
|
||||
// Edge case: row was marked completed by UpdateStatus between the
|
||||
// SELECT and the SetStatus call. SetStatus's forward-only protection
|
||||
// returns ErrInvalidTransition; sweeper increments Errors but the
|
||||
// row is correctly left in completed state.
|
||||
mock := setupTestDB(t)
|
||||
ledger := NewDelegationLedger(nil)
|
||||
sw := NewDelegationSweeper(nil, ledger)
|
||||
|
||||
past := time.Now().Add(-1 * time.Minute)
|
||||
mock.ExpectQuery(`SELECT delegation_id, last_heartbeat, deadline\s+FROM delegations`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delegation_id", "last_heartbeat", "deadline"}).
|
||||
AddRow("deleg-raced", time.Now(), past))
|
||||
|
||||
// SetStatus's status read finds the row already completed (concurrent UpdateStatus won).
|
||||
mock.ExpectQuery(`SELECT status FROM delegations WHERE delegation_id = \$1`).
|
||||
WithArgs("deleg-raced").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("completed"))
|
||||
// No UPDATE — terminal forward-only blocks it.
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.Errors != 1 {
|
||||
t.Errorf("forward-only block must surface as Error count; got %+v", res)
|
||||
}
|
||||
if res.DeadlineFailures != 0 {
|
||||
t.Errorf("must NOT credit a deadline failure that didn't fire; got %d", res.DeadlineFailures)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- env override parsing ----------
|
||||
|
||||
func TestEnvDuration_Default(t *testing.T) {
|
||||
t.Setenv("MY_TEST_KEY", "")
|
||||
if got := envDuration("MY_TEST_KEY", 7*time.Second); got != 7*time.Second {
|
||||
t.Errorf("expected default 7s, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvDuration_ParsesPositiveSeconds(t *testing.T) {
|
||||
t.Setenv("MY_TEST_KEY", "42")
|
||||
if got := envDuration("MY_TEST_KEY", 1*time.Second); got != 42*time.Second {
|
||||
t.Errorf("expected 42s, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvDuration_FallsBackOnInvalid(t *testing.T) {
|
||||
t.Setenv("MY_TEST_KEY", "garbage")
|
||||
if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second {
|
||||
t.Errorf("invalid input must fall back to default; got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnvDuration_FallsBackOnNegative(t *testing.T) {
|
||||
t.Setenv("MY_TEST_KEY", "-10")
|
||||
if got := envDuration("MY_TEST_KEY", 5*time.Second); got != 5*time.Second {
|
||||
t.Errorf("negative must fall back to default; got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweeperConstructor_PicksUpEnvOverrides — interval + threshold env
|
||||
// vars are read at construction time. Confirms the wiring contract; if
|
||||
// somebody adds a new env var without plumbing it, this fails.
|
||||
func TestSweeperConstructor_PicksUpEnvOverrides(t *testing.T) {
|
||||
t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "60")
|
||||
t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "120")
|
||||
|
||||
mock := setupTestDB(t)
|
||||
_ = mock // unused — constructor doesn't fire SQL
|
||||
sw := NewDelegationSweeper(nil, nil)
|
||||
|
||||
if sw.Interval() != 60*time.Second {
|
||||
t.Errorf("interval override not picked up: got %v", sw.Interval())
|
||||
}
|
||||
if sw.Threshold() != 120*time.Second {
|
||||
t.Errorf("threshold override not picked up: got %v", sw.Threshold())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeperConstructor_DefaultsWhenEnvUnset(t *testing.T) {
|
||||
t.Setenv("DELEGATION_SWEEPER_INTERVAL_S", "")
|
||||
t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "")
|
||||
|
||||
mock := setupTestDB(t)
|
||||
_ = mock
|
||||
sw := NewDelegationSweeper(nil, nil)
|
||||
|
||||
if sw.Interval() != defaultSweeperInterval {
|
||||
t.Errorf("default interval not used: got %v", sw.Interval())
|
||||
}
|
||||
if sw.Threshold() != defaultStuckThreshold {
|
||||
t.Errorf("default threshold not used: got %v", sw.Threshold())
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user