test(delegations): extend integration suite with sweeper coverage (3 tests)

Real-Postgres tests for the RFC #2829 PR-3 sweeper. Validates:

  - Deadline-exceeded rows are marked failed with the expected
    error_detail
  - Stale-heartbeat in-flight rows are marked stuck (uses
    DELEGATION_STUCK_THRESHOLD_S env override for deterministic
    timing)
  - Healthy rows (fresh heartbeat + future deadline) are not touched
    — no false-positive against well-behaved delegations

These extend the gate added in the previous commit so the workflow
catches sweeper regressions, not just ledger-write ones. All 7
integration tests now pass; CI workflow runs them all.
This commit is contained in:
Hongming Wang 2026-05-05 02:54:05 -07:00
parent 9e678ccd5e
commit 3d2a50e2a2

View File

@ -201,6 +201,121 @@ func TestIntegration_FailedTransitionCapturesErrorDetail(t *testing.T) {
}
}
// TestIntegration_Sweeper_DeadlineExceededIsMarkedFailed — real-Postgres
// gate for the RFC #2829 PR-3 stuck-task sweeper. Inserts a row with a
// past deadline, runs Sweep, asserts the row is now `failed` with
// `deadline exceeded by sweeper` in error_detail.
//
// sqlmock unit tests pinned the SQL fired but couldn't observe the
// real ON CONFLICT / index-scan behavior on the partial inflight
// index. Real Postgres catches:
// - deadline timestamp comparison is correct under tz boundaries
// - the partial index actually serves the WHERE clause
// - SetStatus's terminal forward-only protection holds across the
// sweep + concurrent-write race
func TestIntegration_Sweeper_DeadlineExceededIsMarkedFailed(t *testing.T) {
conn := integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
id := "integ-sweeper-deadline-1"
caller := "11111111-1111-1111-1111-111111111111"
callee := "22222222-2222-2222-2222-222222222222"
// Insert + transition to dispatched (otherwise queued→failed is
// allowed but doesn't exercise the in-flight scan accurately).
recordLedgerInsert(context.Background(), caller, callee, id, "task", "")
recordLedgerStatus(context.Background(), id, "dispatched", "", "")
// Force the deadline into the past — Insert defaults to now+6h, so
// we override.
if _, err := conn.ExecContext(context.Background(),
`UPDATE delegations SET deadline = now() - interval '1 minute', last_heartbeat = now() WHERE delegation_id = $1`, id,
); err != nil {
t.Fatalf("backdate deadline: %v", err)
}
res := NewDelegationSweeper(nil, nil).Sweep(context.Background())
if res.DeadlineFailures != 1 {
t.Errorf("expected 1 deadline failure, got %+v", res)
}
status, _, errDet := readLedgerRow(t, conn, id)
if status != "failed" {
t.Errorf("status: want failed, got %q", status)
}
if errDet != "deadline exceeded by sweeper" {
t.Errorf("error_detail: %q", errDet)
}
}
// TestIntegration_Sweeper_StaleHeartbeatIsMarkedStuck — heartbeat
// staleness path. Insert + dispatch + backdate last_heartbeat past the
// 10× threshold; Sweep should mark the row stuck.
func TestIntegration_Sweeper_StaleHeartbeatIsMarkedStuck(t *testing.T) {
conn := integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
// Tighten threshold so the test is deterministic + fast.
t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "10")
id := "integ-sweeper-stuck-1"
caller := "11111111-1111-1111-1111-111111111111"
callee := "22222222-2222-2222-2222-222222222222"
recordLedgerInsert(context.Background(), caller, callee, id, "task", "")
recordLedgerStatus(context.Background(), id, "dispatched", "", "")
recordLedgerStatus(context.Background(), id, "in_progress", "", "")
// Backdate last_heartbeat past the 10s threshold; deadline still in
// future so deadline check shouldn't fire.
if _, err := conn.ExecContext(context.Background(),
`UPDATE delegations SET last_heartbeat = now() - interval '60 seconds' WHERE delegation_id = $1`, id,
); err != nil {
t.Fatalf("backdate heartbeat: %v", err)
}
res := NewDelegationSweeper(nil, nil).Sweep(context.Background())
if res.StuckMarked != 1 {
t.Errorf("expected 1 stuck mark, got %+v", res)
}
status, _, errDet := readLedgerRow(t, conn, id)
if status != "stuck" {
t.Errorf("status: want stuck, got %q", status)
}
if errDet == "" {
t.Errorf("error_detail should mention 'no heartbeat for Xs'; got empty")
}
}
// TestIntegration_Sweeper_HealthyRowsNotTouched — sanity: rows with a
// fresh heartbeat AND a future deadline are left alone. Confirms the
// partial inflight index scan + per-row branching don't false-positive
// against well-behaved delegations.
func TestIntegration_Sweeper_HealthyRowsNotTouched(t *testing.T) {
conn := integrationDB(t)
t.Setenv("DELEGATION_LEDGER_WRITE", "1")
id := "integ-sweeper-healthy-1"
caller := "11111111-1111-1111-1111-111111111111"
callee := "22222222-2222-2222-2222-222222222222"
recordLedgerInsert(context.Background(), caller, callee, id, "task", "")
recordLedgerStatus(context.Background(), id, "dispatched", "", "")
// Fresh heartbeat = now()
if _, err := conn.ExecContext(context.Background(),
`UPDATE delegations SET last_heartbeat = now() WHERE delegation_id = $1`, id,
); err != nil {
t.Fatalf("set heartbeat: %v", err)
}
res := NewDelegationSweeper(nil, nil).Sweep(context.Background())
if res.DeadlineFailures != 0 || res.StuckMarked != 0 {
t.Errorf("healthy row touched; result: %+v", res)
}
status, _, _ := readLedgerRow(t, conn, id)
if status != "dispatched" {
t.Errorf("status changed unexpectedly: %q", status)
}
}
// TestIntegration_FullLifecycle_QueuedToDispatchedToCompleted — pins the
// happy-path lifecycle. INSERT lands the row at queued; SetStatus moves
// it through dispatched and into completed with preview. After the