diff --git a/workspace-server/internal/handlers/admin_delegations.go b/workspace-server/internal/handlers/admin_delegations.go new file mode 100644 index 00000000..b2165397 --- /dev/null +++ b/workspace-server/internal/handlers/admin_delegations.go @@ -0,0 +1,236 @@ +package handlers + +import ( + "database/sql" + "log" + "net/http" + "strconv" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +// admin_delegations.go — RFC #2829 PR-4: operator dashboard endpoint +// over the durable delegations ledger (PR-1 schema, PR-3 sweeper). +// +// What this endpoint serves +// ------------------------- +// +// GET /admin/delegations[?status=in_flight|stuck|failed&limit=N] +// +// Returns the rows the operator needs to triage delegation health: +// - in_flight : status IN (queued, dispatched, in_progress) — the +// things actively churning right now. Default view. +// - stuck : status='stuck' — sweeper found these wedged. Operator +// can investigate the callee + decide whether to retry +// (RFC #2829 PR-5 plan). +// - failed : status='failed' — terminal failures, recent. Useful +// for spotting trends like "callee X is failing 50% of +// delegations since 14:00". +// +// Why an admin endpoint at all +// ---------------------------- +// Without this, post-incident investigation requires direct DB access — +// only the on-call SRE can answer "is workspace X delegating to a wedged +// callee?". The dashboard endpoint moves that visibility into the same +// surface as /admin/queue, /admin/schedules-health, /admin/memories etc. +// +// Out of scope (deferred to a follow-up PR per RFC #2829) +// ------------------------------------------------------- +// - "retry this stuck task" mutation: needs careful interaction with +// the agent-side cutover (PR-5) before it can be safely re-fired +// - p95 / p99 duration aggregates: separate metric exposure, not a +// row-level read +// - Canvas UI: this is the JSON contract; the canvas operator panel +// consumes it in a follow-up canvas PR + +// AdminDelegationsHandler serves the operator dashboard read endpoint. +type AdminDelegationsHandler struct { + db *sql.DB +} + +func NewAdminDelegationsHandler(handle *sql.DB) *AdminDelegationsHandler { + if handle == nil { + handle = db.DB + } + return &AdminDelegationsHandler{db: handle} +} + +// delegationRow mirrors the row shape of the `delegations` table that the +// operator dashboard cares about. Order matches the SELECT below — keep +// the two in sync if you add a column. +type delegationRow struct { + DelegationID string `json:"delegation_id"` + CallerID string `json:"caller_id"` + CalleeID string `json:"callee_id"` + TaskPreview string `json:"task_preview"` + Status string `json:"status"` + LastHeartbeat *time.Time `json:"last_heartbeat,omitempty"` + Deadline time.Time `json:"deadline"` + ResultPreview *string `json:"result_preview,omitempty"` + ErrorDetail *string `json:"error_detail,omitempty"` + RetryCount int `json:"retry_count"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// statusFilters maps the query-string `status` value to the SQL set. +// Keep tight — operators don't get to query arbitrary status — so a +// new status name added to the schema needs an explicit allowlist +// entry here. Caught when a future status name doesn't pin to a UI +// expectation (forward-defense). +var statusFilters = map[string][]string{ + "in_flight": {"queued", "dispatched", "in_progress"}, + "stuck": {"stuck"}, + "failed": {"failed"}, + "completed": {"completed"}, +} + +const defaultListLimit = 100 +const maxListLimit = 1000 + +// List handles GET /admin/delegations +// +// Query params: +// - status — one of `in_flight` (default) / `stuck` / `failed` / `completed` +// - limit — int, 1..1000 (default 100) +// +// Returns 200 with `{"delegations": [...], "count": N}`. +func (h *AdminDelegationsHandler) List(c *gin.Context) { + statusKey := c.DefaultQuery("status", "in_flight") + statuses, ok := statusFilters[statusKey] + if !ok { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "unknown status filter", + "allowed": []string{"in_flight", "stuck", "failed", "completed"}, + "requested_status": statusKey, + }) + return + } + + limit := defaultListLimit + if v := c.Query("limit"); v != "" { + n, err := strconv.Atoi(v) + if err != nil || n < 1 || n > maxListLimit { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "limit must be 1..1000", + "requested": v, + }) + return + } + limit = n + } + + // Build the IN list as a parameterized expression — never string- + // concatenate user-controlled values into the SQL. statusKey came + // from the allowlist above so the slice is fully bounded. + args := make([]any, 0, len(statuses)+1) + placeholders := "" + for i, s := range statuses { + if i > 0 { + placeholders += "," + } + args = append(args, s) + placeholders += "$" + strconv.Itoa(i+1) + } + args = append(args, limit) + limitPlaceholder := "$" + strconv.Itoa(len(statuses)+1) + + rows, err := h.db.QueryContext(c.Request.Context(), ` + SELECT delegation_id, caller_id::text, callee_id::text, task_preview, + status, last_heartbeat, deadline, result_preview, error_detail, + retry_count, created_at, updated_at + FROM delegations + WHERE status IN (`+placeholders+`) + ORDER BY created_at DESC + LIMIT `+limitPlaceholder, args...) + if err != nil { + log.Printf("AdminDelegations.List: query failed: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer rows.Close() + + out := make([]delegationRow, 0) + for rows.Next() { + var r delegationRow + var lastBeat sql.NullTime + var resultPreview, errorDetail sql.NullString + if err := rows.Scan( + &r.DelegationID, &r.CallerID, &r.CalleeID, &r.TaskPreview, + &r.Status, &lastBeat, &r.Deadline, &resultPreview, &errorDetail, + &r.RetryCount, &r.CreatedAt, &r.UpdatedAt, + ); err != nil { + log.Printf("AdminDelegations.List: scan failed: %v", err) + continue + } + if lastBeat.Valid { + t := lastBeat.Time + r.LastHeartbeat = &t + } + if resultPreview.Valid { + s := resultPreview.String + r.ResultPreview = &s + } + if errorDetail.Valid { + s := errorDetail.String + r.ErrorDetail = &s + } + out = append(out, r) + } + if err := rows.Err(); err != nil { + log.Printf("AdminDelegations.List: rows.Err: %v", err) + } + + c.JSON(http.StatusOK, gin.H{ + "delegations": out, + "count": len(out), + "status": statusKey, + "limit": limit, + }) +} + +// Stats handles GET /admin/delegations/stats — at-a-glance counts per +// status. Useful for the dashboard summary card at the top of the +// operator panel without paying for a row-level fetch. +// +// Returns 200 with `{"queued": N, "dispatched": N, "in_progress": N, +// "completed": N, "failed": N, "stuck": N}`. +func (h *AdminDelegationsHandler) Stats(c *gin.Context) { + rows, err := h.db.QueryContext(c.Request.Context(), ` + SELECT status, COUNT(*) FROM delegations GROUP BY status + `) + if err != nil { + log.Printf("AdminDelegations.Stats: query failed: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer rows.Close() + + // Initialise to zero so the response always has every known status + // key — the dashboard card doesn't need to handle "missing key vs + // zero" branching. + stats := map[string]int{ + "queued": 0, + "dispatched": 0, + "in_progress": 0, + "completed": 0, + "failed": 0, + "stuck": 0, + } + for rows.Next() { + var status string + var count int + if err := rows.Scan(&status, &count); err != nil { + log.Printf("AdminDelegations.Stats: scan failed: %v", err) + continue + } + stats[status] = count + } + if err := rows.Err(); err != nil { + log.Printf("AdminDelegations.Stats: rows.Err: %v", err) + } + + c.JSON(http.StatusOK, stats) +} diff --git a/workspace-server/internal/handlers/admin_delegations_test.go b/workspace-server/internal/handlers/admin_delegations_test.go new file mode 100644 index 00000000..8fb2cb3d --- /dev/null +++ b/workspace-server/internal/handlers/admin_delegations_test.go @@ -0,0 +1,332 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// admin_delegations_test.go — RFC #2829 PR-4 dashboard endpoint coverage. +// +// - List: status filter + limit defaults + bad-input rejection +// - Stats: per-status counts + zero-fill for missing statuses + +// ---------- List ---------- + +func TestAdminDelegations_List_DefaultStatusInFlight(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + now := time.Now() + mock.ExpectQuery(`SELECT delegation_id, caller_id::text, callee_id::text, task_preview,\s+status, last_heartbeat, deadline, result_preview, error_detail,\s+retry_count, created_at, updated_at\s+FROM delegations\s+WHERE status IN \(\$1,\$2,\$3\)\s+ORDER BY created_at DESC\s+LIMIT \$4`). + WithArgs("queued", "dispatched", "in_progress", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "delegation_id", "caller_id", "callee_id", "task_preview", + "status", "last_heartbeat", "deadline", "result_preview", "error_detail", + "retry_count", "created_at", "updated_at", + }).AddRow( + "deleg-1", "caller-uuid", "callee-uuid", "task body", + "in_progress", now, now.Add(2*time.Hour), nil, nil, + 0, now.Add(-5*time.Minute), now.Add(-1*time.Minute), + )) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var body map[string]any + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("body parse: %v", err) + } + if got := body["count"]; got != float64(1) { + t.Errorf("count: expected 1, got %v", got) + } + if got := body["status"]; got != "in_flight" { + t.Errorf("status: expected in_flight, got %v", got) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestAdminDelegations_List_StatusStuck(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + mock.ExpectQuery(`SELECT delegation_id`). + WithArgs("stuck", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "delegation_id", "caller_id", "callee_id", "task_preview", + "status", "last_heartbeat", "deadline", "result_preview", "error_detail", + "retry_count", "created_at", "updated_at", + })) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?status=stuck", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d", w.Code) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestAdminDelegations_List_StatusFailed(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + mock.ExpectQuery(`SELECT delegation_id`). + WithArgs("failed", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "delegation_id", "caller_id", "callee_id", "task_preview", + "status", "last_heartbeat", "deadline", "result_preview", "error_detail", + "retry_count", "created_at", "updated_at", + })) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?status=failed", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d", w.Code) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestAdminDelegations_List_RejectsUnknownStatus(t *testing.T) { + setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?status=garbage", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestAdminDelegations_List_RejectsNegativeLimit(t *testing.T) { + setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=-5", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d", w.Code) + } +} + +func TestAdminDelegations_List_RejectsLimitOverCap(t *testing.T) { + setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=99999", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d", w.Code) + } +} + +func TestAdminDelegations_List_AcceptsCustomLimit(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + mock.ExpectQuery(`SELECT delegation_id`). + WithArgs("queued", "dispatched", "in_progress", 25). + WillReturnRows(sqlmock.NewRows([]string{ + "delegation_id", "caller_id", "callee_id", "task_preview", + "status", "last_heartbeat", "deadline", "result_preview", "error_detail", + "retry_count", "created_at", "updated_at", + })) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?limit=25", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var body map[string]any + _ = json.Unmarshal(w.Body.Bytes(), &body) + if body["limit"] != float64(25) { + t.Errorf("expected limit=25 echo, got %v", body["limit"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestAdminDelegations_List_PopulatesNullableFields(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + now := time.Now() + resultStr := "all done" + mock.ExpectQuery(`SELECT delegation_id`). + WithArgs("completed", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "delegation_id", "caller_id", "callee_id", "task_preview", + "status", "last_heartbeat", "deadline", "result_preview", "error_detail", + "retry_count", "created_at", "updated_at", + }).AddRow( + "deleg-2", "c", "ca", "t", + "completed", now, now.Add(2*time.Hour), resultStr, nil, + 0, now, now, + )) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations?status=completed", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var body struct { + Delegations []struct { + ResultPreview *string `json:"result_preview"` + ErrorDetail *string `json:"error_detail"` + LastHeartbeat *string `json:"last_heartbeat"` + } `json:"delegations"` + } + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("parse: %v", err) + } + if len(body.Delegations) != 1 { + t.Fatalf("expected 1 row, got %d", len(body.Delegations)) + } + row := body.Delegations[0] + if row.ResultPreview == nil || *row.ResultPreview != "all done" { + t.Errorf("result_preview not populated correctly: %+v", row.ResultPreview) + } + if row.ErrorDetail != nil { + t.Errorf("error_detail should be nil for completed-no-error: %+v", row.ErrorDetail) + } + if row.LastHeartbeat == nil { + t.Errorf("last_heartbeat should be present (non-NULL); got nil") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +// ---------- Stats ---------- + +func TestAdminDelegations_Stats_ZeroFillsMissingStatuses(t *testing.T) { + // Stats response must always include every status key. If no rows + // exist for status='stuck', the response still shows "stuck": 0. + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + mock.ExpectQuery(`SELECT status, COUNT\(\*\) FROM delegations GROUP BY status`). + WillReturnRows(sqlmock.NewRows([]string{"status", "count"}). + AddRow("in_progress", 7). + AddRow("completed", 130)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations/stats", nil) + h.Stats(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var stats map[string]int + if err := json.Unmarshal(w.Body.Bytes(), &stats); err != nil { + t.Fatalf("parse: %v", err) + } + + expectedKeys := []string{"queued", "dispatched", "in_progress", "completed", "failed", "stuck"} + for _, k := range expectedKeys { + if _, ok := stats[k]; !ok { + t.Errorf("stats missing key %q (zero-fill contract broken)", k) + } + } + if stats["in_progress"] != 7 { + t.Errorf("in_progress count: expected 7, got %d", stats["in_progress"]) + } + if stats["completed"] != 130 { + t.Errorf("completed count: expected 130, got %d", stats["completed"]) + } + if stats["stuck"] != 0 { + t.Errorf("stuck must be zero-filled: got %d", stats["stuck"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet: %v", err) + } +} + +func TestAdminDelegations_Stats_EmptyTable(t *testing.T) { + mock := setupTestDB(t) + h := NewAdminDelegationsHandler(nil) + + mock.ExpectQuery(`SELECT status, COUNT\(\*\) FROM delegations GROUP BY status`). + WillReturnRows(sqlmock.NewRows([]string{"status", "count"})) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/delegations/stats", nil) + h.Stats(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + var stats map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &stats) + for k, v := range stats { + if v != 0 { + t.Errorf("empty table → all counts zero; %s=%d", k, v) + } + } +} + +// statusFilters is a contract surface — every key here is documented in +// the endpoint comment + accepted by the validator. Pin it. +func TestStatusFiltersTableShape(t *testing.T) { + expected := map[string][]string{ + "in_flight": {"queued", "dispatched", "in_progress"}, + "stuck": {"stuck"}, + "failed": {"failed"}, + "completed": {"completed"}, + } + for k, want := range expected { + got, ok := statusFilters[k] + if !ok { + t.Errorf("statusFilters missing key %q", k) + continue + } + if len(got) != len(want) { + t.Errorf("statusFilters[%q]: want %v, got %v", k, want, got) + continue + } + for i := range want { + if got[i] != want[i] { + t.Errorf("statusFilters[%q][%d]: want %q, got %q", k, i, want[i], got[i]) + } + } + } +}