diff --git a/workspace-server/internal/handlers/delegation_ledger_integration_test.go b/workspace-server/internal/handlers/delegation_ledger_integration_test.go index 44b510a0..c493bcb8 100644 --- a/workspace-server/internal/handlers/delegation_ledger_integration_test.go +++ b/workspace-server/internal/handlers/delegation_ledger_integration_test.go @@ -201,6 +201,121 @@ func TestIntegration_FailedTransitionCapturesErrorDetail(t *testing.T) { } } +// TestIntegration_Sweeper_DeadlineExceededIsMarkedFailed — real-Postgres +// gate for the RFC #2829 PR-3 stuck-task sweeper. Inserts a row with a +// past deadline, runs Sweep, asserts the row is now `failed` with +// `deadline exceeded by sweeper` in error_detail. +// +// sqlmock unit tests pinned the SQL fired but couldn't observe the +// real ON CONFLICT / index-scan behavior on the partial inflight +// index. Real Postgres catches: +// - deadline timestamp comparison is correct under tz boundaries +// - the partial index actually serves the WHERE clause +// - SetStatus's terminal forward-only protection holds across the +// sweep + concurrent-write race +func TestIntegration_Sweeper_DeadlineExceededIsMarkedFailed(t *testing.T) { + conn := integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + id := "integ-sweeper-deadline-1" + caller := "11111111-1111-1111-1111-111111111111" + callee := "22222222-2222-2222-2222-222222222222" + + // Insert + transition to dispatched (otherwise queued→failed is + // allowed but doesn't exercise the in-flight scan accurately). + recordLedgerInsert(context.Background(), caller, callee, id, "task", "") + recordLedgerStatus(context.Background(), id, "dispatched", "", "") + + // Force the deadline into the past — Insert defaults to now+6h, so + // we override. + if _, err := conn.ExecContext(context.Background(), + `UPDATE delegations SET deadline = now() - interval '1 minute', last_heartbeat = now() WHERE delegation_id = $1`, id, + ); err != nil { + t.Fatalf("backdate deadline: %v", err) + } + + res := NewDelegationSweeper(nil, nil).Sweep(context.Background()) + if res.DeadlineFailures != 1 { + t.Errorf("expected 1 deadline failure, got %+v", res) + } + status, _, errDet := readLedgerRow(t, conn, id) + if status != "failed" { + t.Errorf("status: want failed, got %q", status) + } + if errDet != "deadline exceeded by sweeper" { + t.Errorf("error_detail: %q", errDet) + } +} + +// TestIntegration_Sweeper_StaleHeartbeatIsMarkedStuck — heartbeat +// staleness path. Insert + dispatch + backdate last_heartbeat past the +// 10× threshold; Sweep should mark the row stuck. +func TestIntegration_Sweeper_StaleHeartbeatIsMarkedStuck(t *testing.T) { + conn := integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + // Tighten threshold so the test is deterministic + fast. + t.Setenv("DELEGATION_STUCK_THRESHOLD_S", "10") + + id := "integ-sweeper-stuck-1" + caller := "11111111-1111-1111-1111-111111111111" + callee := "22222222-2222-2222-2222-222222222222" + + recordLedgerInsert(context.Background(), caller, callee, id, "task", "") + recordLedgerStatus(context.Background(), id, "dispatched", "", "") + recordLedgerStatus(context.Background(), id, "in_progress", "", "") + + // Backdate last_heartbeat past the 10s threshold; deadline still in + // future so deadline check shouldn't fire. + if _, err := conn.ExecContext(context.Background(), + `UPDATE delegations SET last_heartbeat = now() - interval '60 seconds' WHERE delegation_id = $1`, id, + ); err != nil { + t.Fatalf("backdate heartbeat: %v", err) + } + + res := NewDelegationSweeper(nil, nil).Sweep(context.Background()) + if res.StuckMarked != 1 { + t.Errorf("expected 1 stuck mark, got %+v", res) + } + status, _, errDet := readLedgerRow(t, conn, id) + if status != "stuck" { + t.Errorf("status: want stuck, got %q", status) + } + if errDet == "" { + t.Errorf("error_detail should mention 'no heartbeat for Xs'; got empty") + } +} + +// TestIntegration_Sweeper_HealthyRowsNotTouched — sanity: rows with a +// fresh heartbeat AND a future deadline are left alone. Confirms the +// partial inflight index scan + per-row branching don't false-positive +// against well-behaved delegations. +func TestIntegration_Sweeper_HealthyRowsNotTouched(t *testing.T) { + conn := integrationDB(t) + t.Setenv("DELEGATION_LEDGER_WRITE", "1") + + id := "integ-sweeper-healthy-1" + caller := "11111111-1111-1111-1111-111111111111" + callee := "22222222-2222-2222-2222-222222222222" + + recordLedgerInsert(context.Background(), caller, callee, id, "task", "") + recordLedgerStatus(context.Background(), id, "dispatched", "", "") + // Fresh heartbeat = now() + if _, err := conn.ExecContext(context.Background(), + `UPDATE delegations SET last_heartbeat = now() WHERE delegation_id = $1`, id, + ); err != nil { + t.Fatalf("set heartbeat: %v", err) + } + + res := NewDelegationSweeper(nil, nil).Sweep(context.Background()) + if res.DeadlineFailures != 0 || res.StuckMarked != 0 { + t.Errorf("healthy row touched; result: %+v", res) + } + status, _, _ := readLedgerRow(t, conn, id) + if status != "dispatched" { + t.Errorf("status changed unexpectedly: %q", status) + } +} + // TestIntegration_FullLifecycle_QueuedToDispatchedToCompleted — pins the // happy-path lifecycle. INSERT lands the row at queued; SetStatus moves // it through dispatched and into completed with preview. After the