Compare commits

...

1 Commits

Author SHA1 Message Date
core-devops 0b771d5770 feat(workspace-server): approval-gate infrastructure for destructive ops (Phase 4)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
CI / Python Lint & Test (pull_request) Successful in 3s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 9s
CI / Detect changes (pull_request) Successful in 10s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
E2E Chat / detect-changes (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 4s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 4s
Harness Replays / detect-changes (pull_request) Successful in 7s
Check migration collisions / Migration version collision check (pull_request) Successful in 18s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 9s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
qa-review / approved (pull_request_target) Failing after 4s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 13s
gate-check-v3 / gate-check (pull_request_target) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 2s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1s
E2E Chat / E2E Chat (pull_request) Successful in 2s
security-review / approved (pull_request_target) Failing after 9s
CI / Canvas Deploy Status (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 2s
Harness Replays / Harness Replays (pull_request) Successful in 3s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 55s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 58s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m26s
CI / Platform (Go) (pull_request) Successful in 4m9s
CI / all-required (pull_request) Successful in 5s
sop-checklist / review-refire (pull_request_target) Has been skipped
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, local-postgres-e2
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 6s
sop-tier-check / tier-check (pull_request_target) Failing after 7s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Has been cancelled
qa-review / approved (pull_request_review) Has been skipped
security-review / approved (pull_request_review) Has been skipped
sop-tier-check / tier-check (pull_request_review) Failing after 11s
audit-force-merge / audit (pull_request_target) Successful in 21s
Server-side gate so destructive org operations the user-driven platform agent can
trigger require a human approval (RFC docs/design/rfc-platform-agent.md). The
platform MCP is a CLIENT of these handlers, so enforcement lives here (the trust
boundary), not in the MCP.

- migration 20260606020000_approvals_consumed: approval_requests.consumed_at
  (single-use) + request_hash (dedup) + a partial index for the gate lookup.
- internal/approvals/policy.go: the one auditable map of gated actions
  (delete_workspace / deprovision / secret_write / org_token_mint) + IsGated.
- requireApproval(): consumes a matching approved+unconsumed request (race-safe
  via conditional UPDATE ... RETURNING / FOR UPDATE SKIP LOCKED) and proceeds,
  else creates/reuses a pending request (dedup by request_hash), broadcasts it to
  the canvas and escalates to the parent if any. gateDestructive() wraps it and
  writes HTTP 202 pending for gin handlers.

Matching is (workspace_id, action, request_hash) where request_hash is a stable
digest of the op + context, so an approval for 'delete ws A' can't be replayed to
'delete ws B', and retries reuse one pending row instead of flooding.

Tests: policy + hash-stability/context-sensitivity unit tests; gateDestructive
non-gated passthrough; and a real-Postgres integration test proving the full
cycle — pending -> dedup -> approve -> consume -> single-use (no replay) ->
context isolation (sqlmock cannot prove consume-once row state).

Infrastructure only — NOT yet wired into live handlers. Wiring requires a
platform-agent caller marker so the gate fires only for concierge-initiated calls
(not operator/CP flows); that lands with Phase 3's runtime/MCP marker so existing
delete/secret flows are unchanged until then.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 10:43:25 -07:00
6 changed files with 398 additions and 0 deletions
@@ -0,0 +1,39 @@
// Package approvals holds the single source of truth for which destructive
// org operations require a human approval before they execute.
//
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
//
// The org-level platform agent is driven by end-user chat and holds an org-admin
// token, so destructive/irreversible operations it can trigger are gated: the
// handler creates a pending approval and returns it instead of executing, and a
// human decides via the existing approvals subsystem. Keeping the gated-action
// list in ONE map makes the blast-radius boundary auditable in a single place —
// a handler not listed here behaves exactly as before.
package approvals
// Action is the canonical identifier of a gated destructive operation. The same
// string is stored in approval_requests.action so the gate can match a pending/
// approved request to the operation being retried.
type Action string
const (
ActionDeleteWorkspace Action = "delete_workspace"
ActionDeprovision Action = "deprovision_workspace"
ActionSecretWrite Action = "secret_write"
ActionOrgTokenMint Action = "org_token_mint"
)
// gated is the set of actions that require a human approval. Add an entry here
// (and gate the corresponding handler with requireApproval) to expand the
// boundary; remove one to drop a gate. This is the only place the policy lives.
var gated = map[Action]bool{
ActionDeleteWorkspace: true,
ActionDeprovision: true,
ActionSecretWrite: true,
ActionOrgTokenMint: true,
}
// IsGated reports whether action requires a human approval before executing.
func IsGated(action Action) bool {
return gated[action]
}
@@ -0,0 +1,153 @@
package handlers
// approval_gate.go — server-side gate for destructive org operations.
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
//
// requireApproval is the choke point a destructive handler calls before
// executing. It is the trust boundary: the platform-management MCP is a CLIENT
// of these handlers, so enforcing here (not in the MCP) means anything holding
// an org-admin token still goes through the gate. The flow:
//
// - if a matching APPROVED + unconsumed approval exists, consume it (single-
// use) and let the operation proceed;
// - otherwise create (or reuse) a PENDING approval, broadcast it to the canvas
// (and escalate to the parent if any), and the handler returns HTTP 202 so a
// human can decide. The agent retries after approval and the gate passes.
//
// Matching is by (workspace_id, action, request_hash) where request_hash is a
// stable digest of the operation + its context, so a retried op reuses its own
// request instead of flooding the table, and an approval for "delete ws A"
// cannot be replayed to "delete ws B".
import (
"context"
"crypto/sha256"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/gin-gonic/gin"
)
// approvalRequestHash is a stable digest of the gated operation. Go's
// json.Marshal sorts map keys, so the same context always hashes the same.
func approvalRequestHash(workspaceID, action string, contextMap map[string]interface{}) string {
cj, err := json.Marshal(contextMap)
if err != nil || cj == nil {
cj = []byte("{}")
}
sum := sha256.Sum256([]byte(workspaceID + "\x00" + action + "\x00" + string(cj)))
return hex.EncodeToString(sum[:])
}
// requireApproval returns (approved=true, consumedID) when a matching approval
// exists and was just consumed; otherwise it creates/reuses a pending approval
// and returns (false, pendingID). A non-nil error is a server error.
func requireApproval(ctx context.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) (bool, string, error) {
hash := approvalRequestHash(workspaceID, string(action), contextMap)
// 1. Atomically consume an approved + unconsumed request, if one exists.
// The conditional UPDATE ... RETURNING makes consumption race-safe: two
// concurrent destructive calls cannot both consume the same approval.
var consumedID string
err := db.DB.QueryRowContext(ctx, `
UPDATE approval_requests SET consumed_at = now()
WHERE id = (
SELECT id FROM approval_requests
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3
AND status = 'approved' AND consumed_at IS NULL
ORDER BY decided_at DESC NULLS LAST
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id
`, workspaceID, string(action), hash).Scan(&consumedID)
if err == nil {
return true, consumedID, nil
}
if !errors.Is(err, sql.ErrNoRows) {
return false, "", fmt.Errorf("consume approval: %w", err)
}
// 2. No usable approval — create a pending one, or reuse an existing pending
// request for the same operation so retries don't flood the table.
cj, mErr := json.Marshal(contextMap)
if mErr != nil || cj == nil {
cj = []byte("{}")
}
var approvalID string
err = db.DB.QueryRowContext(ctx, `
WITH existing AS (
SELECT id FROM approval_requests
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 AND status = 'pending'
LIMIT 1
), ins AS (
INSERT INTO approval_requests (workspace_id, action, reason, context, request_hash)
SELECT $1, $2, $4, $5::jsonb, $3
WHERE NOT EXISTS (SELECT 1 FROM existing)
RETURNING id
)
SELECT id FROM ins UNION ALL SELECT id FROM existing LIMIT 1
`, workspaceID, string(action), hash, reason, string(cj)).Scan(&approvalID)
if err != nil {
return false, "", fmt.Errorf("create approval: %w", err)
}
// Broadcast to the canvas (the user-facing signal). For a platform agent the
// parent_id is NULL, so the requested-event on its own workspace IS the user
// prompt; ordinary workspaces also escalate to their parent.
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
"approval_id": approvalID,
"action": string(action),
"reason": reason,
}); bErr != nil {
log.Printf("approval_gate: broadcast requested failed (ws=%s): %v", workspaceID, bErr)
}
var parentID *string
if pErr := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); pErr != nil {
log.Printf("approval_gate: parent lookup failed (ws=%s): %v", workspaceID, pErr)
}
if parentID != nil {
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
"from_workspace_id": workspaceID,
"action": string(action),
"reason": reason,
}); bErr != nil {
log.Printf("approval_gate: broadcast escalated failed (ws=%s): %v", workspaceID, bErr)
}
}
return false, approvalID, nil
}
// gateDestructive runs requireApproval for a gated action and, when approval is
// still pending, writes the 202 response and returns false (caller must stop).
// Returns true when the caller may proceed (action consumed an approval).
func gateDestructive(c *gin.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) bool {
if !approvals.IsGated(action) {
return true
}
approved, approvalID, err := requireApproval(c.Request.Context(), b, workspaceID, action, reason, contextMap)
if err != nil {
log.Printf("gateDestructive: %v (ws=%s action=%s)", err, workspaceID, action)
c.JSON(http.StatusInternalServerError, gin.H{"error": "approval gate failed"})
return false
}
if !approved {
c.JSON(http.StatusAccepted, gin.H{
"status": "pending_approval",
"approval_id": approvalID,
"action": string(action),
"reason": reason,
})
return false
}
return true
}
@@ -0,0 +1,137 @@
//go:build integration
// +build integration
// approval_gate_integration_test.go — REAL Postgres gate for requireApproval.
//
// Run with:
//
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_RequireApproval -v
//
// Why this is NOT a sqlmock test
// ------------------------------
// The whole gate is about row state across calls: a pending request is created
// once and reused (dedup), an approval is consumed exactly once (single-use via
// the conditional UPDATE ... RETURNING), and a different operation context hashes
// to a different request. sqlmock returns whatever the stub says; only a real
// Postgres proves the consume-once semantics and the partial-index lookup.
package handlers
import (
"context"
"database/sql"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
func TestIntegration_RequireApproval_GateCycle(t *testing.T) {
url := requireIntegrationDBURL(t)
conn, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("open: %v", err)
}
if err := conn.Ping(); err != nil {
t.Fatalf("ping: %v", err)
}
t.Cleanup(func() { conn.Close() })
// requireApproval + the broadcaster's structure_events write use the db.DB
// global; point it at the integration DB and restore afterwards.
prev := db.DB
db.DB = conn
t.Cleanup(func() { db.DB = prev })
setupTestRedis(t) // broadcaster publishes to db.RDB; miniredis backs it
ctx := context.Background()
b := newTestBroadcaster()
wsID := uuid.New().String()
t.Cleanup(func() {
_, _ = conn.ExecContext(ctx, `DELETE FROM approval_requests WHERE workspace_id = $1`, wsID)
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, wsID)
})
// A root workspace (parent_id NULL) — like the platform agent, it has no
// parent, so the gate's escalation target is the user/canvas. (This branch
// is off main and has no kind column; the gate is kind-agnostic.)
if _, err := conn.ExecContext(ctx, `
INSERT INTO workspaces (id, name, tier, status, runtime, parent_id)
VALUES ($1, 'Org Concierge', 0, 'online', 'claude-code', NULL)`, wsID); err != nil {
t.Fatalf("seed root workspace: %v", err)
}
action := approvals.ActionDeleteWorkspace
ctxA := map[string]interface{}{"target": "ws-A"}
// 1. First call → no approval yet → pending created.
ok, id1, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
if err != nil {
t.Fatalf("call 1: %v", err)
}
if ok {
t.Fatal("call 1: approved=true, want false (no approval exists yet)")
}
// 2. Same operation again → must REUSE the same pending row (dedup), not flood.
ok, id2, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
if err != nil {
t.Fatalf("call 2: %v", err)
}
if ok || id2 != id1 {
t.Fatalf("call 2: ok=%v id2=%s, want false and id2==id1(%s) (dedup)", ok, id2, id1)
}
var nPending int
if err := conn.QueryRowContext(ctx,
`SELECT count(*) FROM approval_requests WHERE workspace_id=$1 AND status='pending'`, wsID).Scan(&nPending); err != nil {
t.Fatalf("count pending: %v", err)
}
if nPending != 1 {
t.Fatalf("pending rows = %d, want 1 (dedup must not flood)", nPending)
}
// 3. A human approves it (simulating the Decide handler).
if _, err := conn.ExecContext(ctx,
`UPDATE approval_requests SET status='approved', decided_by='human', decided_at=now() WHERE id=$1`, id1); err != nil {
t.Fatalf("approve: %v", err)
}
// 4. Now the gate consumes the approval and lets the op proceed.
ok, consumedID, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
if err != nil {
t.Fatalf("call 4: %v", err)
}
if !ok || consumedID != id1 {
t.Fatalf("call 4: ok=%v consumedID=%s, want true and id1(%s)", ok, consumedID, id1)
}
// 5. Single-use: the SAME approval cannot be replayed — the next call is
// pending again (a fresh request), not approved.
ok, id5, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
if err != nil {
t.Fatalf("call 5: %v", err)
}
if ok {
t.Fatal("call 5: approved=true — a consumed approval was replayed")
}
if id5 == id1 {
t.Fatal("call 5: reused the consumed request id; want a new pending request")
}
// 6. Context isolation: an approval for ws-A must not authorize ws-B.
// Approve the ws-A request, then a ws-B op must still be pending.
if _, err := conn.ExecContext(ctx,
`UPDATE approval_requests SET status='approved', decided_at=now() WHERE id=$1`, id5); err != nil {
t.Fatalf("approve id5: %v", err)
}
ok, _, err = requireApproval(ctx, b, wsID, action, "delete ws-B", map[string]interface{}{"target": "ws-B"})
if err != nil {
t.Fatalf("call 6: %v", err)
}
if ok {
t.Fatal("call 6: ws-B proceeded on a ws-A approval — context isolation broken")
}
}
@@ -0,0 +1,46 @@
package handlers
import (
"net/http"
"net/http/httptest"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
"github.com/gin-gonic/gin"
)
// TestGateDestructive_NonGatedPassesThrough verifies a non-gated action skips
// the gate entirely (no DB access, no 202) so handlers whose action isn't in the
// policy map behave exactly as before.
func TestGateDestructive_NonGatedPassesThrough(t *testing.T) {
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/x", nil)
proceed := gateDestructive(c, newTestBroadcaster(), "ws-1",
approvals.Action("not_a_gated_action"), "noop", nil)
if !proceed {
t.Fatalf("non-gated action must proceed, got proceed=false (status %d)", w.Code)
}
if w.Code != http.StatusOK { // CreateTestContext default; nothing written
t.Errorf("non-gated action wrote a response (status %d), want none", w.Code)
}
}
// TestApprovalRequestHash_StableAndContextSensitive pins the two properties the
// gate relies on: the same operation hashes identically across calls, and a
// different context yields a different hash (so an approval can't be replayed
// onto a different target).
func TestApprovalRequestHash_StableAndContextSensitive(t *testing.T) {
a := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "A", "n": 1})
aAgain := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"n": 1, "target": "A"})
b := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "B", "n": 1})
if a != aAgain {
t.Errorf("hash not stable across equal contexts: %s vs %s", a, aAgain)
}
if a == b {
t.Errorf("hash not context-sensitive: target A and B collided (%s)", a)
}
}
@@ -0,0 +1,5 @@
-- Reverse the approval-gate single-use/dedup columns.
DROP INDEX IF EXISTS approval_requests_gate_idx;
ALTER TABLE approval_requests
DROP COLUMN IF EXISTS request_hash,
DROP COLUMN IF EXISTS consumed_at;
@@ -0,0 +1,18 @@
-- Single-use + dedup support for the destructive-op approval gate.
-- (RFC docs/design/rfc-platform-agent.md — Phase 4)
--
-- consumed_at: an approval is single-use. Once a destructive op consumes an
-- approved request, consumed_at is stamped so the same approval can't be
-- replayed for a second destructive call.
-- request_hash: a stable hash of (workspace_id, action, context) so a repeated
-- destructive attempt matches its own pending/approved request instead of
-- flooding the table with duplicates.
ALTER TABLE approval_requests
ADD COLUMN IF NOT EXISTS consumed_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS request_hash TEXT;
-- Hot path: the gate looks up an approved + unconsumed row matching
-- (workspace_id, action, request_hash). Partial index keeps that O(log live).
CREATE INDEX IF NOT EXISTS approval_requests_gate_idx
ON approval_requests (workspace_id, action, request_hash)
WHERE status = 'approved' AND consumed_at IS NULL;