From 0b771d5770ef2414f5cee85b79f3bf7d598ec76d Mon Sep 17 00:00:00 2001 From: core-devops Date: Sat, 6 Jun 2026 10:43:25 -0700 Subject: [PATCH] feat(workspace-server): approval-gate infrastructure for destructive ops (Phase 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Server-side gate so destructive org operations the user-driven platform agent can trigger require a human approval (RFC docs/design/rfc-platform-agent.md). The platform MCP is a CLIENT of these handlers, so enforcement lives here (the trust boundary), not in the MCP. - migration 20260606020000_approvals_consumed: approval_requests.consumed_at (single-use) + request_hash (dedup) + a partial index for the gate lookup. - internal/approvals/policy.go: the one auditable map of gated actions (delete_workspace / deprovision / secret_write / org_token_mint) + IsGated. - requireApproval(): consumes a matching approved+unconsumed request (race-safe via conditional UPDATE ... RETURNING / FOR UPDATE SKIP LOCKED) and proceeds, else creates/reuses a pending request (dedup by request_hash), broadcasts it to the canvas and escalates to the parent if any. gateDestructive() wraps it and writes HTTP 202 pending for gin handlers. Matching is (workspace_id, action, request_hash) where request_hash is a stable digest of the op + context, so an approval for 'delete ws A' can't be replayed to 'delete ws B', and retries reuse one pending row instead of flooding. Tests: policy + hash-stability/context-sensitivity unit tests; gateDestructive non-gated passthrough; and a real-Postgres integration test proving the full cycle — pending -> dedup -> approve -> consume -> single-use (no replay) -> context isolation (sqlmock cannot prove consume-once row state). Infrastructure only — NOT yet wired into live handlers. Wiring requires a platform-agent caller marker so the gate fires only for concierge-initiated calls (not operator/CP flows); that lands with Phase 3's runtime/MCP marker so existing delete/secret flows are unchanged until then. Co-Authored-By: Claude Opus 4.8 (1M context) --- workspace-server/internal/approvals/policy.go | 39 +++++ .../internal/handlers/approval_gate.go | 153 ++++++++++++++++++ .../approval_gate_integration_test.go | 137 ++++++++++++++++ .../internal/handlers/approval_gate_test.go | 46 ++++++ ...20260606020000_approvals_consumed.down.sql | 5 + .../20260606020000_approvals_consumed.up.sql | 18 +++ 6 files changed, 398 insertions(+) create mode 100644 workspace-server/internal/approvals/policy.go create mode 100644 workspace-server/internal/handlers/approval_gate.go create mode 100644 workspace-server/internal/handlers/approval_gate_integration_test.go create mode 100644 workspace-server/internal/handlers/approval_gate_test.go create mode 100644 workspace-server/migrations/20260606020000_approvals_consumed.down.sql create mode 100644 workspace-server/migrations/20260606020000_approvals_consumed.up.sql diff --git a/workspace-server/internal/approvals/policy.go b/workspace-server/internal/approvals/policy.go new file mode 100644 index 000000000..4fac83266 --- /dev/null +++ b/workspace-server/internal/approvals/policy.go @@ -0,0 +1,39 @@ +// Package approvals holds the single source of truth for which destructive +// org operations require a human approval before they execute. +// +// (RFC docs/design/rfc-platform-agent.md — Phase 4) +// +// The org-level platform agent is driven by end-user chat and holds an org-admin +// token, so destructive/irreversible operations it can trigger are gated: the +// handler creates a pending approval and returns it instead of executing, and a +// human decides via the existing approvals subsystem. Keeping the gated-action +// list in ONE map makes the blast-radius boundary auditable in a single place — +// a handler not listed here behaves exactly as before. +package approvals + +// Action is the canonical identifier of a gated destructive operation. The same +// string is stored in approval_requests.action so the gate can match a pending/ +// approved request to the operation being retried. +type Action string + +const ( + ActionDeleteWorkspace Action = "delete_workspace" + ActionDeprovision Action = "deprovision_workspace" + ActionSecretWrite Action = "secret_write" + ActionOrgTokenMint Action = "org_token_mint" +) + +// gated is the set of actions that require a human approval. Add an entry here +// (and gate the corresponding handler with requireApproval) to expand the +// boundary; remove one to drop a gate. This is the only place the policy lives. +var gated = map[Action]bool{ + ActionDeleteWorkspace: true, + ActionDeprovision: true, + ActionSecretWrite: true, + ActionOrgTokenMint: true, +} + +// IsGated reports whether action requires a human approval before executing. +func IsGated(action Action) bool { + return gated[action] +} diff --git a/workspace-server/internal/handlers/approval_gate.go b/workspace-server/internal/handlers/approval_gate.go new file mode 100644 index 000000000..1ed487207 --- /dev/null +++ b/workspace-server/internal/handlers/approval_gate.go @@ -0,0 +1,153 @@ +package handlers + +// approval_gate.go — server-side gate for destructive org operations. +// (RFC docs/design/rfc-platform-agent.md — Phase 4) +// +// requireApproval is the choke point a destructive handler calls before +// executing. It is the trust boundary: the platform-management MCP is a CLIENT +// of these handlers, so enforcing here (not in the MCP) means anything holding +// an org-admin token still goes through the gate. The flow: +// +// - if a matching APPROVED + unconsumed approval exists, consume it (single- +// use) and let the operation proceed; +// - otherwise create (or reuse) a PENDING approval, broadcast it to the canvas +// (and escalate to the parent if any), and the handler returns HTTP 202 so a +// human can decide. The agent retries after approval and the gate passes. +// +// Matching is by (workspace_id, action, request_hash) where request_hash is a +// stable digest of the operation + its context, so a retried op reuses its own +// request instead of flooding the table, and an approval for "delete ws A" +// cannot be replayed to "delete ws B". + +import ( + "context" + "crypto/sha256" + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events" + "github.com/gin-gonic/gin" +) + +// approvalRequestHash is a stable digest of the gated operation. Go's +// json.Marshal sorts map keys, so the same context always hashes the same. +func approvalRequestHash(workspaceID, action string, contextMap map[string]interface{}) string { + cj, err := json.Marshal(contextMap) + if err != nil || cj == nil { + cj = []byte("{}") + } + sum := sha256.Sum256([]byte(workspaceID + "\x00" + action + "\x00" + string(cj))) + return hex.EncodeToString(sum[:]) +} + +// requireApproval returns (approved=true, consumedID) when a matching approval +// exists and was just consumed; otherwise it creates/reuses a pending approval +// and returns (false, pendingID). A non-nil error is a server error. +func requireApproval(ctx context.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) (bool, string, error) { + hash := approvalRequestHash(workspaceID, string(action), contextMap) + + // 1. Atomically consume an approved + unconsumed request, if one exists. + // The conditional UPDATE ... RETURNING makes consumption race-safe: two + // concurrent destructive calls cannot both consume the same approval. + var consumedID string + err := db.DB.QueryRowContext(ctx, ` + UPDATE approval_requests SET consumed_at = now() + WHERE id = ( + SELECT id FROM approval_requests + WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 + AND status = 'approved' AND consumed_at IS NULL + ORDER BY decided_at DESC NULLS LAST + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + RETURNING id + `, workspaceID, string(action), hash).Scan(&consumedID) + if err == nil { + return true, consumedID, nil + } + if !errors.Is(err, sql.ErrNoRows) { + return false, "", fmt.Errorf("consume approval: %w", err) + } + + // 2. No usable approval — create a pending one, or reuse an existing pending + // request for the same operation so retries don't flood the table. + cj, mErr := json.Marshal(contextMap) + if mErr != nil || cj == nil { + cj = []byte("{}") + } + var approvalID string + err = db.DB.QueryRowContext(ctx, ` + WITH existing AS ( + SELECT id FROM approval_requests + WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 AND status = 'pending' + LIMIT 1 + ), ins AS ( + INSERT INTO approval_requests (workspace_id, action, reason, context, request_hash) + SELECT $1, $2, $4, $5::jsonb, $3 + WHERE NOT EXISTS (SELECT 1 FROM existing) + RETURNING id + ) + SELECT id FROM ins UNION ALL SELECT id FROM existing LIMIT 1 + `, workspaceID, string(action), hash, reason, string(cj)).Scan(&approvalID) + if err != nil { + return false, "", fmt.Errorf("create approval: %w", err) + } + + // Broadcast to the canvas (the user-facing signal). For a platform agent the + // parent_id is NULL, so the requested-event on its own workspace IS the user + // prompt; ordinary workspaces also escalate to their parent. + if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ + "approval_id": approvalID, + "action": string(action), + "reason": reason, + }); bErr != nil { + log.Printf("approval_gate: broadcast requested failed (ws=%s): %v", workspaceID, bErr) + } + var parentID *string + if pErr := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); pErr != nil { + log.Printf("approval_gate: parent lookup failed (ws=%s): %v", workspaceID, pErr) + } + if parentID != nil { + if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ + "approval_id": approvalID, + "from_workspace_id": workspaceID, + "action": string(action), + "reason": reason, + }); bErr != nil { + log.Printf("approval_gate: broadcast escalated failed (ws=%s): %v", workspaceID, bErr) + } + } + return false, approvalID, nil +} + +// gateDestructive runs requireApproval for a gated action and, when approval is +// still pending, writes the 202 response and returns false (caller must stop). +// Returns true when the caller may proceed (action consumed an approval). +func gateDestructive(c *gin.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) bool { + if !approvals.IsGated(action) { + return true + } + approved, approvalID, err := requireApproval(c.Request.Context(), b, workspaceID, action, reason, contextMap) + if err != nil { + log.Printf("gateDestructive: %v (ws=%s action=%s)", err, workspaceID, action) + c.JSON(http.StatusInternalServerError, gin.H{"error": "approval gate failed"}) + return false + } + if !approved { + c.JSON(http.StatusAccepted, gin.H{ + "status": "pending_approval", + "approval_id": approvalID, + "action": string(action), + "reason": reason, + }) + return false + } + return true +} diff --git a/workspace-server/internal/handlers/approval_gate_integration_test.go b/workspace-server/internal/handlers/approval_gate_integration_test.go new file mode 100644 index 000000000..03094e80f --- /dev/null +++ b/workspace-server/internal/handlers/approval_gate_integration_test.go @@ -0,0 +1,137 @@ +//go:build integration +// +build integration + +// approval_gate_integration_test.go — REAL Postgres gate for requireApproval. +// +// Run with: +// +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/handlers/ -run Integration_RequireApproval -v +// +// Why this is NOT a sqlmock test +// ------------------------------ +// The whole gate is about row state across calls: a pending request is created +// once and reused (dedup), an approval is consumed exactly once (single-use via +// the conditional UPDATE ... RETURNING), and a different operation context hashes +// to a different request. sqlmock returns whatever the stub says; only a real +// Postgres proves the consume-once semantics and the partial-index lookup. + +package handlers + +import ( + "context" + "database/sql" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals" + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "github.com/google/uuid" + _ "github.com/lib/pq" +) + +func TestIntegration_RequireApproval_GateCycle(t *testing.T) { + url := requireIntegrationDBURL(t) + conn, err := sql.Open("postgres", url) + if err != nil { + t.Fatalf("open: %v", err) + } + if err := conn.Ping(); err != nil { + t.Fatalf("ping: %v", err) + } + t.Cleanup(func() { conn.Close() }) + + // requireApproval + the broadcaster's structure_events write use the db.DB + // global; point it at the integration DB and restore afterwards. + prev := db.DB + db.DB = conn + t.Cleanup(func() { db.DB = prev }) + setupTestRedis(t) // broadcaster publishes to db.RDB; miniredis backs it + + ctx := context.Background() + b := newTestBroadcaster() + + wsID := uuid.New().String() + t.Cleanup(func() { + _, _ = conn.ExecContext(ctx, `DELETE FROM approval_requests WHERE workspace_id = $1`, wsID) + _, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, wsID) + }) + // A root workspace (parent_id NULL) — like the platform agent, it has no + // parent, so the gate's escalation target is the user/canvas. (This branch + // is off main and has no kind column; the gate is kind-agnostic.) + if _, err := conn.ExecContext(ctx, ` + INSERT INTO workspaces (id, name, tier, status, runtime, parent_id) + VALUES ($1, 'Org Concierge', 0, 'online', 'claude-code', NULL)`, wsID); err != nil { + t.Fatalf("seed root workspace: %v", err) + } + + action := approvals.ActionDeleteWorkspace + ctxA := map[string]interface{}{"target": "ws-A"} + + // 1. First call → no approval yet → pending created. + ok, id1, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA) + if err != nil { + t.Fatalf("call 1: %v", err) + } + if ok { + t.Fatal("call 1: approved=true, want false (no approval exists yet)") + } + + // 2. Same operation again → must REUSE the same pending row (dedup), not flood. + ok, id2, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA) + if err != nil { + t.Fatalf("call 2: %v", err) + } + if ok || id2 != id1 { + t.Fatalf("call 2: ok=%v id2=%s, want false and id2==id1(%s) (dedup)", ok, id2, id1) + } + var nPending int + if err := conn.QueryRowContext(ctx, + `SELECT count(*) FROM approval_requests WHERE workspace_id=$1 AND status='pending'`, wsID).Scan(&nPending); err != nil { + t.Fatalf("count pending: %v", err) + } + if nPending != 1 { + t.Fatalf("pending rows = %d, want 1 (dedup must not flood)", nPending) + } + + // 3. A human approves it (simulating the Decide handler). + if _, err := conn.ExecContext(ctx, + `UPDATE approval_requests SET status='approved', decided_by='human', decided_at=now() WHERE id=$1`, id1); err != nil { + t.Fatalf("approve: %v", err) + } + + // 4. Now the gate consumes the approval and lets the op proceed. + ok, consumedID, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA) + if err != nil { + t.Fatalf("call 4: %v", err) + } + if !ok || consumedID != id1 { + t.Fatalf("call 4: ok=%v consumedID=%s, want true and id1(%s)", ok, consumedID, id1) + } + + // 5. Single-use: the SAME approval cannot be replayed — the next call is + // pending again (a fresh request), not approved. + ok, id5, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA) + if err != nil { + t.Fatalf("call 5: %v", err) + } + if ok { + t.Fatal("call 5: approved=true — a consumed approval was replayed") + } + if id5 == id1 { + t.Fatal("call 5: reused the consumed request id; want a new pending request") + } + + // 6. Context isolation: an approval for ws-A must not authorize ws-B. + // Approve the ws-A request, then a ws-B op must still be pending. + if _, err := conn.ExecContext(ctx, + `UPDATE approval_requests SET status='approved', decided_at=now() WHERE id=$1`, id5); err != nil { + t.Fatalf("approve id5: %v", err) + } + ok, _, err = requireApproval(ctx, b, wsID, action, "delete ws-B", map[string]interface{}{"target": "ws-B"}) + if err != nil { + t.Fatalf("call 6: %v", err) + } + if ok { + t.Fatal("call 6: ws-B proceeded on a ws-A approval — context isolation broken") + } +} diff --git a/workspace-server/internal/handlers/approval_gate_test.go b/workspace-server/internal/handlers/approval_gate_test.go new file mode 100644 index 000000000..a636c6fbb --- /dev/null +++ b/workspace-server/internal/handlers/approval_gate_test.go @@ -0,0 +1,46 @@ +package handlers + +import ( + "net/http" + "net/http/httptest" + "testing" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals" + "github.com/gin-gonic/gin" +) + +// TestGateDestructive_NonGatedPassesThrough verifies a non-gated action skips +// the gate entirely (no DB access, no 202) so handlers whose action isn't in the +// policy map behave exactly as before. +func TestGateDestructive_NonGatedPassesThrough(t *testing.T) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/x", nil) + + proceed := gateDestructive(c, newTestBroadcaster(), "ws-1", + approvals.Action("not_a_gated_action"), "noop", nil) + + if !proceed { + t.Fatalf("non-gated action must proceed, got proceed=false (status %d)", w.Code) + } + if w.Code != http.StatusOK { // CreateTestContext default; nothing written + t.Errorf("non-gated action wrote a response (status %d), want none", w.Code) + } +} + +// TestApprovalRequestHash_StableAndContextSensitive pins the two properties the +// gate relies on: the same operation hashes identically across calls, and a +// different context yields a different hash (so an approval can't be replayed +// onto a different target). +func TestApprovalRequestHash_StableAndContextSensitive(t *testing.T) { + a := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "A", "n": 1}) + aAgain := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"n": 1, "target": "A"}) + b := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "B", "n": 1}) + if a != aAgain { + t.Errorf("hash not stable across equal contexts: %s vs %s", a, aAgain) + } + if a == b { + t.Errorf("hash not context-sensitive: target A and B collided (%s)", a) + } +} diff --git a/workspace-server/migrations/20260606020000_approvals_consumed.down.sql b/workspace-server/migrations/20260606020000_approvals_consumed.down.sql new file mode 100644 index 000000000..5d76d99f7 --- /dev/null +++ b/workspace-server/migrations/20260606020000_approvals_consumed.down.sql @@ -0,0 +1,5 @@ +-- Reverse the approval-gate single-use/dedup columns. +DROP INDEX IF EXISTS approval_requests_gate_idx; +ALTER TABLE approval_requests + DROP COLUMN IF EXISTS request_hash, + DROP COLUMN IF EXISTS consumed_at; diff --git a/workspace-server/migrations/20260606020000_approvals_consumed.up.sql b/workspace-server/migrations/20260606020000_approvals_consumed.up.sql new file mode 100644 index 000000000..2b2dbae6d --- /dev/null +++ b/workspace-server/migrations/20260606020000_approvals_consumed.up.sql @@ -0,0 +1,18 @@ +-- Single-use + dedup support for the destructive-op approval gate. +-- (RFC docs/design/rfc-platform-agent.md — Phase 4) +-- +-- consumed_at: an approval is single-use. Once a destructive op consumes an +-- approved request, consumed_at is stamped so the same approval can't be +-- replayed for a second destructive call. +-- request_hash: a stable hash of (workspace_id, action, context) so a repeated +-- destructive attempt matches its own pending/approved request instead of +-- flooding the table with duplicates. +ALTER TABLE approval_requests + ADD COLUMN IF NOT EXISTS consumed_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS request_hash TEXT; + +-- Hot path: the gate looks up an approved + unconsumed row matching +-- (workspace_id, action, request_hash). Partial index keeps that O(log live). +CREATE INDEX IF NOT EXISTS approval_requests_gate_idx + ON approval_requests (workspace_id, action, request_hash) + WHERE status = 'approved' AND consumed_at IS NULL; -- 2.52.0