feat(requests): P4 — idle-agent inbox nudge sweeper (RFC) #2526

Merged
agent-reviewer merged 1 commits from feat/unified-requests-inbox-p4-nudge into main 2026-06-10 13:43:00 +00:00
5 changed files with 737 additions and 0 deletions
+12
View File
@@ -469,6 +469,18 @@ func main() {
delegSweeper := handlers.NewDelegationSweeper(nil, nil)
go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start)
// RFC unified-requests-inbox P4: idle-agent inbox-nudge sweeper. Pokes
// an IDLE online agent that has unhandled `requests` inbox items (stale
// >10min) with one A2A nudge so it re-checks its inbox, rate-limited to
// <=1 nudge per request per hour via requests.last_nudged_at. No-op until
// the P1 `requests` table (#2525) + the last_nudged_at column have rolled
// out. Disable via REQUEST_NUDGE_SWEEPER_DISABLED=true; tune cadence via
// REQUEST_NUDGE_SWEEPER_INTERVAL_S.
if !strings.EqualFold(os.Getenv("REQUEST_NUDGE_SWEEPER_DISABLED"), "true") {
nudgeSweeper := handlers.NewRequestNudgeSweeper(nil)
go supervised.RunWithRecover(ctx, "request-nudge-sweeper", nudgeSweeper.Start)
}
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
channelMgr := channels.NewManager(wh, broadcaster)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
@@ -0,0 +1,413 @@
package handlers
import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/google/uuid"
)
// request_nudge_sweeper.go — RFC unified-requests-inbox Phase 4: idle-agent
// inbox nudge sweeper.
//
// What it does
// ------------
// Periodically scans the unified `requests` table (P1 schema) for inbox items
// addressed to an AGENT recipient that have been sitting unhandled for a while,
// and whose recipient agent is currently IDLE and online. For each such agent
// it enqueues ONE short A2A "nudge" message telling the agent to process its
// inbox via list_inbox / respond_request, then stamps last_nudged_at on the
// items the nudge covered so the same items aren't re-nudged for an hour.
//
// Why
// ---
// An agent that's busy will get to its inbox when it frees up; an agent that's
// idle has nothing prompting it to re-check the inbox, so a request can sit
// pending indefinitely. This worker closes that gap WITHOUT spamming: it only
// fires for items that are already stale (created >10min ago), only for idle
// recipients, and at most once per request per hour.
//
// Out of scope: USER recipients. Stale user-addressed items are surfaced by the
// canvas Tasks/Approvals UI already — this worker never enqueues anything for a
// user recipient.
//
// Delivery mechanism
// ------------------
// The nudge is delivered via the existing a2a-queue path (EnqueueA2A) — the
// SAME mechanism the scheduler uses to deliver a cron tick to an agent. The
// message is a normal `message/send` A2A body; the agent drains it from the
// queue on its next heartbeat (registry.Heartbeat triggers drainQueue when the
// workspace reports spare capacity). We do NOT write raw INSERTs into a2a_queue.
//
// On main vs. P1
// --------------
// This file compiles + tests on main, where the `requests` table does not yet
// exist. It queries `requests` via RAW SQL (never imports P1's RequestStore),
// so the build is independent of P1's Go symbols. At runtime the sweep query
// simply finds no rows until the P1 migration + this PR's migration have both
// rolled out. This PR MUST merge AFTER P1 (#2525).
//
// Frequency
// ---------
// 5min default cadence (REQUEST_NUDGE_SWEEPER_INTERVAL_S to override), matching
// delegation_sweeper. Disable entirely via REQUEST_NUDGE_SWEEPER_INTERVAL_S=0?
// No — envDuration treats <=0 as "use default". To disable, set
// REQUEST_NUDGE_SWEEPER_DISABLED=true (checked in main.go wiring).
const (
defaultRequestNudgeInterval = 5 * time.Minute
// staleAfter — an item must have been pending at least this long before
// it's eligible for a nudge. Gives a freshly-created request a grace
// window in which a still-active or just-freed agent picks it up on its
// own, so we don't nudge items that are about to be handled anyway.
requestNudgeStaleAfter = 10 * time.Minute
// reNudgeAfter — rate-limit: a given request is nudged at most once per
// this window. Belt-and-suspenders with the queue idempotency key below.
requestNudgeReNudgeAfter = time.Hour
// nudgeBatchLimit — bound the work per sweep. Caps both DB scan cost and
// the number of agents we poke in one tick; the next tick picks up the
// remainder. Generous enough that a normal backlog clears in one pass.
requestNudgeBatchLimit = 200
)
// enqueueFunc is the a2a-queue enqueue signature (package-level EnqueueA2A).
// Injected as a field so tests can assert the nudge enqueue without mocking
// EnqueueA2A's internal SQL (depth count, supersede). Production wiring uses
// the real EnqueueA2A.
type enqueueFunc func(
ctx context.Context,
workspaceID, callerID string,
priority int,
body []byte,
method, idempotencyKey string,
expiresAt *time.Time,
) (id string, depth int, err error)
// RequestNudgeSweeper runs the periodic inbox-nudge sweep. Construct via
// NewRequestNudgeSweeper, then Start(ctx) in main.go to begin ticking.
type RequestNudgeSweeper struct {
db *sql.DB
interval time.Duration
staleAfter time.Duration
reNudgeWait time.Duration
limit int
enqueue enqueueFunc
}
// NewRequestNudgeSweeper builds a sweeper bound to the package db.DB
// (production wiring) or a test handle. Reads optional env overrides at
// construction time so a long-running process picks them up via restart,
// not mid-flight (mirrors NewDelegationSweeper).
func NewRequestNudgeSweeper(handle *sql.DB) *RequestNudgeSweeper {
if handle == nil {
handle = db.DB
}
return &RequestNudgeSweeper{
db: handle,
interval: envDuration("REQUEST_NUDGE_SWEEPER_INTERVAL_S", defaultRequestNudgeInterval),
staleAfter: requestNudgeStaleAfter,
reNudgeWait: requestNudgeReNudgeAfter,
limit: requestNudgeBatchLimit,
enqueue: EnqueueA2A,
}
}
// Interval exposes the configured tick cadence — tests use it; main.go uses
// it implicitly via Start.
func (s *RequestNudgeSweeper) Interval() time.Duration { return s.interval }
// Start ticks Sweep() at the configured interval until ctx is cancelled.
// Defers panic recovery so a single bad row can't kill the sweeper. Mirrors
// DelegationSweeper.Start: first sweep fires immediately on startup.
//
// No-op until both the `requests` table (P1) and this PR's last_nudged_at
// column have rolled out — the sweep query just finds no rows.
func (s *RequestNudgeSweeper) Start(ctx context.Context) {
t := time.NewTicker(s.interval)
defer t.Stop()
log.Printf("RequestNudgeSweeper: started (interval=%s, stale-after=%s, re-nudge-after=%s)",
s.interval, s.staleAfter, s.reNudgeWait)
tickWithRecover := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("RequestNudgeSweeper: PANIC in tick — recovered: %v", r)
}
}()
s.Sweep(ctx)
}
tickWithRecover()
for {
select {
case <-ctx.Done():
log.Printf("RequestNudgeSweeper: stopped")
return
case <-t.C:
tickWithRecover()
}
}
}
// NudgeResult records what the last sweep did. Returned for observability and
// so tests assert behavior without diffing log lines.
type NudgeResult struct {
AgentsNudged int // distinct idle agents poked this sweep
ItemsCovered int // request rows whose last_nudged_at we stamped
Errors int
}
// Sweep runs one pass:
//
// 1. SELECT stale agent-recipient items whose recipient workspace is online
// and idle (active_tasks=0), grouped by recipient. Each group = one agent
// with N stale inbox items.
// 2. For each idle agent: enqueue ONE A2A nudge, then stamp last_nudged_at on
// exactly the items that group covered.
//
// SQL strategy: the SELECT joins requests → workspaces and aggregates the
// covered request ids per recipient with array_agg, so one scan yields the
// per-agent work list. The idle gate (status='online' AND
// COALESCE(active_tasks,0)=0) lives in the JOIN's WHERE so an offline/busy
// agent is never returned — we never even build a nudge for it.
func (s *RequestNudgeSweeper) Sweep(ctx context.Context) NudgeResult {
res := NudgeResult{}
// Group stale agent-recipient items by recipient. recipient_id::uuid casts
// the TEXT recipient_id to the workspaces UUID PK for the join (requests
// stores ids as TEXT — see P1 migration). Items that don't cast (a
// malformed id) would error the cast; recipient_type='agent' rows are
// always workspace UUIDs by construction, so this is safe in practice and
// any bad row surfaces loudly as a query error rather than a silent skip.
const sweepQuery = `
SELECT r.recipient_id,
array_agg(r.id::text) AS ids
FROM requests r
JOIN workspaces w ON w.id = r.recipient_id::uuid
WHERE r.recipient_type = 'agent'
AND r.status IN ('pending', 'info_requested')
AND r.created_at < now() - ($1 * INTERVAL '1 second')
AND (r.last_nudged_at IS NULL
OR r.last_nudged_at < now() - ($2 * INTERVAL '1 second'))
AND w.status = 'online'
AND COALESCE(w.active_tasks, 0) = 0
GROUP BY r.recipient_id
LIMIT $3
`
rows, err := s.db.QueryContext(ctx, sweepQuery,
int(s.staleAfter.Seconds()), int(s.reNudgeWait.Seconds()), s.limit)
if err != nil {
log.Printf("RequestNudgeSweeper: sweep query failed: %v", err)
res.Errors++
return res
}
defer rows.Close()
type group struct {
recipientID string
ids []string
}
var todo []group
for rows.Next() {
var g group
// array_agg(text) comes back as a Postgres text[]; pq scans it into a
// pq.StringArray. We avoid the pq dependency by scanning into a
// driver-native []string via a small adapter type.
var ids stringArray
if err := rows.Scan(&g.recipientID, &ids); err != nil {
log.Printf("RequestNudgeSweeper: scan failed: %v", err)
res.Errors++
continue
}
g.ids = ids
if len(g.ids) == 0 {
continue
}
todo = append(todo, g)
}
if err := rows.Err(); err != nil {
log.Printf("RequestNudgeSweeper: rows.Err: %v", err)
res.Errors++
}
now := time.Now()
for _, g := range todo {
if err := s.nudgeAgent(ctx, g.recipientID, g.ids, now); err != nil {
log.Printf("RequestNudgeSweeper: nudge agent %s (%d items) failed: %v",
g.recipientID, len(g.ids), err)
res.Errors++
continue
}
res.AgentsNudged++
res.ItemsCovered += len(g.ids)
}
if res.AgentsNudged > 0 || res.Errors > 0 {
log.Printf("RequestNudgeSweeper: sweep complete — agents_nudged=%d items_covered=%d errors=%d",
res.AgentsNudged, res.ItemsCovered, res.Errors)
}
return res
}
// nudgeAgent enqueues one A2A nudge for the idle recipient agent covering the
// given stale request ids, then stamps last_nudged_at on those ids. The
// last_nudged_at UPDATE only fires after a successful enqueue so a failed
// enqueue is retried next sweep (the items stay un-stamped, hence eligible).
func (s *RequestNudgeSweeper) nudgeAgent(ctx context.Context, recipientID string, ids []string, now time.Time) error {
body, err := buildNudgeBody(len(ids))
if err != nil {
return fmt.Errorf("build nudge body: %w", err)
}
// Idempotency key bucketed to the current hour so two concurrent sweeper
// boots collapse to one queued nudge per agent per hour at the queue layer
// too — defense in depth on top of the last_nudged_at rate-limit. Empty
// callerID = canvas/system-style enqueue (source_id NULL), matching the
// scheduler's internal fire path.
idemKey := fmt.Sprintf("inbox-nudge:%s:%d", recipientID, now.Truncate(time.Hour).Unix())
if _, _, err := s.enqueue(ctx, recipientID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil {
return fmt.Errorf("enqueue nudge: %w", err)
}
// Stamp last_nudged_at on exactly the items this nudge covered. ANY($1)
// over the text[] of ids; cast id to text for the comparison so the
// param type is unambiguous. Re-querying eligibility here would race with
// the enqueue, so we trust the ids the sweep already gated.
if _, err := s.db.ExecContext(ctx, `
UPDATE requests
SET last_nudged_at = now()
WHERE id::text = ANY($1)
`, stringArray(ids)); err != nil {
return fmt.Errorf("stamp last_nudged_at: %w", err)
}
return nil
}
// buildNudgeBody constructs the A2A `message/send` JSON-RPC body for the nudge.
// Mirrors the scheduler's body shape (role=user, generated messageId, single
// text part) so the receiving agent processes it like any other inbound turn.
func buildNudgeBody(n int) ([]byte, error) {
plural := "request"
if n != 1 {
plural = "requests"
}
text := fmt.Sprintf(
"You have %d unhandled inbox %s awaiting your response. "+
"Use list_inbox to see them and respond_request / add_request_message to act.",
n, plural,
)
return json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"messageId": "inbox-nudge-" + uuid.New().String(),
"parts": []map[string]interface{}{{"kind": "text", "text": text}},
},
},
})
}
// stringArray scans a Postgres text[] into a []string and serializes a
// []string into a Postgres array literal for parameter binding — a minimal
// inline adapter so this file doesn't pull a new driver dependency just for
// array_agg / ANY(text[]). Mirrors lib/pq's StringArray semantics for the
// shapes we use (no NULL elements, no embedded special chars in UUID/text-id
// values).
type stringArray []string
// Scan implements sql.Scanner for a Postgres text[] value (delivered as
// []byte or string like `{a,b,c}`).
func (a *stringArray) Scan(src interface{}) error {
if src == nil {
*a = nil
return nil
}
var s string
switch v := src.(type) {
case []byte:
s = string(v)
case string:
s = v
default:
return fmt.Errorf("stringArray.Scan: unsupported source type %T", src)
}
*a = parsePGTextArray(s)
return nil
}
// Value implements driver.Valuer, emitting a Postgres array literal `{a,b,c}`.
// Used as the ANY($1) param in the last_nudged_at UPDATE. Each element is
// double-quoted and backslash/quote-escaped so ids containing array-special
// characters bind correctly.
func (a stringArray) Value() (driver.Value, error) {
if a == nil {
return nil, nil
}
out := make([]byte, 0, len(a)*40+2)
out = append(out, '{')
for i, e := range a {
if i > 0 {
out = append(out, ',')
}
out = append(out, '"')
for _, c := range []byte(e) {
if c == '"' || c == '\\' {
out = append(out, '\\')
}
out = append(out, c)
}
out = append(out, '"')
}
out = append(out, '}')
return string(out), nil
}
// parsePGTextArray parses a Postgres array literal `{a,"b c",d}` into a
// []string. Handles the unquoted and double-quoted element forms emitted by
// array_agg(text); element values here are UUIDs so the common path is the
// simple comma split, but quoted handling is included for correctness.
func parsePGTextArray(s string) []string {
if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' {
return nil
}
body := s[1 : len(s)-1]
if body == "" {
return nil
}
var out []string
var cur []byte
inQuotes := false
escaped := false
for i := 0; i < len(body); i++ {
c := body[i]
switch {
case escaped:
cur = append(cur, c)
escaped = false
case c == '\\':
escaped = true
case c == '"':
inQuotes = !inQuotes
case c == ',' && !inQuotes:
out = append(out, string(cur))
cur = cur[:0]
default:
cur = append(cur, c)
}
}
out = append(out, string(cur))
return out
}
@@ -0,0 +1,271 @@
package handlers
import (
"context"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
// request_nudge_sweeper_test.go — coverage for the RFC unified-requests-inbox
// Phase 4 idle-agent inbox-nudge sweeper. Validates:
//
// 1. A stale pending item for an IDLE online agent gets nudged + its
// last_nudged_at stamped.
// 2. A busy (active_tasks>0) or offline agent is NOT returned by the sweep
// query (gated in SQL) → no nudge, no stamp.
// 3. A recently-nudged item (<1h) is excluded by the query → no nudge.
// 4. A user-recipient item is never selected → no nudge.
// 5. Empty result set is a clean no-op.
// 6. Env-override interval parses + falls back; defaults when unset.
//
// The idle/busy/offline/user-recipient/recently-nudged gates all live in the
// single sweep SELECT (status='online', active_tasks=0, recipient_type='agent',
// last_nudged_at filter). So at the sqlmock level those cases are expressed as
// "the query returns the row" vs "the query returns no row" — the test pins
// that a returned row drives exactly one enqueue + one stamp, and that an
// empty result drives neither. The SQL predicates themselves are asserted by
// the integration/real-DB harness, not sqlmock (which can't evaluate WHERE).
// newTestNudgeSweeper builds a sweeper on the sqlmock db.DB with a fake
// enqueue that records calls, so tests assert the nudge without mocking
// EnqueueA2A's internal SQL.
type recordedEnqueue struct {
workspaceID string
idemKey string
method string
body []byte
calls int
}
func newTestNudgeSweeper(t *testing.T) (*RequestNudgeSweeper, *recordedEnqueue) {
t.Helper()
sw := NewRequestNudgeSweeper(nil) // binds to the sqlmock db.DB set by setupTestDB
rec := &recordedEnqueue{}
sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int,
body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
rec.calls++
rec.workspaceID = workspaceID
rec.idemKey = idempotencyKey
rec.method = method
rec.body = body
return "queue-id-1", 1, nil
}
return sw, rec
}
func TestNudgeSweeper_EmptyResultIsCleanNoOp(t *testing.T) {
mock := setupTestDB(t)
sw, rec := newTestNudgeSweeper(t)
mock.ExpectQuery(`SELECT r.recipient_id`).
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}))
res := sw.Sweep(context.Background())
if res.AgentsNudged != 0 || res.ItemsCovered != 0 || res.Errors != 0 {
t.Errorf("empty set must produce zero changes; got %+v", res)
}
if rec.calls != 0 {
t.Errorf("no enqueue expected on empty set; got %d", rec.calls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestNudgeSweeper_StaleIdleAgentIsNudgedAndStamped(t *testing.T) {
mock := setupTestDB(t)
sw, rec := newTestNudgeSweeper(t)
const ws = "11111111-1111-1111-1111-111111111111"
// One idle online agent with two stale pending items.
mock.ExpectQuery(`SELECT r.recipient_id`).
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
AddRow(ws, "{req-a,req-b}"))
// After the (faked) enqueue, the two covered items get last_nudged_at set.
mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`).
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 2))
res := sw.Sweep(context.Background())
if res.AgentsNudged != 1 {
t.Errorf("expected 1 agent nudged, got %d", res.AgentsNudged)
}
if res.ItemsCovered != 2 {
t.Errorf("expected 2 items covered, got %d", res.ItemsCovered)
}
if res.Errors != 0 {
t.Errorf("expected 0 errors, got %d", res.Errors)
}
if rec.calls != 1 {
t.Fatalf("expected exactly 1 enqueue, got %d", rec.calls)
}
if rec.workspaceID != ws {
t.Errorf("nudge enqueued to wrong workspace: got %q want %q", rec.workspaceID, ws)
}
if rec.method != "message/send" {
t.Errorf("nudge method: got %q want message/send", rec.method)
}
if rec.idemKey == "" {
t.Errorf("expected a non-empty hourly idempotency key")
}
// Body should mention the count (2 → plural "requests") and the tool names.
bs := string(rec.body)
if !strings.Contains(bs, "2 unhandled inbox requests") {
t.Errorf("nudge body missing pluralized count: %s", bs)
}
if !strings.Contains(bs, "list_inbox") || !strings.Contains(bs, "respond_request") {
t.Errorf("nudge body missing tool guidance: %s", bs)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestNudgeSweeper_SingularBodyForOneItem pins the "1 request" (singular) copy.
func TestNudgeSweeper_SingularBodyForOneItem(t *testing.T) {
mock := setupTestDB(t)
sw, rec := newTestNudgeSweeper(t)
const ws = "22222222-2222-2222-2222-222222222222"
mock.ExpectQuery(`SELECT r.recipient_id`).
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
AddRow(ws, "{only-one}"))
mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`).
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
res := sw.Sweep(context.Background())
if res.ItemsCovered != 1 {
t.Fatalf("expected 1 item covered, got %d", res.ItemsCovered)
}
if bs := string(rec.body); !strings.Contains(bs, "1 unhandled inbox request ") {
t.Errorf("expected singular copy '1 unhandled inbox request'; got %s", bs)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL — the sweep
// query excludes busy agents (active_tasks>0), offline agents (status!=online),
// user-recipient items (recipient_type='agent' filter), and recently-nudged
// items (last_nudged_at filter). Whatever the reason a candidate is ineligible,
// it does not appear in the result set → no enqueue, no stamp. This pins the
// "no row ⇒ no side effects" contract that those gates rely on.
func TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL(t *testing.T) {
mock := setupTestDB(t)
sw, rec := newTestNudgeSweeper(t)
// The real WHERE clause filtered all of {busy, offline, user-recipient,
// recently-nudged} out, so the query yields zero rows.
mock.ExpectQuery(`SELECT r.recipient_id`).
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}))
res := sw.Sweep(context.Background())
if res.AgentsNudged != 0 {
t.Errorf("ineligible candidates must not be nudged; got %d", res.AgentsNudged)
}
if rec.calls != 0 {
t.Errorf("no enqueue expected for ineligible candidates; got %d", rec.calls)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet (an UPDATE/stamp fired for an ineligible candidate?): %v", err)
}
}
// TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped — if the enqueue fails,
// the last_nudged_at UPDATE must NOT fire, so the items stay eligible and the
// next sweep retries. Pins that the stamp is gated on a successful enqueue.
func TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped(t *testing.T) {
mock := setupTestDB(t)
sw, _ := newTestNudgeSweeper(t)
const ws = "33333333-3333-3333-3333-333333333333"
sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int,
body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
return "", 0, context.DeadlineExceeded
}
mock.ExpectQuery(`SELECT r.recipient_id`).
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
AddRow(ws, "{req-x}"))
// No ExpectExec for the UPDATE — it must not fire after a failed enqueue.
res := sw.Sweep(context.Background())
if res.Errors != 1 {
t.Errorf("expected 1 error from failed enqueue, got %d", res.Errors)
}
if res.AgentsNudged != 0 {
t.Errorf("failed enqueue must not count as nudged; got %d", res.AgentsNudged)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet (did the stamp UPDATE fire despite a failed enqueue?): %v", err)
}
}
// ---------- env override parsing ----------
func TestNudgeSweeperConstructor_PicksUpEnvOverride(t *testing.T) {
t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "90")
mock := setupTestDB(t)
_ = mock
sw := NewRequestNudgeSweeper(nil)
if sw.Interval() != 90*time.Second {
t.Errorf("interval override not picked up: got %v", sw.Interval())
}
}
func TestNudgeSweeperConstructor_DefaultWhenEnvUnset(t *testing.T) {
t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "")
mock := setupTestDB(t)
_ = mock
sw := NewRequestNudgeSweeper(nil)
if sw.Interval() != defaultRequestNudgeInterval {
t.Errorf("default interval not used: got %v", sw.Interval())
}
}
// ---------- pg array adapter ----------
func TestStringArray_RoundTrip(t *testing.T) {
// Scan a Postgres text[] literal, then Value() it back.
var a stringArray
if err := a.Scan("{aaa,bbb,ccc}"); err != nil {
t.Fatalf("scan: %v", err)
}
if len(a) != 3 || a[0] != "aaa" || a[2] != "ccc" {
t.Fatalf("parsed wrong: %#v", a)
}
v, err := a.Value()
if err != nil {
t.Fatalf("value: %v", err)
}
if v.(string) != `{"aaa","bbb","ccc"}` {
t.Errorf("unexpected literal: %v", v)
}
}
func TestStringArray_ScanNilAndEmpty(t *testing.T) {
var a stringArray
if err := a.Scan(nil); err != nil || a != nil {
t.Errorf("nil scan should yield nil slice, no error; got %#v err=%v", a, err)
}
if err := a.Scan("{}"); err != nil || len(a) != 0 {
t.Errorf("empty array literal should yield empty slice; got %#v err=%v", a, err)
}
}
func TestStringArray_ScanQuotedElements(t *testing.T) {
var a stringArray
if err := a.Scan(`{"a b","c,d","e\"f"}`); err != nil {
t.Fatalf("scan quoted: %v", err)
}
if len(a) != 3 || a[0] != "a b" || a[1] != "c,d" || a[2] != `e"f` {
t.Fatalf("quoted parse wrong: %#v", a)
}
}
@@ -0,0 +1,7 @@
-- Reverse of 20260610130000_requests_last_nudged.up.sql. IF EXISTS guards so
-- the down migration is safe whether or not requests / the column / the index
-- are present.
DROP INDEX IF EXISTS idx_requests_nudge_candidates;
ALTER TABLE IF EXISTS requests
DROP COLUMN IF EXISTS last_nudged_at;
@@ -0,0 +1,34 @@
-- requests.last_nudged_at — RFC unified-requests-inbox P4 (idle-agent nudge).
--
-- Phase 4 adds a periodic sweeper (request_nudge_sweeper.go) that pokes an
-- IDLE agent which has unhandled inbox items so it doesn't forget to process
-- them. The sweeper rate-limits itself to ≤1 nudge per request per hour by
-- stamping last_nudged_at when it enqueues a nudge.
--
-- Idempotency / ordering safety
-- -----------------------------
-- The migration runner re-applies every *.up.sql on each boot, and on some
-- orderings the P1 `requests` table may not exist yet on the box this runs
-- against. Both statements are therefore fully idempotent AND guarded with
-- IF EXISTS on the table so they no-op (rather than crash-loop) when requests
-- is absent — the column gets added once P1's CREATE TABLE has landed.
ALTER TABLE IF EXISTS requests
ADD COLUMN IF NOT EXISTS last_nudged_at TIMESTAMPTZ;
-- Partial index supporting the sweep query's hot predicate: agent recipients
-- in a nudge-eligible status, ordered by how long ago we last nudged them.
-- Partial (only the statuses the sweeper scans) keeps it small; the COALESCE
-- in the sweep's last_nudged_at filter still benefits because Postgres can
-- range-scan this index for the recipient/status prefix. IF NOT EXISTS so the
-- re-apply is a no-op; wrapped in a DO block guarded on table existence so it
-- is skipped cleanly when requests isn't created yet.
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = 'requests'
) THEN
CREATE INDEX IF NOT EXISTS idx_requests_nudge_candidates
ON requests (recipient_type, status, recipient_id, last_nudged_at);
END IF;
END$$;