feat(delegations): operator dashboard endpoint over the durable ledger (RFC #2829 PR-4)

Two read endpoints over the `delegations` table (PR-1 schema):

  GET /admin/delegations[?status=in_flight|stuck|failed|completed&limit=N]
  GET /admin/delegations/stats

## What this gives operators

Without this, post-incident investigation requires direct DB access —
only the on-call SRE can answer "is workspace X delegating to a wedged
callee?". This moves that visibility into the same surface as
/admin/queue, /admin/schedules-health, /admin/memories.

## List endpoint

Status filter via tight allowlist:

  - in_flight (default) → status IN (queued, dispatched, in_progress)
  - stuck               → status='stuck' (rows the PR-3 sweeper marked)
  - failed              → status='failed'
  - completed           → status='completed'

Unknown status → 400 with the allowlist in the error body. Limit
1..1000, default 100.

The status allowlist drives a parameterized IN clause (no string-
concatenation of user-controlled values into SQL).

Result rows expose all the audit-grade fields the dashboard needs:
delegation_id, caller_id, callee_id, task_preview, status,
last_heartbeat, deadline, result_preview, error_detail, retry_count,
created_at, updated_at. Nullable fields use pointer types so JSON
omits them when NULL (no false-zero "" for missing values).

## Stats endpoint

Zero-fills every known status key (queued, dispatched, in_progress,
completed, failed, stuck) so the dashboard summary card doesn't have
to handle "missing key vs zero" branching.

## Out of scope (deferred)

  - "retry this stuck task" mutation: needs the agent-side cutover
    (RFC #2829 PR-5 plan) before re-fire is safe
  - p95 / p99 duration aggregates: separate metric exposure, not a
    row-level read endpoint
  - Canvas UI: this is the JSON contract; the canvas operator panel
    consumes it in a follow-up canvas PR

## Wiring

NOT wired into the router in this PR — ships separately to keep
PR-by-PR review surface tight. Wiring will land in the
`enable-rfc2829-server-side` follow-up PR alongside the sweeper Start
call and the result-push flag flip.

## Coverage

11 unit tests:

List (8):
  - default status=in_flight, IN(queued,dispatched,in_progress)
  - status=stuck → IN(stuck)
  - status=failed → IN(failed)
  - unknown status → 400 with allowlist
  - negative limit → 400
  - over-cap limit → 400
  - custom limit accepted + echoed in response
  - nullable fields populated correctly (pointer-omitempty)

Stats (2):
  - zero-fills missing status keys
  - empty table → all counts zero

Contract pin (1):
  - statusFilters table shape — every documented key + value pair
    pinned. Drift catches accidental edits (forward defense).

Refs RFC #2829.
This commit is contained in:
Hongming Wang 2026-05-04 20:58:17 -07:00
parent b3b9a242d6
commit 2ed4f4fb41
2 changed files with 568 additions and 0 deletions

View File

@ -0,0 +1,236 @@
package handlers
import (
"database/sql"
"log"
"net/http"
"strconv"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// admin_delegations.go — RFC #2829 PR-4: operator dashboard endpoint
// over the durable delegations ledger (PR-1 schema, PR-3 sweeper).
//
// What this endpoint serves
// -------------------------
//
// GET /admin/delegations[?status=in_flight|stuck|failed&limit=N]
//
// Returns the rows the operator needs to triage delegation health:
// - in_flight : status IN (queued, dispatched, in_progress) — the
// things actively churning right now. Default view.
// - stuck : status='stuck' — sweeper found these wedged. Operator
// can investigate the callee + decide whether to retry
// (RFC #2829 PR-5 plan).
// - failed : status='failed' — terminal failures, recent. Useful
// for spotting trends like "callee X is failing 50% of
// delegations since 14:00".
//
// Why an admin endpoint at all
// ----------------------------
// Without this, post-incident investigation requires direct DB access —
// only the on-call SRE can answer "is workspace X delegating to a wedged
// callee?". The dashboard endpoint moves that visibility into the same
// surface as /admin/queue, /admin/schedules-health, /admin/memories etc.
//
// Out of scope (deferred to a follow-up PR per RFC #2829)
// -------------------------------------------------------
// - "retry this stuck task" mutation: needs careful interaction with
// the agent-side cutover (PR-5) before it can be safely re-fired
// - p95 / p99 duration aggregates: separate metric exposure, not a
// row-level read
// - Canvas UI: this is the JSON contract; the canvas operator panel
// consumes it in a follow-up canvas PR
// AdminDelegationsHandler serves the operator dashboard read endpoint.
type AdminDelegationsHandler struct {
db *sql.DB
}
func NewAdminDelegationsHandler(handle *sql.DB) *AdminDelegationsHandler {
if handle == nil {
handle = db.DB
}
return &AdminDelegationsHandler{db: handle}
}
// delegationRow mirrors the row shape of the `delegations` table that the
// operator dashboard cares about. Order matches the SELECT below — keep
// the two in sync if you add a column.
type delegationRow struct {
DelegationID string `json:"delegation_id"`
CallerID string `json:"caller_id"`
CalleeID string `json:"callee_id"`
TaskPreview string `json:"task_preview"`
Status string `json:"status"`
LastHeartbeat *time.Time `json:"last_heartbeat,omitempty"`
Deadline time.Time `json:"deadline"`
ResultPreview *string `json:"result_preview,omitempty"`
ErrorDetail *string `json:"error_detail,omitempty"`
RetryCount int `json:"retry_count"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// statusFilters maps the query-string `status` value to the SQL set.
// Keep tight — operators don't get to query arbitrary status — so a
// new status name added to the schema needs an explicit allowlist
// entry here. Caught when a future status name doesn't pin to a UI
// expectation (forward-defense).
var statusFilters = map[string][]string{
"in_flight": {"queued", "dispatched", "in_progress"},
"stuck": {"stuck"},
"failed": {"failed"},
"completed": {"completed"},
}
const defaultListLimit = 100
const maxListLimit = 1000
// List handles GET /admin/delegations
//
// Query params:
// - status — one of `in_flight` (default) / `stuck` / `failed` / `completed`
// - limit — int, 1..1000 (default 100)
//
// Returns 200 with `{"delegations": [...], "count": N}`.
func (h *AdminDelegationsHandler) List(c *gin.Context) {
statusKey := c.DefaultQuery("status", "in_flight")
statuses, ok := statusFilters[statusKey]
if !ok {
c.JSON(http.StatusBadRequest, gin.H{
"error": "unknown status filter",
"allowed": []string{"in_flight", "stuck", "failed", "completed"},
"requested_status": statusKey,
})
return
}
limit := defaultListLimit
if v := c.Query("limit"); v != "" {
n, err := strconv.Atoi(v)
if err != nil || n < 1 || n > maxListLimit {
c.JSON(http.StatusBadRequest, gin.H{
"error": "limit must be 1..1000",
"requested": v,
})
return
}
limit = n
}
// Build the IN list as a parameterized expression — never string-
// concatenate user-controlled values into the SQL. statusKey came
// from the allowlist above so the slice is fully bounded.
args := make([]any, 0, len(statuses)+1)
placeholders := ""
for i, s := range statuses {
if i > 0 {
placeholders += ","
}
args = append(args, s)
placeholders += "$" + strconv.Itoa(i+1)
}
args = append(args, limit)
limitPlaceholder := "$" + strconv.Itoa(len(statuses)+1)
rows, err := h.db.QueryContext(c.Request.Context(), `
SELECT delegation_id, caller_id::text, callee_id::text, task_preview,
status, last_heartbeat, deadline, result_preview, error_detail,
retry_count, created_at, updated_at
FROM delegations
WHERE status IN (`+placeholders+`)
ORDER BY created_at DESC
LIMIT `+limitPlaceholder, args...)
if err != nil {
log.Printf("AdminDelegations.List: query failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
defer rows.Close()
out := make([]delegationRow, 0)
for rows.Next() {
var r delegationRow
var lastBeat sql.NullTime
var resultPreview, errorDetail sql.NullString
if err := rows.Scan(
&r.DelegationID, &r.CallerID, &r.CalleeID, &r.TaskPreview,
&r.Status, &lastBeat, &r.Deadline, &resultPreview, &errorDetail,
&r.RetryCount, &r.CreatedAt, &r.UpdatedAt,
); err != nil {
log.Printf("AdminDelegations.List: scan failed: %v", err)
continue
}
if lastBeat.Valid {
t := lastBeat.Time
r.LastHeartbeat = &t
}
if resultPreview.Valid {
s := resultPreview.String
r.ResultPreview = &s
}
if errorDetail.Valid {
s := errorDetail.String
r.ErrorDetail = &s
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Printf("AdminDelegations.List: rows.Err: %v", err)
}
c.JSON(http.StatusOK, gin.H{
"delegations": out,
"count": len(out),
"status": statusKey,
"limit": limit,
})
}
// Stats handles GET /admin/delegations/stats — at-a-glance counts per
// status. Useful for the dashboard summary card at the top of the
// operator panel without paying for a row-level fetch.
//
// Returns 200 with `{"queued": N, "dispatched": N, "in_progress": N,
// "completed": N, "failed": N, "stuck": N}`.
func (h *AdminDelegationsHandler) Stats(c *gin.Context) {
rows, err := h.db.QueryContext(c.Request.Context(), `
SELECT status, COUNT(*) FROM delegations GROUP BY status
`)
if err != nil {
log.Printf("AdminDelegations.Stats: query failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
defer rows.Close()
// Initialise to zero so the response always has every known status
// key — the dashboard card doesn't need to handle "missing key vs
// zero" branching.
stats := map[string]int{
"queued": 0,
"dispatched": 0,
"in_progress": 0,
"completed": 0,
"failed": 0,
"stuck": 0,
}
for rows.Next() {
var status string
var count int
if err := rows.Scan(&status, &count); err != nil {
log.Printf("AdminDelegations.Stats: scan failed: %v", err)
continue
}
stats[status] = count
}
if err := rows.Err(); err != nil {
log.Printf("AdminDelegations.Stats: rows.Err: %v", err)
}
c.JSON(http.StatusOK, stats)
}

View File

@ -0,0 +1,332 @@
package handlers
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// admin_delegations_test.go — RFC #2829 PR-4 dashboard endpoint coverage.
//
// - List: status filter + limit defaults + bad-input rejection
// - Stats: per-status counts + zero-fill for missing statuses
// ---------- List ----------
func TestAdminDelegations_List_DefaultStatusInFlight(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
now := time.Now()
mock.ExpectQuery(`SELECT delegation_id, caller_id::text, callee_id::text, task_preview,\s+status, last_heartbeat, deadline, result_preview, error_detail,\s+retry_count, created_at, updated_at\s+FROM delegations\s+WHERE status IN \(\$1,\$2,\$3\)\s+ORDER BY created_at DESC\s+LIMIT \$4`).
WithArgs("queued", "dispatched", "in_progress", 100).
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "last_heartbeat", "deadline", "result_preview", "error_detail",
"retry_count", "created_at", "updated_at",
}).AddRow(
"deleg-1", "caller-uuid", "callee-uuid", "task body",
"in_progress", now, now.Add(2*time.Hour), nil, nil,
0, now.Add(-5*time.Minute), now.Add(-1*time.Minute),
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations", nil)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var body map[string]any
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("body parse: %v", err)
}
if got := body["count"]; got != float64(1) {
t.Errorf("count: expected 1, got %v", got)
}
if got := body["status"]; got != "in_flight" {
t.Errorf("status: expected in_flight, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestAdminDelegations_List_StatusStuck(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
mock.ExpectQuery(`SELECT delegation_id`).
WithArgs("stuck", 100).
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "last_heartbeat", "deadline", "result_preview", "error_detail",
"retry_count", "created_at", "updated_at",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?status=stuck", nil)
h.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestAdminDelegations_List_StatusFailed(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
mock.ExpectQuery(`SELECT delegation_id`).
WithArgs("failed", 100).
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "last_heartbeat", "deadline", "result_preview", "error_detail",
"retry_count", "created_at", "updated_at",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?status=failed", nil)
h.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestAdminDelegations_List_RejectsUnknownStatus(t *testing.T) {
setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?status=garbage", nil)
h.List(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestAdminDelegations_List_RejectsNegativeLimit(t *testing.T) {
setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=-5", nil)
h.List(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
func TestAdminDelegations_List_RejectsLimitOverCap(t *testing.T) {
setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=99999", nil)
h.List(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
func TestAdminDelegations_List_AcceptsCustomLimit(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
mock.ExpectQuery(`SELECT delegation_id`).
WithArgs("queued", "dispatched", "in_progress", 25).
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "last_heartbeat", "deadline", "result_preview", "error_detail",
"retry_count", "created_at", "updated_at",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=25", nil)
h.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var body map[string]any
_ = json.Unmarshal(w.Body.Bytes(), &body)
if body["limit"] != float64(25) {
t.Errorf("expected limit=25 echo, got %v", body["limit"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestAdminDelegations_List_PopulatesNullableFields(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
now := time.Now()
resultStr := "all done"
mock.ExpectQuery(`SELECT delegation_id`).
WithArgs("completed", 100).
WillReturnRows(sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "last_heartbeat", "deadline", "result_preview", "error_detail",
"retry_count", "created_at", "updated_at",
}).AddRow(
"deleg-2", "c", "ca", "t",
"completed", now, now.Add(2*time.Hour), resultStr, nil,
0, now, now,
))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations?status=completed", nil)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var body struct {
Delegations []struct {
ResultPreview *string `json:"result_preview"`
ErrorDetail *string `json:"error_detail"`
LastHeartbeat *string `json:"last_heartbeat"`
} `json:"delegations"`
}
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("parse: %v", err)
}
if len(body.Delegations) != 1 {
t.Fatalf("expected 1 row, got %d", len(body.Delegations))
}
row := body.Delegations[0]
if row.ResultPreview == nil || *row.ResultPreview != "all done" {
t.Errorf("result_preview not populated correctly: %+v", row.ResultPreview)
}
if row.ErrorDetail != nil {
t.Errorf("error_detail should be nil for completed-no-error: %+v", row.ErrorDetail)
}
if row.LastHeartbeat == nil {
t.Errorf("last_heartbeat should be present (non-NULL); got nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ---------- Stats ----------
func TestAdminDelegations_Stats_ZeroFillsMissingStatuses(t *testing.T) {
// Stats response must always include every status key. If no rows
// exist for status='stuck', the response still shows "stuck": 0.
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
mock.ExpectQuery(`SELECT status, COUNT\(\*\) FROM delegations GROUP BY status`).
WillReturnRows(sqlmock.NewRows([]string{"status", "count"}).
AddRow("in_progress", 7).
AddRow("completed", 130))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations/stats", nil)
h.Stats(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var stats map[string]int
if err := json.Unmarshal(w.Body.Bytes(), &stats); err != nil {
t.Fatalf("parse: %v", err)
}
expectedKeys := []string{"queued", "dispatched", "in_progress", "completed", "failed", "stuck"}
for _, k := range expectedKeys {
if _, ok := stats[k]; !ok {
t.Errorf("stats missing key %q (zero-fill contract broken)", k)
}
}
if stats["in_progress"] != 7 {
t.Errorf("in_progress count: expected 7, got %d", stats["in_progress"])
}
if stats["completed"] != 130 {
t.Errorf("completed count: expected 130, got %d", stats["completed"])
}
if stats["stuck"] != 0 {
t.Errorf("stuck must be zero-filled: got %d", stats["stuck"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestAdminDelegations_Stats_EmptyTable(t *testing.T) {
mock := setupTestDB(t)
h := NewAdminDelegationsHandler(nil)
mock.ExpectQuery(`SELECT status, COUNT\(\*\) FROM delegations GROUP BY status`).
WillReturnRows(sqlmock.NewRows([]string{"status", "count"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/delegations/stats", nil)
h.Stats(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var stats map[string]int
_ = json.Unmarshal(w.Body.Bytes(), &stats)
for k, v := range stats {
if v != 0 {
t.Errorf("empty table → all counts zero; %s=%d", k, v)
}
}
}
// statusFilters is a contract surface — every key here is documented in
// the endpoint comment + accepted by the validator. Pin it.
func TestStatusFiltersTableShape(t *testing.T) {
expected := map[string][]string{
"in_flight": {"queued", "dispatched", "in_progress"},
"stuck": {"stuck"},
"failed": {"failed"},
"completed": {"completed"},
}
for k, want := range expected {
got, ok := statusFilters[k]
if !ok {
t.Errorf("statusFilters missing key %q", k)
continue
}
if len(got) != len(want) {
t.Errorf("statusFilters[%q]: want %v, got %v", k, want, got)
continue
}
for i := range want {
if got[i] != want[i] {
t.Errorf("statusFilters[%q][%d]: want %q, got %q", k, i, want[i], got[i])
}
}
}
}