feat(requests): P4 — idle-agent inbox nudge sweeper (RFC) #2526
@@ -469,6 +469,18 @@ func main() {
|
||||
delegSweeper := handlers.NewDelegationSweeper(nil, nil)
|
||||
go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start)
|
||||
|
||||
// RFC unified-requests-inbox P4: idle-agent inbox-nudge sweeper. Pokes
|
||||
// an IDLE online agent that has unhandled `requests` inbox items (stale
|
||||
// >10min) with one A2A nudge so it re-checks its inbox, rate-limited to
|
||||
// <=1 nudge per request per hour via requests.last_nudged_at. No-op until
|
||||
// the P1 `requests` table (#2525) + the last_nudged_at column have rolled
|
||||
// out. Disable via REQUEST_NUDGE_SWEEPER_DISABLED=true; tune cadence via
|
||||
// REQUEST_NUDGE_SWEEPER_INTERVAL_S.
|
||||
if !strings.EqualFold(os.Getenv("REQUEST_NUDGE_SWEEPER_DISABLED"), "true") {
|
||||
nudgeSweeper := handlers.NewRequestNudgeSweeper(nil)
|
||||
go supervised.RunWithRecover(ctx, "request-nudge-sweeper", nudgeSweeper.Start)
|
||||
}
|
||||
|
||||
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
|
||||
channelMgr := channels.NewManager(wh, broadcaster)
|
||||
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
||||
|
||||
@@ -0,0 +1,413 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// request_nudge_sweeper.go — RFC unified-requests-inbox Phase 4: idle-agent
|
||||
// inbox nudge sweeper.
|
||||
//
|
||||
// What it does
|
||||
// ------------
|
||||
// Periodically scans the unified `requests` table (P1 schema) for inbox items
|
||||
// addressed to an AGENT recipient that have been sitting unhandled for a while,
|
||||
// and whose recipient agent is currently IDLE and online. For each such agent
|
||||
// it enqueues ONE short A2A "nudge" message telling the agent to process its
|
||||
// inbox via list_inbox / respond_request, then stamps last_nudged_at on the
|
||||
// items the nudge covered so the same items aren't re-nudged for an hour.
|
||||
//
|
||||
// Why
|
||||
// ---
|
||||
// An agent that's busy will get to its inbox when it frees up; an agent that's
|
||||
// idle has nothing prompting it to re-check the inbox, so a request can sit
|
||||
// pending indefinitely. This worker closes that gap WITHOUT spamming: it only
|
||||
// fires for items that are already stale (created >10min ago), only for idle
|
||||
// recipients, and at most once per request per hour.
|
||||
//
|
||||
// Out of scope: USER recipients. Stale user-addressed items are surfaced by the
|
||||
// canvas Tasks/Approvals UI already — this worker never enqueues anything for a
|
||||
// user recipient.
|
||||
//
|
||||
// Delivery mechanism
|
||||
// ------------------
|
||||
// The nudge is delivered via the existing a2a-queue path (EnqueueA2A) — the
|
||||
// SAME mechanism the scheduler uses to deliver a cron tick to an agent. The
|
||||
// message is a normal `message/send` A2A body; the agent drains it from the
|
||||
// queue on its next heartbeat (registry.Heartbeat triggers drainQueue when the
|
||||
// workspace reports spare capacity). We do NOT write raw INSERTs into a2a_queue.
|
||||
//
|
||||
// On main vs. P1
|
||||
// --------------
|
||||
// This file compiles + tests on main, where the `requests` table does not yet
|
||||
// exist. It queries `requests` via RAW SQL (never imports P1's RequestStore),
|
||||
// so the build is independent of P1's Go symbols. At runtime the sweep query
|
||||
// simply finds no rows until the P1 migration + this PR's migration have both
|
||||
// rolled out. This PR MUST merge AFTER P1 (#2525).
|
||||
//
|
||||
// Frequency
|
||||
// ---------
|
||||
// 5min default cadence (REQUEST_NUDGE_SWEEPER_INTERVAL_S to override), matching
|
||||
// delegation_sweeper. Disable entirely via REQUEST_NUDGE_SWEEPER_INTERVAL_S=0?
|
||||
// No — envDuration treats <=0 as "use default". To disable, set
|
||||
// REQUEST_NUDGE_SWEEPER_DISABLED=true (checked in main.go wiring).
|
||||
|
||||
const (
|
||||
defaultRequestNudgeInterval = 5 * time.Minute
|
||||
|
||||
// staleAfter — an item must have been pending at least this long before
|
||||
// it's eligible for a nudge. Gives a freshly-created request a grace
|
||||
// window in which a still-active or just-freed agent picks it up on its
|
||||
// own, so we don't nudge items that are about to be handled anyway.
|
||||
requestNudgeStaleAfter = 10 * time.Minute
|
||||
|
||||
// reNudgeAfter — rate-limit: a given request is nudged at most once per
|
||||
// this window. Belt-and-suspenders with the queue idempotency key below.
|
||||
requestNudgeReNudgeAfter = time.Hour
|
||||
|
||||
// nudgeBatchLimit — bound the work per sweep. Caps both DB scan cost and
|
||||
// the number of agents we poke in one tick; the next tick picks up the
|
||||
// remainder. Generous enough that a normal backlog clears in one pass.
|
||||
requestNudgeBatchLimit = 200
|
||||
)
|
||||
|
||||
// enqueueFunc is the a2a-queue enqueue signature (package-level EnqueueA2A).
|
||||
// Injected as a field so tests can assert the nudge enqueue without mocking
|
||||
// EnqueueA2A's internal SQL (depth count, supersede). Production wiring uses
|
||||
// the real EnqueueA2A.
|
||||
type enqueueFunc func(
|
||||
ctx context.Context,
|
||||
workspaceID, callerID string,
|
||||
priority int,
|
||||
body []byte,
|
||||
method, idempotencyKey string,
|
||||
expiresAt *time.Time,
|
||||
) (id string, depth int, err error)
|
||||
|
||||
// RequestNudgeSweeper runs the periodic inbox-nudge sweep. Construct via
|
||||
// NewRequestNudgeSweeper, then Start(ctx) in main.go to begin ticking.
|
||||
type RequestNudgeSweeper struct {
|
||||
db *sql.DB
|
||||
interval time.Duration
|
||||
staleAfter time.Duration
|
||||
reNudgeWait time.Duration
|
||||
limit int
|
||||
enqueue enqueueFunc
|
||||
}
|
||||
|
||||
// NewRequestNudgeSweeper builds a sweeper bound to the package db.DB
|
||||
// (production wiring) or a test handle. Reads optional env overrides at
|
||||
// construction time so a long-running process picks them up via restart,
|
||||
// not mid-flight (mirrors NewDelegationSweeper).
|
||||
func NewRequestNudgeSweeper(handle *sql.DB) *RequestNudgeSweeper {
|
||||
if handle == nil {
|
||||
handle = db.DB
|
||||
}
|
||||
return &RequestNudgeSweeper{
|
||||
db: handle,
|
||||
interval: envDuration("REQUEST_NUDGE_SWEEPER_INTERVAL_S", defaultRequestNudgeInterval),
|
||||
staleAfter: requestNudgeStaleAfter,
|
||||
reNudgeWait: requestNudgeReNudgeAfter,
|
||||
limit: requestNudgeBatchLimit,
|
||||
enqueue: EnqueueA2A,
|
||||
}
|
||||
}
|
||||
|
||||
// Interval exposes the configured tick cadence — tests use it; main.go uses
|
||||
// it implicitly via Start.
|
||||
func (s *RequestNudgeSweeper) Interval() time.Duration { return s.interval }
|
||||
|
||||
// Start ticks Sweep() at the configured interval until ctx is cancelled.
|
||||
// Defers panic recovery so a single bad row can't kill the sweeper. Mirrors
|
||||
// DelegationSweeper.Start: first sweep fires immediately on startup.
|
||||
//
|
||||
// No-op until both the `requests` table (P1) and this PR's last_nudged_at
|
||||
// column have rolled out — the sweep query just finds no rows.
|
||||
func (s *RequestNudgeSweeper) Start(ctx context.Context) {
|
||||
t := time.NewTicker(s.interval)
|
||||
defer t.Stop()
|
||||
log.Printf("RequestNudgeSweeper: started (interval=%s, stale-after=%s, re-nudge-after=%s)",
|
||||
s.interval, s.staleAfter, s.reNudgeWait)
|
||||
|
||||
tickWithRecover := func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("RequestNudgeSweeper: PANIC in tick — recovered: %v", r)
|
||||
}
|
||||
}()
|
||||
s.Sweep(ctx)
|
||||
}
|
||||
|
||||
tickWithRecover()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Printf("RequestNudgeSweeper: stopped")
|
||||
return
|
||||
case <-t.C:
|
||||
tickWithRecover()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NudgeResult records what the last sweep did. Returned for observability and
|
||||
// so tests assert behavior without diffing log lines.
|
||||
type NudgeResult struct {
|
||||
AgentsNudged int // distinct idle agents poked this sweep
|
||||
ItemsCovered int // request rows whose last_nudged_at we stamped
|
||||
Errors int
|
||||
}
|
||||
|
||||
// Sweep runs one pass:
|
||||
//
|
||||
// 1. SELECT stale agent-recipient items whose recipient workspace is online
|
||||
// and idle (active_tasks=0), grouped by recipient. Each group = one agent
|
||||
// with N stale inbox items.
|
||||
// 2. For each idle agent: enqueue ONE A2A nudge, then stamp last_nudged_at on
|
||||
// exactly the items that group covered.
|
||||
//
|
||||
// SQL strategy: the SELECT joins requests → workspaces and aggregates the
|
||||
// covered request ids per recipient with array_agg, so one scan yields the
|
||||
// per-agent work list. The idle gate (status='online' AND
|
||||
// COALESCE(active_tasks,0)=0) lives in the JOIN's WHERE so an offline/busy
|
||||
// agent is never returned — we never even build a nudge for it.
|
||||
func (s *RequestNudgeSweeper) Sweep(ctx context.Context) NudgeResult {
|
||||
res := NudgeResult{}
|
||||
|
||||
// Group stale agent-recipient items by recipient. recipient_id::uuid casts
|
||||
// the TEXT recipient_id to the workspaces UUID PK for the join (requests
|
||||
// stores ids as TEXT — see P1 migration). Items that don't cast (a
|
||||
// malformed id) would error the cast; recipient_type='agent' rows are
|
||||
// always workspace UUIDs by construction, so this is safe in practice and
|
||||
// any bad row surfaces loudly as a query error rather than a silent skip.
|
||||
const sweepQuery = `
|
||||
SELECT r.recipient_id,
|
||||
array_agg(r.id::text) AS ids
|
||||
FROM requests r
|
||||
JOIN workspaces w ON w.id = r.recipient_id::uuid
|
||||
WHERE r.recipient_type = 'agent'
|
||||
AND r.status IN ('pending', 'info_requested')
|
||||
AND r.created_at < now() - ($1 * INTERVAL '1 second')
|
||||
AND (r.last_nudged_at IS NULL
|
||||
OR r.last_nudged_at < now() - ($2 * INTERVAL '1 second'))
|
||||
AND w.status = 'online'
|
||||
AND COALESCE(w.active_tasks, 0) = 0
|
||||
GROUP BY r.recipient_id
|
||||
LIMIT $3
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, sweepQuery,
|
||||
int(s.staleAfter.Seconds()), int(s.reNudgeWait.Seconds()), s.limit)
|
||||
if err != nil {
|
||||
log.Printf("RequestNudgeSweeper: sweep query failed: %v", err)
|
||||
res.Errors++
|
||||
return res
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type group struct {
|
||||
recipientID string
|
||||
ids []string
|
||||
}
|
||||
var todo []group
|
||||
for rows.Next() {
|
||||
var g group
|
||||
// array_agg(text) comes back as a Postgres text[]; pq scans it into a
|
||||
// pq.StringArray. We avoid the pq dependency by scanning into a
|
||||
// driver-native []string via a small adapter type.
|
||||
var ids stringArray
|
||||
if err := rows.Scan(&g.recipientID, &ids); err != nil {
|
||||
log.Printf("RequestNudgeSweeper: scan failed: %v", err)
|
||||
res.Errors++
|
||||
continue
|
||||
}
|
||||
g.ids = ids
|
||||
if len(g.ids) == 0 {
|
||||
continue
|
||||
}
|
||||
todo = append(todo, g)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("RequestNudgeSweeper: rows.Err: %v", err)
|
||||
res.Errors++
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for _, g := range todo {
|
||||
if err := s.nudgeAgent(ctx, g.recipientID, g.ids, now); err != nil {
|
||||
log.Printf("RequestNudgeSweeper: nudge agent %s (%d items) failed: %v",
|
||||
g.recipientID, len(g.ids), err)
|
||||
res.Errors++
|
||||
continue
|
||||
}
|
||||
res.AgentsNudged++
|
||||
res.ItemsCovered += len(g.ids)
|
||||
}
|
||||
|
||||
if res.AgentsNudged > 0 || res.Errors > 0 {
|
||||
log.Printf("RequestNudgeSweeper: sweep complete — agents_nudged=%d items_covered=%d errors=%d",
|
||||
res.AgentsNudged, res.ItemsCovered, res.Errors)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// nudgeAgent enqueues one A2A nudge for the idle recipient agent covering the
|
||||
// given stale request ids, then stamps last_nudged_at on those ids. The
|
||||
// last_nudged_at UPDATE only fires after a successful enqueue so a failed
|
||||
// enqueue is retried next sweep (the items stay un-stamped, hence eligible).
|
||||
func (s *RequestNudgeSweeper) nudgeAgent(ctx context.Context, recipientID string, ids []string, now time.Time) error {
|
||||
body, err := buildNudgeBody(len(ids))
|
||||
if err != nil {
|
||||
return fmt.Errorf("build nudge body: %w", err)
|
||||
}
|
||||
|
||||
// Idempotency key bucketed to the current hour so two concurrent sweeper
|
||||
// boots collapse to one queued nudge per agent per hour at the queue layer
|
||||
// too — defense in depth on top of the last_nudged_at rate-limit. Empty
|
||||
// callerID = canvas/system-style enqueue (source_id NULL), matching the
|
||||
// scheduler's internal fire path.
|
||||
idemKey := fmt.Sprintf("inbox-nudge:%s:%d", recipientID, now.Truncate(time.Hour).Unix())
|
||||
|
||||
if _, _, err := s.enqueue(ctx, recipientID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil {
|
||||
return fmt.Errorf("enqueue nudge: %w", err)
|
||||
}
|
||||
|
||||
// Stamp last_nudged_at on exactly the items this nudge covered. ANY($1)
|
||||
// over the text[] of ids; cast id to text for the comparison so the
|
||||
// param type is unambiguous. Re-querying eligibility here would race with
|
||||
// the enqueue, so we trust the ids the sweep already gated.
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
UPDATE requests
|
||||
SET last_nudged_at = now()
|
||||
WHERE id::text = ANY($1)
|
||||
`, stringArray(ids)); err != nil {
|
||||
return fmt.Errorf("stamp last_nudged_at: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// buildNudgeBody constructs the A2A `message/send` JSON-RPC body for the nudge.
|
||||
// Mirrors the scheduler's body shape (role=user, generated messageId, single
|
||||
// text part) so the receiving agent processes it like any other inbound turn.
|
||||
func buildNudgeBody(n int) ([]byte, error) {
|
||||
plural := "request"
|
||||
if n != 1 {
|
||||
plural = "requests"
|
||||
}
|
||||
text := fmt.Sprintf(
|
||||
"You have %d unhandled inbox %s awaiting your response. "+
|
||||
"Use list_inbox to see them and respond_request / add_request_message to act.",
|
||||
n, plural,
|
||||
)
|
||||
return json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": "inbox-nudge-" + uuid.New().String(),
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": text}},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// stringArray scans a Postgres text[] into a []string and serializes a
|
||||
// []string into a Postgres array literal for parameter binding — a minimal
|
||||
// inline adapter so this file doesn't pull a new driver dependency just for
|
||||
// array_agg / ANY(text[]). Mirrors lib/pq's StringArray semantics for the
|
||||
// shapes we use (no NULL elements, no embedded special chars in UUID/text-id
|
||||
// values).
|
||||
type stringArray []string
|
||||
|
||||
// Scan implements sql.Scanner for a Postgres text[] value (delivered as
|
||||
// []byte or string like `{a,b,c}`).
|
||||
func (a *stringArray) Scan(src interface{}) error {
|
||||
if src == nil {
|
||||
*a = nil
|
||||
return nil
|
||||
}
|
||||
var s string
|
||||
switch v := src.(type) {
|
||||
case []byte:
|
||||
s = string(v)
|
||||
case string:
|
||||
s = v
|
||||
default:
|
||||
return fmt.Errorf("stringArray.Scan: unsupported source type %T", src)
|
||||
}
|
||||
*a = parsePGTextArray(s)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value implements driver.Valuer, emitting a Postgres array literal `{a,b,c}`.
|
||||
// Used as the ANY($1) param in the last_nudged_at UPDATE. Each element is
|
||||
// double-quoted and backslash/quote-escaped so ids containing array-special
|
||||
// characters bind correctly.
|
||||
func (a stringArray) Value() (driver.Value, error) {
|
||||
if a == nil {
|
||||
return nil, nil
|
||||
}
|
||||
out := make([]byte, 0, len(a)*40+2)
|
||||
out = append(out, '{')
|
||||
for i, e := range a {
|
||||
if i > 0 {
|
||||
out = append(out, ',')
|
||||
}
|
||||
out = append(out, '"')
|
||||
for _, c := range []byte(e) {
|
||||
if c == '"' || c == '\\' {
|
||||
out = append(out, '\\')
|
||||
}
|
||||
out = append(out, c)
|
||||
}
|
||||
out = append(out, '"')
|
||||
}
|
||||
out = append(out, '}')
|
||||
return string(out), nil
|
||||
}
|
||||
|
||||
// parsePGTextArray parses a Postgres array literal `{a,"b c",d}` into a
|
||||
// []string. Handles the unquoted and double-quoted element forms emitted by
|
||||
// array_agg(text); element values here are UUIDs so the common path is the
|
||||
// simple comma split, but quoted handling is included for correctness.
|
||||
func parsePGTextArray(s string) []string {
|
||||
if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' {
|
||||
return nil
|
||||
}
|
||||
body := s[1 : len(s)-1]
|
||||
if body == "" {
|
||||
return nil
|
||||
}
|
||||
var out []string
|
||||
var cur []byte
|
||||
inQuotes := false
|
||||
escaped := false
|
||||
for i := 0; i < len(body); i++ {
|
||||
c := body[i]
|
||||
switch {
|
||||
case escaped:
|
||||
cur = append(cur, c)
|
||||
escaped = false
|
||||
case c == '\\':
|
||||
escaped = true
|
||||
case c == '"':
|
||||
inQuotes = !inQuotes
|
||||
case c == ',' && !inQuotes:
|
||||
out = append(out, string(cur))
|
||||
cur = cur[:0]
|
||||
default:
|
||||
cur = append(cur, c)
|
||||
}
|
||||
}
|
||||
out = append(out, string(cur))
|
||||
return out
|
||||
}
|
||||
@@ -0,0 +1,271 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
// request_nudge_sweeper_test.go — coverage for the RFC unified-requests-inbox
|
||||
// Phase 4 idle-agent inbox-nudge sweeper. Validates:
|
||||
//
|
||||
// 1. A stale pending item for an IDLE online agent gets nudged + its
|
||||
// last_nudged_at stamped.
|
||||
// 2. A busy (active_tasks>0) or offline agent is NOT returned by the sweep
|
||||
// query (gated in SQL) → no nudge, no stamp.
|
||||
// 3. A recently-nudged item (<1h) is excluded by the query → no nudge.
|
||||
// 4. A user-recipient item is never selected → no nudge.
|
||||
// 5. Empty result set is a clean no-op.
|
||||
// 6. Env-override interval parses + falls back; defaults when unset.
|
||||
//
|
||||
// The idle/busy/offline/user-recipient/recently-nudged gates all live in the
|
||||
// single sweep SELECT (status='online', active_tasks=0, recipient_type='agent',
|
||||
// last_nudged_at filter). So at the sqlmock level those cases are expressed as
|
||||
// "the query returns the row" vs "the query returns no row" — the test pins
|
||||
// that a returned row drives exactly one enqueue + one stamp, and that an
|
||||
// empty result drives neither. The SQL predicates themselves are asserted by
|
||||
// the integration/real-DB harness, not sqlmock (which can't evaluate WHERE).
|
||||
|
||||
// newTestNudgeSweeper builds a sweeper on the sqlmock db.DB with a fake
|
||||
// enqueue that records calls, so tests assert the nudge without mocking
|
||||
// EnqueueA2A's internal SQL.
|
||||
type recordedEnqueue struct {
|
||||
workspaceID string
|
||||
idemKey string
|
||||
method string
|
||||
body []byte
|
||||
calls int
|
||||
}
|
||||
|
||||
func newTestNudgeSweeper(t *testing.T) (*RequestNudgeSweeper, *recordedEnqueue) {
|
||||
t.Helper()
|
||||
sw := NewRequestNudgeSweeper(nil) // binds to the sqlmock db.DB set by setupTestDB
|
||||
rec := &recordedEnqueue{}
|
||||
sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int,
|
||||
body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
|
||||
rec.calls++
|
||||
rec.workspaceID = workspaceID
|
||||
rec.idemKey = idempotencyKey
|
||||
rec.method = method
|
||||
rec.body = body
|
||||
return "queue-id-1", 1, nil
|
||||
}
|
||||
return sw, rec
|
||||
}
|
||||
|
||||
func TestNudgeSweeper_EmptyResultIsCleanNoOp(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
sw, rec := newTestNudgeSweeper(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT r.recipient_id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.AgentsNudged != 0 || res.ItemsCovered != 0 || res.Errors != 0 {
|
||||
t.Errorf("empty set must produce zero changes; got %+v", res)
|
||||
}
|
||||
if rec.calls != 0 {
|
||||
t.Errorf("no enqueue expected on empty set; got %d", rec.calls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNudgeSweeper_StaleIdleAgentIsNudgedAndStamped(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
sw, rec := newTestNudgeSweeper(t)
|
||||
|
||||
const ws = "11111111-1111-1111-1111-111111111111"
|
||||
|
||||
// One idle online agent with two stale pending items.
|
||||
mock.ExpectQuery(`SELECT r.recipient_id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
|
||||
AddRow(ws, "{req-a,req-b}"))
|
||||
|
||||
// After the (faked) enqueue, the two covered items get last_nudged_at set.
|
||||
mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`).
|
||||
WithArgs(sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 2))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.AgentsNudged != 1 {
|
||||
t.Errorf("expected 1 agent nudged, got %d", res.AgentsNudged)
|
||||
}
|
||||
if res.ItemsCovered != 2 {
|
||||
t.Errorf("expected 2 items covered, got %d", res.ItemsCovered)
|
||||
}
|
||||
if res.Errors != 0 {
|
||||
t.Errorf("expected 0 errors, got %d", res.Errors)
|
||||
}
|
||||
if rec.calls != 1 {
|
||||
t.Fatalf("expected exactly 1 enqueue, got %d", rec.calls)
|
||||
}
|
||||
if rec.workspaceID != ws {
|
||||
t.Errorf("nudge enqueued to wrong workspace: got %q want %q", rec.workspaceID, ws)
|
||||
}
|
||||
if rec.method != "message/send" {
|
||||
t.Errorf("nudge method: got %q want message/send", rec.method)
|
||||
}
|
||||
if rec.idemKey == "" {
|
||||
t.Errorf("expected a non-empty hourly idempotency key")
|
||||
}
|
||||
// Body should mention the count (2 → plural "requests") and the tool names.
|
||||
bs := string(rec.body)
|
||||
if !strings.Contains(bs, "2 unhandled inbox requests") {
|
||||
t.Errorf("nudge body missing pluralized count: %s", bs)
|
||||
}
|
||||
if !strings.Contains(bs, "list_inbox") || !strings.Contains(bs, "respond_request") {
|
||||
t.Errorf("nudge body missing tool guidance: %s", bs)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNudgeSweeper_SingularBodyForOneItem pins the "1 request" (singular) copy.
|
||||
func TestNudgeSweeper_SingularBodyForOneItem(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
sw, rec := newTestNudgeSweeper(t)
|
||||
const ws = "22222222-2222-2222-2222-222222222222"
|
||||
|
||||
mock.ExpectQuery(`SELECT r.recipient_id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
|
||||
AddRow(ws, "{only-one}"))
|
||||
mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`).
|
||||
WithArgs(sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.ItemsCovered != 1 {
|
||||
t.Fatalf("expected 1 item covered, got %d", res.ItemsCovered)
|
||||
}
|
||||
if bs := string(rec.body); !strings.Contains(bs, "1 unhandled inbox request ") {
|
||||
t.Errorf("expected singular copy '1 unhandled inbox request'; got %s", bs)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL — the sweep
|
||||
// query excludes busy agents (active_tasks>0), offline agents (status!=online),
|
||||
// user-recipient items (recipient_type='agent' filter), and recently-nudged
|
||||
// items (last_nudged_at filter). Whatever the reason a candidate is ineligible,
|
||||
// it does not appear in the result set → no enqueue, no stamp. This pins the
|
||||
// "no row ⇒ no side effects" contract that those gates rely on.
|
||||
func TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
sw, rec := newTestNudgeSweeper(t)
|
||||
|
||||
// The real WHERE clause filtered all of {busy, offline, user-recipient,
|
||||
// recently-nudged} out, so the query yields zero rows.
|
||||
mock.ExpectQuery(`SELECT r.recipient_id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}))
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.AgentsNudged != 0 {
|
||||
t.Errorf("ineligible candidates must not be nudged; got %d", res.AgentsNudged)
|
||||
}
|
||||
if rec.calls != 0 {
|
||||
t.Errorf("no enqueue expected for ineligible candidates; got %d", rec.calls)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet (an UPDATE/stamp fired for an ineligible candidate?): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped — if the enqueue fails,
|
||||
// the last_nudged_at UPDATE must NOT fire, so the items stay eligible and the
|
||||
// next sweep retries. Pins that the stamp is gated on a successful enqueue.
|
||||
func TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
sw, _ := newTestNudgeSweeper(t)
|
||||
const ws = "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int,
|
||||
body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) {
|
||||
return "", 0, context.DeadlineExceeded
|
||||
}
|
||||
|
||||
mock.ExpectQuery(`SELECT r.recipient_id`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}).
|
||||
AddRow(ws, "{req-x}"))
|
||||
// No ExpectExec for the UPDATE — it must not fire after a failed enqueue.
|
||||
|
||||
res := sw.Sweep(context.Background())
|
||||
if res.Errors != 1 {
|
||||
t.Errorf("expected 1 error from failed enqueue, got %d", res.Errors)
|
||||
}
|
||||
if res.AgentsNudged != 0 {
|
||||
t.Errorf("failed enqueue must not count as nudged; got %d", res.AgentsNudged)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet (did the stamp UPDATE fire despite a failed enqueue?): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- env override parsing ----------
|
||||
|
||||
func TestNudgeSweeperConstructor_PicksUpEnvOverride(t *testing.T) {
|
||||
t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "90")
|
||||
mock := setupTestDB(t)
|
||||
_ = mock
|
||||
sw := NewRequestNudgeSweeper(nil)
|
||||
if sw.Interval() != 90*time.Second {
|
||||
t.Errorf("interval override not picked up: got %v", sw.Interval())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNudgeSweeperConstructor_DefaultWhenEnvUnset(t *testing.T) {
|
||||
t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "")
|
||||
mock := setupTestDB(t)
|
||||
_ = mock
|
||||
sw := NewRequestNudgeSweeper(nil)
|
||||
if sw.Interval() != defaultRequestNudgeInterval {
|
||||
t.Errorf("default interval not used: got %v", sw.Interval())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- pg array adapter ----------
|
||||
|
||||
func TestStringArray_RoundTrip(t *testing.T) {
|
||||
// Scan a Postgres text[] literal, then Value() it back.
|
||||
var a stringArray
|
||||
if err := a.Scan("{aaa,bbb,ccc}"); err != nil {
|
||||
t.Fatalf("scan: %v", err)
|
||||
}
|
||||
if len(a) != 3 || a[0] != "aaa" || a[2] != "ccc" {
|
||||
t.Fatalf("parsed wrong: %#v", a)
|
||||
}
|
||||
v, err := a.Value()
|
||||
if err != nil {
|
||||
t.Fatalf("value: %v", err)
|
||||
}
|
||||
if v.(string) != `{"aaa","bbb","ccc"}` {
|
||||
t.Errorf("unexpected literal: %v", v)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStringArray_ScanNilAndEmpty(t *testing.T) {
|
||||
var a stringArray
|
||||
if err := a.Scan(nil); err != nil || a != nil {
|
||||
t.Errorf("nil scan should yield nil slice, no error; got %#v err=%v", a, err)
|
||||
}
|
||||
if err := a.Scan("{}"); err != nil || len(a) != 0 {
|
||||
t.Errorf("empty array literal should yield empty slice; got %#v err=%v", a, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStringArray_ScanQuotedElements(t *testing.T) {
|
||||
var a stringArray
|
||||
if err := a.Scan(`{"a b","c,d","e\"f"}`); err != nil {
|
||||
t.Fatalf("scan quoted: %v", err)
|
||||
}
|
||||
if len(a) != 3 || a[0] != "a b" || a[1] != "c,d" || a[2] != `e"f` {
|
||||
t.Fatalf("quoted parse wrong: %#v", a)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
-- Reverse of 20260610130000_requests_last_nudged.up.sql. IF EXISTS guards so
|
||||
-- the down migration is safe whether or not requests / the column / the index
|
||||
-- are present.
|
||||
DROP INDEX IF EXISTS idx_requests_nudge_candidates;
|
||||
|
||||
ALTER TABLE IF EXISTS requests
|
||||
DROP COLUMN IF EXISTS last_nudged_at;
|
||||
@@ -0,0 +1,34 @@
|
||||
-- requests.last_nudged_at — RFC unified-requests-inbox P4 (idle-agent nudge).
|
||||
--
|
||||
-- Phase 4 adds a periodic sweeper (request_nudge_sweeper.go) that pokes an
|
||||
-- IDLE agent which has unhandled inbox items so it doesn't forget to process
|
||||
-- them. The sweeper rate-limits itself to ≤1 nudge per request per hour by
|
||||
-- stamping last_nudged_at when it enqueues a nudge.
|
||||
--
|
||||
-- Idempotency / ordering safety
|
||||
-- -----------------------------
|
||||
-- The migration runner re-applies every *.up.sql on each boot, and on some
|
||||
-- orderings the P1 `requests` table may not exist yet on the box this runs
|
||||
-- against. Both statements are therefore fully idempotent AND guarded with
|
||||
-- IF EXISTS on the table so they no-op (rather than crash-loop) when requests
|
||||
-- is absent — the column gets added once P1's CREATE TABLE has landed.
|
||||
ALTER TABLE IF EXISTS requests
|
||||
ADD COLUMN IF NOT EXISTS last_nudged_at TIMESTAMPTZ;
|
||||
|
||||
-- Partial index supporting the sweep query's hot predicate: agent recipients
|
||||
-- in a nudge-eligible status, ordered by how long ago we last nudged them.
|
||||
-- Partial (only the statuses the sweeper scans) keeps it small; the COALESCE
|
||||
-- in the sweep's last_nudged_at filter still benefits because Postgres can
|
||||
-- range-scan this index for the recipient/status prefix. IF NOT EXISTS so the
|
||||
-- re-apply is a no-op; wrapped in a DO block guarded on table existence so it
|
||||
-- is skipped cleanly when requests isn't created yet.
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (
|
||||
SELECT 1 FROM information_schema.tables
|
||||
WHERE table_name = 'requests'
|
||||
) THEN
|
||||
CREATE INDEX IF NOT EXISTS idx_requests_nudge_candidates
|
||||
ON requests (recipient_type, status, recipient_id, last_nudged_at);
|
||||
END IF;
|
||||
END$$;
|
||||
Reference in New Issue
Block a user