diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 48785ddce..afece59f3 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -469,6 +469,18 @@ func main() { delegSweeper := handlers.NewDelegationSweeper(nil, nil) go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start) + // RFC unified-requests-inbox P4: idle-agent inbox-nudge sweeper. Pokes + // an IDLE online agent that has unhandled `requests` inbox items (stale + // >10min) with one A2A nudge so it re-checks its inbox, rate-limited to + // <=1 nudge per request per hour via requests.last_nudged_at. No-op until + // the P1 `requests` table (#2525) + the last_nudged_at column have rolled + // out. Disable via REQUEST_NUDGE_SWEEPER_DISABLED=true; tune cadence via + // REQUEST_NUDGE_SWEEPER_INTERVAL_S. + if !strings.EqualFold(os.Getenv("REQUEST_NUDGE_SWEEPER_DISABLED"), "true") { + nudgeSweeper := handlers.NewRequestNudgeSweeper(nil) + go supervised.RunWithRecover(ctx, "request-nudge-sweeper", nudgeSweeper.Start) + } + // Channel Manager — social channel integrations (Telegram, Slack, etc.) channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) diff --git a/workspace-server/internal/handlers/request_nudge_sweeper.go b/workspace-server/internal/handlers/request_nudge_sweeper.go new file mode 100644 index 000000000..17d738721 --- /dev/null +++ b/workspace-server/internal/handlers/request_nudge_sweeper.go @@ -0,0 +1,413 @@ +package handlers + +import ( + "context" + "database/sql" + "database/sql/driver" + "encoding/json" + "fmt" + "log" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "github.com/google/uuid" +) + +// request_nudge_sweeper.go — RFC unified-requests-inbox Phase 4: idle-agent +// inbox nudge sweeper. +// +// What it does +// ------------ +// Periodically scans the unified `requests` table (P1 schema) for inbox items +// addressed to an AGENT recipient that have been sitting unhandled for a while, +// and whose recipient agent is currently IDLE and online. For each such agent +// it enqueues ONE short A2A "nudge" message telling the agent to process its +// inbox via list_inbox / respond_request, then stamps last_nudged_at on the +// items the nudge covered so the same items aren't re-nudged for an hour. +// +// Why +// --- +// An agent that's busy will get to its inbox when it frees up; an agent that's +// idle has nothing prompting it to re-check the inbox, so a request can sit +// pending indefinitely. This worker closes that gap WITHOUT spamming: it only +// fires for items that are already stale (created >10min ago), only for idle +// recipients, and at most once per request per hour. +// +// Out of scope: USER recipients. Stale user-addressed items are surfaced by the +// canvas Tasks/Approvals UI already — this worker never enqueues anything for a +// user recipient. +// +// Delivery mechanism +// ------------------ +// The nudge is delivered via the existing a2a-queue path (EnqueueA2A) — the +// SAME mechanism the scheduler uses to deliver a cron tick to an agent. The +// message is a normal `message/send` A2A body; the agent drains it from the +// queue on its next heartbeat (registry.Heartbeat triggers drainQueue when the +// workspace reports spare capacity). We do NOT write raw INSERTs into a2a_queue. +// +// On main vs. P1 +// -------------- +// This file compiles + tests on main, where the `requests` table does not yet +// exist. It queries `requests` via RAW SQL (never imports P1's RequestStore), +// so the build is independent of P1's Go symbols. At runtime the sweep query +// simply finds no rows until the P1 migration + this PR's migration have both +// rolled out. This PR MUST merge AFTER P1 (#2525). +// +// Frequency +// --------- +// 5min default cadence (REQUEST_NUDGE_SWEEPER_INTERVAL_S to override), matching +// delegation_sweeper. Disable entirely via REQUEST_NUDGE_SWEEPER_INTERVAL_S=0? +// No — envDuration treats <=0 as "use default". To disable, set +// REQUEST_NUDGE_SWEEPER_DISABLED=true (checked in main.go wiring). + +const ( + defaultRequestNudgeInterval = 5 * time.Minute + + // staleAfter — an item must have been pending at least this long before + // it's eligible for a nudge. Gives a freshly-created request a grace + // window in which a still-active or just-freed agent picks it up on its + // own, so we don't nudge items that are about to be handled anyway. + requestNudgeStaleAfter = 10 * time.Minute + + // reNudgeAfter — rate-limit: a given request is nudged at most once per + // this window. Belt-and-suspenders with the queue idempotency key below. + requestNudgeReNudgeAfter = time.Hour + + // nudgeBatchLimit — bound the work per sweep. Caps both DB scan cost and + // the number of agents we poke in one tick; the next tick picks up the + // remainder. Generous enough that a normal backlog clears in one pass. + requestNudgeBatchLimit = 200 +) + +// enqueueFunc is the a2a-queue enqueue signature (package-level EnqueueA2A). +// Injected as a field so tests can assert the nudge enqueue without mocking +// EnqueueA2A's internal SQL (depth count, supersede). Production wiring uses +// the real EnqueueA2A. +type enqueueFunc func( + ctx context.Context, + workspaceID, callerID string, + priority int, + body []byte, + method, idempotencyKey string, + expiresAt *time.Time, +) (id string, depth int, err error) + +// RequestNudgeSweeper runs the periodic inbox-nudge sweep. Construct via +// NewRequestNudgeSweeper, then Start(ctx) in main.go to begin ticking. +type RequestNudgeSweeper struct { + db *sql.DB + interval time.Duration + staleAfter time.Duration + reNudgeWait time.Duration + limit int + enqueue enqueueFunc +} + +// NewRequestNudgeSweeper builds a sweeper bound to the package db.DB +// (production wiring) or a test handle. Reads optional env overrides at +// construction time so a long-running process picks them up via restart, +// not mid-flight (mirrors NewDelegationSweeper). +func NewRequestNudgeSweeper(handle *sql.DB) *RequestNudgeSweeper { + if handle == nil { + handle = db.DB + } + return &RequestNudgeSweeper{ + db: handle, + interval: envDuration("REQUEST_NUDGE_SWEEPER_INTERVAL_S", defaultRequestNudgeInterval), + staleAfter: requestNudgeStaleAfter, + reNudgeWait: requestNudgeReNudgeAfter, + limit: requestNudgeBatchLimit, + enqueue: EnqueueA2A, + } +} + +// Interval exposes the configured tick cadence — tests use it; main.go uses +// it implicitly via Start. +func (s *RequestNudgeSweeper) Interval() time.Duration { return s.interval } + +// Start ticks Sweep() at the configured interval until ctx is cancelled. +// Defers panic recovery so a single bad row can't kill the sweeper. Mirrors +// DelegationSweeper.Start: first sweep fires immediately on startup. +// +// No-op until both the `requests` table (P1) and this PR's last_nudged_at +// column have rolled out — the sweep query just finds no rows. +func (s *RequestNudgeSweeper) Start(ctx context.Context) { + t := time.NewTicker(s.interval) + defer t.Stop() + log.Printf("RequestNudgeSweeper: started (interval=%s, stale-after=%s, re-nudge-after=%s)", + s.interval, s.staleAfter, s.reNudgeWait) + + tickWithRecover := func() { + defer func() { + if r := recover(); r != nil { + log.Printf("RequestNudgeSweeper: PANIC in tick — recovered: %v", r) + } + }() + s.Sweep(ctx) + } + + tickWithRecover() + + for { + select { + case <-ctx.Done(): + log.Printf("RequestNudgeSweeper: stopped") + return + case <-t.C: + tickWithRecover() + } + } +} + +// NudgeResult records what the last sweep did. Returned for observability and +// so tests assert behavior without diffing log lines. +type NudgeResult struct { + AgentsNudged int // distinct idle agents poked this sweep + ItemsCovered int // request rows whose last_nudged_at we stamped + Errors int +} + +// Sweep runs one pass: +// +// 1. SELECT stale agent-recipient items whose recipient workspace is online +// and idle (active_tasks=0), grouped by recipient. Each group = one agent +// with N stale inbox items. +// 2. For each idle agent: enqueue ONE A2A nudge, then stamp last_nudged_at on +// exactly the items that group covered. +// +// SQL strategy: the SELECT joins requests → workspaces and aggregates the +// covered request ids per recipient with array_agg, so one scan yields the +// per-agent work list. The idle gate (status='online' AND +// COALESCE(active_tasks,0)=0) lives in the JOIN's WHERE so an offline/busy +// agent is never returned — we never even build a nudge for it. +func (s *RequestNudgeSweeper) Sweep(ctx context.Context) NudgeResult { + res := NudgeResult{} + + // Group stale agent-recipient items by recipient. recipient_id::uuid casts + // the TEXT recipient_id to the workspaces UUID PK for the join (requests + // stores ids as TEXT — see P1 migration). Items that don't cast (a + // malformed id) would error the cast; recipient_type='agent' rows are + // always workspace UUIDs by construction, so this is safe in practice and + // any bad row surfaces loudly as a query error rather than a silent skip. + const sweepQuery = ` + SELECT r.recipient_id, + array_agg(r.id::text) AS ids + FROM requests r + JOIN workspaces w ON w.id = r.recipient_id::uuid + WHERE r.recipient_type = 'agent' + AND r.status IN ('pending', 'info_requested') + AND r.created_at < now() - ($1 * INTERVAL '1 second') + AND (r.last_nudged_at IS NULL + OR r.last_nudged_at < now() - ($2 * INTERVAL '1 second')) + AND w.status = 'online' + AND COALESCE(w.active_tasks, 0) = 0 + GROUP BY r.recipient_id + LIMIT $3 + ` + + rows, err := s.db.QueryContext(ctx, sweepQuery, + int(s.staleAfter.Seconds()), int(s.reNudgeWait.Seconds()), s.limit) + if err != nil { + log.Printf("RequestNudgeSweeper: sweep query failed: %v", err) + res.Errors++ + return res + } + defer rows.Close() + + type group struct { + recipientID string + ids []string + } + var todo []group + for rows.Next() { + var g group + // array_agg(text) comes back as a Postgres text[]; pq scans it into a + // pq.StringArray. We avoid the pq dependency by scanning into a + // driver-native []string via a small adapter type. + var ids stringArray + if err := rows.Scan(&g.recipientID, &ids); err != nil { + log.Printf("RequestNudgeSweeper: scan failed: %v", err) + res.Errors++ + continue + } + g.ids = ids + if len(g.ids) == 0 { + continue + } + todo = append(todo, g) + } + if err := rows.Err(); err != nil { + log.Printf("RequestNudgeSweeper: rows.Err: %v", err) + res.Errors++ + } + + now := time.Now() + for _, g := range todo { + if err := s.nudgeAgent(ctx, g.recipientID, g.ids, now); err != nil { + log.Printf("RequestNudgeSweeper: nudge agent %s (%d items) failed: %v", + g.recipientID, len(g.ids), err) + res.Errors++ + continue + } + res.AgentsNudged++ + res.ItemsCovered += len(g.ids) + } + + if res.AgentsNudged > 0 || res.Errors > 0 { + log.Printf("RequestNudgeSweeper: sweep complete — agents_nudged=%d items_covered=%d errors=%d", + res.AgentsNudged, res.ItemsCovered, res.Errors) + } + return res +} + +// nudgeAgent enqueues one A2A nudge for the idle recipient agent covering the +// given stale request ids, then stamps last_nudged_at on those ids. The +// last_nudged_at UPDATE only fires after a successful enqueue so a failed +// enqueue is retried next sweep (the items stay un-stamped, hence eligible). +func (s *RequestNudgeSweeper) nudgeAgent(ctx context.Context, recipientID string, ids []string, now time.Time) error { + body, err := buildNudgeBody(len(ids)) + if err != nil { + return fmt.Errorf("build nudge body: %w", err) + } + + // Idempotency key bucketed to the current hour so two concurrent sweeper + // boots collapse to one queued nudge per agent per hour at the queue layer + // too — defense in depth on top of the last_nudged_at rate-limit. Empty + // callerID = canvas/system-style enqueue (source_id NULL), matching the + // scheduler's internal fire path. + idemKey := fmt.Sprintf("inbox-nudge:%s:%d", recipientID, now.Truncate(time.Hour).Unix()) + + if _, _, err := s.enqueue(ctx, recipientID, "", PriorityInfo, body, "message/send", idemKey, nil); err != nil { + return fmt.Errorf("enqueue nudge: %w", err) + } + + // Stamp last_nudged_at on exactly the items this nudge covered. ANY($1) + // over the text[] of ids; cast id to text for the comparison so the + // param type is unambiguous. Re-querying eligibility here would race with + // the enqueue, so we trust the ids the sweep already gated. + if _, err := s.db.ExecContext(ctx, ` + UPDATE requests + SET last_nudged_at = now() + WHERE id::text = ANY($1) + `, stringArray(ids)); err != nil { + return fmt.Errorf("stamp last_nudged_at: %w", err) + } + return nil +} + +// buildNudgeBody constructs the A2A `message/send` JSON-RPC body for the nudge. +// Mirrors the scheduler's body shape (role=user, generated messageId, single +// text part) so the receiving agent processes it like any other inbound turn. +func buildNudgeBody(n int) ([]byte, error) { + plural := "request" + if n != 1 { + plural = "requests" + } + text := fmt.Sprintf( + "You have %d unhandled inbox %s awaiting your response. "+ + "Use list_inbox to see them and respond_request / add_request_message to act.", + n, plural, + ) + return json.Marshal(map[string]interface{}{ + "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "messageId": "inbox-nudge-" + uuid.New().String(), + "parts": []map[string]interface{}{{"kind": "text", "text": text}}, + }, + }, + }) +} + +// stringArray scans a Postgres text[] into a []string and serializes a +// []string into a Postgres array literal for parameter binding — a minimal +// inline adapter so this file doesn't pull a new driver dependency just for +// array_agg / ANY(text[]). Mirrors lib/pq's StringArray semantics for the +// shapes we use (no NULL elements, no embedded special chars in UUID/text-id +// values). +type stringArray []string + +// Scan implements sql.Scanner for a Postgres text[] value (delivered as +// []byte or string like `{a,b,c}`). +func (a *stringArray) Scan(src interface{}) error { + if src == nil { + *a = nil + return nil + } + var s string + switch v := src.(type) { + case []byte: + s = string(v) + case string: + s = v + default: + return fmt.Errorf("stringArray.Scan: unsupported source type %T", src) + } + *a = parsePGTextArray(s) + return nil +} + +// Value implements driver.Valuer, emitting a Postgres array literal `{a,b,c}`. +// Used as the ANY($1) param in the last_nudged_at UPDATE. Each element is +// double-quoted and backslash/quote-escaped so ids containing array-special +// characters bind correctly. +func (a stringArray) Value() (driver.Value, error) { + if a == nil { + return nil, nil + } + out := make([]byte, 0, len(a)*40+2) + out = append(out, '{') + for i, e := range a { + if i > 0 { + out = append(out, ',') + } + out = append(out, '"') + for _, c := range []byte(e) { + if c == '"' || c == '\\' { + out = append(out, '\\') + } + out = append(out, c) + } + out = append(out, '"') + } + out = append(out, '}') + return string(out), nil +} + +// parsePGTextArray parses a Postgres array literal `{a,"b c",d}` into a +// []string. Handles the unquoted and double-quoted element forms emitted by +// array_agg(text); element values here are UUIDs so the common path is the +// simple comma split, but quoted handling is included for correctness. +func parsePGTextArray(s string) []string { + if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' { + return nil + } + body := s[1 : len(s)-1] + if body == "" { + return nil + } + var out []string + var cur []byte + inQuotes := false + escaped := false + for i := 0; i < len(body); i++ { + c := body[i] + switch { + case escaped: + cur = append(cur, c) + escaped = false + case c == '\\': + escaped = true + case c == '"': + inQuotes = !inQuotes + case c == ',' && !inQuotes: + out = append(out, string(cur)) + cur = cur[:0] + default: + cur = append(cur, c) + } + } + out = append(out, string(cur)) + return out +} diff --git a/workspace-server/internal/handlers/request_nudge_sweeper_test.go b/workspace-server/internal/handlers/request_nudge_sweeper_test.go new file mode 100644 index 000000000..c55738b46 --- /dev/null +++ b/workspace-server/internal/handlers/request_nudge_sweeper_test.go @@ -0,0 +1,271 @@ +package handlers + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" +) + +// request_nudge_sweeper_test.go — coverage for the RFC unified-requests-inbox +// Phase 4 idle-agent inbox-nudge sweeper. Validates: +// +// 1. A stale pending item for an IDLE online agent gets nudged + its +// last_nudged_at stamped. +// 2. A busy (active_tasks>0) or offline agent is NOT returned by the sweep +// query (gated in SQL) → no nudge, no stamp. +// 3. A recently-nudged item (<1h) is excluded by the query → no nudge. +// 4. A user-recipient item is never selected → no nudge. +// 5. Empty result set is a clean no-op. +// 6. Env-override interval parses + falls back; defaults when unset. +// +// The idle/busy/offline/user-recipient/recently-nudged gates all live in the +// single sweep SELECT (status='online', active_tasks=0, recipient_type='agent', +// last_nudged_at filter). So at the sqlmock level those cases are expressed as +// "the query returns the row" vs "the query returns no row" — the test pins +// that a returned row drives exactly one enqueue + one stamp, and that an +// empty result drives neither. The SQL predicates themselves are asserted by +// the integration/real-DB harness, not sqlmock (which can't evaluate WHERE). + +// newTestNudgeSweeper builds a sweeper on the sqlmock db.DB with a fake +// enqueue that records calls, so tests assert the nudge without mocking +// EnqueueA2A's internal SQL. +type recordedEnqueue struct { + workspaceID string + idemKey string + method string + body []byte + calls int +} + +func newTestNudgeSweeper(t *testing.T) (*RequestNudgeSweeper, *recordedEnqueue) { + t.Helper() + sw := NewRequestNudgeSweeper(nil) // binds to the sqlmock db.DB set by setupTestDB + rec := &recordedEnqueue{} + sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int, + body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) { + rec.calls++ + rec.workspaceID = workspaceID + rec.idemKey = idempotencyKey + rec.method = method + rec.body = body + return "queue-id-1", 1, nil + } + return sw, rec +} + +func TestNudgeSweeper_EmptyResultIsCleanNoOp(t *testing.T) { + mock := setupTestDB(t) + sw, rec := newTestNudgeSweeper(t) + + mock.ExpectQuery(`SELECT r.recipient_id`). + WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"})) + + res := sw.Sweep(context.Background()) + if res.AgentsNudged != 0 || res.ItemsCovered != 0 || res.Errors != 0 { + t.Errorf("empty set must produce zero changes; got %+v", res) + } + if rec.calls != 0 { + t.Errorf("no enqueue expected on empty set; got %d", rec.calls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestNudgeSweeper_StaleIdleAgentIsNudgedAndStamped(t *testing.T) { + mock := setupTestDB(t) + sw, rec := newTestNudgeSweeper(t) + + const ws = "11111111-1111-1111-1111-111111111111" + + // One idle online agent with two stale pending items. + mock.ExpectQuery(`SELECT r.recipient_id`). + WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}). + AddRow(ws, "{req-a,req-b}")) + + // After the (faked) enqueue, the two covered items get last_nudged_at set. + mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`). + WithArgs(sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 2)) + + res := sw.Sweep(context.Background()) + if res.AgentsNudged != 1 { + t.Errorf("expected 1 agent nudged, got %d", res.AgentsNudged) + } + if res.ItemsCovered != 2 { + t.Errorf("expected 2 items covered, got %d", res.ItemsCovered) + } + if res.Errors != 0 { + t.Errorf("expected 0 errors, got %d", res.Errors) + } + if rec.calls != 1 { + t.Fatalf("expected exactly 1 enqueue, got %d", rec.calls) + } + if rec.workspaceID != ws { + t.Errorf("nudge enqueued to wrong workspace: got %q want %q", rec.workspaceID, ws) + } + if rec.method != "message/send" { + t.Errorf("nudge method: got %q want message/send", rec.method) + } + if rec.idemKey == "" { + t.Errorf("expected a non-empty hourly idempotency key") + } + // Body should mention the count (2 → plural "requests") and the tool names. + bs := string(rec.body) + if !strings.Contains(bs, "2 unhandled inbox requests") { + t.Errorf("nudge body missing pluralized count: %s", bs) + } + if !strings.Contains(bs, "list_inbox") || !strings.Contains(bs, "respond_request") { + t.Errorf("nudge body missing tool guidance: %s", bs) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// TestNudgeSweeper_SingularBodyForOneItem pins the "1 request" (singular) copy. +func TestNudgeSweeper_SingularBodyForOneItem(t *testing.T) { + mock := setupTestDB(t) + sw, rec := newTestNudgeSweeper(t) + const ws = "22222222-2222-2222-2222-222222222222" + + mock.ExpectQuery(`SELECT r.recipient_id`). + WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}). + AddRow(ws, "{only-one}")) + mock.ExpectExec(`UPDATE requests\s+SET last_nudged_at = now\(\)`). + WithArgs(sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + res := sw.Sweep(context.Background()) + if res.ItemsCovered != 1 { + t.Fatalf("expected 1 item covered, got %d", res.ItemsCovered) + } + if bs := string(rec.body); !strings.Contains(bs, "1 unhandled inbox request ") { + t.Errorf("expected singular copy '1 unhandled inbox request'; got %s", bs) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL — the sweep +// query excludes busy agents (active_tasks>0), offline agents (status!=online), +// user-recipient items (recipient_type='agent' filter), and recently-nudged +// items (last_nudged_at filter). Whatever the reason a candidate is ineligible, +// it does not appear in the result set → no enqueue, no stamp. This pins the +// "no row ⇒ no side effects" contract that those gates rely on. +func TestNudgeSweeper_BusyOfflineUserAndRecentlyNudgedAreGatedBySQL(t *testing.T) { + mock := setupTestDB(t) + sw, rec := newTestNudgeSweeper(t) + + // The real WHERE clause filtered all of {busy, offline, user-recipient, + // recently-nudged} out, so the query yields zero rows. + mock.ExpectQuery(`SELECT r.recipient_id`). + WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"})) + + res := sw.Sweep(context.Background()) + if res.AgentsNudged != 0 { + t.Errorf("ineligible candidates must not be nudged; got %d", res.AgentsNudged) + } + if rec.calls != 0 { + t.Errorf("no enqueue expected for ineligible candidates; got %d", rec.calls) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet (an UPDATE/stamp fired for an ineligible candidate?): %v", err) + } +} + +// TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped — if the enqueue fails, +// the last_nudged_at UPDATE must NOT fire, so the items stay eligible and the +// next sweep retries. Pins that the stamp is gated on a successful enqueue. +func TestNudgeSweeper_EnqueueFailureLeavesItemsUnstamped(t *testing.T) { + mock := setupTestDB(t) + sw, _ := newTestNudgeSweeper(t) + const ws = "33333333-3333-3333-3333-333333333333" + + sw.enqueue = func(ctx context.Context, workspaceID, callerID string, priority int, + body []byte, method, idempotencyKey string, expiresAt *time.Time) (string, int, error) { + return "", 0, context.DeadlineExceeded + } + + mock.ExpectQuery(`SELECT r.recipient_id`). + WillReturnRows(sqlmock.NewRows([]string{"recipient_id", "ids"}). + AddRow(ws, "{req-x}")) + // No ExpectExec for the UPDATE — it must not fire after a failed enqueue. + + res := sw.Sweep(context.Background()) + if res.Errors != 1 { + t.Errorf("expected 1 error from failed enqueue, got %d", res.Errors) + } + if res.AgentsNudged != 0 { + t.Errorf("failed enqueue must not count as nudged; got %d", res.AgentsNudged) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet (did the stamp UPDATE fire despite a failed enqueue?): %v", err) + } +} + +// ---------- env override parsing ---------- + +func TestNudgeSweeperConstructor_PicksUpEnvOverride(t *testing.T) { + t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "90") + mock := setupTestDB(t) + _ = mock + sw := NewRequestNudgeSweeper(nil) + if sw.Interval() != 90*time.Second { + t.Errorf("interval override not picked up: got %v", sw.Interval()) + } +} + +func TestNudgeSweeperConstructor_DefaultWhenEnvUnset(t *testing.T) { + t.Setenv("REQUEST_NUDGE_SWEEPER_INTERVAL_S", "") + mock := setupTestDB(t) + _ = mock + sw := NewRequestNudgeSweeper(nil) + if sw.Interval() != defaultRequestNudgeInterval { + t.Errorf("default interval not used: got %v", sw.Interval()) + } +} + +// ---------- pg array adapter ---------- + +func TestStringArray_RoundTrip(t *testing.T) { + // Scan a Postgres text[] literal, then Value() it back. + var a stringArray + if err := a.Scan("{aaa,bbb,ccc}"); err != nil { + t.Fatalf("scan: %v", err) + } + if len(a) != 3 || a[0] != "aaa" || a[2] != "ccc" { + t.Fatalf("parsed wrong: %#v", a) + } + v, err := a.Value() + if err != nil { + t.Fatalf("value: %v", err) + } + if v.(string) != `{"aaa","bbb","ccc"}` { + t.Errorf("unexpected literal: %v", v) + } +} + +func TestStringArray_ScanNilAndEmpty(t *testing.T) { + var a stringArray + if err := a.Scan(nil); err != nil || a != nil { + t.Errorf("nil scan should yield nil slice, no error; got %#v err=%v", a, err) + } + if err := a.Scan("{}"); err != nil || len(a) != 0 { + t.Errorf("empty array literal should yield empty slice; got %#v err=%v", a, err) + } +} + +func TestStringArray_ScanQuotedElements(t *testing.T) { + var a stringArray + if err := a.Scan(`{"a b","c,d","e\"f"}`); err != nil { + t.Fatalf("scan quoted: %v", err) + } + if len(a) != 3 || a[0] != "a b" || a[1] != "c,d" || a[2] != `e"f` { + t.Fatalf("quoted parse wrong: %#v", a) + } +} diff --git a/workspace-server/migrations/20260610130000_requests_last_nudged.down.sql b/workspace-server/migrations/20260610130000_requests_last_nudged.down.sql new file mode 100644 index 000000000..f54eb46fd --- /dev/null +++ b/workspace-server/migrations/20260610130000_requests_last_nudged.down.sql @@ -0,0 +1,7 @@ +-- Reverse of 20260610130000_requests_last_nudged.up.sql. IF EXISTS guards so +-- the down migration is safe whether or not requests / the column / the index +-- are present. +DROP INDEX IF EXISTS idx_requests_nudge_candidates; + +ALTER TABLE IF EXISTS requests + DROP COLUMN IF EXISTS last_nudged_at; diff --git a/workspace-server/migrations/20260610130000_requests_last_nudged.up.sql b/workspace-server/migrations/20260610130000_requests_last_nudged.up.sql new file mode 100644 index 000000000..96f14d631 --- /dev/null +++ b/workspace-server/migrations/20260610130000_requests_last_nudged.up.sql @@ -0,0 +1,34 @@ +-- requests.last_nudged_at — RFC unified-requests-inbox P4 (idle-agent nudge). +-- +-- Phase 4 adds a periodic sweeper (request_nudge_sweeper.go) that pokes an +-- IDLE agent which has unhandled inbox items so it doesn't forget to process +-- them. The sweeper rate-limits itself to ≤1 nudge per request per hour by +-- stamping last_nudged_at when it enqueues a nudge. +-- +-- Idempotency / ordering safety +-- ----------------------------- +-- The migration runner re-applies every *.up.sql on each boot, and on some +-- orderings the P1 `requests` table may not exist yet on the box this runs +-- against. Both statements are therefore fully idempotent AND guarded with +-- IF EXISTS on the table so they no-op (rather than crash-loop) when requests +-- is absent — the column gets added once P1's CREATE TABLE has landed. +ALTER TABLE IF EXISTS requests + ADD COLUMN IF NOT EXISTS last_nudged_at TIMESTAMPTZ; + +-- Partial index supporting the sweep query's hot predicate: agent recipients +-- in a nudge-eligible status, ordered by how long ago we last nudged them. +-- Partial (only the statuses the sweeper scans) keeps it small; the COALESCE +-- in the sweep's last_nudged_at filter still benefits because Postgres can +-- range-scan this index for the recipient/status prefix. IF NOT EXISTS so the +-- re-apply is a no-op; wrapped in a DO block guarded on table existence so it +-- is skipped cleanly when requests isn't created yet. +DO $$ +BEGIN + IF EXISTS ( + SELECT 1 FROM information_schema.tables + WHERE table_name = 'requests' + ) THEN + CREATE INDEX IF NOT EXISTS idx_requests_nudge_candidates + ON requests (recipient_type, status, recipient_id, last_nudged_at); + END IF; +END$$;