Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0b771d5770 |
@@ -0,0 +1,39 @@
|
||||
// Package approvals holds the single source of truth for which destructive
|
||||
// org operations require a human approval before they execute.
|
||||
//
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// The org-level platform agent is driven by end-user chat and holds an org-admin
|
||||
// token, so destructive/irreversible operations it can trigger are gated: the
|
||||
// handler creates a pending approval and returns it instead of executing, and a
|
||||
// human decides via the existing approvals subsystem. Keeping the gated-action
|
||||
// list in ONE map makes the blast-radius boundary auditable in a single place —
|
||||
// a handler not listed here behaves exactly as before.
|
||||
package approvals
|
||||
|
||||
// Action is the canonical identifier of a gated destructive operation. The same
|
||||
// string is stored in approval_requests.action so the gate can match a pending/
|
||||
// approved request to the operation being retried.
|
||||
type Action string
|
||||
|
||||
const (
|
||||
ActionDeleteWorkspace Action = "delete_workspace"
|
||||
ActionDeprovision Action = "deprovision_workspace"
|
||||
ActionSecretWrite Action = "secret_write"
|
||||
ActionOrgTokenMint Action = "org_token_mint"
|
||||
)
|
||||
|
||||
// gated is the set of actions that require a human approval. Add an entry here
|
||||
// (and gate the corresponding handler with requireApproval) to expand the
|
||||
// boundary; remove one to drop a gate. This is the only place the policy lives.
|
||||
var gated = map[Action]bool{
|
||||
ActionDeleteWorkspace: true,
|
||||
ActionDeprovision: true,
|
||||
ActionSecretWrite: true,
|
||||
ActionOrgTokenMint: true,
|
||||
}
|
||||
|
||||
// IsGated reports whether action requires a human approval before executing.
|
||||
func IsGated(action Action) bool {
|
||||
return gated[action]
|
||||
}
|
||||
@@ -0,0 +1,153 @@
|
||||
package handlers
|
||||
|
||||
// approval_gate.go — server-side gate for destructive org operations.
|
||||
// (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
//
|
||||
// requireApproval is the choke point a destructive handler calls before
|
||||
// executing. It is the trust boundary: the platform-management MCP is a CLIENT
|
||||
// of these handlers, so enforcing here (not in the MCP) means anything holding
|
||||
// an org-admin token still goes through the gate. The flow:
|
||||
//
|
||||
// - if a matching APPROVED + unconsumed approval exists, consume it (single-
|
||||
// use) and let the operation proceed;
|
||||
// - otherwise create (or reuse) a PENDING approval, broadcast it to the canvas
|
||||
// (and escalate to the parent if any), and the handler returns HTTP 202 so a
|
||||
// human can decide. The agent retries after approval and the gate passes.
|
||||
//
|
||||
// Matching is by (workspace_id, action, request_hash) where request_hash is a
|
||||
// stable digest of the operation + its context, so a retried op reuses its own
|
||||
// request instead of flooding the table, and an approval for "delete ws A"
|
||||
// cannot be replayed to "delete ws B".
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// approvalRequestHash is a stable digest of the gated operation. Go's
|
||||
// json.Marshal sorts map keys, so the same context always hashes the same.
|
||||
func approvalRequestHash(workspaceID, action string, contextMap map[string]interface{}) string {
|
||||
cj, err := json.Marshal(contextMap)
|
||||
if err != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
sum := sha256.Sum256([]byte(workspaceID + "\x00" + action + "\x00" + string(cj)))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
// requireApproval returns (approved=true, consumedID) when a matching approval
|
||||
// exists and was just consumed; otherwise it creates/reuses a pending approval
|
||||
// and returns (false, pendingID). A non-nil error is a server error.
|
||||
func requireApproval(ctx context.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) (bool, string, error) {
|
||||
hash := approvalRequestHash(workspaceID, string(action), contextMap)
|
||||
|
||||
// 1. Atomically consume an approved + unconsumed request, if one exists.
|
||||
// The conditional UPDATE ... RETURNING makes consumption race-safe: two
|
||||
// concurrent destructive calls cannot both consume the same approval.
|
||||
var consumedID string
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
UPDATE approval_requests SET consumed_at = now()
|
||||
WHERE id = (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3
|
||||
AND status = 'approved' AND consumed_at IS NULL
|
||||
ORDER BY decided_at DESC NULLS LAST
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id
|
||||
`, workspaceID, string(action), hash).Scan(&consumedID)
|
||||
if err == nil {
|
||||
return true, consumedID, nil
|
||||
}
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
return false, "", fmt.Errorf("consume approval: %w", err)
|
||||
}
|
||||
|
||||
// 2. No usable approval — create a pending one, or reuse an existing pending
|
||||
// request for the same operation so retries don't flood the table.
|
||||
cj, mErr := json.Marshal(contextMap)
|
||||
if mErr != nil || cj == nil {
|
||||
cj = []byte("{}")
|
||||
}
|
||||
var approvalID string
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
WITH existing AS (
|
||||
SELECT id FROM approval_requests
|
||||
WHERE workspace_id = $1 AND action = $2 AND request_hash = $3 AND status = 'pending'
|
||||
LIMIT 1
|
||||
), ins AS (
|
||||
INSERT INTO approval_requests (workspace_id, action, reason, context, request_hash)
|
||||
SELECT $1, $2, $4, $5::jsonb, $3
|
||||
WHERE NOT EXISTS (SELECT 1 FROM existing)
|
||||
RETURNING id
|
||||
)
|
||||
SELECT id FROM ins UNION ALL SELECT id FROM existing LIMIT 1
|
||||
`, workspaceID, string(action), hash, reason, string(cj)).Scan(&approvalID)
|
||||
if err != nil {
|
||||
return false, "", fmt.Errorf("create approval: %w", err)
|
||||
}
|
||||
|
||||
// Broadcast to the canvas (the user-facing signal). For a platform agent the
|
||||
// parent_id is NULL, so the requested-event on its own workspace IS the user
|
||||
// prompt; ordinary workspaces also escalate to their parent.
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast requested failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
var parentID *string
|
||||
if pErr := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); pErr != nil {
|
||||
log.Printf("approval_gate: parent lookup failed (ws=%s): %v", workspaceID, pErr)
|
||||
}
|
||||
if parentID != nil {
|
||||
if bErr := b.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
}); bErr != nil {
|
||||
log.Printf("approval_gate: broadcast escalated failed (ws=%s): %v", workspaceID, bErr)
|
||||
}
|
||||
}
|
||||
return false, approvalID, nil
|
||||
}
|
||||
|
||||
// gateDestructive runs requireApproval for a gated action and, when approval is
|
||||
// still pending, writes the 202 response and returns false (caller must stop).
|
||||
// Returns true when the caller may proceed (action consumed an approval).
|
||||
func gateDestructive(c *gin.Context, b *events.Broadcaster, workspaceID string, action approvals.Action, reason string, contextMap map[string]interface{}) bool {
|
||||
if !approvals.IsGated(action) {
|
||||
return true
|
||||
}
|
||||
approved, approvalID, err := requireApproval(c.Request.Context(), b, workspaceID, action, reason, contextMap)
|
||||
if err != nil {
|
||||
log.Printf("gateDestructive: %v (ws=%s action=%s)", err, workspaceID, action)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "approval gate failed"})
|
||||
return false
|
||||
}
|
||||
if !approved {
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"status": "pending_approval",
|
||||
"approval_id": approvalID,
|
||||
"action": string(action),
|
||||
"reason": reason,
|
||||
})
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// approval_gate_integration_test.go — REAL Postgres gate for requireApproval.
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_RequireApproval -v
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The whole gate is about row state across calls: a pending request is created
|
||||
// once and reused (dedup), an approval is consumed exactly once (single-use via
|
||||
// the conditional UPDATE ... RETURNING), and a different operation context hashes
|
||||
// to a different request. sqlmock returns whatever the stub says; only a real
|
||||
// Postgres proves the consume-once semantics and the partial-index lookup.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func TestIntegration_RequireApproval_GateCycle(t *testing.T) {
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
|
||||
// requireApproval + the broadcaster's structure_events write use the db.DB
|
||||
// global; point it at the integration DB and restore afterwards.
|
||||
prev := db.DB
|
||||
db.DB = conn
|
||||
t.Cleanup(func() { db.DB = prev })
|
||||
setupTestRedis(t) // broadcaster publishes to db.RDB; miniredis backs it
|
||||
|
||||
ctx := context.Background()
|
||||
b := newTestBroadcaster()
|
||||
|
||||
wsID := uuid.New().String()
|
||||
t.Cleanup(func() {
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM approval_requests WHERE workspace_id = $1`, wsID)
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, wsID)
|
||||
})
|
||||
// A root workspace (parent_id NULL) — like the platform agent, it has no
|
||||
// parent, so the gate's escalation target is the user/canvas. (This branch
|
||||
// is off main and has no kind column; the gate is kind-agnostic.)
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, status, runtime, parent_id)
|
||||
VALUES ($1, 'Org Concierge', 0, 'online', 'claude-code', NULL)`, wsID); err != nil {
|
||||
t.Fatalf("seed root workspace: %v", err)
|
||||
}
|
||||
|
||||
action := approvals.ActionDeleteWorkspace
|
||||
ctxA := map[string]interface{}{"target": "ws-A"}
|
||||
|
||||
// 1. First call → no approval yet → pending created.
|
||||
ok, id1, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 1: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 1: approved=true, want false (no approval exists yet)")
|
||||
}
|
||||
|
||||
// 2. Same operation again → must REUSE the same pending row (dedup), not flood.
|
||||
ok, id2, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 2: %v", err)
|
||||
}
|
||||
if ok || id2 != id1 {
|
||||
t.Fatalf("call 2: ok=%v id2=%s, want false and id2==id1(%s) (dedup)", ok, id2, id1)
|
||||
}
|
||||
var nPending int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT count(*) FROM approval_requests WHERE workspace_id=$1 AND status='pending'`, wsID).Scan(&nPending); err != nil {
|
||||
t.Fatalf("count pending: %v", err)
|
||||
}
|
||||
if nPending != 1 {
|
||||
t.Fatalf("pending rows = %d, want 1 (dedup must not flood)", nPending)
|
||||
}
|
||||
|
||||
// 3. A human approves it (simulating the Decide handler).
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_by='human', decided_at=now() WHERE id=$1`, id1); err != nil {
|
||||
t.Fatalf("approve: %v", err)
|
||||
}
|
||||
|
||||
// 4. Now the gate consumes the approval and lets the op proceed.
|
||||
ok, consumedID, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 4: %v", err)
|
||||
}
|
||||
if !ok || consumedID != id1 {
|
||||
t.Fatalf("call 4: ok=%v consumedID=%s, want true and id1(%s)", ok, consumedID, id1)
|
||||
}
|
||||
|
||||
// 5. Single-use: the SAME approval cannot be replayed — the next call is
|
||||
// pending again (a fresh request), not approved.
|
||||
ok, id5, err := requireApproval(ctx, b, wsID, action, "delete ws-A", ctxA)
|
||||
if err != nil {
|
||||
t.Fatalf("call 5: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 5: approved=true — a consumed approval was replayed")
|
||||
}
|
||||
if id5 == id1 {
|
||||
t.Fatal("call 5: reused the consumed request id; want a new pending request")
|
||||
}
|
||||
|
||||
// 6. Context isolation: an approval for ws-A must not authorize ws-B.
|
||||
// Approve the ws-A request, then a ws-B op must still be pending.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`UPDATE approval_requests SET status='approved', decided_at=now() WHERE id=$1`, id5); err != nil {
|
||||
t.Fatalf("approve id5: %v", err)
|
||||
}
|
||||
ok, _, err = requireApproval(ctx, b, wsID, action, "delete ws-B", map[string]interface{}{"target": "ws-B"})
|
||||
if err != nil {
|
||||
t.Fatalf("call 6: %v", err)
|
||||
}
|
||||
if ok {
|
||||
t.Fatal("call 6: ws-B proceeded on a ws-A approval — context isolation broken")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/approvals"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestGateDestructive_NonGatedPassesThrough verifies a non-gated action skips
|
||||
// the gate entirely (no DB access, no 202) so handlers whose action isn't in the
|
||||
// policy map behave exactly as before.
|
||||
func TestGateDestructive_NonGatedPassesThrough(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/x", nil)
|
||||
|
||||
proceed := gateDestructive(c, newTestBroadcaster(), "ws-1",
|
||||
approvals.Action("not_a_gated_action"), "noop", nil)
|
||||
|
||||
if !proceed {
|
||||
t.Fatalf("non-gated action must proceed, got proceed=false (status %d)", w.Code)
|
||||
}
|
||||
if w.Code != http.StatusOK { // CreateTestContext default; nothing written
|
||||
t.Errorf("non-gated action wrote a response (status %d), want none", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// TestApprovalRequestHash_StableAndContextSensitive pins the two properties the
|
||||
// gate relies on: the same operation hashes identically across calls, and a
|
||||
// different context yields a different hash (so an approval can't be replayed
|
||||
// onto a different target).
|
||||
func TestApprovalRequestHash_StableAndContextSensitive(t *testing.T) {
|
||||
a := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "A", "n": 1})
|
||||
aAgain := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"n": 1, "target": "A"})
|
||||
b := approvalRequestHash("ws", "delete_workspace", map[string]interface{}{"target": "B", "n": 1})
|
||||
if a != aAgain {
|
||||
t.Errorf("hash not stable across equal contexts: %s vs %s", a, aAgain)
|
||||
}
|
||||
if a == b {
|
||||
t.Errorf("hash not context-sensitive: target A and B collided (%s)", a)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
-- Reverse the approval-gate single-use/dedup columns.
|
||||
DROP INDEX IF EXISTS approval_requests_gate_idx;
|
||||
ALTER TABLE approval_requests
|
||||
DROP COLUMN IF EXISTS request_hash,
|
||||
DROP COLUMN IF EXISTS consumed_at;
|
||||
@@ -0,0 +1,18 @@
|
||||
-- Single-use + dedup support for the destructive-op approval gate.
|
||||
-- (RFC docs/design/rfc-platform-agent.md — Phase 4)
|
||||
--
|
||||
-- consumed_at: an approval is single-use. Once a destructive op consumes an
|
||||
-- approved request, consumed_at is stamped so the same approval can't be
|
||||
-- replayed for a second destructive call.
|
||||
-- request_hash: a stable hash of (workspace_id, action, context) so a repeated
|
||||
-- destructive attempt matches its own pending/approved request instead of
|
||||
-- flooding the table with duplicates.
|
||||
ALTER TABLE approval_requests
|
||||
ADD COLUMN IF NOT EXISTS consumed_at TIMESTAMPTZ,
|
||||
ADD COLUMN IF NOT EXISTS request_hash TEXT;
|
||||
|
||||
-- Hot path: the gate looks up an approved + unconsumed row matching
|
||||
-- (workspace_id, action, request_hash). Partial index keeps that O(log live).
|
||||
CREATE INDEX IF NOT EXISTS approval_requests_gate_idx
|
||||
ON approval_requests (workspace_id, action, request_hash)
|
||||
WHERE status = 'approved' AND consumed_at IS NULL;
|
||||
Reference in New Issue
Block a user