From 59e5b20724424d0f42721d0b4bd85c2033604653 Mon Sep 17 00:00:00 2001 From: devops-engineer Date: Wed, 10 Jun 2026 10:23:24 +0000 Subject: [PATCH] =?UTF-8?q?feat(requests):=20P1=20=E2=80=94=20unified=20re?= =?UTF-8?q?quests/inbox=20data=20model=20+=20endpoints=20(RFC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Phase 1 of the unified-requests-inbox RFC: a single `requests` subsystem that generalizes `user_tasks` (agent→user worklist asks) and `approval_requests` (the destructive-action gate) into one inbox keyed by `kind ∈ {task, approval}`, where requester and recipient may each be a user OR an agent. Responding is asynchronous — the requester never blocks; a REQUEST_RESPONDED event signals it to pick the answer up on its next tick. Schema (migrations/20260610120000_requests.{up,down}.sql, idempotent): - `requests`: kind, requester_{type,id}, org_id, recipient_{type,id}, title, detail, status (pending|info_requested|done|rejected|approved|cancelled), responder_{type,id}, priority, created/updated/responded_at. recipient_id / requester_id are plain TEXT with NO FK (a party may be a user, not a workspaces row). Indexes for inbox, org-pending, and outgoing reads. - `request_messages`: the More-Info / "chat about this" thread (FK → requests, ON DELETE CASCADE), indexed by (request_id, created_at). - Idempotent backfill (ON CONFLICT (id) DO NOTHING) copies historical user_tasks (kind=task; dismissed→rejected) and approval_requests (kind=approval; denied→rejected, escalated→pending) into the unified inbox so the tabs show pre-cutover items. Store (internal/handlers/request_store.go): RequestStore mirrors UserTaskStore — per-request over global db.DB, events.EventEmitter for testability, sentinel errors. Create / Get / Messages / ListInbox / ListOutgoing / ListPendingForOrg (LEFT JOIN workspaces for the agent name) / Respond (validates action↔kind: approval→approved|rejected, task→done|rejected) / RequestInfo / AddMessage (flips info_requested when the recipient asks back) / Cancel. Mutations broadcast REQUEST_CREATED / REQUEST_RESPONDED / REQUEST_MESSAGE anchored on the agent party so the canvas/inbox is signalled. Handler (internal/handlers/requests.go) + routes (internal/router/router.go), mirroring the approvals/user-tasks auth split: - wsAuth (workspace token): POST /workspaces/:id/requests, GET .../requests (outgoing), GET .../requests/inbox, plus agent-side .../requests/:requestId/{get,respond,messages,cancel}. - AdminAuth (canvas user): GET /requests/pending?kind=task|approval (the tabs), and /requests/:requestId/{get,respond,messages,cancel}. The org_id anchor is resolved via orgRootID (org_scope.go) — the workspaces table has no org_id column. Events (internal/events/types.go): adds REQUEST_CREATED, REQUEST_RESPONDED, REQUEST_MESSAGE to the taxonomy + AllEventTypes; drift snapshot (types_test.go) updated. Canvas TS mirror is a later phase (not touched here). Tests (internal/handlers/requests_test.go): 20 cases via the existing sqlmock harness — create (task + approval, agent recipient), inbox vs outgoing, get+thread, respond (valid + invalid action-for-kind + not-found), More-Info message → info_requested (recipient flips, requester doesn't), cancel, org pending list + kind filter, recipient routing. This is P1 of the unified-requests RFC. Follow-ons: P2 MCP tools + approval shims, P3 canvas Tasks/Approvals tabs UI, P4 idle-nudge worker. Not included here. RFC: docs/design/rfc-unified-requests-inbox.md Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/design/rfc-unified-requests-inbox.md | 102 ++++ workspace-server/internal/events/types.go | 26 +- .../internal/events/types_test.go | 3 + .../internal/handlers/request_store.go | 526 ++++++++++++++++ .../internal/handlers/requests.go | 376 ++++++++++++ .../internal/handlers/requests_test.go | 563 ++++++++++++++++++ workspace-server/internal/router/router.go | 30 + .../20260610120000_requests.down.sql | 4 + .../migrations/20260610120000_requests.up.sql | 122 ++++ 9 files changed, 1744 insertions(+), 8 deletions(-) create mode 100644 docs/design/rfc-unified-requests-inbox.md create mode 100644 workspace-server/internal/handlers/request_store.go create mode 100644 workspace-server/internal/handlers/requests.go create mode 100644 workspace-server/internal/handlers/requests_test.go create mode 100644 workspace-server/migrations/20260610120000_requests.down.sql create mode 100644 workspace-server/migrations/20260610120000_requests.up.sql diff --git a/docs/design/rfc-unified-requests-inbox.md b/docs/design/rfc-unified-requests-inbox.md new file mode 100644 index 000000000..06655b4d6 --- /dev/null +++ b/docs/design/rfc-unified-requests-inbox.md @@ -0,0 +1,102 @@ +# RFC: Unified Requests / Inbox subsystem (Tasks + Approvals) + +**Status:** proposed (awaiting CTO sign-off) · **Author:** CEO-assistant · **Date:** 2026-06-10 + +## 1. Motivation +Agents need to ask **users or other agents** to do something (task) or to approve +something (approval), **asynchronously**, with a clear **inbox**, action buttons, a +clarification thread, and accountability (who responded). Today only a thin approval +primitive exists (`create_approval`/`decide_approval` + one-way `notify_user`). This +RFC generalizes that into **one** requests subsystem with `kind ∈ {task, approval}`. + +## 2. Data model (unified) +**`requests`** +- `id` uuid +- `kind` enum: `task` | `approval` +- `requester_type` (`user`|`agent`), `requester_id`, `org_id` +- `recipient_type` (`user`|`agent`), `recipient_id` ← user OR agent +- `title` text, `detail` markdown +- `status` enum: `pending` | `info_requested` | `done` | `rejected` | `approved` | `cancelled` +- `responder_type` (`user`|`agent`), `responder_id` ← nullable until acted; **who acted** +- `priority` smallint (optional), `created_at`, `updated_at`, `responded_at` + +**`request_messages`** (the "More Info / chat about this" thread) +- `id`, `request_id` fk, `author_type`, `author_id`, `body`, `created_at` + +## 3. Lifecycle / actions +| kind | buttons | transitions | +|---|---|---| +| task | **Done · Reject · More Info** | pending→done / pending→rejected / pending↔info_requested | +| approval | **Approve · Reject · More Info** | pending→approved / pending→rejected / pending↔info_requested | + +Terminal: `done`/`approved`/`rejected`/`cancelled`. **More Info** sets `info_requested` +and appends a `request_messages` row ("chat about this"); either side replies until it +resolves to a terminal action. Every terminal action stamps `responder_id`+`responder_type`. + +## 4. Async semantics (non-blocking — core requirement) +- Creating a request returns immediately with `request_id`; the **requester never blocks** + and keeps working. +- The response is delivered **asynchronously**: a signal is posted to the requester's + activity/inbox (an `a2a_receive`-style event) carrying `{request_id, status, responder_id, + thread}`. The requester picks it up on its **next tick** — never sits and waits. +- Recipient **agents** receive/poll their pending inbox; recipient **users** see the + Tasks/Approvals tabs (live via the existing WebSocket). + +## 5. Control-plane API +- `POST /workspaces/{id}/requests` — create (requester = that workspace) `{kind, recipient_type, recipient_id, title, detail, priority?}` +- `GET /requests?recipient_type=&recipient_id=&status=` — inbox list; org-scoped variant powers the "all agents' incoming" tab view +- `GET /requests/{id}` +- `POST /requests/{id}/respond` `{action: done|rejected|approved}` — `responder_id` taken from the **authenticated principal** (user session or agent token), not the body +- `POST /requests/{id}/messages` `{body}` — More-Info thread +- `POST /requests/{id}/cancel` — requester withdraws + +## 6. MCP tools (agent-facing; unify with existing) +- `create_request(recipient, kind, title, detail)` — supersedes `create_approval` (which stays as a `kind=approval` alias during deprecation) +- `list_inbox(status?)` — the calling agent's **incoming** requests +- `respond_request(request_id, action, message?)` — agent responds to a request addressed to it (done/reject/approve, or open More-Info) +- `add_request_message(request_id, body)` — thread reply +- `check_requests()` — status of requests the agent **sent** (the async pickup) +- User-side actions flow through the canvas UI → `/respond` (captures the user id). + +## 7. Canvas UI (Tasks + Approvals tabs — already scaffolded) +- Each tab lists items of that kind, **grouped by requesting agent**, newest first: agent + name + avatar, title, detail preview, age, status badge. +- Buttons per the table in §3. Click Done/Approve/Reject → `POST /respond` with the + logged-in user → optimistic update + toast. +- **More Info** expands an inline thread panel ("chat about this"): message list + input → + `POST /messages`; status → `info_requested`; requester replies appear in-thread. +- "All agents' incoming" = org-scoped list. Live updates over the existing WS. + +## 8. Idle-nudge sweep +- A CP periodic worker: for each agent workspace that is **idle** (`active_tasks=0`, + `status=online`) **and** has ≥1 `pending`/`info_requested` request where it is the + recipient, older than threshold → inject a **nudge** (notify/tick: "you have M unhandled + inbox items …") so it processes them via `respond_request`. Covers the "agent forgot / + didn't use the proper MCP tool" case. +- User-addressed requests pending past threshold → UI badge (+ optional channel notify). +- **Anti-spam:** rate-limit (≤1 nudge per request per hour); stop once the agent acts. + +## 9. Responder identity / multi-user (forward-looking) +`responder_id`+`responder_type` stamped on every terminal action; UI shows "Approved by +". Schema already supports multiple users with roles in one space (a request can be +fulfilled by a different person than it was shown to). Per-role routing/permissions is +**out of scope for v1** but the model doesn't preclude it. + +## 10. Approvals migration +New `requests` table; **idempotent migration** copies existing `approvals` rows in as +`kind=approval`. `create_approval`/`decide_approval`/`list_pending_approvals` become thin +shims over the requests endpoints for a deprecation window, then removed. + +## 11. Phasing (each phase = SOP PR → 2-genuine → merge) +- **P0** RFC sign-off (this doc) +- **P1** CP: `requests`+`request_messages` tables, migration, endpoints, tests (molecule-controlplane) +- **P2** MCP tools + approval shims (core/runtime) +- **P3** Canvas UI: Tasks/Approvals tabs render + buttons + More-Info thread + WS live (app/canvas) +- **P4** idle-nudge worker + user-pending notifications +- **P5** deprecate old approval path; docs + +## 12. Recommended defaults on open points (flag if you disagree) +- (a) **Keep approval shims** for one deprecation window (not a hard cut) — safer for in-flight callers. +- (b) Nudge: **idle + pending > 10 min → nudge, max 1/hr per request.** +- (c) v1 = **single recipient per request** (fan-out later). +- (d) v1 = **any user in the org may respond** to a user-addressed request (role-gating later). diff --git a/workspace-server/internal/events/types.go b/workspace-server/internal/events/types.go index 48395b48a..abc760e93 100644 --- a/workspace-server/internal/events/types.go +++ b/workspace-server/internal/events/types.go @@ -41,8 +41,8 @@ type EventType string // scan-friendly as it grows. const ( // Chat / agent messaging — surfaces in canvas chat panels. - EventAgentMessage EventType = "AGENT_MESSAGE" - EventA2AResponse EventType = "A2A_RESPONSE" + EventAgentMessage EventType = "AGENT_MESSAGE" + EventA2AResponse EventType = "A2A_RESPONSE" EventActivityLogged EventType = "ACTIVITY_LOGGED" EventChannelMessage EventType = "CHANNEL_MESSAGE" @@ -59,11 +59,11 @@ const ( EventWorkspaceHeartbeat EventType = "WORKSPACE_HEARTBEAT" // Agent assignment + identity. - EventAgentAssigned EventType = "AGENT_ASSIGNED" - EventAgentReplaced EventType = "AGENT_REPLACED" - EventAgentRemoved EventType = "AGENT_REMOVED" - EventAgentMoved EventType = "AGENT_MOVED" - EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED" + EventAgentAssigned EventType = "AGENT_ASSIGNED" + EventAgentReplaced EventType = "AGENT_REPLACED" + EventAgentRemoved EventType = "AGENT_REMOVED" + EventAgentMoved EventType = "AGENT_MOVED" + EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED" // Delegation lifecycle. EventDelegationSent EventType = "DELEGATION_SENT" @@ -72,7 +72,7 @@ const ( EventDelegationFailed EventType = "DELEGATION_FAILED" // Task progression + scheduler. - EventTaskUpdated EventType = "TASK_UPDATED" + EventTaskUpdated EventType = "TASK_UPDATED" EventCronExecuted EventType = "CRON_EXECUTED" EventCronSkipped EventType = "CRON_SKIPPED" @@ -84,6 +84,13 @@ const ( EventUserTaskRequested EventType = "USER_TASK_REQUESTED" EventUserTaskResolved EventType = "USER_TASK_RESOLVED" + // Requests — the unified Tasks + Approvals inbox (RFC P1). REQUEST_CREATED + // pokes a recipient agent's inbox; REQUEST_RESPONDED is the async signal-back + // to the requester; REQUEST_MESSAGE is a More-Info thread reply. + EventRequestCreated EventType = "REQUEST_CREATED" + EventRequestResponded EventType = "REQUEST_RESPONDED" + EventRequestMessage EventType = "REQUEST_MESSAGE" + // Auth / credentials. EventExternalCredentialsRotated EventType = "EXTERNAL_CREDENTIALS_ROTATED" ) @@ -115,6 +122,9 @@ var AllEventTypes = []EventType{ EventDelegationSent, EventDelegationStatus, EventExternalCredentialsRotated, + EventRequestCreated, + EventRequestMessage, + EventRequestResponded, EventTaskUpdated, EventUserTaskRequested, EventUserTaskResolved, diff --git a/workspace-server/internal/events/types_test.go b/workspace-server/internal/events/types_test.go index f1ee61dba..219e34867 100644 --- a/workspace-server/internal/events/types_test.go +++ b/workspace-server/internal/events/types_test.go @@ -40,6 +40,9 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) { "DELEGATION_SENT", "DELEGATION_STATUS", "EXTERNAL_CREDENTIALS_ROTATED", + "REQUEST_CREATED", + "REQUEST_MESSAGE", + "REQUEST_RESPONDED", "TASK_UPDATED", "USER_TASK_REQUESTED", "USER_TASK_RESOLVED", diff --git a/workspace-server/internal/handlers/request_store.go b/workspace-server/internal/handlers/request_store.go new file mode 100644 index 000000000..2be454d03 --- /dev/null +++ b/workspace-server/internal/handlers/request_store.go @@ -0,0 +1,526 @@ +package handlers + +// RequestStore is the SSOT for the unified "requests" primitive — the Tasks + +// Approvals inbox (RFC P1, docs/design/rfc-unified-requests-inbox.md). It +// generalizes UserTaskStore (agent→user worklist asks) and the inline +// approval_requests SQL in approvals.go into ONE store keyed by kind ∈ +// {task, approval}, where requester and recipient may each be a user OR an +// agent. +// +// Every surface that mutates or reads `requests` — the REST handlers in +// requests.go AND (in a later phase) the MCP request tools — MUST route through +// this store rather than re-implement the SQL + status-enum validation + +// REQUEST_* broadcast inline. Two copies of one contract drift silently; this +// is the same consolidation rationale as UserTaskStore / AgentMessageWriter. +// +// The store owns persistence + validation + the event broadcast. HTTP-specific +// concerns (gin binding, status codes) stay in the handler. Construct per call +// via NewRequestStore over the live global db.DB so the test harness's db.DB +// swap is observed — mirroring UserTaskStore. + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events" +) + +// ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage +// when no row matches — the request does not exist, or (for the mutating +// actions) is already terminal. Callers translate to HTTP 404. +var ErrRequestNotFound = errors.New("request: not found") + +// ErrInvalidRequestAction is returned when a respond action is outside the +// done/rejected/approved set, or is incompatible with the request's kind +// (approval→approved/rejected, task→done/rejected). Callers translate to 400. +var ErrInvalidRequestAction = errors.New("request: invalid action for this request kind") + +// ErrInvalidRequestKind is returned when Create is given a kind outside +// task/approval. Callers translate to HTTP 400. +var ErrInvalidRequestKind = errors.New("request: kind must be 'task' or 'approval'") + +// ErrInvalidRequestParty is returned when a requester_type / recipient_type / +// author_type is outside the user/agent enum. Callers translate to HTTP 400. +var ErrInvalidRequestParty = errors.New("request: type must be 'user' or 'agent'") + +// CreateRequestInput is the set of fields Create needs. requester_* identifies +// who raised it (for a per-workspace POST that is the calling agent); recipient_* +// is who must act. Detail/OrgID/Priority are optional (zero values → NULL). +type CreateRequestInput struct { + Kind string + RequesterType string + RequesterID string + OrgID string + RecipientType string + RecipientID string + Title string + Detail string + Priority *int +} + +// RequestRow is one request as returned by the list + get methods. +// WorkspaceName is non-empty only for ListPendingForOrg rows whose party is an +// agent (decorated via a LEFT JOIN on workspaces); the inbox/outgoing lists +// leave it empty. +type RequestRow struct { + ID string `json:"id"` + Kind string `json:"kind"` + RequesterType string `json:"requester_type"` + RequesterID string `json:"requester_id"` + OrgID *string `json:"org_id"` + RecipientType string `json:"recipient_type"` + RecipientID string `json:"recipient_id"` + Title string `json:"title"` + Detail *string `json:"detail"` + Status string `json:"status"` + ResponderType *string `json:"responder_type"` + ResponderID *string `json:"responder_id"` + Priority *int `json:"priority"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + RespondedAt *string `json:"responded_at"` + WorkspaceName *string `json:"workspace_name,omitempty"` +} + +// RequestMessageRow is one row of a request's More-Info thread. +type RequestMessageRow struct { + ID string `json:"id"` + RequestID string `json:"request_id"` + AuthorType string `json:"author_type"` + AuthorID string `json:"author_id"` + Body string `json:"body"` + CreatedAt string `json:"created_at"` +} + +// RequestStore persists + broadcasts request mutations. Takes +// events.EventEmitter (not the concrete *Broadcaster) so tests can substitute +// a fake emitter, mirroring UserTaskStore. +type RequestStore struct { + db *sql.DB + broadcaster events.EventEmitter +} + +// NewRequestStore binds the store to a DB pool + the platform broadcaster. +func NewRequestStore(db *sql.DB, broadcaster events.EventEmitter) *RequestStore { + return &RequestStore{db: db, broadcaster: broadcaster} +} + +// requestColumns is the canonical SELECT projection for a single request row, +// shared by Get / ListInbox / ListOutgoing so the Scan order can't drift. +const requestColumns = `id, kind, requester_type, requester_id, org_id, + recipient_type, recipient_id, title, detail, status, + responder_type, responder_id, priority, created_at, updated_at, responded_at` + +// scanRequest reads one row in requestColumns order into a RequestRow. +func scanRequest(rows *sql.Rows) (RequestRow, error) { + var r RequestRow + err := rows.Scan( + &r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID, + &r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status, + &r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt, + ) + return r, err +} + +func validParty(t string) bool { return t == "user" || t == "agent" } + +// broadcastTarget picks the workspace id to anchor a REQUEST_* event on. Events +// are workspace-scoped (structure_events.workspace_id), so we anchor on the +// agent party when there is one — the requester (so its canvas/inbox is +// signalled on a response) or, lacking that, the recipient. A user-only request +// has no workspace anchor; we skip the broadcast rather than insert a bad row. +func broadcastTarget(requesterType, requesterID, recipientType, recipientID string) string { + if requesterType == "agent" && requesterID != "" { + return requesterID + } + if recipientType == "agent" && recipientID != "" { + return recipientID + } + return "" +} + +// Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored +// on the recipient agent if any, so an agent recipient's inbox is signalled). +// Returns the new request id. Validates kind + party enums up front. +func (s *RequestStore) Create(ctx context.Context, in CreateRequestInput) (string, error) { + if in.Kind != "task" && in.Kind != "approval" { + return "", ErrInvalidRequestKind + } + if !validParty(in.RequesterType) || !validParty(in.RecipientType) { + return "", ErrInvalidRequestParty + } + + var detailArg interface{} + if in.Detail != "" { + detailArg = in.Detail + } + var orgArg interface{} + if in.OrgID != "" { + orgArg = in.OrgID + } + var priorityArg interface{} + if in.Priority != nil { + priorityArg = *in.Priority + } + + var requestID string + err := s.db.QueryRowContext(ctx, ` + INSERT INTO requests ( + kind, requester_type, requester_id, org_id, + recipient_type, recipient_id, title, detail, priority + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + RETURNING id + `, in.Kind, in.RequesterType, in.RequesterID, orgArg, + in.RecipientType, in.RecipientID, in.Title, detailArg, priorityArg).Scan(&requestID) + if err != nil { + return "", fmt.Errorf("request: create: %w", err) + } + + // Anchor REQUEST_CREATED on the recipient agent (so its inbox is poked) if + // the recipient is an agent; else on the requester. A user→user request has + // no agent to signal — skip the broadcast. + target := broadcastTarget(in.RecipientType, in.RecipientID, in.RequesterType, in.RequesterID) + if target != "" { + if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestCreated), target, map[string]interface{}{ + "request_id": requestID, + "kind": in.Kind, + "recipient_type": in.RecipientType, + "recipient_id": in.RecipientID, + "title": in.Title, + }); err != nil { + log.Printf("request: failed to broadcast created for %s: %v", target, err) + } + } + + return requestID, nil +} + +// Get returns a single request by id, or ErrRequestNotFound. +func (s *RequestStore) Get(ctx context.Context, id string) (RequestRow, error) { + rows, err := s.db.QueryContext(ctx, `SELECT `+requestColumns+` FROM requests WHERE id = $1`, id) + if err != nil { + return RequestRow{}, fmt.Errorf("request: get: %w", err) + } + defer rows.Close() + if !rows.Next() { + if err := rows.Err(); err != nil { + return RequestRow{}, fmt.Errorf("request: get: %w", err) + } + return RequestRow{}, ErrRequestNotFound + } + r, err := scanRequest(rows) + if err != nil { + return RequestRow{}, fmt.Errorf("request: get scan: %w", err) + } + return r, nil +} + +// Messages returns a request's More-Info thread, oldest first. +func (s *RequestStore) Messages(ctx context.Context, requestID string) ([]RequestMessageRow, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT id, request_id, author_type, author_id, body, created_at + FROM request_messages WHERE request_id = $1 + ORDER BY created_at ASC LIMIT 200 + `, requestID) + if err != nil { + return nil, fmt.Errorf("request: messages: %w", err) + } + defer rows.Close() + + msgs := make([]RequestMessageRow, 0) + for rows.Next() { + var m RequestMessageRow + if rows.Scan(&m.ID, &m.RequestID, &m.AuthorType, &m.AuthorID, &m.Body, &m.CreatedAt) != nil { + continue + } + msgs = append(msgs, m) + } + if err := rows.Err(); err != nil { + log.Printf("request: messages rows.Err request=%s: %v", requestID, err) + } + return msgs, nil +} + +// ListInbox returns the requests addressed TO a recipient (recipient_type + +// recipient_id), newest first, optionally filtered by status. status "" = all. +func (s *RequestStore) ListInbox(ctx context.Context, recipientType, recipientID, status string) ([]RequestRow, error) { + q := `SELECT ` + requestColumns + ` FROM requests + WHERE recipient_type = $1 AND recipient_id = $2` + args := []interface{}{recipientType, recipientID} + if status != "" { + q += ` AND status = $3` + args = append(args, status) + } + q += ` ORDER BY created_at DESC LIMIT 50` + return s.queryRequests(ctx, "list inbox", q, args) +} + +// ListOutgoing returns the requests a requester RAISED (requester_type + +// requester_id), newest first — the async pickup of responses. +func (s *RequestStore) ListOutgoing(ctx context.Context, requesterType, requesterID, status string) ([]RequestRow, error) { + q := `SELECT ` + requestColumns + ` FROM requests + WHERE requester_type = $1 AND requester_id = $2` + args := []interface{}{requesterType, requesterID} + if status != "" { + q += ` AND status = $3` + args = append(args, status) + } + q += ` ORDER BY created_at DESC LIMIT 50` + return s.queryRequests(ctx, "list outgoing", q, args) +} + +// ListPendingForOrg powers the cross-org "all incoming" canvas view. Returns +// pending + info_requested requests in an org, newest first, decorated with the +// requesting/responding agent's workspace name via a LEFT JOIN (NULL for a user +// party). kind "" = both tabs; "task"/"approval" filters one. +func (s *RequestStore) ListPendingForOrg(ctx context.Context, orgID, kind string) ([]RequestRow, error) { + // LEFT JOIN workspaces on the requester id when it is an agent, to surface a + // human-readable name in the org view. recipient name is left to the caller. + q := `SELECT ` + + `r.id, r.kind, r.requester_type, r.requester_id, r.org_id, + r.recipient_type, r.recipient_id, r.title, r.detail, r.status, + r.responder_type, r.responder_id, r.priority, r.created_at, r.updated_at, r.responded_at, + w.name + FROM requests r + LEFT JOIN workspaces w + ON r.requester_type = 'agent' AND w.id = r.requester_id::uuid + WHERE r.status IN ('pending', 'info_requested')` + args := []interface{}{} + if orgID != "" { + args = append(args, orgID) + q += fmt.Sprintf(" AND r.org_id = $%d", len(args)) + } + if kind != "" { + args = append(args, kind) + q += fmt.Sprintf(" AND r.kind = $%d", len(args)) + } + q += ` ORDER BY r.created_at DESC LIMIT 50` + + rows, err := s.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("request: list pending org: %w", err) + } + defer rows.Close() + + out := make([]RequestRow, 0) + for rows.Next() { + var r RequestRow + if rows.Scan( + &r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID, + &r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status, + &r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt, + &r.WorkspaceName, + ) != nil { + continue + } + out = append(out, r) + } + if err := rows.Err(); err != nil { + log.Printf("request: list pending org rows.Err org=%s: %v", orgID, err) + } + return out, nil +} + +// queryRequests runs a SELECT in requestColumns order and scans into RequestRows. +func (s *RequestStore) queryRequests(ctx context.Context, op, q string, args []interface{}) ([]RequestRow, error) { + rows, err := s.db.QueryContext(ctx, q, args...) + if err != nil { + return nil, fmt.Errorf("request: %s: %w", op, err) + } + defer rows.Close() + + out := make([]RequestRow, 0) + for rows.Next() { + r, scanErr := scanRequest(rows) + if scanErr != nil { + continue + } + out = append(out, r) + } + if err := rows.Err(); err != nil { + log.Printf("request: %s rows.Err: %v", op, err) + } + return out, nil +} + +// actionToStatus maps a terminal respond action to the status it sets, gated by +// the request's kind: approval accepts approved/rejected, task accepts +// done/rejected. Returns ErrInvalidRequestAction on any other combination. +func actionToStatus(kind, action string) (string, error) { + switch kind { + case "approval": + if action == "approved" || action == "rejected" { + return action, nil + } + case "task": + if action == "done" || action == "rejected" { + return action, nil + } + } + return "", ErrInvalidRequestAction +} + +// Respond applies a terminal action (done/rejected/approved, validated against +// the request's kind), stamps responder + responded_at, and broadcasts +// REQUEST_RESPONDED so the requester/canvas picks it up asynchronously. The +// requester is NOT blocked — it reads this via ListOutgoing on its next tick. +// responderType defaults to "user" (the canvas path); responderID defaults to +// "human" when empty. Only acts on a non-terminal (pending/info_requested) row. +func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, responderID string) (RequestRow, error) { + if responderType == "" { + responderType = "user" + } + if !validParty(responderType) { + return RequestRow{}, ErrInvalidRequestParty + } + if responderID == "" { + responderID = "human" + } + + // Look the request up first so we can validate action↔kind compatibility and + // know who to signal (the requester). + req, err := s.Get(ctx, id) + if err != nil { + return RequestRow{}, err + } + status, err := actionToStatus(req.Kind, action) + if err != nil { + return RequestRow{}, err + } + + result, err := s.db.ExecContext(ctx, ` + UPDATE requests + SET status = $1, responder_type = $2, responder_id = $3, + responded_at = now(), updated_at = now() + WHERE id = $4 AND status IN ('pending', 'info_requested') + `, status, responderType, responderID, id) + if err != nil { + return RequestRow{}, fmt.Errorf("request: respond: %w", err) + } + n, err := result.RowsAffected() + if err != nil { + return RequestRow{}, fmt.Errorf("request: respond RowsAffected: %w", err) + } + if n == 0 { + return RequestRow{}, ErrRequestNotFound + } + + // Signal the requester (the agent that raised it) so its inbox/canvas picks + // up the resolution asynchronously. + target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID) + if target != "" { + if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestResponded), target, map[string]interface{}{ + "request_id": id, + "status": status, + "responder_type": responderType, + "responder_id": responderID, + }); err != nil { + log.Printf("request: failed to broadcast responded for %s: %v", target, err) + } + } + + req.Status = status + req.ResponderType = &responderType + req.ResponderID = &responderID + return req, nil +} + +// RequestInfo flips a request to 'info_requested' (the "More Info" transition) +// without a terminal action. Used when the recipient asks the requester for +// clarification. Only acts on a non-terminal row. +func (s *RequestStore) RequestInfo(ctx context.Context, id string) error { + result, err := s.db.ExecContext(ctx, ` + UPDATE requests SET status = 'info_requested', updated_at = now() + WHERE id = $1 AND status IN ('pending', 'info_requested') + `, id) + if err != nil { + return fmt.Errorf("request: request-info: %w", err) + } + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("request: request-info RowsAffected: %w", err) + } + if n == 0 { + return ErrRequestNotFound + } + return nil +} + +// AddMessage appends a row to the More-Info thread and broadcasts +// REQUEST_MESSAGE. When the author is the request's RECIPIENT (i.e. the party +// being asked is asking back for clarification), it also flips the request to +// 'info_requested' so the requester knows it's their turn. Returns the new +// message id. +func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID, body string) (string, error) { + if !validParty(authorType) { + return "", ErrInvalidRequestParty + } + + req, err := s.Get(ctx, id) + if err != nil { + return "", err + } + + var messageID string + err = s.db.QueryRowContext(ctx, ` + INSERT INTO request_messages (request_id, author_type, author_id, body) + VALUES ($1, $2, $3, $4) + RETURNING id + `, id, authorType, authorID, body).Scan(&messageID) + if err != nil { + return "", fmt.Errorf("request: add message: %w", err) + } + + // If the author is the recipient (the one being asked), this message is a + // "please clarify" — flip to info_requested so the requester is prompted. + // Only flip a non-terminal request; a closed request keeps its terminal + // status even if a late note is appended. + if authorType == req.RecipientType && authorID == req.RecipientID { + if _, err := s.db.ExecContext(ctx, ` + UPDATE requests SET status = 'info_requested', updated_at = now() + WHERE id = $1 AND status IN ('pending', 'info_requested') + `, id); err != nil { + log.Printf("request: failed to flip info_requested on message for %s: %v", id, err) + } + } + + // Signal both ends via the requester anchor (the canvas thread listens there). + target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID) + if target != "" { + if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestMessage), target, map[string]interface{}{ + "request_id": id, + "message_id": messageID, + "author_type": authorType, + "author_id": authorID, + }); err != nil { + log.Printf("request: failed to broadcast message for %s: %v", id, err) + } + } + + return messageID, nil +} + +// Cancel withdraws a request (the requester changed its mind). Sets status +// 'cancelled' + updated_at. Only acts on a non-terminal row; returns +// ErrRequestNotFound when missing or already terminal. +func (s *RequestStore) Cancel(ctx context.Context, id string) error { + result, err := s.db.ExecContext(ctx, ` + UPDATE requests SET status = 'cancelled', updated_at = now() + WHERE id = $1 AND status IN ('pending', 'info_requested') + `, id) + if err != nil { + return fmt.Errorf("request: cancel: %w", err) + } + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("request: cancel RowsAffected: %w", err) + } + if n == 0 { + return ErrRequestNotFound + } + return nil +} diff --git a/workspace-server/internal/handlers/requests.go b/workspace-server/internal/handlers/requests.go new file mode 100644 index 000000000..779b7c300 --- /dev/null +++ b/workspace-server/internal/handlers/requests.go @@ -0,0 +1,376 @@ +package handlers + +import ( + "errors" + "log" + "net/http" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events" + "github.com/gin-gonic/gin" +) + +// RequestsHandler serves the unified "requests" inbox — the Tasks + Approvals +// primitive (RFC P1, docs/design/rfc-unified-requests-inbox.md). It generalizes +// UserTasksHandler (agent→user asks) and ApprovalsHandler (the gate) into one +// surface keyed by kind ∈ {task, approval}, where requester and recipient may +// each be a user OR an agent. Responding is asynchronous: the requester is +// never blocked; a REQUEST_RESPONDED event signals it to pick the answer up on +// its next tick. +type RequestsHandler struct { + broadcaster *events.Broadcaster +} + +// --- OpenAPI doc shapes (used by swaggo; the handlers emit gin.H inline) --- + +// CreateRequestBody is the body of POST /workspaces/{id}/requests. requester is +// the calling workspace (agent); only the recipient + content are supplied. +type CreateRequestBody struct { + Kind string `json:"kind" binding:"required" enums:"task,approval"` + RecipientType string `json:"recipient_type" binding:"required" enums:"user,agent"` + RecipientID string `json:"recipient_id"` + Title string `json:"title" binding:"required"` + Detail string `json:"detail"` + Priority *int `json:"priority"` +} + +// CreateRequestResponse is returned by POST /workspaces/{id}/requests. +type CreateRequestResponse struct { + RequestID string `json:"request_id"` + Status string `json:"status"` +} + +// RespondRequestBody is the body of POST /requests/{requestId}/respond. The +// responder identity is taken from the body for now (P1); the canvas path +// defaults responder_type to 'user'. action is validated against the kind. +type RespondRequestBody struct { + Action string `json:"action" binding:"required" enums:"done,rejected,approved"` + ResponderType string `json:"responder_type" enums:"user,agent"` + ResponderID string `json:"responder_id"` +} + +// AddRequestMessageBody is the body of POST /requests/{requestId}/messages — +// the More-Info thread. When the author is the recipient, the request flips to +// info_requested. +type AddRequestMessageBody struct { + Body string `json:"body" binding:"required"` + AuthorType string `json:"author_type" binding:"required" enums:"user,agent"` + AuthorID string `json:"author_id"` +} + +// RequestMutationResponse is the {status, request_id} echo returned by the +// respond / cancel / messages endpoints. +type RequestMutationResponse struct { + Status string `json:"status"` + RequestID string `json:"request_id"` +} + +// RequestWithThread is the GET /requests/{requestId} shape — the request plus +// its More-Info thread. +type RequestWithThread struct { + Request RequestRow `json:"request"` + Messages []RequestMessageRow `json:"messages"` +} + +func NewRequestsHandler(b *events.Broadcaster) *RequestsHandler { + return &RequestsHandler{broadcaster: b} +} + +// store builds a RequestStore over the live global db.DB per request — same +// rationale as UserTasksHandler.store(): the test harness swaps db.DB under us. +func (h *RequestsHandler) store() *RequestStore { + return NewRequestStore(db.DB, h.broadcaster) +} + +// Create handles POST /workspaces/:id/requests — the calling workspace (an +// agent) raises a task/approval addressed to a user or another agent. +// +// @Summary Raise a request (task or approval) +// @Tags requests +// @Accept json +// @Produce json +// @Param id path string true "Requester workspace ID" +// @Param body body CreateRequestBody true "Request fields" +// @Success 201 {object} CreateRequestResponse +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /workspaces/{id}/requests [post] +// @Security BearerAuth && OrgSlugAuth +func (h *RequestsHandler) Create(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + var body CreateRequestBody + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + // Decorate the request with its org anchor for the cross-org pending view. + // The workspaces table has NO org_id column — an "org" is the parent_id-chain + // root resolved by orgRootID (org_scope.go). Best-effort: a missing root + // leaves org_id NULL (the org view simply won't surface it), never blocks + // creation. + orgID, err := orgRootID(ctx, db.DB, workspaceID) + if err != nil { + log.Printf("requests: failed to resolve org root for workspace=%s: %v", workspaceID, err) + orgID = "" + } + + requestID, err := h.store().Create(ctx, CreateRequestInput{ + Kind: body.Kind, + RequesterType: "agent", + RequesterID: workspaceID, + OrgID: orgID, + RecipientType: body.RecipientType, + RecipientID: body.RecipientID, + Title: body.Title, + Detail: body.Detail, + Priority: body.Priority, + }) + if err != nil { + if errors.Is(err, ErrInvalidRequestKind) || errors.Is(err, ErrInvalidRequestParty) { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Printf("Create request error workspace=%s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"}) + return + } + + c.JSON(http.StatusCreated, gin.H{"request_id": requestID, "status": "pending"}) +} + +// ListInbox handles GET /workspaces/:id/requests/inbox?status= — requests +// addressed TO this workspace (the agent's incoming). +// +// @Summary List a workspace's incoming requests (inbox) +// @Tags requests +// @Produce json +// @Param id path string true "Recipient workspace ID" +// @Param status query string false "Filter by status" +// @Success 200 {array} RequestRow +// @Failure 500 {object} ErrorResponse +// @Router /workspaces/{id}/requests/inbox [get] +// @Security BearerAuth && OrgSlugAuth +func (h *RequestsHandler) ListInbox(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + rows, err := h.store().ListInbox(ctx, "agent", workspaceID, c.Query("status")) + if err != nil { + log.Printf("List inbox error workspace=%s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + c.JSON(http.StatusOK, rows) +} + +// ListOutgoing handles GET /workspaces/:id/requests?status= — the requests this +// workspace RAISED (the async pickup of responses). +// +// @Summary List a workspace's outgoing requests +// @Tags requests +// @Produce json +// @Param id path string true "Requester workspace ID" +// @Param status query string false "Filter by status" +// @Success 200 {array} RequestRow +// @Failure 500 {object} ErrorResponse +// @Router /workspaces/{id}/requests [get] +// @Security BearerAuth && OrgSlugAuth +func (h *RequestsHandler) ListOutgoing(c *gin.Context) { + workspaceID := c.Param("id") + ctx := c.Request.Context() + + rows, err := h.store().ListOutgoing(ctx, "agent", workspaceID, c.Query("status")) + if err != nil { + log.Printf("List outgoing error workspace=%s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + c.JSON(http.StatusOK, rows) +} + +// Get handles GET /requests/:requestId — a single request plus its More-Info +// thread. +// +// @Summary Get a request with its message thread +// @Tags requests +// @Produce json +// @Param requestId path string true "Request ID" +// @Success 200 {object} RequestWithThread +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /requests/{requestId} [get] +// @Security BearerAuth +func (h *RequestsHandler) Get(c *gin.Context) { + requestID := c.Param("requestId") + ctx := c.Request.Context() + + s := h.store() + req, err := s.Get(ctx, requestID) + if err != nil { + if errors.Is(err, ErrRequestNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "request not found"}) + return + } + log.Printf("Get request error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + msgs, err := s.Messages(ctx, requestID) + if err != nil { + log.Printf("Get request messages error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + c.JSON(http.StatusOK, gin.H{"request": req, "messages": msgs}) +} + +// Respond handles POST /requests/:requestId/respond — a terminal action +// (done/rejected/approved), validated against the request's kind. responder +// identity comes from the body; the canvas/admin path defaults to 'user'. +// +// @Summary Respond to a request (done / rejected / approved) +// @Tags requests +// @Accept json +// @Produce json +// @Param requestId path string true "Request ID" +// @Param body body RespondRequestBody true "Response" +// @Success 200 {object} RequestMutationResponse +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /requests/{requestId}/respond [post] +// @Security BearerAuth +func (h *RequestsHandler) Respond(c *gin.Context) { + requestID := c.Param("requestId") + ctx := c.Request.Context() + + var body RespondRequestBody + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + if _, err := h.store().Respond(ctx, requestID, body.Action, body.ResponderType, body.ResponderID); err != nil { + if errors.Is(err, ErrRequestNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"}) + return + } + if errors.Is(err, ErrInvalidRequestAction) || errors.Is(err, ErrInvalidRequestParty) { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Printf("Respond request error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update"}) + return + } + + c.JSON(http.StatusOK, gin.H{"status": body.Action, "request_id": requestID}) +} + +// AddMessage handles POST /requests/:requestId/messages — append to the +// More-Info thread. When the author is the recipient, the request flips to +// info_requested. +// +// @Summary Add a message to a request's More-Info thread +// @Tags requests +// @Accept json +// @Produce json +// @Param requestId path string true "Request ID" +// @Param body body AddRequestMessageBody true "Message" +// @Success 201 {object} RequestMutationResponse +// @Failure 400 {object} ErrorResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /requests/{requestId}/messages [post] +// @Security BearerAuth +func (h *RequestsHandler) AddMessage(c *gin.Context) { + requestID := c.Param("requestId") + ctx := c.Request.Context() + + var body AddRequestMessageBody + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body) + if err != nil { + if errors.Is(err, ErrRequestNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "request not found"}) + return + } + if errors.Is(err, ErrInvalidRequestParty) { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + log.Printf("AddMessage request error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"}) + return + } + + c.JSON(http.StatusCreated, gin.H{"status": "created", "request_id": requestID, "message_id": messageID}) +} + +// Cancel handles POST /requests/:requestId/cancel — the requester withdraws. +// +// @Summary Cancel (withdraw) a request +// @Tags requests +// @Produce json +// @Param requestId path string true "Request ID" +// @Success 200 {object} RequestMutationResponse +// @Failure 404 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /requests/{requestId}/cancel [post] +// @Security BearerAuth +func (h *RequestsHandler) Cancel(c *gin.Context) { + requestID := c.Param("requestId") + ctx := c.Request.Context() + + if err := h.store().Cancel(ctx, requestID); err != nil { + if errors.Is(err, ErrRequestNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"}) + return + } + log.Printf("Cancel request error request=%s: %v", requestID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to cancel"}) + return + } + + c.JSON(http.StatusOK, gin.H{"status": "cancelled", "request_id": requestID}) +} + +// ListPending handles GET /requests/pending?kind= — the cross-org pending view +// for the canvas Tasks/Approvals tabs. Cross-workspace, so AdminAuth-gated like +// /user-tasks/pending and /approvals/pending. ?kind=task|approval lets each tab +// query its own slice. +// +// @Summary List pending requests across the org (canvas tabs) +// @Tags requests +// @Produce json +// @Param kind query string false "Filter by kind" Enums(task, approval) +// @Param org_id query string false "Filter by org" +// @Success 200 {array} RequestRow +// @Failure 400 {object} ErrorResponse +// @Failure 500 {object} ErrorResponse +// @Router /requests/pending [get] +// @Security BearerAuth +func (h *RequestsHandler) ListPending(c *gin.Context) { + ctx := c.Request.Context() + + kind := c.Query("kind") + if kind != "" && kind != "task" && kind != "approval" { + c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'task' or 'approval'"}) + return + } + + rows, err := h.store().ListPendingForOrg(ctx, c.Query("org_id"), kind) + if err != nil { + log.Printf("ListPending requests error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + c.JSON(http.StatusOK, rows) +} diff --git a/workspace-server/internal/handlers/requests_test.go b/workspace-server/internal/handlers/requests_test.go new file mode 100644 index 000000000..2e7101c4d --- /dev/null +++ b/workspace-server/internal/handlers/requests_test.go @@ -0,0 +1,563 @@ +package handlers + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// requestColumnNames mirrors the requestColumns SELECT projection order so the +// sqlmock rows line up with scanRequest. +var requestColumnNames = []string{ + "id", "kind", "requester_type", "requester_id", "org_id", + "recipient_type", "recipient_id", "title", "detail", "status", + "responder_type", "responder_id", "priority", "created_at", "updated_at", "responded_at", +} + +// oneRequestRow builds a single-row result set for a Get/List SELECT. +func oneRequestRow(id, kind, requesterID, recipientType, recipientID, status string) *sqlmock.Rows { + return sqlmock.NewRows(requestColumnNames).AddRow( + id, kind, "agent", requesterID, nil, + recipientType, recipientID, "Some title", nil, status, + nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil, + ) +} + +// ---------- Create ---------- + +func TestRequests_Create_Task(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // Create handler first resolves the org root (org_scope CTE). + mock.ExpectQuery("WITH RECURSIVE org_chain"). + WithArgs("ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1")) + // INSERT request → id + mock.ExpectQuery("INSERT INTO requests"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-1")) + // REQUEST_CREATED broadcast (recipient is a user here → anchors on requester agent) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + body := `{"kind":"task","recipient_type":"user","recipient_id":"","title":"Review the launch draft","detail":"posts/launch.md"}` + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["request_id"] != "req-1" { + t.Errorf("expected request_id req-1, got %v", resp["request_id"]) + } + if resp["status"] != "pending" { + t.Errorf("expected status pending, got %v", resp["status"]) + } +} + +func TestRequests_Create_Approval_AgentRecipient(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("WITH RECURSIVE org_chain"). + WithArgs("ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1")) + mock.ExpectQuery("INSERT INTO requests"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-2")) + // Recipient is an agent → REQUEST_CREATED anchors on the recipient workspace. + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + body := `{"kind":"approval","recipient_type":"agent","recipient_id":"ws-2","title":"Approve deploy"}` + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_Create_MissingTitle(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"task","recipient_type":"user"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for missing title, got %d", w.Code) + } +} + +func TestRequests_Create_InvalidKind(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // org root resolves, then Create rejects the kind before any INSERT. + mock.ExpectQuery("WITH RECURSIVE org_chain"). + WithArgs("ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"banana","recipient_type":"user","title":"x"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Create(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for invalid kind, got %d: %s", w.Code, w.Body.String()) + } +} + +// ---------- Inbox vs Outgoing listing ---------- + +func TestRequests_ListInbox(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests"). + WithArgs("agent", "ws-2"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-2"}} + c.Request = httptest.NewRequest("GET", "/", nil) + + handler.ListInbox(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 1 || resp[0]["id"] != "req-1" { + t.Errorf("expected one inbox request req-1, got %v", resp) + } +} + +func TestRequests_ListInbox_WithStatusFilter(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests"). + WithArgs("agent", "ws-2", "pending"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-2"}} + c.Request = httptest.NewRequest("GET", "/?status=pending", nil) + + handler.ListInbox(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_ListOutgoing(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests"). + WithArgs("agent", "ws-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "done")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/", nil) + + handler.ListOutgoing(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 1 || resp[0]["status"] != "done" { + t.Errorf("expected one outgoing request status done, got %v", resp) + } +} + +// ---------- Get (with thread) ---------- + +func TestRequests_Get_WithThread(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "info_requested")) + mock.ExpectQuery("FROM request_messages WHERE request_id"). + WithArgs("req-1"). + WillReturnRows(sqlmock.NewRows([]string{"id", "request_id", "author_type", "author_id", "body", "created_at"}). + AddRow("msg-1", "req-1", "user", "u-1", "what file?", "2026-06-10T01:00:00Z")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("GET", "/", nil) + + handler.Get(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["request"] == nil || resp["messages"] == nil { + t.Errorf("expected request + messages keys, got %v", resp) + } +} + +func TestRequests_Get_NotFound(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("nope"). + WillReturnRows(sqlmock.NewRows(requestColumnNames)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "nope"}} + c.Request = httptest.NewRequest("GET", "/", nil) + + handler.Get(c) + + if w.Code != http.StatusNotFound { + t.Errorf("expected 404, got %d", w.Code) + } +} + +// ---------- Respond (valid + invalid action-for-kind) ---------- + +func TestRequests_Respond_ApprovalApproved(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // Respond does Get first to validate action↔kind. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending")) + mock.ExpectExec("UPDATE requests"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // REQUEST_RESPONDED broadcast (anchored on requester agent ws-1). + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["status"] != "approved" { + t.Errorf("expected status approved, got %v", resp["status"]) + } +} + +func TestRequests_Respond_TaskDone(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "pending")) + mock.ExpectExec("UPDATE requests"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_Respond_InvalidActionForKind(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // kind=approval cannot be "done" — Get succeeds, actionToStatus rejects. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for done-on-approval, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_Respond_NotFound(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("nope"). + WillReturnRows(sqlmock.NewRows(requestColumnNames)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "nope"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Respond(c) + + if w.Code != http.StatusNotFound { + t.Errorf("expected 404, got %d", w.Code) + } +} + +// ---------- AddMessage → info_requested when recipient asks ---------- + +func TestRequests_AddMessage_RecipientFlipsInfoRequested(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // Author is the recipient agent ws-2 → flips to info_requested. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending")) + mock.ExpectQuery("INSERT INTO request_messages"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1")) + // recipient-authored → status flip UPDATE + mock.ExpectExec("UPDATE requests SET status = 'info_requested'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // REQUEST_MESSAGE broadcast (anchored on requester ws-1) + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-2"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if resp["message_id"] != "msg-1" { + t.Errorf("expected message_id msg-1, got %v", resp["message_id"]) + } +} + +func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // Author is the requester ws-1 (not recipient) → NO info_requested flip. + mock.ExpectQuery("FROM requests WHERE id"). + WithArgs("req-1"). + WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested")) + mock.ExpectQuery("INSERT INTO request_messages"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2")) + // No status-flip UPDATE expected here. + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"posts/launch.md","author_type":"agent","author_id":"ws-1"}`)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.AddMessage(c) + + if w.Code != http.StatusCreated { + t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String()) + } +} + +// ---------- Cancel ---------- + +func TestRequests_Cancel_Success(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectExec("UPDATE requests SET status = 'cancelled'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "req-1"}} + c.Request = httptest.NewRequest("POST", "/", nil) + + handler.Cancel(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_Cancel_NotFound(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectExec("UPDATE requests SET status = 'cancelled'"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "requestId", Value: "gone"}} + c.Request = httptest.NewRequest("POST", "/", nil) + + handler.Cancel(c) + + if w.Code != http.StatusNotFound { + t.Errorf("expected 404, got %d", w.Code) + } +} + +// ---------- ListPending (org view) + kind filter ---------- + +// pendingColumnNames mirrors ListPendingForOrg's projection (requestColumns + w.name). +var pendingColumnNames = append(append([]string{}, requestColumnNames...), "workspace_name") + +func onePendingRow(id, kind string) *sqlmock.Rows { + return sqlmock.NewRows(pendingColumnNames).AddRow( + id, kind, "agent", "ws-1", "org-root-1", + "user", "", "Some title", nil, "pending", + nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil, + "Marketing Agent", + ) +} + +func TestRequests_ListPending_NoFilter(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + mock.ExpectQuery("FROM requests r"). + WillReturnRows(onePendingRow("req-1", "task").AddRow( + "req-2", "approval", "agent", "ws-2", "org-root-1", + "user", "", "Approve deploy", nil, "pending", + nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil, + "Ops Agent", + )) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/", nil) + + handler.ListPending(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 2 { + t.Errorf("expected 2 pending, got %d (%v)", len(resp), resp) + } + if resp[0]["workspace_name"] != "Marketing Agent" { + t.Errorf("expected decorated workspace_name, got %v", resp[0]["workspace_name"]) + } +} + +func TestRequests_ListPending_KindFilter(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + // ?kind=approval must add an arg. + mock.ExpectQuery("FROM requests r"). + WithArgs("approval"). + WillReturnRows(onePendingRow("req-2", "approval")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/?kind=approval", nil) + + handler.ListPending(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestRequests_ListPending_InvalidKind(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewRequestsHandler(newTestBroadcaster()) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/?kind=banana", nil) + + handler.ListPending(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for invalid kind, got %d", w.Code) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 460cf4bb7..f43c4efcd 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -348,6 +348,36 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // AdminAuth-gated exactly like /approvals/pending. r.GET("/user-tasks/pending", middleware.AdminAuth(db.DB), uth.ListAll) + // Requests — the unified Tasks + Approvals inbox (RFC P1). Generalizes + // approvals + user-tasks into one model keyed by kind. Auth is split the + // same way: per-workspace create/list under wsAuth (an agent acts with + // its workspace token); the cross-org pending view + the + // /requests/:requestId/* action paths are AdminAuth-gated for the canvas + // user. Because an AGENT can also respond to a request addressed to it + // (using its own workspace token), the action verbs are ALSO registered + // under the wsAuth /workspaces/:id/requests/:requestId/* prefix — same + // dual-surface pattern the brief calls for (agent = workspace token, + // canvas user = admin token), no new auth mechanism. + rqh := handlers.NewRequestsHandler(broadcaster) + wsAuth.POST("/requests", rqh.Create) + wsAuth.GET("/requests", rqh.ListOutgoing) + wsAuth.GET("/requests/inbox", rqh.ListInbox) + // Agent-side action verbs (workspace-token auth). + wsAuth.GET("/requests/:requestId", rqh.Get) + wsAuth.POST("/requests/:requestId/respond", rqh.Respond) + wsAuth.POST("/requests/:requestId/messages", rqh.AddMessage) + wsAuth.POST("/requests/:requestId/cancel", rqh.Cancel) + // Cross-org pending view for the canvas Tasks/Approvals tabs — AdminAuth + // like /user-tasks/pending. ?kind=task|approval drives each tab. + r.GET("/requests/pending", middleware.AdminAuth(db.DB), rqh.ListPending) + // Canvas-user action verbs (admin auth). Same handlers; the responder + // defaults to 'user' on this path. + reqAdmin := r.Group("", middleware.AdminAuth(db.DB)) + reqAdmin.GET("/requests/:requestId", rqh.Get) + reqAdmin.POST("/requests/:requestId/respond", rqh.Respond) + reqAdmin.POST("/requests/:requestId/messages", rqh.AddMessage) + reqAdmin.POST("/requests/:requestId/cancel", rqh.Cancel) + // (TeamHandler is gone — #2864.) The visual canvas Collapse // button calls PATCH /workspaces/:id { collapsed: true/false } // (presentational toggle on canvas_layouts), NOT the destructive diff --git a/workspace-server/migrations/20260610120000_requests.down.sql b/workspace-server/migrations/20260610120000_requests.down.sql new file mode 100644 index 000000000..a6ed94831 --- /dev/null +++ b/workspace-server/migrations/20260610120000_requests.down.sql @@ -0,0 +1,4 @@ +-- Drop child first (FK), then parent. Idempotent (IF EXISTS) so a re-run of the +-- down migration is a no-op rather than a crash. +DROP TABLE IF EXISTS request_messages; +DROP TABLE IF EXISTS requests; diff --git a/workspace-server/migrations/20260610120000_requests.up.sql b/workspace-server/migrations/20260610120000_requests.up.sql new file mode 100644 index 000000000..e180108d8 --- /dev/null +++ b/workspace-server/migrations/20260610120000_requests.up.sql @@ -0,0 +1,122 @@ +-- requests: the unified Tasks + Approvals primitive (RFC P1). Generalizes +-- user_tasks (007-style worklist "asks") and approval_requests (the destructive +-- gate) into ONE inbox keyed by kind ∈ {task, approval}, with a requester and a +-- recipient that may each be a user OR an agent. See +-- docs/design/rfc-unified-requests-inbox.md. +-- +-- recipient_id / requester_id are plain TEXT with NO foreign key on purpose: an +-- agent's id is a workspaces(id) UUID, but a user's id is not a workspaces row, +-- so a cross-type FK is impossible. ListPendingForOrg LEFT JOINs workspaces to +-- decorate a name when the party is an agent (a user party simply has no match). +-- +-- The migration runner tracks applied filenames in schema_migrations, but a +-- partial-failure re-run would re-apply this file, so EVERYTHING here is +-- idempotent: CREATE TABLE/INDEX IF NOT EXISTS (no bare ALTER ADD CONSTRAINT, +-- which can't be IF NOT EXISTS and would crash-loop), and the historical +-- backfills use ON CONFLICT (id) DO NOTHING so re-runs are no-ops. +CREATE TABLE IF NOT EXISTS requests ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + kind TEXT NOT NULL CHECK (kind IN ('task', 'approval')), + requester_type TEXT NOT NULL CHECK (requester_type IN ('user', 'agent')), + requester_id TEXT NOT NULL, + org_id UUID, + recipient_type TEXT NOT NULL CHECK (recipient_type IN ('user', 'agent')), + recipient_id TEXT NOT NULL, + title TEXT NOT NULL, + detail TEXT, + status TEXT NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'info_requested', 'done', 'rejected', 'approved', 'cancelled')), + responder_type TEXT, + responder_id TEXT, + priority SMALLINT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + responded_at TIMESTAMPTZ +); + +-- Inbox read: WHERE recipient_type=$1 AND recipient_id=$2 [AND status=$3] +-- ORDER BY created_at DESC. This is the recipient's "incoming" list (agent poll +-- or user Tasks/Approvals tab). +CREATE INDEX IF NOT EXISTS idx_requests_inbox + ON requests (recipient_type, recipient_id, status, created_at DESC); + +-- Org pending view ("all agents' incoming" tab): WHERE org_id=$1 AND status... +CREATE INDEX IF NOT EXISTS idx_requests_org_pending + ON requests (org_id, status, created_at DESC); + +-- Outgoing / async pickup: WHERE requester_type=$1 AND requester_id=$2 — the +-- requester reads back the requests it raised (check_requests). +CREATE INDEX IF NOT EXISTS idx_requests_outgoing + ON requests (requester_type, requester_id, created_at DESC); + +-- request_messages: the "More Info / chat about this" thread on a request. +CREATE TABLE IF NOT EXISTS request_messages ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + request_id UUID NOT NULL REFERENCES requests(id) ON DELETE CASCADE, + author_type TEXT NOT NULL CHECK (author_type IN ('user', 'agent')), + author_id TEXT NOT NULL, + body TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +-- Thread read: WHERE request_id=$1 ORDER BY created_at. +CREATE INDEX IF NOT EXISTS idx_request_messages_thread + ON request_messages (request_id, created_at); + +-- Idempotent backfill — copy historical user_tasks into the unified inbox so +-- the Tasks tab shows pre-cutover items. user_tasks are always agent→user asks, +-- so requester_type='agent' (requester_id = the raising workspace), recipient is +-- the human user; recipient_id '' is the "the org's user" sentinel (user ids are +-- not modelled yet in this table's source — P1 keeps it empty, the CHECK permits +-- empty TEXT). 'dismissed' maps to the unified 'rejected'. +INSERT INTO requests ( + id, kind, requester_type, requester_id, recipient_type, recipient_id, + title, detail, status, responder_id, created_at, responded_at +) +SELECT + id, + 'task', + 'agent', + workspace_id::text, + 'user', + '', + title, + detail, + CASE status WHEN 'dismissed' THEN 'rejected' ELSE status END, + resolved_by, + created_at, + resolved_at +FROM user_tasks +ON CONFLICT (id) DO NOTHING; + +-- Idempotent backfill — copy historical approval_requests in as kind='approval'. +-- approval_requests columns (007_approvals.sql): id, workspace_id, task_id, +-- action, reason, context, status IN (pending|approved|denied|escalated), +-- decided_by, decided_at, created_at. Map status: 'denied'→'rejected', +-- 'escalated'→'pending' (still awaiting a decision, just bubbled up). title is +-- the action; detail is the reason. requester = the raising workspace (agent), +-- recipient = the user who decides. +INSERT INTO requests ( + id, kind, requester_type, requester_id, recipient_type, recipient_id, + title, detail, status, responder_id, created_at, responded_at +) +SELECT + id, + 'approval', + 'agent', + workspace_id::text, + 'user', + '', + action, + reason, + CASE status + WHEN 'denied' THEN 'rejected' + WHEN 'escalated' THEN 'pending' + ELSE status + END, + decided_by, + created_at, + decided_at +FROM approval_requests +WHERE workspace_id IS NOT NULL +ON CONFLICT (id) DO NOTHING; -- 2.52.0