feat(requests): P1 — unified requests/inbox data model + endpoints (RFC) #2525
@@ -0,0 +1,102 @@
|
||||
# RFC: Unified Requests / Inbox subsystem (Tasks + Approvals)
|
||||
|
||||
**Status:** proposed (awaiting CTO sign-off) · **Author:** CEO-assistant · **Date:** 2026-06-10
|
||||
|
||||
## 1. Motivation
|
||||
Agents need to ask **users or other agents** to do something (task) or to approve
|
||||
something (approval), **asynchronously**, with a clear **inbox**, action buttons, a
|
||||
clarification thread, and accountability (who responded). Today only a thin approval
|
||||
primitive exists (`create_approval`/`decide_approval` + one-way `notify_user`). This
|
||||
RFC generalizes that into **one** requests subsystem with `kind ∈ {task, approval}`.
|
||||
|
||||
## 2. Data model (unified)
|
||||
**`requests`**
|
||||
- `id` uuid
|
||||
- `kind` enum: `task` | `approval`
|
||||
- `requester_type` (`user`|`agent`), `requester_id`, `org_id`
|
||||
- `recipient_type` (`user`|`agent`), `recipient_id` ← user OR agent
|
||||
- `title` text, `detail` markdown
|
||||
- `status` enum: `pending` | `info_requested` | `done` | `rejected` | `approved` | `cancelled`
|
||||
- `responder_type` (`user`|`agent`), `responder_id` ← nullable until acted; **who acted**
|
||||
- `priority` smallint (optional), `created_at`, `updated_at`, `responded_at`
|
||||
|
||||
**`request_messages`** (the "More Info / chat about this" thread)
|
||||
- `id`, `request_id` fk, `author_type`, `author_id`, `body`, `created_at`
|
||||
|
||||
## 3. Lifecycle / actions
|
||||
| kind | buttons | transitions |
|
||||
|---|---|---|
|
||||
| task | **Done · Reject · More Info** | pending→done / pending→rejected / pending↔info_requested |
|
||||
| approval | **Approve · Reject · More Info** | pending→approved / pending→rejected / pending↔info_requested |
|
||||
|
||||
Terminal: `done`/`approved`/`rejected`/`cancelled`. **More Info** sets `info_requested`
|
||||
and appends a `request_messages` row ("chat about this"); either side replies until it
|
||||
resolves to a terminal action. Every terminal action stamps `responder_id`+`responder_type`.
|
||||
|
||||
## 4. Async semantics (non-blocking — core requirement)
|
||||
- Creating a request returns immediately with `request_id`; the **requester never blocks**
|
||||
and keeps working.
|
||||
- The response is delivered **asynchronously**: a signal is posted to the requester's
|
||||
activity/inbox (an `a2a_receive`-style event) carrying `{request_id, status, responder_id,
|
||||
thread}`. The requester picks it up on its **next tick** — never sits and waits.
|
||||
- Recipient **agents** receive/poll their pending inbox; recipient **users** see the
|
||||
Tasks/Approvals tabs (live via the existing WebSocket).
|
||||
|
||||
## 5. Control-plane API
|
||||
- `POST /workspaces/{id}/requests` — create (requester = that workspace) `{kind, recipient_type, recipient_id, title, detail, priority?}`
|
||||
- `GET /requests?recipient_type=&recipient_id=&status=` — inbox list; org-scoped variant powers the "all agents' incoming" tab view
|
||||
- `GET /requests/{id}`
|
||||
- `POST /requests/{id}/respond` `{action: done|rejected|approved}` — `responder_id` taken from the **authenticated principal** (user session or agent token), not the body
|
||||
- `POST /requests/{id}/messages` `{body}` — More-Info thread
|
||||
- `POST /requests/{id}/cancel` — requester withdraws
|
||||
|
||||
## 6. MCP tools (agent-facing; unify with existing)
|
||||
- `create_request(recipient, kind, title, detail)` — supersedes `create_approval` (which stays as a `kind=approval` alias during deprecation)
|
||||
- `list_inbox(status?)` — the calling agent's **incoming** requests
|
||||
- `respond_request(request_id, action, message?)` — agent responds to a request addressed to it (done/reject/approve, or open More-Info)
|
||||
- `add_request_message(request_id, body)` — thread reply
|
||||
- `check_requests()` — status of requests the agent **sent** (the async pickup)
|
||||
- User-side actions flow through the canvas UI → `/respond` (captures the user id).
|
||||
|
||||
## 7. Canvas UI (Tasks + Approvals tabs — already scaffolded)
|
||||
- Each tab lists items of that kind, **grouped by requesting agent**, newest first: agent
|
||||
name + avatar, title, detail preview, age, status badge.
|
||||
- Buttons per the table in §3. Click Done/Approve/Reject → `POST /respond` with the
|
||||
logged-in user → optimistic update + toast.
|
||||
- **More Info** expands an inline thread panel ("chat about this"): message list + input →
|
||||
`POST /messages`; status → `info_requested`; requester replies appear in-thread.
|
||||
- "All agents' incoming" = org-scoped list. Live updates over the existing WS.
|
||||
|
||||
## 8. Idle-nudge sweep
|
||||
- A CP periodic worker: for each agent workspace that is **idle** (`active_tasks=0`,
|
||||
`status=online`) **and** has ≥1 `pending`/`info_requested` request where it is the
|
||||
recipient, older than threshold → inject a **nudge** (notify/tick: "you have M unhandled
|
||||
inbox items …") so it processes them via `respond_request`. Covers the "agent forgot /
|
||||
didn't use the proper MCP tool" case.
|
||||
- User-addressed requests pending past threshold → UI badge (+ optional channel notify).
|
||||
- **Anti-spam:** rate-limit (≤1 nudge per request per hour); stop once the agent acts.
|
||||
|
||||
## 9. Responder identity / multi-user (forward-looking)
|
||||
`responder_id`+`responder_type` stamped on every terminal action; UI shows "Approved by
|
||||
<name>". Schema already supports multiple users with roles in one space (a request can be
|
||||
fulfilled by a different person than it was shown to). Per-role routing/permissions is
|
||||
**out of scope for v1** but the model doesn't preclude it.
|
||||
|
||||
## 10. Approvals migration
|
||||
New `requests` table; **idempotent migration** copies existing `approvals` rows in as
|
||||
`kind=approval`. `create_approval`/`decide_approval`/`list_pending_approvals` become thin
|
||||
shims over the requests endpoints for a deprecation window, then removed.
|
||||
|
||||
## 11. Phasing (each phase = SOP PR → 2-genuine → merge)
|
||||
- **P0** RFC sign-off (this doc)
|
||||
- **P1** CP: `requests`+`request_messages` tables, migration, endpoints, tests (molecule-controlplane)
|
||||
- **P2** MCP tools + approval shims (core/runtime)
|
||||
- **P3** Canvas UI: Tasks/Approvals tabs render + buttons + More-Info thread + WS live (app/canvas)
|
||||
- **P4** idle-nudge worker + user-pending notifications
|
||||
- **P5** deprecate old approval path; docs
|
||||
|
||||
## 12. Recommended defaults on open points (flag if you disagree)
|
||||
- (a) **Keep approval shims** for one deprecation window (not a hard cut) — safer for in-flight callers.
|
||||
- (b) Nudge: **idle + pending > 10 min → nudge, max 1/hr per request.**
|
||||
- (c) v1 = **single recipient per request** (fan-out later).
|
||||
- (d) v1 = **any user in the org may respond** to a user-addressed request (role-gating later).
|
||||
@@ -41,8 +41,8 @@ type EventType string
|
||||
// scan-friendly as it grows.
|
||||
const (
|
||||
// Chat / agent messaging — surfaces in canvas chat panels.
|
||||
EventAgentMessage EventType = "AGENT_MESSAGE"
|
||||
EventA2AResponse EventType = "A2A_RESPONSE"
|
||||
EventAgentMessage EventType = "AGENT_MESSAGE"
|
||||
EventA2AResponse EventType = "A2A_RESPONSE"
|
||||
EventActivityLogged EventType = "ACTIVITY_LOGGED"
|
||||
EventChannelMessage EventType = "CHANNEL_MESSAGE"
|
||||
|
||||
@@ -59,11 +59,11 @@ const (
|
||||
EventWorkspaceHeartbeat EventType = "WORKSPACE_HEARTBEAT"
|
||||
|
||||
// Agent assignment + identity.
|
||||
EventAgentAssigned EventType = "AGENT_ASSIGNED"
|
||||
EventAgentReplaced EventType = "AGENT_REPLACED"
|
||||
EventAgentRemoved EventType = "AGENT_REMOVED"
|
||||
EventAgentMoved EventType = "AGENT_MOVED"
|
||||
EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED"
|
||||
EventAgentAssigned EventType = "AGENT_ASSIGNED"
|
||||
EventAgentReplaced EventType = "AGENT_REPLACED"
|
||||
EventAgentRemoved EventType = "AGENT_REMOVED"
|
||||
EventAgentMoved EventType = "AGENT_MOVED"
|
||||
EventAgentCardUpdated EventType = "AGENT_CARD_UPDATED"
|
||||
|
||||
// Delegation lifecycle.
|
||||
EventDelegationSent EventType = "DELEGATION_SENT"
|
||||
@@ -72,7 +72,7 @@ const (
|
||||
EventDelegationFailed EventType = "DELEGATION_FAILED"
|
||||
|
||||
// Task progression + scheduler.
|
||||
EventTaskUpdated EventType = "TASK_UPDATED"
|
||||
EventTaskUpdated EventType = "TASK_UPDATED"
|
||||
EventCronExecuted EventType = "CRON_EXECUTED"
|
||||
EventCronSkipped EventType = "CRON_SKIPPED"
|
||||
|
||||
@@ -84,6 +84,13 @@ const (
|
||||
EventUserTaskRequested EventType = "USER_TASK_REQUESTED"
|
||||
EventUserTaskResolved EventType = "USER_TASK_RESOLVED"
|
||||
|
||||
// Requests — the unified Tasks + Approvals inbox (RFC P1). REQUEST_CREATED
|
||||
// pokes a recipient agent's inbox; REQUEST_RESPONDED is the async signal-back
|
||||
// to the requester; REQUEST_MESSAGE is a More-Info thread reply.
|
||||
EventRequestCreated EventType = "REQUEST_CREATED"
|
||||
EventRequestResponded EventType = "REQUEST_RESPONDED"
|
||||
EventRequestMessage EventType = "REQUEST_MESSAGE"
|
||||
|
||||
// Auth / credentials.
|
||||
EventExternalCredentialsRotated EventType = "EXTERNAL_CREDENTIALS_ROTATED"
|
||||
)
|
||||
@@ -115,6 +122,9 @@ var AllEventTypes = []EventType{
|
||||
EventDelegationSent,
|
||||
EventDelegationStatus,
|
||||
EventExternalCredentialsRotated,
|
||||
EventRequestCreated,
|
||||
EventRequestMessage,
|
||||
EventRequestResponded,
|
||||
EventTaskUpdated,
|
||||
EventUserTaskRequested,
|
||||
EventUserTaskResolved,
|
||||
|
||||
@@ -40,6 +40,9 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
|
||||
"DELEGATION_SENT",
|
||||
"DELEGATION_STATUS",
|
||||
"EXTERNAL_CREDENTIALS_ROTATED",
|
||||
"REQUEST_CREATED",
|
||||
"REQUEST_MESSAGE",
|
||||
"REQUEST_RESPONDED",
|
||||
"TASK_UPDATED",
|
||||
"USER_TASK_REQUESTED",
|
||||
"USER_TASK_RESOLVED",
|
||||
|
||||
@@ -0,0 +1,526 @@
|
||||
package handlers
|
||||
|
||||
// RequestStore is the SSOT for the unified "requests" primitive — the Tasks +
|
||||
// Approvals inbox (RFC P1, docs/design/rfc-unified-requests-inbox.md). It
|
||||
// generalizes UserTaskStore (agent→user worklist asks) and the inline
|
||||
// approval_requests SQL in approvals.go into ONE store keyed by kind ∈
|
||||
// {task, approval}, where requester and recipient may each be a user OR an
|
||||
// agent.
|
||||
//
|
||||
// Every surface that mutates or reads `requests` — the REST handlers in
|
||||
// requests.go AND (in a later phase) the MCP request tools — MUST route through
|
||||
// this store rather than re-implement the SQL + status-enum validation +
|
||||
// REQUEST_* broadcast inline. Two copies of one contract drift silently; this
|
||||
// is the same consolidation rationale as UserTaskStore / AgentMessageWriter.
|
||||
//
|
||||
// The store owns persistence + validation + the event broadcast. HTTP-specific
|
||||
// concerns (gin binding, status codes) stay in the handler. Construct per call
|
||||
// via NewRequestStore over the live global db.DB so the test harness's db.DB
|
||||
// swap is observed — mirroring UserTaskStore.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
)
|
||||
|
||||
// ErrRequestNotFound is returned by Get/Respond/RequestInfo/Cancel/AddMessage
|
||||
// when no row matches — the request does not exist, or (for the mutating
|
||||
// actions) is already terminal. Callers translate to HTTP 404.
|
||||
var ErrRequestNotFound = errors.New("request: not found")
|
||||
|
||||
// ErrInvalidRequestAction is returned when a respond action is outside the
|
||||
// done/rejected/approved set, or is incompatible with the request's kind
|
||||
// (approval→approved/rejected, task→done/rejected). Callers translate to 400.
|
||||
var ErrInvalidRequestAction = errors.New("request: invalid action for this request kind")
|
||||
|
||||
// ErrInvalidRequestKind is returned when Create is given a kind outside
|
||||
// task/approval. Callers translate to HTTP 400.
|
||||
var ErrInvalidRequestKind = errors.New("request: kind must be 'task' or 'approval'")
|
||||
|
||||
// ErrInvalidRequestParty is returned when a requester_type / recipient_type /
|
||||
// author_type is outside the user/agent enum. Callers translate to HTTP 400.
|
||||
var ErrInvalidRequestParty = errors.New("request: type must be 'user' or 'agent'")
|
||||
|
||||
// CreateRequestInput is the set of fields Create needs. requester_* identifies
|
||||
// who raised it (for a per-workspace POST that is the calling agent); recipient_*
|
||||
// is who must act. Detail/OrgID/Priority are optional (zero values → NULL).
|
||||
type CreateRequestInput struct {
|
||||
Kind string
|
||||
RequesterType string
|
||||
RequesterID string
|
||||
OrgID string
|
||||
RecipientType string
|
||||
RecipientID string
|
||||
Title string
|
||||
Detail string
|
||||
Priority *int
|
||||
}
|
||||
|
||||
// RequestRow is one request as returned by the list + get methods.
|
||||
// WorkspaceName is non-empty only for ListPendingForOrg rows whose party is an
|
||||
// agent (decorated via a LEFT JOIN on workspaces); the inbox/outgoing lists
|
||||
// leave it empty.
|
||||
type RequestRow struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
RequesterType string `json:"requester_type"`
|
||||
RequesterID string `json:"requester_id"`
|
||||
OrgID *string `json:"org_id"`
|
||||
RecipientType string `json:"recipient_type"`
|
||||
RecipientID string `json:"recipient_id"`
|
||||
Title string `json:"title"`
|
||||
Detail *string `json:"detail"`
|
||||
Status string `json:"status"`
|
||||
ResponderType *string `json:"responder_type"`
|
||||
ResponderID *string `json:"responder_id"`
|
||||
Priority *int `json:"priority"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
RespondedAt *string `json:"responded_at"`
|
||||
WorkspaceName *string `json:"workspace_name,omitempty"`
|
||||
}
|
||||
|
||||
// RequestMessageRow is one row of a request's More-Info thread.
|
||||
type RequestMessageRow struct {
|
||||
ID string `json:"id"`
|
||||
RequestID string `json:"request_id"`
|
||||
AuthorType string `json:"author_type"`
|
||||
AuthorID string `json:"author_id"`
|
||||
Body string `json:"body"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// RequestStore persists + broadcasts request mutations. Takes
|
||||
// events.EventEmitter (not the concrete *Broadcaster) so tests can substitute
|
||||
// a fake emitter, mirroring UserTaskStore.
|
||||
type RequestStore struct {
|
||||
db *sql.DB
|
||||
broadcaster events.EventEmitter
|
||||
}
|
||||
|
||||
// NewRequestStore binds the store to a DB pool + the platform broadcaster.
|
||||
func NewRequestStore(db *sql.DB, broadcaster events.EventEmitter) *RequestStore {
|
||||
return &RequestStore{db: db, broadcaster: broadcaster}
|
||||
}
|
||||
|
||||
// requestColumns is the canonical SELECT projection for a single request row,
|
||||
// shared by Get / ListInbox / ListOutgoing so the Scan order can't drift.
|
||||
const requestColumns = `id, kind, requester_type, requester_id, org_id,
|
||||
recipient_type, recipient_id, title, detail, status,
|
||||
responder_type, responder_id, priority, created_at, updated_at, responded_at`
|
||||
|
||||
// scanRequest reads one row in requestColumns order into a RequestRow.
|
||||
func scanRequest(rows *sql.Rows) (RequestRow, error) {
|
||||
var r RequestRow
|
||||
err := rows.Scan(
|
||||
&r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID,
|
||||
&r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status,
|
||||
&r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt,
|
||||
)
|
||||
return r, err
|
||||
}
|
||||
|
||||
func validParty(t string) bool { return t == "user" || t == "agent" }
|
||||
|
||||
// broadcastTarget picks the workspace id to anchor a REQUEST_* event on. Events
|
||||
// are workspace-scoped (structure_events.workspace_id), so we anchor on the
|
||||
// agent party when there is one — the requester (so its canvas/inbox is
|
||||
// signalled on a response) or, lacking that, the recipient. A user-only request
|
||||
// has no workspace anchor; we skip the broadcast rather than insert a bad row.
|
||||
func broadcastTarget(requesterType, requesterID, recipientType, recipientID string) string {
|
||||
if requesterType == "agent" && requesterID != "" {
|
||||
return requesterID
|
||||
}
|
||||
if recipientType == "agent" && recipientID != "" {
|
||||
return recipientID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Create inserts a new pending request and broadcasts REQUEST_CREATED (anchored
|
||||
// on the recipient agent if any, so an agent recipient's inbox is signalled).
|
||||
// Returns the new request id. Validates kind + party enums up front.
|
||||
func (s *RequestStore) Create(ctx context.Context, in CreateRequestInput) (string, error) {
|
||||
if in.Kind != "task" && in.Kind != "approval" {
|
||||
return "", ErrInvalidRequestKind
|
||||
}
|
||||
if !validParty(in.RequesterType) || !validParty(in.RecipientType) {
|
||||
return "", ErrInvalidRequestParty
|
||||
}
|
||||
|
||||
var detailArg interface{}
|
||||
if in.Detail != "" {
|
||||
detailArg = in.Detail
|
||||
}
|
||||
var orgArg interface{}
|
||||
if in.OrgID != "" {
|
||||
orgArg = in.OrgID
|
||||
}
|
||||
var priorityArg interface{}
|
||||
if in.Priority != nil {
|
||||
priorityArg = *in.Priority
|
||||
}
|
||||
|
||||
var requestID string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
INSERT INTO requests (
|
||||
kind, requester_type, requester_id, org_id,
|
||||
recipient_type, recipient_id, title, detail, priority
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
RETURNING id
|
||||
`, in.Kind, in.RequesterType, in.RequesterID, orgArg,
|
||||
in.RecipientType, in.RecipientID, in.Title, detailArg, priorityArg).Scan(&requestID)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("request: create: %w", err)
|
||||
}
|
||||
|
||||
// Anchor REQUEST_CREATED on the recipient agent (so its inbox is poked) if
|
||||
// the recipient is an agent; else on the requester. A user→user request has
|
||||
// no agent to signal — skip the broadcast.
|
||||
target := broadcastTarget(in.RecipientType, in.RecipientID, in.RequesterType, in.RequesterID)
|
||||
if target != "" {
|
||||
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestCreated), target, map[string]interface{}{
|
||||
"request_id": requestID,
|
||||
"kind": in.Kind,
|
||||
"recipient_type": in.RecipientType,
|
||||
"recipient_id": in.RecipientID,
|
||||
"title": in.Title,
|
||||
}); err != nil {
|
||||
log.Printf("request: failed to broadcast created for %s: %v", target, err)
|
||||
}
|
||||
}
|
||||
|
||||
return requestID, nil
|
||||
}
|
||||
|
||||
// Get returns a single request by id, or ErrRequestNotFound.
|
||||
func (s *RequestStore) Get(ctx context.Context, id string) (RequestRow, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `SELECT `+requestColumns+` FROM requests WHERE id = $1`, id)
|
||||
if err != nil {
|
||||
return RequestRow{}, fmt.Errorf("request: get: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
if !rows.Next() {
|
||||
if err := rows.Err(); err != nil {
|
||||
return RequestRow{}, fmt.Errorf("request: get: %w", err)
|
||||
}
|
||||
return RequestRow{}, ErrRequestNotFound
|
||||
}
|
||||
r, err := scanRequest(rows)
|
||||
if err != nil {
|
||||
return RequestRow{}, fmt.Errorf("request: get scan: %w", err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Messages returns a request's More-Info thread, oldest first.
|
||||
func (s *RequestStore) Messages(ctx context.Context, requestID string) ([]RequestMessageRow, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
SELECT id, request_id, author_type, author_id, body, created_at
|
||||
FROM request_messages WHERE request_id = $1
|
||||
ORDER BY created_at ASC LIMIT 200
|
||||
`, requestID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request: messages: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
msgs := make([]RequestMessageRow, 0)
|
||||
for rows.Next() {
|
||||
var m RequestMessageRow
|
||||
if rows.Scan(&m.ID, &m.RequestID, &m.AuthorType, &m.AuthorID, &m.Body, &m.CreatedAt) != nil {
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("request: messages rows.Err request=%s: %v", requestID, err)
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// ListInbox returns the requests addressed TO a recipient (recipient_type +
|
||||
// recipient_id), newest first, optionally filtered by status. status "" = all.
|
||||
func (s *RequestStore) ListInbox(ctx context.Context, recipientType, recipientID, status string) ([]RequestRow, error) {
|
||||
q := `SELECT ` + requestColumns + ` FROM requests
|
||||
WHERE recipient_type = $1 AND recipient_id = $2`
|
||||
args := []interface{}{recipientType, recipientID}
|
||||
if status != "" {
|
||||
q += ` AND status = $3`
|
||||
args = append(args, status)
|
||||
}
|
||||
q += ` ORDER BY created_at DESC LIMIT 50`
|
||||
return s.queryRequests(ctx, "list inbox", q, args)
|
||||
}
|
||||
|
||||
// ListOutgoing returns the requests a requester RAISED (requester_type +
|
||||
// requester_id), newest first — the async pickup of responses.
|
||||
func (s *RequestStore) ListOutgoing(ctx context.Context, requesterType, requesterID, status string) ([]RequestRow, error) {
|
||||
q := `SELECT ` + requestColumns + ` FROM requests
|
||||
WHERE requester_type = $1 AND requester_id = $2`
|
||||
args := []interface{}{requesterType, requesterID}
|
||||
if status != "" {
|
||||
q += ` AND status = $3`
|
||||
args = append(args, status)
|
||||
}
|
||||
q += ` ORDER BY created_at DESC LIMIT 50`
|
||||
return s.queryRequests(ctx, "list outgoing", q, args)
|
||||
}
|
||||
|
||||
// ListPendingForOrg powers the cross-org "all incoming" canvas view. Returns
|
||||
// pending + info_requested requests in an org, newest first, decorated with the
|
||||
// requesting/responding agent's workspace name via a LEFT JOIN (NULL for a user
|
||||
// party). kind "" = both tabs; "task"/"approval" filters one.
|
||||
func (s *RequestStore) ListPendingForOrg(ctx context.Context, orgID, kind string) ([]RequestRow, error) {
|
||||
// LEFT JOIN workspaces on the requester id when it is an agent, to surface a
|
||||
// human-readable name in the org view. recipient name is left to the caller.
|
||||
q := `SELECT ` +
|
||||
`r.id, r.kind, r.requester_type, r.requester_id, r.org_id,
|
||||
r.recipient_type, r.recipient_id, r.title, r.detail, r.status,
|
||||
r.responder_type, r.responder_id, r.priority, r.created_at, r.updated_at, r.responded_at,
|
||||
w.name
|
||||
FROM requests r
|
||||
LEFT JOIN workspaces w
|
||||
ON r.requester_type = 'agent' AND w.id = r.requester_id::uuid
|
||||
WHERE r.status IN ('pending', 'info_requested')`
|
||||
args := []interface{}{}
|
||||
if orgID != "" {
|
||||
args = append(args, orgID)
|
||||
q += fmt.Sprintf(" AND r.org_id = $%d", len(args))
|
||||
}
|
||||
if kind != "" {
|
||||
args = append(args, kind)
|
||||
q += fmt.Sprintf(" AND r.kind = $%d", len(args))
|
||||
}
|
||||
q += ` ORDER BY r.created_at DESC LIMIT 50`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, q, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request: list pending org: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]RequestRow, 0)
|
||||
for rows.Next() {
|
||||
var r RequestRow
|
||||
if rows.Scan(
|
||||
&r.ID, &r.Kind, &r.RequesterType, &r.RequesterID, &r.OrgID,
|
||||
&r.RecipientType, &r.RecipientID, &r.Title, &r.Detail, &r.Status,
|
||||
&r.ResponderType, &r.ResponderID, &r.Priority, &r.CreatedAt, &r.UpdatedAt, &r.RespondedAt,
|
||||
&r.WorkspaceName,
|
||||
) != nil {
|
||||
continue
|
||||
}
|
||||
out = append(out, r)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("request: list pending org rows.Err org=%s: %v", orgID, err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// queryRequests runs a SELECT in requestColumns order and scans into RequestRows.
|
||||
func (s *RequestStore) queryRequests(ctx context.Context, op, q string, args []interface{}) ([]RequestRow, error) {
|
||||
rows, err := s.db.QueryContext(ctx, q, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request: %s: %w", op, err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]RequestRow, 0)
|
||||
for rows.Next() {
|
||||
r, scanErr := scanRequest(rows)
|
||||
if scanErr != nil {
|
||||
continue
|
||||
}
|
||||
out = append(out, r)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("request: %s rows.Err: %v", op, err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// actionToStatus maps a terminal respond action to the status it sets, gated by
|
||||
// the request's kind: approval accepts approved/rejected, task accepts
|
||||
// done/rejected. Returns ErrInvalidRequestAction on any other combination.
|
||||
func actionToStatus(kind, action string) (string, error) {
|
||||
switch kind {
|
||||
case "approval":
|
||||
if action == "approved" || action == "rejected" {
|
||||
return action, nil
|
||||
}
|
||||
case "task":
|
||||
if action == "done" || action == "rejected" {
|
||||
return action, nil
|
||||
}
|
||||
}
|
||||
return "", ErrInvalidRequestAction
|
||||
}
|
||||
|
||||
// Respond applies a terminal action (done/rejected/approved, validated against
|
||||
// the request's kind), stamps responder + responded_at, and broadcasts
|
||||
// REQUEST_RESPONDED so the requester/canvas picks it up asynchronously. The
|
||||
// requester is NOT blocked — it reads this via ListOutgoing on its next tick.
|
||||
// responderType defaults to "user" (the canvas path); responderID defaults to
|
||||
// "human" when empty. Only acts on a non-terminal (pending/info_requested) row.
|
||||
func (s *RequestStore) Respond(ctx context.Context, id, action, responderType, responderID string) (RequestRow, error) {
|
||||
if responderType == "" {
|
||||
responderType = "user"
|
||||
}
|
||||
if !validParty(responderType) {
|
||||
return RequestRow{}, ErrInvalidRequestParty
|
||||
}
|
||||
if responderID == "" {
|
||||
responderID = "human"
|
||||
}
|
||||
|
||||
// Look the request up first so we can validate action↔kind compatibility and
|
||||
// know who to signal (the requester).
|
||||
req, err := s.Get(ctx, id)
|
||||
if err != nil {
|
||||
return RequestRow{}, err
|
||||
}
|
||||
status, err := actionToStatus(req.Kind, action)
|
||||
if err != nil {
|
||||
return RequestRow{}, err
|
||||
}
|
||||
|
||||
result, err := s.db.ExecContext(ctx, `
|
||||
UPDATE requests
|
||||
SET status = $1, responder_type = $2, responder_id = $3,
|
||||
responded_at = now(), updated_at = now()
|
||||
WHERE id = $4 AND status IN ('pending', 'info_requested')
|
||||
`, status, responderType, responderID, id)
|
||||
if err != nil {
|
||||
return RequestRow{}, fmt.Errorf("request: respond: %w", err)
|
||||
}
|
||||
n, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return RequestRow{}, fmt.Errorf("request: respond RowsAffected: %w", err)
|
||||
}
|
||||
if n == 0 {
|
||||
return RequestRow{}, ErrRequestNotFound
|
||||
}
|
||||
|
||||
// Signal the requester (the agent that raised it) so its inbox/canvas picks
|
||||
// up the resolution asynchronously.
|
||||
target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID)
|
||||
if target != "" {
|
||||
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestResponded), target, map[string]interface{}{
|
||||
"request_id": id,
|
||||
"status": status,
|
||||
"responder_type": responderType,
|
||||
"responder_id": responderID,
|
||||
}); err != nil {
|
||||
log.Printf("request: failed to broadcast responded for %s: %v", target, err)
|
||||
}
|
||||
}
|
||||
|
||||
req.Status = status
|
||||
req.ResponderType = &responderType
|
||||
req.ResponderID = &responderID
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// RequestInfo flips a request to 'info_requested' (the "More Info" transition)
|
||||
// without a terminal action. Used when the recipient asks the requester for
|
||||
// clarification. Only acts on a non-terminal row.
|
||||
func (s *RequestStore) RequestInfo(ctx context.Context, id string) error {
|
||||
result, err := s.db.ExecContext(ctx, `
|
||||
UPDATE requests SET status = 'info_requested', updated_at = now()
|
||||
WHERE id = $1 AND status IN ('pending', 'info_requested')
|
||||
`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request: request-info: %w", err)
|
||||
}
|
||||
n, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("request: request-info RowsAffected: %w", err)
|
||||
}
|
||||
if n == 0 {
|
||||
return ErrRequestNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddMessage appends a row to the More-Info thread and broadcasts
|
||||
// REQUEST_MESSAGE. When the author is the request's RECIPIENT (i.e. the party
|
||||
// being asked is asking back for clarification), it also flips the request to
|
||||
// 'info_requested' so the requester knows it's their turn. Returns the new
|
||||
// message id.
|
||||
func (s *RequestStore) AddMessage(ctx context.Context, id, authorType, authorID, body string) (string, error) {
|
||||
if !validParty(authorType) {
|
||||
return "", ErrInvalidRequestParty
|
||||
}
|
||||
|
||||
req, err := s.Get(ctx, id)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var messageID string
|
||||
err = s.db.QueryRowContext(ctx, `
|
||||
INSERT INTO request_messages (request_id, author_type, author_id, body)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, id, authorType, authorID, body).Scan(&messageID)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("request: add message: %w", err)
|
||||
}
|
||||
|
||||
// If the author is the recipient (the one being asked), this message is a
|
||||
// "please clarify" — flip to info_requested so the requester is prompted.
|
||||
// Only flip a non-terminal request; a closed request keeps its terminal
|
||||
// status even if a late note is appended.
|
||||
if authorType == req.RecipientType && authorID == req.RecipientID {
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
UPDATE requests SET status = 'info_requested', updated_at = now()
|
||||
WHERE id = $1 AND status IN ('pending', 'info_requested')
|
||||
`, id); err != nil {
|
||||
log.Printf("request: failed to flip info_requested on message for %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Signal both ends via the requester anchor (the canvas thread listens there).
|
||||
target := broadcastTarget(req.RequesterType, req.RequesterID, req.RecipientType, req.RecipientID)
|
||||
if target != "" {
|
||||
if err := s.broadcaster.RecordAndBroadcast(ctx, string(events.EventRequestMessage), target, map[string]interface{}{
|
||||
"request_id": id,
|
||||
"message_id": messageID,
|
||||
"author_type": authorType,
|
||||
"author_id": authorID,
|
||||
}); err != nil {
|
||||
log.Printf("request: failed to broadcast message for %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
return messageID, nil
|
||||
}
|
||||
|
||||
// Cancel withdraws a request (the requester changed its mind). Sets status
|
||||
// 'cancelled' + updated_at. Only acts on a non-terminal row; returns
|
||||
// ErrRequestNotFound when missing or already terminal.
|
||||
func (s *RequestStore) Cancel(ctx context.Context, id string) error {
|
||||
result, err := s.db.ExecContext(ctx, `
|
||||
UPDATE requests SET status = 'cancelled', updated_at = now()
|
||||
WHERE id = $1 AND status IN ('pending', 'info_requested')
|
||||
`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request: cancel: %w", err)
|
||||
}
|
||||
n, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("request: cancel RowsAffected: %w", err)
|
||||
}
|
||||
if n == 0 {
|
||||
return ErrRequestNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,376 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// RequestsHandler serves the unified "requests" inbox — the Tasks + Approvals
|
||||
// primitive (RFC P1, docs/design/rfc-unified-requests-inbox.md). It generalizes
|
||||
// UserTasksHandler (agent→user asks) and ApprovalsHandler (the gate) into one
|
||||
// surface keyed by kind ∈ {task, approval}, where requester and recipient may
|
||||
// each be a user OR an agent. Responding is asynchronous: the requester is
|
||||
// never blocked; a REQUEST_RESPONDED event signals it to pick the answer up on
|
||||
// its next tick.
|
||||
type RequestsHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
}
|
||||
|
||||
// --- OpenAPI doc shapes (used by swaggo; the handlers emit gin.H inline) ---
|
||||
|
||||
// CreateRequestBody is the body of POST /workspaces/{id}/requests. requester is
|
||||
// the calling workspace (agent); only the recipient + content are supplied.
|
||||
type CreateRequestBody struct {
|
||||
Kind string `json:"kind" binding:"required" enums:"task,approval"`
|
||||
RecipientType string `json:"recipient_type" binding:"required" enums:"user,agent"`
|
||||
RecipientID string `json:"recipient_id"`
|
||||
Title string `json:"title" binding:"required"`
|
||||
Detail string `json:"detail"`
|
||||
Priority *int `json:"priority"`
|
||||
}
|
||||
|
||||
// CreateRequestResponse is returned by POST /workspaces/{id}/requests.
|
||||
type CreateRequestResponse struct {
|
||||
RequestID string `json:"request_id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// RespondRequestBody is the body of POST /requests/{requestId}/respond. The
|
||||
// responder identity is taken from the body for now (P1); the canvas path
|
||||
// defaults responder_type to 'user'. action is validated against the kind.
|
||||
type RespondRequestBody struct {
|
||||
Action string `json:"action" binding:"required" enums:"done,rejected,approved"`
|
||||
ResponderType string `json:"responder_type" enums:"user,agent"`
|
||||
ResponderID string `json:"responder_id"`
|
||||
}
|
||||
|
||||
// AddRequestMessageBody is the body of POST /requests/{requestId}/messages —
|
||||
// the More-Info thread. When the author is the recipient, the request flips to
|
||||
// info_requested.
|
||||
type AddRequestMessageBody struct {
|
||||
Body string `json:"body" binding:"required"`
|
||||
AuthorType string `json:"author_type" binding:"required" enums:"user,agent"`
|
||||
AuthorID string `json:"author_id"`
|
||||
}
|
||||
|
||||
// RequestMutationResponse is the {status, request_id} echo returned by the
|
||||
// respond / cancel / messages endpoints.
|
||||
type RequestMutationResponse struct {
|
||||
Status string `json:"status"`
|
||||
RequestID string `json:"request_id"`
|
||||
}
|
||||
|
||||
// RequestWithThread is the GET /requests/{requestId} shape — the request plus
|
||||
// its More-Info thread.
|
||||
type RequestWithThread struct {
|
||||
Request RequestRow `json:"request"`
|
||||
Messages []RequestMessageRow `json:"messages"`
|
||||
}
|
||||
|
||||
func NewRequestsHandler(b *events.Broadcaster) *RequestsHandler {
|
||||
return &RequestsHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// store builds a RequestStore over the live global db.DB per request — same
|
||||
// rationale as UserTasksHandler.store(): the test harness swaps db.DB under us.
|
||||
func (h *RequestsHandler) store() *RequestStore {
|
||||
return NewRequestStore(db.DB, h.broadcaster)
|
||||
}
|
||||
|
||||
// Create handles POST /workspaces/:id/requests — the calling workspace (an
|
||||
// agent) raises a task/approval addressed to a user or another agent.
|
||||
//
|
||||
// @Summary Raise a request (task or approval)
|
||||
// @Tags requests
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param id path string true "Requester workspace ID"
|
||||
// @Param body body CreateRequestBody true "Request fields"
|
||||
// @Success 201 {object} CreateRequestResponse
|
||||
// @Failure 400 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /workspaces/{id}/requests [post]
|
||||
// @Security BearerAuth && OrgSlugAuth
|
||||
func (h *RequestsHandler) Create(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var body CreateRequestBody
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
|
||||
// Decorate the request with its org anchor for the cross-org pending view.
|
||||
// The workspaces table has NO org_id column — an "org" is the parent_id-chain
|
||||
// root resolved by orgRootID (org_scope.go). Best-effort: a missing root
|
||||
// leaves org_id NULL (the org view simply won't surface it), never blocks
|
||||
// creation.
|
||||
orgID, err := orgRootID(ctx, db.DB, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("requests: failed to resolve org root for workspace=%s: %v", workspaceID, err)
|
||||
orgID = ""
|
||||
}
|
||||
|
||||
requestID, err := h.store().Create(ctx, CreateRequestInput{
|
||||
Kind: body.Kind,
|
||||
RequesterType: "agent",
|
||||
RequesterID: workspaceID,
|
||||
OrgID: orgID,
|
||||
RecipientType: body.RecipientType,
|
||||
RecipientID: body.RecipientID,
|
||||
Title: body.Title,
|
||||
Detail: body.Detail,
|
||||
Priority: body.Priority,
|
||||
})
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrInvalidRequestKind) || errors.Is(err, ErrInvalidRequestParty) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
log.Printf("Create request error workspace=%s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"request_id": requestID, "status": "pending"})
|
||||
}
|
||||
|
||||
// ListInbox handles GET /workspaces/:id/requests/inbox?status= — requests
|
||||
// addressed TO this workspace (the agent's incoming).
|
||||
//
|
||||
// @Summary List a workspace's incoming requests (inbox)
|
||||
// @Tags requests
|
||||
// @Produce json
|
||||
// @Param id path string true "Recipient workspace ID"
|
||||
// @Param status query string false "Filter by status"
|
||||
// @Success 200 {array} RequestRow
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /workspaces/{id}/requests/inbox [get]
|
||||
// @Security BearerAuth && OrgSlugAuth
|
||||
func (h *RequestsHandler) ListInbox(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
rows, err := h.store().ListInbox(ctx, "agent", workspaceID, c.Query("status"))
|
||||
if err != nil {
|
||||
log.Printf("List inbox error workspace=%s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, rows)
|
||||
}
|
||||
|
||||
// ListOutgoing handles GET /workspaces/:id/requests?status= — the requests this
|
||||
// workspace RAISED (the async pickup of responses).
|
||||
//
|
||||
// @Summary List a workspace's outgoing requests
|
||||
// @Tags requests
|
||||
// @Produce json
|
||||
// @Param id path string true "Requester workspace ID"
|
||||
// @Param status query string false "Filter by status"
|
||||
// @Success 200 {array} RequestRow
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /workspaces/{id}/requests [get]
|
||||
// @Security BearerAuth && OrgSlugAuth
|
||||
func (h *RequestsHandler) ListOutgoing(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
rows, err := h.store().ListOutgoing(ctx, "agent", workspaceID, c.Query("status"))
|
||||
if err != nil {
|
||||
log.Printf("List outgoing error workspace=%s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, rows)
|
||||
}
|
||||
|
||||
// Get handles GET /requests/:requestId — a single request plus its More-Info
|
||||
// thread.
|
||||
//
|
||||
// @Summary Get a request with its message thread
|
||||
// @Tags requests
|
||||
// @Produce json
|
||||
// @Param requestId path string true "Request ID"
|
||||
// @Success 200 {object} RequestWithThread
|
||||
// @Failure 404 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/{requestId} [get]
|
||||
// @Security BearerAuth
|
||||
func (h *RequestsHandler) Get(c *gin.Context) {
|
||||
requestID := c.Param("requestId")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
s := h.store()
|
||||
req, err := s.Get(ctx, requestID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
|
||||
return
|
||||
}
|
||||
log.Printf("Get request error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
msgs, err := s.Messages(ctx, requestID)
|
||||
if err != nil {
|
||||
log.Printf("Get request messages error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"request": req, "messages": msgs})
|
||||
}
|
||||
|
||||
// Respond handles POST /requests/:requestId/respond — a terminal action
|
||||
// (done/rejected/approved), validated against the request's kind. responder
|
||||
// identity comes from the body; the canvas/admin path defaults to 'user'.
|
||||
//
|
||||
// @Summary Respond to a request (done / rejected / approved)
|
||||
// @Tags requests
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param requestId path string true "Request ID"
|
||||
// @Param body body RespondRequestBody true "Response"
|
||||
// @Success 200 {object} RequestMutationResponse
|
||||
// @Failure 400 {object} ErrorResponse
|
||||
// @Failure 404 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/{requestId}/respond [post]
|
||||
// @Security BearerAuth
|
||||
func (h *RequestsHandler) Respond(c *gin.Context) {
|
||||
requestID := c.Param("requestId")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var body RespondRequestBody
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := h.store().Respond(ctx, requestID, body.Action, body.ResponderType, body.ResponderID); err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"})
|
||||
return
|
||||
}
|
||||
if errors.Is(err, ErrInvalidRequestAction) || errors.Is(err, ErrInvalidRequestParty) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
log.Printf("Respond request error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": body.Action, "request_id": requestID})
|
||||
}
|
||||
|
||||
// AddMessage handles POST /requests/:requestId/messages — append to the
|
||||
// More-Info thread. When the author is the recipient, the request flips to
|
||||
// info_requested.
|
||||
//
|
||||
// @Summary Add a message to a request's More-Info thread
|
||||
// @Tags requests
|
||||
// @Accept json
|
||||
// @Produce json
|
||||
// @Param requestId path string true "Request ID"
|
||||
// @Param body body AddRequestMessageBody true "Message"
|
||||
// @Success 201 {object} RequestMutationResponse
|
||||
// @Failure 400 {object} ErrorResponse
|
||||
// @Failure 404 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/{requestId}/messages [post]
|
||||
// @Security BearerAuth
|
||||
func (h *RequestsHandler) AddMessage(c *gin.Context) {
|
||||
requestID := c.Param("requestId")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var body AddRequestMessageBody
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
|
||||
messageID, err := h.store().AddMessage(ctx, requestID, body.AuthorType, body.AuthorID, body.Body)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found"})
|
||||
return
|
||||
}
|
||||
if errors.Is(err, ErrInvalidRequestParty) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
log.Printf("AddMessage request error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to add message"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"status": "created", "request_id": requestID, "message_id": messageID})
|
||||
}
|
||||
|
||||
// Cancel handles POST /requests/:requestId/cancel — the requester withdraws.
|
||||
//
|
||||
// @Summary Cancel (withdraw) a request
|
||||
// @Tags requests
|
||||
// @Produce json
|
||||
// @Param requestId path string true "Request ID"
|
||||
// @Success 200 {object} RequestMutationResponse
|
||||
// @Failure 404 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/{requestId}/cancel [post]
|
||||
// @Security BearerAuth
|
||||
func (h *RequestsHandler) Cancel(c *gin.Context) {
|
||||
requestID := c.Param("requestId")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
if err := h.store().Cancel(ctx, requestID); err != nil {
|
||||
if errors.Is(err, ErrRequestNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "request not found or already resolved"})
|
||||
return
|
||||
}
|
||||
log.Printf("Cancel request error request=%s: %v", requestID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to cancel"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "cancelled", "request_id": requestID})
|
||||
}
|
||||
|
||||
// ListPending handles GET /requests/pending?kind= — the cross-org pending view
|
||||
// for the canvas Tasks/Approvals tabs. Cross-workspace, so AdminAuth-gated like
|
||||
// /user-tasks/pending and /approvals/pending. ?kind=task|approval lets each tab
|
||||
// query its own slice.
|
||||
//
|
||||
// @Summary List pending requests across the org (canvas tabs)
|
||||
// @Tags requests
|
||||
// @Produce json
|
||||
// @Param kind query string false "Filter by kind" Enums(task, approval)
|
||||
// @Param org_id query string false "Filter by org"
|
||||
// @Success 200 {array} RequestRow
|
||||
// @Failure 400 {object} ErrorResponse
|
||||
// @Failure 500 {object} ErrorResponse
|
||||
// @Router /requests/pending [get]
|
||||
// @Security BearerAuth
|
||||
func (h *RequestsHandler) ListPending(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
kind := c.Query("kind")
|
||||
if kind != "" && kind != "task" && kind != "approval" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'task' or 'approval'"})
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := h.store().ListPendingForOrg(ctx, c.Query("org_id"), kind)
|
||||
if err != nil {
|
||||
log.Printf("ListPending requests error: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, rows)
|
||||
}
|
||||
@@ -0,0 +1,563 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// requestColumnNames mirrors the requestColumns SELECT projection order so the
|
||||
// sqlmock rows line up with scanRequest.
|
||||
var requestColumnNames = []string{
|
||||
"id", "kind", "requester_type", "requester_id", "org_id",
|
||||
"recipient_type", "recipient_id", "title", "detail", "status",
|
||||
"responder_type", "responder_id", "priority", "created_at", "updated_at", "responded_at",
|
||||
}
|
||||
|
||||
// oneRequestRow builds a single-row result set for a Get/List SELECT.
|
||||
func oneRequestRow(id, kind, requesterID, recipientType, recipientID, status string) *sqlmock.Rows {
|
||||
return sqlmock.NewRows(requestColumnNames).AddRow(
|
||||
id, kind, "agent", requesterID, nil,
|
||||
recipientType, recipientID, "Some title", nil, status,
|
||||
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
|
||||
)
|
||||
}
|
||||
|
||||
// ---------- Create ----------
|
||||
|
||||
func TestRequests_Create_Task(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// Create handler first resolves the org root (org_scope CTE).
|
||||
mock.ExpectQuery("WITH RECURSIVE org_chain").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
|
||||
// INSERT request → id
|
||||
mock.ExpectQuery("INSERT INTO requests").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-1"))
|
||||
// REQUEST_CREATED broadcast (recipient is a user here → anchors on requester agent)
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
body := `{"kind":"task","recipient_type":"user","recipient_id":"","title":"Review the launch draft","detail":"posts/launch.md"}`
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["request_id"] != "req-1" {
|
||||
t.Errorf("expected request_id req-1, got %v", resp["request_id"])
|
||||
}
|
||||
if resp["status"] != "pending" {
|
||||
t.Errorf("expected status pending, got %v", resp["status"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Create_Approval_AgentRecipient(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("WITH RECURSIVE org_chain").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
|
||||
mock.ExpectQuery("INSERT INTO requests").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("req-2"))
|
||||
// Recipient is an agent → REQUEST_CREATED anchors on the recipient workspace.
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
body := `{"kind":"approval","recipient_type":"agent","recipient_id":"ws-2","title":"Approve deploy"}`
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Create_MissingTitle(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"task","recipient_type":"user"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400 for missing title, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Create_InvalidKind(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// org root resolves, then Create rejects the kind before any INSERT.
|
||||
mock.ExpectQuery("WITH RECURSIVE org_chain").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow("org-root-1"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"kind":"banana","recipient_type":"user","title":"x"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400 for invalid kind, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Inbox vs Outgoing listing ----------
|
||||
|
||||
func TestRequests_ListInbox(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests").
|
||||
WithArgs("agent", "ws-2").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-2"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
handler.ListInbox(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 1 || resp[0]["id"] != "req-1" {
|
||||
t.Errorf("expected one inbox request req-1, got %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_ListInbox_WithStatusFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests").
|
||||
WithArgs("agent", "ws-2", "pending").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-2"}}
|
||||
c.Request = httptest.NewRequest("GET", "/?status=pending", nil)
|
||||
|
||||
handler.ListInbox(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_ListOutgoing(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests").
|
||||
WithArgs("agent", "ws-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "done"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
handler.ListOutgoing(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 1 || resp[0]["status"] != "done" {
|
||||
t.Errorf("expected one outgoing request status done, got %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Get (with thread) ----------
|
||||
|
||||
func TestRequests_Get_WithThread(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "info_requested"))
|
||||
mock.ExpectQuery("FROM request_messages WHERE request_id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "request_id", "author_type", "author_id", "body", "created_at"}).
|
||||
AddRow("msg-1", "req-1", "user", "u-1", "what file?", "2026-06-10T01:00:00Z"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["request"] == nil || resp["messages"] == nil {
|
||||
t.Errorf("expected request + messages keys, got %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Get_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("nope").
|
||||
WillReturnRows(sqlmock.NewRows(requestColumnNames))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "nope"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
handler.Get(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Respond (valid + invalid action-for-kind) ----------
|
||||
|
||||
func TestRequests_Respond_ApprovalApproved(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// Respond does Get first to validate action↔kind.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending"))
|
||||
mock.ExpectExec("UPDATE requests").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// REQUEST_RESPONDED broadcast (anchored on requester agent ws-1).
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"approved","responder_id":"u-1"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["status"] != "approved" {
|
||||
t.Errorf("expected status approved, got %v", resp["status"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Respond_TaskDone(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "user", "", "pending"))
|
||||
mock.ExpectExec("UPDATE requests").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Respond_InvalidActionForKind(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// kind=approval cannot be "done" — Get succeeds, actionToStatus rejects.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "approval", "ws-1", "user", "", "pending"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400 for done-on-approval, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Respond_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("nope").
|
||||
WillReturnRows(sqlmock.NewRows(requestColumnNames))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "nope"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"action":"done"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Respond(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- AddMessage → info_requested when recipient asks ----------
|
||||
|
||||
func TestRequests_AddMessage_RecipientFlipsInfoRequested(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// Author is the recipient agent ws-2 → flips to info_requested.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "pending"))
|
||||
mock.ExpectQuery("INSERT INTO request_messages").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-1"))
|
||||
// recipient-authored → status flip UPDATE
|
||||
mock.ExpectExec("UPDATE requests SET status = 'info_requested'").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// REQUEST_MESSAGE broadcast (anchored on requester ws-1)
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"which file?","author_type":"agent","author_id":"ws-2"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["message_id"] != "msg-1" {
|
||||
t.Errorf("expected message_id msg-1, got %v", resp["message_id"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_AddMessage_RequesterDoesNotFlip(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// Author is the requester ws-1 (not recipient) → NO info_requested flip.
|
||||
mock.ExpectQuery("FROM requests WHERE id").
|
||||
WithArgs("req-1").
|
||||
WillReturnRows(oneRequestRow("req-1", "task", "ws-1", "agent", "ws-2", "info_requested"))
|
||||
mock.ExpectQuery("INSERT INTO request_messages").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("msg-2"))
|
||||
// No status-flip UPDATE expected here.
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"body":"posts/launch.md","author_type":"agent","author_id":"ws-1"}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.AddMessage(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Cancel ----------
|
||||
|
||||
func TestRequests_Cancel_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectExec("UPDATE requests SET status = 'cancelled'").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "req-1"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", nil)
|
||||
|
||||
handler.Cancel(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_Cancel_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectExec("UPDATE requests SET status = 'cancelled'").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "requestId", Value: "gone"}}
|
||||
c.Request = httptest.NewRequest("POST", "/", nil)
|
||||
|
||||
handler.Cancel(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- ListPending (org view) + kind filter ----------
|
||||
|
||||
// pendingColumnNames mirrors ListPendingForOrg's projection (requestColumns + w.name).
|
||||
var pendingColumnNames = append(append([]string{}, requestColumnNames...), "workspace_name")
|
||||
|
||||
func onePendingRow(id, kind string) *sqlmock.Rows {
|
||||
return sqlmock.NewRows(pendingColumnNames).AddRow(
|
||||
id, kind, "agent", "ws-1", "org-root-1",
|
||||
"user", "", "Some title", nil, "pending",
|
||||
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
|
||||
"Marketing Agent",
|
||||
)
|
||||
}
|
||||
|
||||
func TestRequests_ListPending_NoFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("FROM requests r").
|
||||
WillReturnRows(onePendingRow("req-1", "task").AddRow(
|
||||
"req-2", "approval", "agent", "ws-2", "org-root-1",
|
||||
"user", "", "Approve deploy", nil, "pending",
|
||||
nil, nil, nil, "2026-06-10T00:00:00Z", "2026-06-10T00:00:00Z", nil,
|
||||
"Ops Agent",
|
||||
))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
handler.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp []map[string]interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 2 {
|
||||
t.Errorf("expected 2 pending, got %d (%v)", len(resp), resp)
|
||||
}
|
||||
if resp[0]["workspace_name"] != "Marketing Agent" {
|
||||
t.Errorf("expected decorated workspace_name, got %v", resp[0]["workspace_name"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_ListPending_KindFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
// ?kind=approval must add an arg.
|
||||
mock.ExpectQuery("FROM requests r").
|
||||
WithArgs("approval").
|
||||
WillReturnRows(onePendingRow("req-2", "approval"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/?kind=approval", nil)
|
||||
|
||||
handler.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequests_ListPending_InvalidKind(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewRequestsHandler(newTestBroadcaster())
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/?kind=banana", nil)
|
||||
|
||||
handler.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400 for invalid kind, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
@@ -348,6 +348,36 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// AdminAuth-gated exactly like /approvals/pending.
|
||||
r.GET("/user-tasks/pending", middleware.AdminAuth(db.DB), uth.ListAll)
|
||||
|
||||
// Requests — the unified Tasks + Approvals inbox (RFC P1). Generalizes
|
||||
// approvals + user-tasks into one model keyed by kind. Auth is split the
|
||||
// same way: per-workspace create/list under wsAuth (an agent acts with
|
||||
// its workspace token); the cross-org pending view + the
|
||||
// /requests/:requestId/* action paths are AdminAuth-gated for the canvas
|
||||
// user. Because an AGENT can also respond to a request addressed to it
|
||||
// (using its own workspace token), the action verbs are ALSO registered
|
||||
// under the wsAuth /workspaces/:id/requests/:requestId/* prefix — same
|
||||
// dual-surface pattern the brief calls for (agent = workspace token,
|
||||
// canvas user = admin token), no new auth mechanism.
|
||||
rqh := handlers.NewRequestsHandler(broadcaster)
|
||||
wsAuth.POST("/requests", rqh.Create)
|
||||
wsAuth.GET("/requests", rqh.ListOutgoing)
|
||||
wsAuth.GET("/requests/inbox", rqh.ListInbox)
|
||||
// Agent-side action verbs (workspace-token auth).
|
||||
wsAuth.GET("/requests/:requestId", rqh.Get)
|
||||
wsAuth.POST("/requests/:requestId/respond", rqh.Respond)
|
||||
wsAuth.POST("/requests/:requestId/messages", rqh.AddMessage)
|
||||
wsAuth.POST("/requests/:requestId/cancel", rqh.Cancel)
|
||||
// Cross-org pending view for the canvas Tasks/Approvals tabs — AdminAuth
|
||||
// like /user-tasks/pending. ?kind=task|approval drives each tab.
|
||||
r.GET("/requests/pending", middleware.AdminAuth(db.DB), rqh.ListPending)
|
||||
// Canvas-user action verbs (admin auth). Same handlers; the responder
|
||||
// defaults to 'user' on this path.
|
||||
reqAdmin := r.Group("", middleware.AdminAuth(db.DB))
|
||||
reqAdmin.GET("/requests/:requestId", rqh.Get)
|
||||
reqAdmin.POST("/requests/:requestId/respond", rqh.Respond)
|
||||
reqAdmin.POST("/requests/:requestId/messages", rqh.AddMessage)
|
||||
reqAdmin.POST("/requests/:requestId/cancel", rqh.Cancel)
|
||||
|
||||
// (TeamHandler is gone — #2864.) The visual canvas Collapse
|
||||
// button calls PATCH /workspaces/:id { collapsed: true/false }
|
||||
// (presentational toggle on canvas_layouts), NOT the destructive
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- Drop child first (FK), then parent. Idempotent (IF EXISTS) so a re-run of the
|
||||
-- down migration is a no-op rather than a crash.
|
||||
DROP TABLE IF EXISTS request_messages;
|
||||
DROP TABLE IF EXISTS requests;
|
||||
@@ -0,0 +1,122 @@
|
||||
-- requests: the unified Tasks + Approvals primitive (RFC P1). Generalizes
|
||||
-- user_tasks (007-style worklist "asks") and approval_requests (the destructive
|
||||
-- gate) into ONE inbox keyed by kind ∈ {task, approval}, with a requester and a
|
||||
-- recipient that may each be a user OR an agent. See
|
||||
-- docs/design/rfc-unified-requests-inbox.md.
|
||||
--
|
||||
-- recipient_id / requester_id are plain TEXT with NO foreign key on purpose: an
|
||||
-- agent's id is a workspaces(id) UUID, but a user's id is not a workspaces row,
|
||||
-- so a cross-type FK is impossible. ListPendingForOrg LEFT JOINs workspaces to
|
||||
-- decorate a name when the party is an agent (a user party simply has no match).
|
||||
--
|
||||
-- The migration runner tracks applied filenames in schema_migrations, but a
|
||||
-- partial-failure re-run would re-apply this file, so EVERYTHING here is
|
||||
-- idempotent: CREATE TABLE/INDEX IF NOT EXISTS (no bare ALTER ADD CONSTRAINT,
|
||||
-- which can't be IF NOT EXISTS and would crash-loop), and the historical
|
||||
-- backfills use ON CONFLICT (id) DO NOTHING so re-runs are no-ops.
|
||||
CREATE TABLE IF NOT EXISTS requests (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
kind TEXT NOT NULL CHECK (kind IN ('task', 'approval')),
|
||||
requester_type TEXT NOT NULL CHECK (requester_type IN ('user', 'agent')),
|
||||
requester_id TEXT NOT NULL,
|
||||
org_id UUID,
|
||||
recipient_type TEXT NOT NULL CHECK (recipient_type IN ('user', 'agent')),
|
||||
recipient_id TEXT NOT NULL,
|
||||
title TEXT NOT NULL,
|
||||
detail TEXT,
|
||||
status TEXT NOT NULL DEFAULT 'pending'
|
||||
CHECK (status IN ('pending', 'info_requested', 'done', 'rejected', 'approved', 'cancelled')),
|
||||
responder_type TEXT,
|
||||
responder_id TEXT,
|
||||
priority SMALLINT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
responded_at TIMESTAMPTZ
|
||||
);
|
||||
|
||||
-- Inbox read: WHERE recipient_type=$1 AND recipient_id=$2 [AND status=$3]
|
||||
-- ORDER BY created_at DESC. This is the recipient's "incoming" list (agent poll
|
||||
-- or user Tasks/Approvals tab).
|
||||
CREATE INDEX IF NOT EXISTS idx_requests_inbox
|
||||
ON requests (recipient_type, recipient_id, status, created_at DESC);
|
||||
|
||||
-- Org pending view ("all agents' incoming" tab): WHERE org_id=$1 AND status...
|
||||
CREATE INDEX IF NOT EXISTS idx_requests_org_pending
|
||||
ON requests (org_id, status, created_at DESC);
|
||||
|
||||
-- Outgoing / async pickup: WHERE requester_type=$1 AND requester_id=$2 — the
|
||||
-- requester reads back the requests it raised (check_requests).
|
||||
CREATE INDEX IF NOT EXISTS idx_requests_outgoing
|
||||
ON requests (requester_type, requester_id, created_at DESC);
|
||||
|
||||
-- request_messages: the "More Info / chat about this" thread on a request.
|
||||
CREATE TABLE IF NOT EXISTS request_messages (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
request_id UUID NOT NULL REFERENCES requests(id) ON DELETE CASCADE,
|
||||
author_type TEXT NOT NULL CHECK (author_type IN ('user', 'agent')),
|
||||
author_id TEXT NOT NULL,
|
||||
body TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- Thread read: WHERE request_id=$1 ORDER BY created_at.
|
||||
CREATE INDEX IF NOT EXISTS idx_request_messages_thread
|
||||
ON request_messages (request_id, created_at);
|
||||
|
||||
-- Idempotent backfill — copy historical user_tasks into the unified inbox so
|
||||
-- the Tasks tab shows pre-cutover items. user_tasks are always agent→user asks,
|
||||
-- so requester_type='agent' (requester_id = the raising workspace), recipient is
|
||||
-- the human user; recipient_id '' is the "the org's user" sentinel (user ids are
|
||||
-- not modelled yet in this table's source — P1 keeps it empty, the CHECK permits
|
||||
-- empty TEXT). 'dismissed' maps to the unified 'rejected'.
|
||||
INSERT INTO requests (
|
||||
id, kind, requester_type, requester_id, recipient_type, recipient_id,
|
||||
title, detail, status, responder_id, created_at, responded_at
|
||||
)
|
||||
SELECT
|
||||
id,
|
||||
'task',
|
||||
'agent',
|
||||
workspace_id::text,
|
||||
'user',
|
||||
'',
|
||||
title,
|
||||
detail,
|
||||
CASE status WHEN 'dismissed' THEN 'rejected' ELSE status END,
|
||||
resolved_by,
|
||||
created_at,
|
||||
resolved_at
|
||||
FROM user_tasks
|
||||
ON CONFLICT (id) DO NOTHING;
|
||||
|
||||
-- Idempotent backfill — copy historical approval_requests in as kind='approval'.
|
||||
-- approval_requests columns (007_approvals.sql): id, workspace_id, task_id,
|
||||
-- action, reason, context, status IN (pending|approved|denied|escalated),
|
||||
-- decided_by, decided_at, created_at. Map status: 'denied'→'rejected',
|
||||
-- 'escalated'→'pending' (still awaiting a decision, just bubbled up). title is
|
||||
-- the action; detail is the reason. requester = the raising workspace (agent),
|
||||
-- recipient = the user who decides.
|
||||
INSERT INTO requests (
|
||||
id, kind, requester_type, requester_id, recipient_type, recipient_id,
|
||||
title, detail, status, responder_id, created_at, responded_at
|
||||
)
|
||||
SELECT
|
||||
id,
|
||||
'approval',
|
||||
'agent',
|
||||
workspace_id::text,
|
||||
'user',
|
||||
'',
|
||||
action,
|
||||
reason,
|
||||
CASE status
|
||||
WHEN 'denied' THEN 'rejected'
|
||||
WHEN 'escalated' THEN 'pending'
|
||||
ELSE status
|
||||
END,
|
||||
decided_by,
|
||||
created_at,
|
||||
decided_at
|
||||
FROM approval_requests
|
||||
WHERE workspace_id IS NOT NULL
|
||||
ON CONFLICT (id) DO NOTHING;
|
||||
Reference in New Issue
Block a user