feat(requests): P1 — unified requests/inbox data model + endpoints (RFC) #2525

Merged
claude-ceo-assistant merged 1 commits from feat/unified-requests-inbox-p1 into main 2026-06-10 14:14:22 +00:00
9 changed files with 1744 additions and 8 deletions
+102
View File
@@ -0,0 +1,102 @@
# RFC: Unified Requests / Inbox subsystem (Tasks + Approvals)
**Status:** proposed (awaiting CTO sign-off) · **Author:** CEO-assistant · **Date:** 2026-06-10
## 1. Motivation
Agents need to ask **users or other agents** to do something (task) or to approve
something (approval), **asynchronously**, with a clear **inbox**, action buttons, a
clarification thread, and accountability (who responded). Today only a thin approval
primitive exists (`create_approval`/`decide_approval` + one-way `notify_user`). This
RFC generalizes that into **one** requests subsystem with `kind ∈ {task, approval}`.
## 2. Data model (unified)
**`requests`**
- `id` uuid
- `kind` enum: `task` | `approval`
- `requester_type` (`user`|`agent`), `requester_id`, `org_id`
- `recipient_type` (`user`|`agent`), `recipient_id` ← user OR agent
- `title` text, `detail` markdown
- `status` enum: `pending` | `info_requested` | `done` | `rejected` | `approved` | `cancelled`
- `responder_type` (`user`|`agent`), `responder_id` ← nullable until acted; **who acted**
- `priority` smallint (optional), `created_at`, `updated_at`, `responded_at`
**`request_messages`** (the "More Info / chat about this" thread)
- `id`, `request_id` fk, `author_type`, `author_id`, `body`, `created_at`
## 3. Lifecycle / actions
| kind | buttons | transitions |
|---|---|---|
| task | **Done · Reject · More Info** | pending→done / pending→rejected / pending↔info_requested |
| approval | **Approve · Reject · More Info** | pending→approved / pending→rejected / pending↔info_requested |
Terminal: `done`/`approved`/`rejected`/`cancelled`. **More Info** sets `info_requested`
and appends a `request_messages` row ("chat about this"); either side replies until it
resolves to a terminal action. Every terminal action stamps `responder_id`+`responder_type`.
## 4. Async semantics (non-blocking — core requirement)
- Creating a request returns immediately with `request_id`; the **requester never blocks**
and keeps working.
- The response is delivered **asynchronously**: a signal is posted to the requester's
activity/inbox (an `a2a_receive`-style event) carrying `{request_id, status, responder_id,
thread}`. The requester picks it up on its **next tick** — never sits and waits.
- Recipient **agents** receive/poll their pending inbox; recipient **users** see the
Tasks/Approvals tabs (live via the existing WebSocket).
## 5. Control-plane API
- `POST /workspaces/{id}/requests` — create (requester = that workspace) `{kind, recipient_type, recipient_id, title, detail, priority?}`
- `GET /requests?recipient_type=&recipient_id=&status=` — inbox list; org-scoped variant powers the "all agents' incoming" tab view
- `GET /requests/{id}`
- `POST /requests/{id}/respond` `{action: done|rejected|approved}``responder_id` taken from the **authenticated principal** (user session or agent token), not the body
- `POST /requests/{id}/messages` `{body}` — More-Info thread
- `POST /requests/{id}/cancel` — requester withdraws
## 6. MCP tools (agent-facing; unify with existing)
- `create_request(recipient, kind, title, detail)` — supersedes `create_approval` (which stays as a `kind=approval` alias during deprecation)
- `list_inbox(status?)` — the calling agent's **incoming** requests
- `respond_request(request_id, action, message?)` — agent responds to a request addressed to it (done/reject/approve, or open More-Info)
- `add_request_message(request_id, body)` — thread reply
- `check_requests()` — status of requests the agent **sent** (the async pickup)
- User-side actions flow through the canvas UI → `/respond` (captures the user id).
## 7. Canvas UI (Tasks + Approvals tabs — already scaffolded)
- Each tab lists items of that kind, **grouped by requesting agent**, newest first: agent
name + avatar, title, detail preview, age, status badge.
- Buttons per the table in §3. Click Done/Approve/Reject → `POST /respond` with the
logged-in user → optimistic update + toast.
- **More Info** expands an inline thread panel ("chat about this"): message list + input →
`POST /messages`; status → `info_requested`; requester replies appear in-thread.
- "All agents' incoming" = org-scoped list. Live updates over the existing WS.
## 8. Idle-nudge sweep
- A CP periodic worker: for each agent workspace that is **idle** (`active_tasks=0`,
`status=online`) **and** has ≥1 `pending`/`info_requested` request where it is the
recipient, older than threshold → inject a **nudge** (notify/tick: "you have M unhandled
inbox items …") so it processes them via `respond_request`. Covers the "agent forgot /
didn't use the proper MCP tool" case.
- User-addressed requests pending past threshold → UI badge (+ optional channel notify).
- **Anti-spam:** rate-limit (≤1 nudge per request per hour); stop once the agent acts.
## 9. Responder identity / multi-user (forward-looking)
`responder_id`+`responder_type` stamped on every terminal action; UI shows "Approved by
<name>". Schema already supports multiple users with roles in one space (a request can be
fulfilled by a different person than it was shown to). Per-role routing/permissions is
**out of scope for v1** but the model doesn't preclude it.
## 10. Approvals migration
New `requests` table; **idempotent migration** copies existing `approvals` rows in as
`kind=approval`. `create_approval`/`decide_approval`/`list_pending_approvals` become thin
shims over the requests endpoints for a deprecation window, then removed.
## 11. Phasing (each phase = SOP PR → 2-genuine → merge)
- **P0** RFC sign-off (this doc)
- **P1** CP: `requests`+`request_messages` tables, migration, endpoints, tests (molecule-controlplane)
- **P2** MCP tools + approval shims (core/runtime)
- **P3** Canvas UI: Tasks/Approvals tabs render + buttons + More-Info thread + WS live (app/canvas)
- **P4** idle-nudge worker + user-pending notifications
- **P5** deprecate old approval path; docs
## 12. Recommended defaults on open points (flag if you disagree)
- (a) **Keep approval shims** for one deprecation window (not a hard cut) — safer for in-flight callers.
- (b) Nudge: **idle + pending > 10 min → nudge, max 1/hr per request.**
- (c) v1 = **single recipient per request** (fan-out later).
- (d) v1 = **any user in the org may respond** to a user-addressed request (role-gating later).
+18 -8
View File
@@ -41,8 +41,8 @@ type EventType string
// scan-friendly as it grows.
const (
// Chat / agent messaging — surfaces in canvas chat panels.
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventActivityLogged EventType = "ACTIVITY_LOGGED"
EventChannelMessage EventType = "CHANNEL_MESSAGE"
@@ -59,11 +59,11 @@ const (
EventWorkspaceHeartbeat EventType = "WORKSPACE_HEARTBEAT"
// Agent assignment + identity.
EventAgentAssigned EventType = "AGENT_ASSIGNED"
EventAgentReplaced EventType = "AGENT_REPLACED"
EventAgentRemoved EventType = "AGENT_REMOVED"
EventAgentMoved EventType = "AGENT_MOVED"
EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED"
EventAgentAssigned EventType = "AGENT_ASSIGNED"
EventAgentReplaced EventType = "AGENT_REPLACED"
EventAgentRemoved EventType = "AGENT_REMOVED"
EventAgentMoved EventType = "AGENT_MOVED"
EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED"
// Delegation lifecycle.
EventDelegationSent EventType = "DELEGATION_SENT"
@@ -72,7 +72,7 @@ const (
EventDelegationFailed EventType = "DELEGATION_FAILED"
// Task progression + scheduler.
EventTaskUpdated EventType = "TASK_UPDATED"
EventTaskUpdated EventType = "TASK_UPDATED"
EventCronExecuted EventType = "CRON_EXECUTED"
EventCronSkipped EventType = "CRON_SKIPPED"
@@ -84,6 +84,13 @@ const (
EventUserTaskRequested EventType = "USER_TASK_REQUESTED"
EventUserTaskResolved EventType = "USER_TASK_RESOLVED"
// Requests — the unified Tasks + Approvals inbox (RFC P1). REQUEST_CREATED
// pokes a recipient agent's inbox; REQUEST_RESPONDED is the async signal-back
// to the requester; REQUEST_MESSAGE is a More-Info thread reply.
EventRequestCreated EventType = "REQUEST_CREATED"
EventRequestResponded EventType = "REQUEST_RESPONDED"
EventRequestMessage EventType = "REQUEST_MESSAGE"
// Auth / credentials.
EventExternalCredentialsRotated EventType = "EXTERNAL_CREDENTIALS_ROTATED"
)
@@ -115,6 +122,9 @@ var AllEventTypes = []EventType{
EventDelegationSent,
EventDelegationStatus,
EventExternalCredentialsRotated,
EventRequestCreated,
EventRequestMessage,
EventRequestResponded,
EventTaskUpdated,
EventUserTaskRequested,
EventUserTaskResolved,
@@ -40,6 +40,9 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
"DELEGATION_SENT",
"DELEGATION_STATUS",
"EXTERNAL_CREDENTIALS_ROTATED",
"REQUEST_CREATED",
"REQUEST_MESSAGE",
"REQUEST_RESPONDED",
"TASK_UPDATED",
"USER_TASK_REQUESTED",
"USER_TASK_RESOLVED",
@@ -0,0 +1,526 @@
package handlers
// RequestStore is the SSOT for the unified "requests" primitive — the Tasks +
// Approvals inbox (RFC P1, docs/design/rfc-unified-requests-inbox.md). It
// generalizes UserTaskStore (agent→user worklist asks) and the inline
// approval_requests SQL in approvals.go into ONE store keyed by kind ∈
// {task, approval}, where requester and recipient may each be a user OR an
// agent.
//
// Every surface that mutates or reads `requests` — the REST handlers in
// requests.go AND (in a later phase) the MCP request tools — MUST route through
// this store rather than re-implement the SQL + status-enum validation +
// REQUEST_* broadcast inline. Two copies of one contract drift silently; this
// is the same consolidation rationale as UserTaskStore / AgentMessageWriter.
//
// The store owns persistence + validation + the event broadcast. HTTP-specific
// concerns (gin binding, status codes) stay in the handler. Construct per call
// via NewRequestStore over the live global db.DB so the test harness's db.DB
// swap is observed — mirroring UserTaskStore.
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
)
// ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage
// when no row matches — the request does not exist, or (for the mutating
// actions) is already terminal. Callers translate to HTTP 404.
var ErrRequestNotFound = errors.New("request: not found")
// ErrInvalidRequestAction is returned when a respond action is outside the
// done/rejected/approved set, or is incompatible with the request's kind
// (approval→approved/rejected, task→done/rejected). Callers translate to 400.
var ErrInvalidRequestAction = errors.New("request: invalid action for this request kind")
// ErrInvalidRequestKind is returned when Create is given a kind outside
// task/approval. Callers translate to HTTP 400.
var ErrInvalidRequestKind = errors.New("request: kind must be 'task' or 'approval'")
// ErrInvalidRequestParty is returned when a requester_type / recipient_type /
// author_type is outside the user/agent enum. Callers translate to HTTP 400.
var ErrInvalidRequestParty = errors.New("request: type must be 'user' or 'agent'")
// CreateRequestInput is the set of fields Create needs. requester_* identifies
// who raised it (for a per-workspace POST that is the calling agent); recipient_*
// is who must act. Detail/OrgID/Priority are optional (zero values → NULL).
type CreateRequestInput struct {
Kind string
RequesterType string
RequesterID string
OrgID string
RecipientType string
RecipientID string
Title string
Detail string
Priority *int
}
// RequestRow is one request as returned by the list + get methods.
// WorkspaceName is non-empty only for ListPendingForOrg rows whose party is an
// agent (decorated via a LEFT JOIN on workspaces); the inbox/outgoing lists
// leave it empty.
type RequestRow struct {
ID string `json:"id"`
Kind string `json:"kind"`
RequesterType string `json:"requester_type"`
RequesterID string `json:"requester_id"`
OrgID *string `json:"org_id"`
RecipientType string `json:"recipient_type"`
RecipientID string `json:"recipient_id"`
Title string `json:"title"`
Detail *string `json:"detail"`
Status string `json:"status"`
ResponderType *string `json:"responder_type"`
ResponderID *string `json:"responder_id"`
Priority *int `json:"priority"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
RespondedAt *string `json:"responded_at"`
WorkspaceName *string `json:"workspace_name,omitempty"`
}
// RequestMessageRow is one row of a request's More-Info thread.
type RequestMessageRow struct {
ID string `json:"id"`
RequestID string `json:"request_id"`
AuthorType string `json:"author_type"`
AuthorID string `json:"author_id"`
Body string `json:"body"`
CreatedAt string `json:"created_at"`
}
// RequestStore persists + broadcasts request mutations. Takes
// events.EventEmitter (not the concrete *Broadcaster) so tests can substitute
// a fake emitter, mirroring UserTaskStore.
type RequestStore struct {
db *sql.DB
broadcaster events.EventEmitter
}
// NewRequestStore binds the store to a DB pool + the platform broadcaster.
func NewRequestStore(db *sql.DB, broadcaster events.EventEmitter) *RequestStore {
return &RequestStore{db: db, broadcaster: broadcaster}
}
// requestColumns is the canonical SELECT projection for a single request row,
// shared by Get / ListInbox / ListOutgoing so the Scan order can't drift.
const requestColumns = `id, kind, requester_type, requester_id, org_id,
recipient_type, recipient_id, title, detail, status,
responder_type, responder_id, priority, created_at, updated_at, responded_at`
// scanRequest reads one row in requestColumns order into a RequestRow.
func scanRequest(rows *sql.Rows) (RequestRow, error) {
var r RequestRow
err := rows.Scan(
&r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID,
&r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status,
&r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt,
)
return r, err
}
func validParty(t string) bool { return t == "user" || t == "agent" }
// broadcastTarget picks the workspace id to anchor a REQUEST_* event on. Events
// are workspace-scoped (structure_events.workspace_id), so we anchor on the
// agent party when there is one — the requester (so its canvas/inbox is
// signalled on a response) or, lacking that, the recipient. A user-only request
// has no workspace anchor; we skip the broadcast rather than insert a bad row.
func broadcastTarget(requesterType, requesterID, recipientType, recipientID string) string {
if requesterType == "agent" && requesterID != "" {
return requesterID
}
if recipientType == "agent" && recipientID != "" {
return recipientID
}
return ""
}
// Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored
// on the recipient agent if any, so an agent recipient's inbox is signalled).
// Returns the new request id. Validates kind + party enums up front.
func (s *RequestStore) Create(ctx context.Context, in CreateRequestInput) (string, error) {
if in.Kind != "task" && in.Kind != "approval" {
return "", ErrInvalidRequestKind
}
if !validParty(in.RequesterType) || !validParty(in.RecipientType) {
return "", ErrInvalidRequestParty
}
var detailArg interface{}
if in.Detail != "" {
detailArg = in.Detail
}
var orgArg interface{}
if in.OrgID != "" {
orgArg = in.OrgID
}
var priorityArg interface{}
if in.Priority != nil {
priorityArg = *in.Priority
}
var requestID string
err := s.db.QueryRowContext(ctx, `
INSERT INTO requests (
kind, requester_type, requester_id, org_id,
recipient_type, recipient_id, title, detail, priority
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id
`, in.Kind, in.RequesterType, in.RequesterID, orgArg,
in.RecipientType, in.RecipientID, in.Title, detailArg, priorityArg).Scan(&requestID)
if err != nil {
return "", fmt.Errorf("request: create: %w", err)
}
// Anchor REQUEST_CREATED on the recipient agent (so its inbox is poked) if
// the recipient is an agent; else on the requester. A user→user request has
// no agent to signal — skip the broadcast.
target := broadcastTarget(in.RecipientType, in.RecipientID, in.RequesterType, in.RequesterID)
if target != "" {
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestCreated), target, map[string]interface{}{
"request_id": requestID,
"kind": in.Kind,
"recipient_type": in.RecipientType,
"recipient_id": in.RecipientID,
"title": in.Title,
}); err != nil {
log.Printf("request: failed to broadcast created for %s: %v", target, err)
}
}
return requestID, nil
}
// Get returns a single request by id, or ErrRequestNotFound.
func (s *RequestStore) Get(ctx context.Context, id string) (RequestRow, error) {
rows, err := s.db.QueryContext(ctx, `SELECT `+requestColumns+` FROM requests WHERE id = $1`, id)
if err != nil {
return RequestRow{}, fmt.Errorf("request: get: %w", err)
}
defer rows.Close()
if !rows.Next() {
if err := rows.Err(); err != nil {
return RequestRow{}, fmt.Errorf("request: get: %w", err)
}
return RequestRow{}, ErrRequestNotFound
}
r, err := scanRequest(rows)
if err != nil {
return RequestRow{}, fmt.Errorf("request: get scan: %w", err)
}
return r, nil
}
// Messages returns a request's More-Info thread, oldest first.
func (s *RequestStore) Messages(ctx context.Context, requestID string) ([]RequestMessageRow, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, request_id, author_type, author_id, body, created_at
FROM request_messages WHERE request_id = $1
ORDER BY created_at ASC LIMIT 200
`, requestID)
if err != nil {
return nil, fmt.Errorf("request: messages: %w", err)
}
defer rows.Close()
msgs := make([]RequestMessageRow, 0)
for rows.Next() {
var m RequestMessageRow
if rows.Scan(&m.ID, &m.RequestID, &m.AuthorType, &m.AuthorID, &m.Body, &m.CreatedAt) != nil {
continue
}
msgs = append(msgs, m)
}
if err := rows.Err(); err != nil {
log.Printf("request: messages rows.Err request=%s: %v", requestID, err)
}
return msgs, nil
}
// ListInbox returns the requests addressed TO a recipient (recipient_type +
// recipient_id), newest first, optionally filtered by status. status "" = all.
func (s *RequestStore) ListInbox(ctx context.Context, recipientType, recipientID, status string) ([]RequestRow, error) {
q := `SELECT ` + requestColumns + ` FROM requests
WHERE recipient_type = $1 AND recipient_id = $2`
args := []interface{}{recipientType, recipientID}
if status != "" {
q += ` AND status = $3`
args = append(args, status)
}
q += ` ORDER BY created_at DESC LIMIT 50`
return s.queryRequests(ctx, "list inbox", q, args)
}
// ListOutgoing returns the requests a requester RAISED (requester_type +
// requester_id), newest first — the async pickup of responses.
func (s *RequestStore) ListOutgoing(ctx context.Context, requesterType, requesterID, status string) ([]RequestRow, error) {
q := `SELECT ` + requestColumns + ` FROM requests
WHERE requester_type = $1 AND requester_id = $2`
args := []interface{}{requesterType, requesterID}
if status != "" {
q += ` AND status = $3`
args = append(args, status)
}
q += ` ORDER BY created_at DESC LIMIT 50`
return s.queryRequests(ctx, "list outgoing", q, args)
}
// ListPendingForOrg powers the cross-org "all incoming" canvas view. Returns
// pending + info_requested requests in an org, newest first, decorated with the
// requesting/responding agent's workspace name via a LEFT JOIN (NULL for a user
// party). kind "" = both tabs; "task"/"approval" filters one.
func (s *RequestStore) ListPendingForOrg(ctx context.Context, orgID, kind string) ([]RequestRow, error) {
// LEFT JOIN workspaces on the requester id when it is an agent, to surface a
// human-readable name in the org view. recipient name is left to the caller.
q := `SELECT ` +
`r.id, r.kind, r.requester_type, r.requester_id, r.org_id,
r.recipient_type, r.recipient_id, r.title, r.detail, r.status,
r.responder_type, r.responder_id, r.priority, r.created_at, r.updated_at, r.responded_at,
w.name
FROM requests r
LEFT JOIN workspaces w
ON r.requester_type = 'agent' AND w.id = r.requester_id::uuid
WHERE r.status IN ('pending', 'info_requested')`
args := []interface{}{}
if orgID != "" {
args = append(args, orgID)
q += fmt.Sprintf(" AND r.org_id = $%d", len(args))
}
if kind != "" {
args = append(args, kind)
q += fmt.Sprintf(" AND r.kind = $%d", len(args))
}
q += ` ORDER BY r.created_at DESC LIMIT 50`
rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("request: list pending org: %w", err)
}
defer rows.Close()
out := make([]RequestRow, 0)
for rows.Next() {
var r RequestRow
if rows.Scan(
&r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID,
&r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status,
&r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt,
&r.WorkspaceName,
) != nil {
continue
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Printf("request: list pending org rows.Err org=%s: %v", orgID, err)
}
return out, nil
}
// queryRequests runs a SELECT in requestColumns order and scans into RequestRows.
func (s *RequestStore) queryRequests(ctx context.Context, op, q string, args []interface{}) ([]RequestRow, error) {
rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
return nil, fmt.Errorf("request: %s: %w", op, err)
}
defer rows.Close()
out := make([]RequestRow, 0)
for rows.Next() {
r, scanErr := scanRequest(rows)
if scanErr != nil {
continue
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Printf("request: %s rows.Err: %v", op, err)
}
return out, nil
}
// actionToStatus maps a terminal respond action to the status it sets, gated by
// the request's kind: approval accepts approved/rejected, task accepts
// done/rejected. Returns ErrInvalidRequestAction on any other combination.
func actionToStatus(kind, action string) (string, error) {
switch kind {
case "approval":
if action == "approved" || action == "rejected" {
return action, nil
}
case "task":
if action == "done" || action == "rejected" {
return action, nil
}
}
return "", ErrInvalidRequestAction
}
// Respond applies a terminal action (done/rejected/approved, validated against
// the request's kind), stamps responder + responded_at, and broadcasts
// REQUEST_RESPONDED so the requester/canvas picks it up asynchronously. The
// requester is NOT blocked — it reads this via ListOutgoing on its next tick.
// responderType defaults to "user" (the canvas path); responderID defaults to
// "human" when empty. Only acts on a non-terminal (pending/info_requested) row.
func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, responderID string) (RequestRow, error) {
if responderType == "" {
responderType = "user"
}
if !validParty(responderType) {
return RequestRow{}, ErrInvalidRequestParty
}
if responderID == "" {
responderID = "human"
}
// Look the request up first so we can validate action↔kind compatibility and
// know who to signal (the requester).
req, err := s.Get(ctx, id)
if err != nil {
return RequestRow{}, err
}
status, err := actionToStatus(req.Kind, action)
if err != nil {
return RequestRow{}, err
}
result, err := s.db.ExecContext(ctx, `
UPDATE requests
SET status = $1, responder_type = $2, responder_id = $3,
responded_at = now(), updated_at = now()
WHERE id = $4 AND status IN ('pending', 'info_requested')
`, status, responderType, responderID, id)
if err != nil {
return RequestRow{}, fmt.Errorf("request: respond: %w", err)
}
n, err := result.RowsAffected()
if err != nil {
return RequestRow{}, fmt.Errorf("request: respond RowsAffected: %w", err)
}
if n == 0 {
return RequestRow{}, ErrRequestNotFound
}
// Signal the requester (the agent that raised it) so its inbox/canvas picks
// up the resolution asynchronously.
target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID)
if target != "" {
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestResponded), target, map[string]interface{}{
"request_id": id,
"status": status,
"responder_type": responderType,
"responder_id": responderID,
}); err != nil {
log.Printf("request: failed to broadcast responded for %s: %v", target, err)
}
}
req.Status = status
req.ResponderType = &responderType
req.ResponderID = &responderID
return req, nil
}
// RequestInfo flips a request to 'info_requested' (the "More Info" transition)
// without a terminal action. Used when the recipient asks the requester for
// clarification. Only acts on a non-terminal row.
func (s *RequestStore) RequestInfo(ctx context.Context, id string) error {
result, err := s.db.ExecContext(ctx, `
UPDATE requests SET status = 'info_requested', updated_at = now()
WHERE id = $1 AND status IN ('pending', 'info_requested')
`, id)
if err != nil {
return fmt.Errorf("request: request-info: %w", err)
}
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("request: request-info RowsAffected: %w", err)
}
if n == 0 {
return ErrRequestNotFound
}
return nil
}
// AddMessage appends a row to the More-Info thread and broadcasts
// REQUEST_MESSAGE. When the author is the request's RECIPIENT (i.e. the party
// being asked is asking back for clarification), it also flips the request to
// 'info_requested' so the requester knows it's their turn. Returns the new
// message id.
func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID, body string) (string, error) {
if !validParty(authorType) {
return "", ErrInvalidRequestParty
}
req, err := s.Get(ctx, id)
if err != nil {
return "", err
}
var messageID string
err = s.db.QueryRowContext(ctx, `
INSERT INTO request_messages (request_id, author_type, author_id, body)
VALUES ($1, $2, $3, $4)
RETURNING id
`, id, authorType, authorID, body).Scan(&messageID)
if err != nil {
return "", fmt.Errorf("request: add message: %w", err)
}
// If the author is the recipient (the one being asked), this message is a
// "please clarify" — flip to info_requested so the requester is prompted.
// Only flip a non-terminal request; a closed request keeps its terminal
// status even if a late note is appended.
if authorType == req.RecipientType && authorID == req.RecipientID {
if _, err := s.db.ExecContext(ctx, `
UPDATE requests SET status = 'info_requested', updated_at = now()
WHERE id = $1 AND status IN ('pending', 'info_requested')
`, id); err != nil {
log.Printf("request: failed to flip info_requested on message for %s: %v", id, err)
}
}
// Signal both ends via the requester anchor (the canvas thread listens there).
target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID)
if target != "" {
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestMessage), target, map[string]interface{}{
"request_id": id,
"message_id": messageID,
"author_type": authorType,
"author_id": authorID,
}); err != nil {
log.Printf("request: failed to broadcast message for %s: %v", id, err)
}
}
return messageID, nil
}
// Cancel withdraws a request (the requester changed its mind). Sets status
// 'cancelled' + updated_at. Only acts on a non-terminal row; returns
// ErrRequestNotFound when missing or already terminal.
func (s *RequestStore) Cancel(ctx context.Context, id string) error {
result, err := s.db.ExecContext(ctx, `
UPDATE requests SET status = 'cancelled', updated_at = now()
WHERE id = $1 AND status IN ('pending', 'info_requested')
`, id)
if err != nil {
return fmt.Errorf("request: cancel: %w", err)
}
n, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("request: cancel RowsAffected: %w", err)
}
if n == 0 {
return ErrRequestNotFound
}
return nil
}
@@ -0,0 +1,376 @@
package handlers
import (
"errors"
"log"
"net/http"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/gin-gonic/gin"
)
// RequestsHandler serves the unified "requests" inbox — the Tasks + Approvals
// primitive (RFC P1, docs/design/rfc-unified-requests-inbox.md). It generalizes
// UserTasksHandler (agent→user asks) and ApprovalsHandler (the gate) into one
// surface keyed by kind ∈ {task, approval}, where requester and recipient may
// each be a user OR an agent. Responding is asynchronous: the requester is
// never blocked; a REQUEST_RESPONDED event signals it to pick the answer up on
// its next tick.
type RequestsHandler struct {
broadcaster *events.Broadcaster
}
// --- OpenAPI doc shapes (used by swaggo; the handlers emit gin.H inline) ---
// CreateRequestBody is the body of POST /workspaces/{id}/requests. requester is
// the calling workspace (agent); only the recipient + content are supplied.
type CreateRequestBody struct {
Kind string `json:"kind" binding:"required" enums:"task,approval"`
RecipientType string `json:"recipient_type" binding:"required" enums:"user,agent"`
RecipientID string `json:"recipient_id"`
Title string `json:"title" binding:"required"`
Detail string `json:"detail"`
Priority *int `json:"priority"`
}
// CreateRequestResponse is returned by POST /workspaces/{id}/requests.
type CreateRequestResponse struct {
RequestID string `json:"request_id"`
Status string `json:"status"`
}
// RespondRequestBody is the body of POST /requests/{requestId}/respond. The
// responder identity is taken from the body for now (P1); the canvas path
// defaults responder_type to 'user'. action is validated against the kind.
type RespondRequestBody struct {
Action string `json:"action" binding:"required" enums:"done,rejected,approved"`
ResponderType string `json:"responder_type" enums:"user,agent"`
ResponderID string `json:"responder_id"`
}
// AddRequestMessageBody is the body of POST /requests/{requestId}/messages —
// the More-Info thread. When the author is the recipient, the request flips to
// info_requested.
type AddRequestMessageBody struct {
Body string `json:"body" binding:"required"`
AuthorType string `json:"author_type" binding:"required" enums:"user,agent"`
AuthorID string `json:"author_id"`
}
// RequestMutationResponse is the {status, request_id} echo returned by the
// respond / cancel / messages endpoints.
type RequestMutationResponse struct {
Status string `json:"status"`
RequestID string `json:"request_id"`
}
// RequestWithThread is the GET /requests/{requestId} shape — the request plus
// its More-Info thread.
type RequestWithThread struct {
Request RequestRow `json:"request"`
Messages []RequestMessageRow `json:"messages"`
}
func NewRequestsHandler(b *events.Broadcaster) *RequestsHandler {
return &RequestsHandler{broadcaster: b}
}
// store builds a RequestStore over the live global db.DB per request — same
// rationale as UserTasksHandler.store(): the test harness swaps db.DB under us.
func (h *RequestsHandler) store() *RequestStore {
return NewRequestStore(db.DB, h.broadcaster)
}
// Create handles POST /workspaces/:id/requests — the calling workspace (an
// agent) raises a task/approval addressed to a user or another agent.
//
// @Summary Raise a request (task or approval)
// @Tags requests
// @Accept json
// @Produce json
// @Param id path string true "Requester workspace ID"
// @Param body body CreateRequestBody true "Request fields"
// @Success 201 {object} CreateRequestResponse
// @Failure 400 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/requests [post]
// @Security BearerAuth && OrgSlugAuth
func (h *RequestsHandler) Create(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
var body CreateRequestBody
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
// Decorate the request with its org anchor for the cross-org pending view.
// The workspaces table has NO org_id column — an "org" is the parent_id-chain
// root resolved by orgRootID (org_scope.go). Best-effort: a missing root
// leaves org_id NULL (the org view simply won't surface it), never blocks
// creation.
orgID, err := orgRootID(ctx, db.DB, workspaceID)
if err != nil {
log.Printf("requests: failed to resolve org root for workspace=%s: %v", workspaceID, err)
orgID = ""
}
requestID, err := h.store().Create(ctx, CreateRequestInput{
Kind: body.Kind,
RequesterType: "agent",
RequesterID: workspaceID,
OrgID: orgID,
RecipientType: body.RecipientType,
RecipientID: body.RecipientID,
Title: body.Title,
Detail: body.Detail,
Priority: body.Priority,
})
if err != nil {
if errors.Is(err, ErrInvalidRequestKind) || errors.Is(err, ErrInvalidRequestParty) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Printf("Create request error workspace=%s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
return
}
c.JSON(http.StatusCreated, gin.H{"request_id": requestID, "status": "pending"})
}
// ListInbox handles GET /workspaces/:id/requests/inbox?status= — requests
// addressed TO this workspace (the agent's incoming).
//
// @Summary List a workspace's incoming requests (inbox)
// @Tags requests
// @Produce json
// @Param id path string true "Recipient workspace ID"
// @Param status query string false "Filter by status"
// @Success 200 {array} RequestRow
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/requests/inbox [get]
// @Security BearerAuth && OrgSlugAuth
func (h *RequestsHandler) ListInbox(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
rows, err := h.store().ListInbox(ctx, "agent", workspaceID, c.Query("status"))
if err != nil {
log.Printf("List inbox error workspace=%s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
c.JSON(http.StatusOK, rows)
}
// ListOutgoing handles GET /workspaces/:id/requests?status= — the requests this
// workspace RAISED (the async pickup of responses).
//
// @Summary List a workspace's outgoing requests
// @Tags requests
// @Produce json
// @Param id path string true "Requester workspace ID"
// @Param status query string false "Filter by status"
// @Success 200 {array} RequestRow
// @Failure 500 {object} ErrorResponse
// @Router /workspaces/{id}/requests [get]
// @Security BearerAuth && OrgSlugAuth
func (h *RequestsHandler) ListOutgoing(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
rows, err := h.store().ListOutgoing(ctx, "agent", workspaceID, c.Query("status"))
if err != nil {
log.Printf("List outgoing error workspace=%s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
c.JSON(http.StatusOK, rows)
}
// Get handles GET /requests/:requestId — a single request plus its More-Info
// thread.
//
// @Summary Get a request with its message thread
// @Tags requests
// @Produce json
// @Param requestId path string true "Request ID"
// @Success 200 {object} RequestWithThread
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/{requestId} [get]
// @Security BearerAuth
func (h *RequestsHandler) Get(c *gin.Context) {
requestID := c.Param("requestId")
ctx := c.Request.Context()
s := h.store()
req, err := s.Get(ctx, requestID)
if err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
return
}
log.Printf("Get request error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
msgs, err := s.Messages(ctx, requestID)
if err != nil {
log.Printf("Get request messages error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
c.JSON(http.StatusOK, gin.H{"request": req, "messages": msgs})
}
// Respond handles POST /requests/:requestId/respond — a terminal action
// (done/rejected/approved), validated against the request's kind. responder
// identity comes from the body; the canvas/admin path defaults to 'user'.
//
// @Summary Respond to a request (done / rejected / approved)
// @Tags requests
// @Accept json
// @Produce json
// @Param requestId path string true "Request ID"
// @Param body body RespondRequestBody true "Response"
// @Success 200 {object} RequestMutationResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/{requestId}/respond [post]
// @Security BearerAuth
func (h *RequestsHandler) Respond(c *gin.Context) {
requestID := c.Param("requestId")
ctx := c.Request.Context()
var body RespondRequestBody
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
if _, err := h.store().Respond(ctx, requestID, body.Action, body.ResponderType, body.ResponderID); err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"})
return
}
if errors.Is(err, ErrInvalidRequestAction) || errors.Is(err, ErrInvalidRequestParty) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Printf("Respond request error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update"})
return
}
c.JSON(http.StatusOK, gin.H{"status": body.Action, "request_id": requestID})
}
// AddMessage handles POST /requests/:requestId/messages — append to the
// More-Info thread. When the author is the recipient, the request flips to
// info_requested.
//
// @Summary Add a message to a request's More-Info thread
// @Tags requests
// @Accept json
// @Produce json
// @Param requestId path string true "Request ID"
// @Param body body AddRequestMessageBody true "Message"
// @Success 201 {object} RequestMutationResponse
// @Failure 400 {object} ErrorResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/{requestId}/messages [post]
// @Security BearerAuth
func (h *RequestsHandler) AddMessage(c *gin.Context) {
requestID := c.Param("requestId")
ctx := c.Request.Context()
var body AddRequestMessageBody
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body)
if err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
return
}
if errors.Is(err, ErrInvalidRequestParty) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
log.Printf("AddMessage request error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"})
return
}
c.JSON(http.StatusCreated, gin.H{"status": "created", "request_id": requestID, "message_id": messageID})
}
// Cancel handles POST /requests/:requestId/cancel — the requester withdraws.
//
// @Summary Cancel (withdraw) a request
// @Tags requests
// @Produce json
// @Param requestId path string true "Request ID"
// @Success 200 {object} RequestMutationResponse
// @Failure 404 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/{requestId}/cancel [post]
// @Security BearerAuth
func (h *RequestsHandler) Cancel(c *gin.Context) {
requestID := c.Param("requestId")
ctx := c.Request.Context()
if err := h.store().Cancel(ctx, requestID); err != nil {
if errors.Is(err, ErrRequestNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"})
return
}
log.Printf("Cancel request error request=%s: %v", requestID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to cancel"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "cancelled", "request_id": requestID})
}
// ListPending handles GET /requests/pending?kind= — the cross-org pending view
// for the canvas Tasks/Approvals tabs. Cross-workspace, so AdminAuth-gated like
// /user-tasks/pending and /approvals/pending. ?kind=task|approval lets each tab
// query its own slice.
//
// @Summary List pending requests across the org (canvas tabs)
// @Tags requests
// @Produce json
// @Param kind query string false "Filter by kind" Enums(task, approval)
// @Param org_id query string false "Filter by org"
// @Success 200 {array} RequestRow
// @Failure 400 {object} ErrorResponse
// @Failure 500 {object} ErrorResponse
// @Router /requests/pending [get]
// @Security BearerAuth
func (h *RequestsHandler) ListPending(c *gin.Context) {
ctx := c.Request.Context()
kind := c.Query("kind")
if kind != "" && kind != "task" && kind != "approval" {
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'task' or 'approval'"})
return
}
rows, err := h.store().ListPendingForOrg(ctx, c.Query("org_id"), kind)
if err != nil {
log.Printf("ListPending requests error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
c.JSON(http.StatusOK, rows)
}
@@ -0,0 +1,563 @@
package handlers
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// requestColumnNames mirrors the requestColumns SELECT projection order so the
// sqlmock rows line up with scanRequest.
var requestColumnNames = []string{
"id", "kind", "requester_type", "requester_id", "org_id",
"recipient_type", "recipient_id", "title", "detail", "status",
"responder_type", "responder_id", "priority", "created_at", "updated_at", "responded_at",
}
// oneRequestRow builds a single-row result set for a Get/List SELECT.
func oneRequestRow(id, kind, requesterID, recipientType, recipientID, status string) *sqlmock.Rows {
return sqlmock.NewRows(requestColumnNames).AddRow(
id, kind, "agent", requesterID, nil,
recipientType, recipientID, "Some title", nil, status,
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
)
}
// ---------- Create ----------
func TestRequests_Create_Task(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// Create handler first resolves the org root (org_scope CTE).
mock.ExpectQuery("WITH RECURSIVE org_chain").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
// INSERT request → id
mock.ExpectQuery("INSERT INTO requests").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-1"))
// REQUEST_CREATED broadcast (recipient is a user here → anchors on requester agent)
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
body := `{"kind":"task","recipient_type":"user","recipient_id":"","title":"Review the launch draft","detail":"posts/launch.md"}`
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["request_id"] != "req-1" {
t.Errorf("expected request_id req-1, got %v", resp["request_id"])
}
if resp["status"] != "pending" {
t.Errorf("expected status pending, got %v", resp["status"])
}
}
func TestRequests_Create_Approval_AgentRecipient(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("WITH RECURSIVE org_chain").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
mock.ExpectQuery("INSERT INTO requests").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-2"))
// Recipient is an agent → REQUEST_CREATED anchors on the recipient workspace.
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
body := `{"kind":"approval","recipient_type":"agent","recipient_id":"ws-2","title":"Approve deploy"}`
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_Create_MissingTitle(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"task","recipient_type":"user"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for missing title, got %d", w.Code)
}
}
func TestRequests_Create_InvalidKind(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// org root resolves, then Create rejects the kind before any INSERT.
mock.ExpectQuery("WITH RECURSIVE org_chain").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"banana","recipient_type":"user","title":"x"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for invalid kind, got %d: %s", w.Code, w.Body.String())
}
}
// ---------- Inbox vs Outgoing listing ----------
func TestRequests_ListInbox(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests").
WithArgs("agent", "ws-2").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-2"}}
c.Request = httptest.NewRequest("GET", "/", nil)
handler.ListInbox(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 1 || resp[0]["id"] != "req-1" {
t.Errorf("expected one inbox request req-1, got %v", resp)
}
}
func TestRequests_ListInbox_WithStatusFilter(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests").
WithArgs("agent", "ws-2", "pending").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-2"}}
c.Request = httptest.NewRequest("GET", "/?status=pending", nil)
handler.ListInbox(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_ListOutgoing(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests").
WithArgs("agent", "ws-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "done"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/", nil)
handler.ListOutgoing(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 1 || resp[0]["status"] != "done" {
t.Errorf("expected one outgoing request status done, got %v", resp)
}
}
// ---------- Get (with thread) ----------
func TestRequests_Get_WithThread(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "info_requested"))
mock.ExpectQuery("FROM request_messages WHERE request_id").
WithArgs("req-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "request_id", "author_type", "author_id", "body", "created_at"}).
AddRow("msg-1", "req-1", "user", "u-1", "what file?", "2026-06-10T01:00:00Z"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("GET", "/", nil)
handler.Get(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["request"] == nil || resp["messages"] == nil {
t.Errorf("expected request + messages keys, got %v", resp)
}
}
func TestRequests_Get_NotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("nope").
WillReturnRows(sqlmock.NewRows(requestColumnNames))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "nope"}}
c.Request = httptest.NewRequest("GET", "/", nil)
handler.Get(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d", w.Code)
}
}
// ---------- Respond (valid + invalid action-for-kind) ----------
func TestRequests_Respond_ApprovalApproved(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// Respond does Get first to validate action↔kind.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending"))
mock.ExpectExec("UPDATE requests").
WillReturnResult(sqlmock.NewResult(0, 1))
// REQUEST_RESPONDED broadcast (anchored on requester agent ws-1).
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "approved" {
t.Errorf("expected status approved, got %v", resp["status"])
}
}
func TestRequests_Respond_TaskDone(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "pending"))
mock.ExpectExec("UPDATE requests").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_Respond_InvalidActionForKind(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// kind=approval cannot be "done" — Get succeeds, actionToStatus rejects.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for done-on-approval, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_Respond_NotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("nope").
WillReturnRows(sqlmock.NewRows(requestColumnNames))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "nope"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Respond(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d", w.Code)
}
}
// ---------- AddMessage → info_requested when recipient asks ----------
func TestRequests_AddMessage_RecipientFlipsInfoRequested(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// Author is the recipient agent ws-2 → flips to info_requested.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
mock.ExpectQuery("INSERT INTO request_messages").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1"))
// recipient-authored → status flip UPDATE
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
WillReturnResult(sqlmock.NewResult(0, 1))
// REQUEST_MESSAGE broadcast (anchored on requester ws-1)
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-2"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["message_id"] != "msg-1" {
t.Errorf("expected message_id msg-1, got %v", resp["message_id"])
}
}
func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// Author is the requester ws-1 (not recipient) → NO info_requested flip.
mock.ExpectQuery("FROM requests WHERE id").
WithArgs("req-1").
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
mock.ExpectQuery("INSERT INTO request_messages").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2"))
// No status-flip UPDATE expected here.
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"posts/launch.md","author_type":"agent","author_id":"ws-1"}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.AddMessage(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
}
}
// ---------- Cancel ----------
func TestRequests_Cancel_Success(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectExec("UPDATE requests SET status = 'cancelled'").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
c.Request = httptest.NewRequest("POST", "/", nil)
handler.Cancel(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_Cancel_NotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectExec("UPDATE requests SET status = 'cancelled'").
WillReturnResult(sqlmock.NewResult(0, 0))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "requestId", Value: "gone"}}
c.Request = httptest.NewRequest("POST", "/", nil)
handler.Cancel(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d", w.Code)
}
}
// ---------- ListPending (org view) + kind filter ----------
// pendingColumnNames mirrors ListPendingForOrg's projection (requestColumns + w.name).
var pendingColumnNames = append(append([]string{}, requestColumnNames...), "workspace_name")
func onePendingRow(id, kind string) *sqlmock.Rows {
return sqlmock.NewRows(pendingColumnNames).AddRow(
id, kind, "agent", "ws-1", "org-root-1",
"user", "", "Some title", nil, "pending",
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
"Marketing Agent",
)
}
func TestRequests_ListPending_NoFilter(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
mock.ExpectQuery("FROM requests r").
WillReturnRows(onePendingRow("req-1", "task").AddRow(
"req-2", "approval", "agent", "ws-2", "org-root-1",
"user", "", "Approve deploy", nil, "pending",
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
"Ops Agent",
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/", nil)
handler.ListPending(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 2 {
t.Errorf("expected 2 pending, got %d (%v)", len(resp), resp)
}
if resp[0]["workspace_name"] != "Marketing Agent" {
t.Errorf("expected decorated workspace_name, got %v", resp[0]["workspace_name"])
}
}
func TestRequests_ListPending_KindFilter(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
// ?kind=approval must add an arg.
mock.ExpectQuery("FROM requests r").
WithArgs("approval").
WillReturnRows(onePendingRow("req-2", "approval"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/?kind=approval", nil)
handler.ListPending(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
}
func TestRequests_ListPending_InvalidKind(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewRequestsHandler(newTestBroadcaster())
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/?kind=banana", nil)
handler.ListPending(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for invalid kind, got %d", w.Code)
}
}
@@ -348,6 +348,36 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// AdminAuth-gated exactly like /approvals/pending.
r.GET("/user-tasks/pending", middleware.AdminAuth(db.DB), uth.ListAll)
// Requests — the unified Tasks + Approvals inbox (RFC P1). Generalizes
// approvals + user-tasks into one model keyed by kind. Auth is split the
// same way: per-workspace create/list under wsAuth (an agent acts with
// its workspace token); the cross-org pending view + the
// /requests/:requestId/* action paths are AdminAuth-gated for the canvas
// user. Because an AGENT can also respond to a request addressed to it
// (using its own workspace token), the action verbs are ALSO registered
// under the wsAuth /workspaces/:id/requests/:requestId/* prefix — same
// dual-surface pattern the brief calls for (agent = workspace token,
// canvas user = admin token), no new auth mechanism.
rqh := handlers.NewRequestsHandler(broadcaster)
wsAuth.POST("/requests", rqh.Create)
wsAuth.GET("/requests", rqh.ListOutgoing)
wsAuth.GET("/requests/inbox", rqh.ListInbox)
// Agent-side action verbs (workspace-token auth).
wsAuth.GET("/requests/:requestId", rqh.Get)
wsAuth.POST("/requests/:requestId/respond", rqh.Respond)
wsAuth.POST("/requests/:requestId/messages", rqh.AddMessage)
wsAuth.POST("/requests/:requestId/cancel", rqh.Cancel)
// Cross-org pending view for the canvas Tasks/Approvals tabs — AdminAuth
// like /user-tasks/pending. ?kind=task|approval drives each tab.
r.GET("/requests/pending", middleware.AdminAuth(db.DB), rqh.ListPending)
// Canvas-user action verbs (admin auth). Same handlers; the responder
// defaults to 'user' on this path.
reqAdmin := r.Group("", middleware.AdminAuth(db.DB))
reqAdmin.GET("/requests/:requestId", rqh.Get)
reqAdmin.POST("/requests/:requestId/respond", rqh.Respond)
reqAdmin.POST("/requests/:requestId/messages", rqh.AddMessage)
reqAdmin.POST("/requests/:requestId/cancel", rqh.Cancel)
// (TeamHandler is gone — #2864.) The visual canvas Collapse
// button calls PATCH /workspaces/:id { collapsed: true/false }
// (presentational toggle on canvas_layouts), NOT the destructive
@@ -0,0 +1,4 @@
-- Drop child first (FK), then parent. Idempotent (IF EXISTS) so a re-run of the
-- down migration is a no-op rather than a crash.
DROP TABLE IF EXISTS request_messages;
DROP TABLE IF EXISTS requests;
@@ -0,0 +1,122 @@
-- requests: the unified Tasks + Approvals primitive (RFC P1). Generalizes
-- user_tasks (007-style worklist "asks") and approval_requests (the destructive
-- gate) into ONE inbox keyed by kind ∈ {task, approval}, with a requester and a
-- recipient that may each be a user OR an agent. See
-- docs/design/rfc-unified-requests-inbox.md.
--
-- recipient_id / requester_id are plain TEXT with NO foreign key on purpose: an
-- agent's id is a workspaces(id) UUID, but a user's id is not a workspaces row,
-- so a cross-type FK is impossible. ListPendingForOrg LEFT JOINs workspaces to
-- decorate a name when the party is an agent (a user party simply has no match).
--
-- The migration runner tracks applied filenames in schema_migrations, but a
-- partial-failure re-run would re-apply this file, so EVERYTHING here is
-- idempotent: CREATE TABLE/INDEX IF NOT EXISTS (no bare ALTER ADD CONSTRAINT,
-- which can't be IF NOT EXISTS and would crash-loop), and the historical
-- backfills use ON CONFLICT (id) DO NOTHING so re-runs are no-ops.
CREATE TABLE IF NOT EXISTS requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
kind TEXT NOT NULL CHECK (kind IN ('task', 'approval')),
requester_type TEXT NOT NULL CHECK (requester_type IN ('user', 'agent')),
requester_id TEXT NOT NULL,
org_id UUID,
recipient_type TEXT NOT NULL CHECK (recipient_type IN ('user', 'agent')),
recipient_id TEXT NOT NULL,
title TEXT NOT NULL,
detail TEXT,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'info_requested', 'done', 'rejected', 'approved', 'cancelled')),
responder_type TEXT,
responder_id TEXT,
priority SMALLINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
responded_at TIMESTAMPTZ
);
-- Inbox read: WHERE recipient_type=$1 AND recipient_id=$2 [AND status=$3]
-- ORDER BY created_at DESC. This is the recipient's "incoming" list (agent poll
-- or user Tasks/Approvals tab).
CREATE INDEX IF NOT EXISTS idx_requests_inbox
ON requests (recipient_type, recipient_id, status, created_at DESC);
-- Org pending view ("all agents' incoming" tab): WHERE org_id=$1 AND status...
CREATE INDEX IF NOT EXISTS idx_requests_org_pending
ON requests (org_id, status, created_at DESC);
-- Outgoing / async pickup: WHERE requester_type=$1 AND requester_id=$2 — the
-- requester reads back the requests it raised (check_requests).
CREATE INDEX IF NOT EXISTS idx_requests_outgoing
ON requests (requester_type, requester_id, created_at DESC);
-- request_messages: the "More Info / chat about this" thread on a request.
CREATE TABLE IF NOT EXISTS request_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_id UUID NOT NULL REFERENCES requests(id) ON DELETE CASCADE,
author_type TEXT NOT NULL CHECK (author_type IN ('user', 'agent')),
author_id TEXT NOT NULL,
body TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Thread read: WHERE request_id=$1 ORDER BY created_at.
CREATE INDEX IF NOT EXISTS idx_request_messages_thread
ON request_messages (request_id, created_at);
-- Idempotent backfill — copy historical user_tasks into the unified inbox so
-- the Tasks tab shows pre-cutover items. user_tasks are always agent→user asks,
-- so requester_type='agent' (requester_id = the raising workspace), recipient is
-- the human user; recipient_id '' is the "the org's user" sentinel (user ids are
-- not modelled yet in this table's source — P1 keeps it empty, the CHECK permits
-- empty TEXT). 'dismissed' maps to the unified 'rejected'.
INSERT INTO requests (
id, kind, requester_type, requester_id, recipient_type, recipient_id,
title, detail, status, responder_id, created_at, responded_at
)
SELECT
id,
'task',
'agent',
workspace_id::text,
'user',
'',
title,
detail,
CASE status WHEN 'dismissed' THEN 'rejected' ELSE status END,
resolved_by,
created_at,
resolved_at
FROM user_tasks
ON CONFLICT (id) DO NOTHING;
-- Idempotent backfill — copy historical approval_requests in as kind='approval'.
-- approval_requests columns (007_approvals.sql): id, workspace_id, task_id,
-- action, reason, context, status IN (pending|approved|denied|escalated),
-- decided_by, decided_at, created_at. Map status: 'denied'→'rejected',
-- 'escalated'→'pending' (still awaiting a decision, just bubbled up). title is
-- the action; detail is the reason. requester = the raising workspace (agent),
-- recipient = the user who decides.
INSERT INTO requests (
id, kind, requester_type, requester_id, recipient_type, recipient_id,
title, detail, status, responder_id, created_at, responded_at
)
SELECT
id,
'approval',
'agent',
workspace_id::text,
'user',
'',
action,
reason,
CASE status
WHEN 'denied' THEN 'rejected'
WHEN 'escalated' THEN 'pending'
ELSE status
END,
decided_by,
created_at,
decided_at
FROM approval_requests
WHERE workspace_id IS NOT NULL
ON CONFLICT (id) DO NOTHING;