feat(delegations): wire RFC #2829 sweeper + admin routes into platform server

Activates the server-side foundation that PRs #2832, #2836, #2837
shipped without wiring (each PR landed dead code on purpose so the
review surface stayed tight).

## What this PR wires up

  1. router.go — registers the RFC #2829 PR-4 admin endpoints behind
     AdminAuth:
       GET /admin/delegations[?status=...&limit=N]
       GET /admin/delegations/stats

  2. cmd/server/main.go — starts the RFC #2829 PR-3 stuck-task
     sweeper as a supervised goroutine alongside the existing
     scheduler + hibernation-monitor + image-auto-refresh:
       go supervised.RunWithRecover(ctx, "delegation-sweeper",
                                    delegSweeper.Start)

## What this PR does NOT do

  - PR-2's DELEGATION_RESULT_INBOX_PUSH flag stays default off — flip
    happens via env config in a follow-up after staging burn-in.
  - PR-5's DELEGATION_SYNC_VIA_INBOX flag stays default off — same
    reason. The two flags are independent; either can be flipped in
    isolation.
  - Canvas operator panel UI: this PR exposes the JSON contract; the
    canvas panel consumes it in a separate canvas PR.

## Coverage

2 new router gate tests in admin_delegations_route_test.go:

  - List endpoint requires AdminAuth (unauthenticated → 401)
  - Stats endpoint requires AdminAuth (unauthenticated → 401)

Pattern mirrors admin_test_token_route_test.go (the IDOR-fix gate
for PR #112). Catches a future router refactor that silently drops
AdminAuth — operator dashboard data exposes caller_id, callee_id, and
task_preview, none of which should reach unauthenticated callers.

Sweeper boots as a no-op until at least one delegation row exists,
so this PR is safe to land before PR-5's agent-side cutover sees
production traffic.

Refs RFC #2829.
This commit is contained in:
Hongming Wang 2026-05-04 22:00:59 -07:00
parent 789d705866
commit 7993693cf1
3 changed files with 96 additions and 0 deletions

View File

@ -297,6 +297,15 @@ func main() {
registry.StartHibernationMonitor(c, wh.HibernateWorkspace)
})
// RFC #2829 PR-3: stuck-task sweeper for the durable delegations
// ledger. Marks deadline-exceeded rows as failed and heartbeat-stale
// in-flight rows as stuck. Both transitions go through the ledger's
// terminal forward-only protection so concurrent UpdateStatus calls
// are not clobbered. Defaults: 5min interval, 10min stale threshold;
// override via DELEGATION_SWEEPER_INTERVAL_S / DELEGATION_STUCK_THRESHOLD_S.
delegSweeper := handlers.NewDelegationSweeper(nil, nil)
go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start)
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
channelMgr := channels.NewManager(wh, broadcaster)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)

View File

@ -0,0 +1,78 @@
package router
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/gin-gonic/gin"
)
// admin_delegations_route_test.go — pin the RFC #2829 PR-4 wiring.
//
// Both the List and Stats endpoints must:
// 1. Be registered at the documented path
// 2. Be gated by AdminAuth (caller without a valid admin token → 401)
//
// Without this gate test, a future router refactor could silently drop
// AdminAuth on these endpoints — the operator dashboard would still work
// for the operator, but unauthenticated callers could pull the in-flight
// delegation list including caller_id, callee_id, and task previews.
func buildAdminDelegationsEngine(t *testing.T) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
r := gin.New()
adH := handlers.NewAdminDelegationsHandler(db.DB)
r.GET("/admin/delegations", middleware.AdminAuth(db.DB), adH.List)
r.GET("/admin/delegations/stats", middleware.AdminAuth(db.DB), adH.Stats)
return r
}
// Both tests use the existing AdminAuth pattern: set ADMIN_TOKEN to disable
// the dev-mode fail-open branch, and have HasAnyLiveTokenGlobal return ≥1
// so AdminAuth enforces auth (rather than fail-open on fresh install).
// Without these two switches AdminAuth would return 200 + invoke the
// handler — defeating the gate test.
func TestAdminDelegationsRoute_List_RequiresAdminAuth(t *testing.T) {
t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller")
mock := setupRouterTestDB(t)
mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
r := buildAdminDelegationsEngine(t)
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/admin/delegations", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock unmet: %v", err)
}
}
func TestAdminDelegationsRoute_Stats_RequiresAdminAuth(t *testing.T) {
t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller")
mock := setupRouterTestDB(t)
mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
r := buildAdminDelegationsEngine(t)
w := httptest.NewRecorder()
req := httptest.NewRequest("GET", "/admin/delegations/stats", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock unmet: %v", err)
}
}

View File

@ -433,6 +433,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.POST("/admin/a2a-queue/drop-stale", middleware.AdminAuth(db.DB), qH.DropStale)
}
// Admin — RFC #2829 PR-4 dashboard endpoints over the durable
// `delegations` ledger (PR-1 schema). Operators triage in-flight,
// stuck, or failed delegations without direct DB access.
{
adH := handlers.NewAdminDelegationsHandler(db.DB)
r.GET("/admin/delegations", middleware.AdminAuth(db.DB), adH.List)
r.GET("/admin/delegations/stats", middleware.AdminAuth(db.DB), adH.Stats)
}
// Admin — workspace template image refresh. Pulls latest images from GHCR
// and recreates running ws-* containers so they adopt the new image.
// Final step of the runtime CD chain — see docs/workspace-runtime-package.md.