molecule-core/workspace-server/internal/handlers/admin_delegations.go
Hongming Wang 2ed4f4fb41 feat(delegations): operator dashboard endpoint over the durable ledger (RFC #2829 PR-4)
Two read endpoints over the `delegations` table (PR-1 schema):

  GET /admin/delegations[?status=in_flight|stuck|failed|completed&limit=N]
  GET /admin/delegations/stats

## What this gives operators

Without this, post-incident investigation requires direct DB access —
only the on-call SRE can answer "is workspace X delegating to a wedged
callee?". This moves that visibility into the same surface as
/admin/queue, /admin/schedules-health, /admin/memories.

## List endpoint

Status filter via tight allowlist:

  - in_flight (default) → status IN (queued, dispatched, in_progress)
  - stuck               → status='stuck' (rows the PR-3 sweeper marked)
  - failed              → status='failed'
  - completed           → status='completed'

Unknown status → 400 with the allowlist in the error body. Limit
1..1000, default 100.

The status allowlist drives a parameterized IN clause (no string-
concatenation of user-controlled values into SQL).

Result rows expose all the audit-grade fields the dashboard needs:
delegation_id, caller_id, callee_id, task_preview, status,
last_heartbeat, deadline, result_preview, error_detail, retry_count,
created_at, updated_at. Nullable fields use pointer types so JSON
omits them when NULL (no false-zero "" for missing values).

## Stats endpoint

Zero-fills every known status key (queued, dispatched, in_progress,
completed, failed, stuck) so the dashboard summary card doesn't have
to handle "missing key vs zero" branching.

## Out of scope (deferred)

  - "retry this stuck task" mutation: needs the agent-side cutover
    (RFC #2829 PR-5 plan) before re-fire is safe
  - p95 / p99 duration aggregates: separate metric exposure, not a
    row-level read endpoint
  - Canvas UI: this is the JSON contract; the canvas operator panel
    consumes it in a follow-up canvas PR

## Wiring

NOT wired into the router in this PR — ships separately to keep
PR-by-PR review surface tight. Wiring will land in the
`enable-rfc2829-server-side` follow-up PR alongside the sweeper Start
call and the result-push flag flip.

## Coverage

11 unit tests:

List (8):
  - default status=in_flight, IN(queued,dispatched,in_progress)
  - status=stuck → IN(stuck)
  - status=failed → IN(failed)
  - unknown status → 400 with allowlist
  - negative limit → 400
  - over-cap limit → 400
  - custom limit accepted + echoed in response
  - nullable fields populated correctly (pointer-omitempty)

Stats (2):
  - zero-fills missing status keys
  - empty table → all counts zero

Contract pin (1):
  - statusFilters table shape — every documented key + value pair
    pinned. Drift catches accidental edits (forward defense).

Refs RFC #2829.
2026-05-04 20:58:17 -07:00

237 lines
7.6 KiB
Go

package handlers
import (
"database/sql"
"log"
"net/http"
"strconv"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// admin_delegations.go — RFC #2829 PR-4: operator dashboard endpoint
// over the durable delegations ledger (PR-1 schema, PR-3 sweeper).
//
// What this endpoint serves
// -------------------------
//
// GET /admin/delegations[?status=in_flight|stuck|failed&limit=N]
//
// Returns the rows the operator needs to triage delegation health:
// - in_flight : status IN (queued, dispatched, in_progress) — the
// things actively churning right now. Default view.
// - stuck : status='stuck' — sweeper found these wedged. Operator
// can investigate the callee + decide whether to retry
// (RFC #2829 PR-5 plan).
// - failed : status='failed' — terminal failures, recent. Useful
// for spotting trends like "callee X is failing 50% of
// delegations since 14:00".
//
// Why an admin endpoint at all
// ----------------------------
// Without this, post-incident investigation requires direct DB access —
// only the on-call SRE can answer "is workspace X delegating to a wedged
// callee?". The dashboard endpoint moves that visibility into the same
// surface as /admin/queue, /admin/schedules-health, /admin/memories etc.
//
// Out of scope (deferred to a follow-up PR per RFC #2829)
// -------------------------------------------------------
// - "retry this stuck task" mutation: needs careful interaction with
// the agent-side cutover (PR-5) before it can be safely re-fired
// - p95 / p99 duration aggregates: separate metric exposure, not a
// row-level read
// - Canvas UI: this is the JSON contract; the canvas operator panel
// consumes it in a follow-up canvas PR
// AdminDelegationsHandler serves the operator dashboard read endpoint.
type AdminDelegationsHandler struct {
db *sql.DB
}
func NewAdminDelegationsHandler(handle *sql.DB) *AdminDelegationsHandler {
if handle == nil {
handle = db.DB
}
return &AdminDelegationsHandler{db: handle}
}
// delegationRow mirrors the row shape of the `delegations` table that the
// operator dashboard cares about. Order matches the SELECT below — keep
// the two in sync if you add a column.
type delegationRow struct {
DelegationID string `json:"delegation_id"`
CallerID string `json:"caller_id"`
CalleeID string `json:"callee_id"`
TaskPreview string `json:"task_preview"`
Status string `json:"status"`
LastHeartbeat *time.Time `json:"last_heartbeat,omitempty"`
Deadline time.Time `json:"deadline"`
ResultPreview *string `json:"result_preview,omitempty"`
ErrorDetail *string `json:"error_detail,omitempty"`
RetryCount int `json:"retry_count"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// statusFilters maps the query-string `status` value to the SQL set.
// Keep tight — operators don't get to query arbitrary status — so a
// new status name added to the schema needs an explicit allowlist
// entry here. Caught when a future status name doesn't pin to a UI
// expectation (forward-defense).
var statusFilters = map[string][]string{
"in_flight": {"queued", "dispatched", "in_progress"},
"stuck": {"stuck"},
"failed": {"failed"},
"completed": {"completed"},
}
const defaultListLimit = 100
const maxListLimit = 1000
// List handles GET /admin/delegations
//
// Query params:
// - status — one of `in_flight` (default) / `stuck` / `failed` / `completed`
// - limit — int, 1..1000 (default 100)
//
// Returns 200 with `{"delegations": [...], "count": N}`.
func (h *AdminDelegationsHandler) List(c *gin.Context) {
statusKey := c.DefaultQuery("status", "in_flight")
statuses, ok := statusFilters[statusKey]
if !ok {
c.JSON(http.StatusBadRequest, gin.H{
"error": "unknown status filter",
"allowed": []string{"in_flight", "stuck", "failed", "completed"},
"requested_status": statusKey,
})
return
}
limit := defaultListLimit
if v := c.Query("limit"); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 1 || n > maxListLimit {
c.JSON(http.StatusBadRequest, gin.H{
"error": "limit must be 1..1000",
"requested": v,
})
return
}
limit = n
}
// Build the IN list as a parameterized expression — never string-
// concatenate user-controlled values into the SQL. statusKey came
// from the allowlist above so the slice is fully bounded.
args := make([]any, 0, len(statuses)+1)
placeholders := ""
for i, s := range statuses {
if i > 0 {
placeholders += ","
}
args = append(args, s)
placeholders += "$" + strconv.Itoa(i+1)
}
args = append(args, limit)
limitPlaceholder := "$" + strconv.Itoa(len(statuses)+1)
rows, err := h.db.QueryContext(c.Request.Context(), `
SELECT delegation_id, caller_id::text, callee_id::text, task_preview,
status, last_heartbeat, deadline, result_preview, error_detail,
retry_count, created_at, updated_at
FROM delegations
WHERE status IN (`+placeholders+`)
ORDER BY created_at DESC
LIMIT `+limitPlaceholder, args...)
if err != nil {
log.Printf("AdminDelegations.List: query failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
defer rows.Close()
out := make([]delegationRow, 0)
for rows.Next() {
var r delegationRow
var lastBeat sql.NullTime
var resultPreview, errorDetail sql.NullString
if err := rows.Scan(
&r.DelegationID, &r.CallerID, &r.CalleeID, &r.TaskPreview,
&r.Status, &lastBeat, &r.Deadline, &resultPreview, &errorDetail,
&r.RetryCount, &r.CreatedAt, &r.UpdatedAt,
); err != nil {
log.Printf("AdminDelegations.List: scan failed: %v", err)
continue
}
if lastBeat.Valid {
t := lastBeat.Time
r.LastHeartbeat = &t
}
if resultPreview.Valid {
s := resultPreview.String
r.ResultPreview = &s
}
if errorDetail.Valid {
s := errorDetail.String
r.ErrorDetail = &s
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Printf("AdminDelegations.List: rows.Err: %v", err)
}
c.JSON(http.StatusOK, gin.H{
"delegations": out,
"count": len(out),
"status": statusKey,
"limit": limit,
})
}
// Stats handles GET /admin/delegations/stats — at-a-glance counts per
// status. Useful for the dashboard summary card at the top of the
// operator panel without paying for a row-level fetch.
//
// Returns 200 with `{"queued": N, "dispatched": N, "in_progress": N,
// "completed": N, "failed": N, "stuck": N}`.
func (h *AdminDelegationsHandler) Stats(c *gin.Context) {
rows, err := h.db.QueryContext(c.Request.Context(), `
SELECT status, COUNT(*) FROM delegations GROUP BY status
`)
if err != nil {
log.Printf("AdminDelegations.Stats: query failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
defer rows.Close()
// Initialise to zero so the response always has every known status
// key — the dashboard card doesn't need to handle "missing key vs
// zero" branching.
stats := map[string]int{
"queued": 0,
"dispatched": 0,
"in_progress": 0,
"completed": 0,
"failed": 0,
"stuck": 0,
}
for rows.Next() {
var status string
var count int
if err := rows.Scan(&status, &count); err != nil {
log.Printf("AdminDelegations.Stats: scan failed: %v", err)
continue
}
stats[status] = count
}
if err := rows.Err(); err != nil {
log.Printf("AdminDelegations.Stats: rows.Err: %v", err)
}
c.JSON(http.StatusOK, stats)
}