From 7993693cf134c22ea4165d7aa9c80581061dea2d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 22:00:59 -0700 Subject: [PATCH] feat(delegations): wire RFC #2829 sweeper + admin routes into platform server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Activates the server-side foundation that PRs #2832, #2836, #2837 shipped without wiring (each PR landed dead code on purpose so the review surface stayed tight). ## What this PR wires up 1. router.go — registers the RFC #2829 PR-4 admin endpoints behind AdminAuth: GET /admin/delegations[?status=...&limit=N] GET /admin/delegations/stats 2. cmd/server/main.go — starts the RFC #2829 PR-3 stuck-task sweeper as a supervised goroutine alongside the existing scheduler + hibernation-monitor + image-auto-refresh: go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start) ## What this PR does NOT do - PR-2's DELEGATION_RESULT_INBOX_PUSH flag stays default off — flip happens via env config in a follow-up after staging burn-in. - PR-5's DELEGATION_SYNC_VIA_INBOX flag stays default off — same reason. The two flags are independent; either can be flipped in isolation. - Canvas operator panel UI: this PR exposes the JSON contract; the canvas panel consumes it in a separate canvas PR. ## Coverage 2 new router gate tests in admin_delegations_route_test.go: - List endpoint requires AdminAuth (unauthenticated → 401) - Stats endpoint requires AdminAuth (unauthenticated → 401) Pattern mirrors admin_test_token_route_test.go (the IDOR-fix gate for PR #112). Catches a future router refactor that silently drops AdminAuth — operator dashboard data exposes caller_id, callee_id, and task_preview, none of which should reach unauthenticated callers. Sweeper boots as a no-op until at least one delegation row exists, so this PR is safe to land before PR-5's agent-side cutover sees production traffic. Refs RFC #2829. --- workspace-server/cmd/server/main.go | 9 +++ .../router/admin_delegations_route_test.go | 78 +++++++++++++++++++ workspace-server/internal/router/router.go | 9 +++ 3 files changed, 96 insertions(+) create mode 100644 workspace-server/internal/router/admin_delegations_route_test.go diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index e11f5a96..3961a842 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -297,6 +297,15 @@ func main() { registry.StartHibernationMonitor(c, wh.HibernateWorkspace) }) + // RFC #2829 PR-3: stuck-task sweeper for the durable delegations + // ledger. Marks deadline-exceeded rows as failed and heartbeat-stale + // in-flight rows as stuck. Both transitions go through the ledger's + // terminal forward-only protection so concurrent UpdateStatus calls + // are not clobbered. Defaults: 5min interval, 10min stale threshold; + // override via DELEGATION_SWEEPER_INTERVAL_S / DELEGATION_STUCK_THRESHOLD_S. + delegSweeper := handlers.NewDelegationSweeper(nil, nil) + go supervised.RunWithRecover(ctx, "delegation-sweeper", delegSweeper.Start) + // Channel Manager — social channel integrations (Telegram, Slack, etc.) channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) diff --git a/workspace-server/internal/router/admin_delegations_route_test.go b/workspace-server/internal/router/admin_delegations_route_test.go new file mode 100644 index 00000000..062b6967 --- /dev/null +++ b/workspace-server/internal/router/admin_delegations_route_test.go @@ -0,0 +1,78 @@ +package router + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" + "github.com/gin-gonic/gin" +) + +// admin_delegations_route_test.go — pin the RFC #2829 PR-4 wiring. +// +// Both the List and Stats endpoints must: +// 1. Be registered at the documented path +// 2. Be gated by AdminAuth (caller without a valid admin token → 401) +// +// Without this gate test, a future router refactor could silently drop +// AdminAuth on these endpoints — the operator dashboard would still work +// for the operator, but unauthenticated callers could pull the in-flight +// delegation list including caller_id, callee_id, and task previews. + +func buildAdminDelegationsEngine(t *testing.T) *gin.Engine { + t.Helper() + gin.SetMode(gin.TestMode) + r := gin.New() + adH := handlers.NewAdminDelegationsHandler(db.DB) + r.GET("/admin/delegations", middleware.AdminAuth(db.DB), adH.List) + r.GET("/admin/delegations/stats", middleware.AdminAuth(db.DB), adH.Stats) + return r +} + +// Both tests use the existing AdminAuth pattern: set ADMIN_TOKEN to disable +// the dev-mode fail-open branch, and have HasAnyLiveTokenGlobal return ≥1 +// so AdminAuth enforces auth (rather than fail-open on fresh install). +// Without these two switches AdminAuth would return 200 + invoke the +// handler — defeating the gate test. + +func TestAdminDelegationsRoute_List_RequiresAdminAuth(t *testing.T) { + t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller") + mock := setupRouterTestDB(t) + mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + r := buildAdminDelegationsEngine(t) + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/admin/delegations", nil) + r.ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock unmet: %v", err) + } +} + +func TestAdminDelegationsRoute_Stats_RequiresAdminAuth(t *testing.T) { + t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller") + mock := setupRouterTestDB(t) + mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + r := buildAdminDelegationsEngine(t) + w := httptest.NewRecorder() + req := httptest.NewRequest("GET", "/admin/delegations/stats", nil) + r.ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock unmet: %v", err) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 1afff092..59403cae 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -433,6 +433,15 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi r.POST("/admin/a2a-queue/drop-stale", middleware.AdminAuth(db.DB), qH.DropStale) } + // Admin — RFC #2829 PR-4 dashboard endpoints over the durable + // `delegations` ledger (PR-1 schema). Operators triage in-flight, + // stuck, or failed delegations without direct DB access. + { + adH := handlers.NewAdminDelegationsHandler(db.DB) + r.GET("/admin/delegations", middleware.AdminAuth(db.DB), adH.List) + r.GET("/admin/delegations/stats", middleware.AdminAuth(db.DB), adH.Stats) + } + // Admin — workspace template image refresh. Pulls latest images from GHCR // and recreates running ws-* containers so they adopt the new image. // Final step of the runtime CD chain — see docs/workspace-runtime-package.md.