diff --git a/canvas/src/components/A2ATopologyOverlay.tsx b/canvas/src/components/A2ATopologyOverlay.tsx index 58f2d976..53920d4a 100644 --- a/canvas/src/components/A2ATopologyOverlay.tsx +++ b/canvas/src/components/A2ATopologyOverlay.tsx @@ -1,9 +1,10 @@ 'use client'; -import { useEffect, useMemo, useCallback } from "react"; +import { useEffect, useMemo, useCallback, useRef } from "react"; import { type Edge, MarkerType } from "@xyflow/react"; import { api } from "@/lib/api"; import { useCanvasStore } from "@/store/canvas"; +import { useSocketEvent } from "@/hooks/useSocketEvent"; import type { ActivityEntry } from "@/types/activity"; // ── Constants ───────────────────────────────────────────────────────────────── @@ -11,9 +12,6 @@ import type { ActivityEntry } from "@/types/activity"; /** 60-minute look-back window for delegation activity */ export const A2A_WINDOW_MS = 60 * 60 * 1000; -/** Polling interval — refresh edges every 60 seconds */ -export const A2A_POLL_MS = 60 * 1_000; - /** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */ export const A2A_HOT_MS = 5 * 60 * 1_000; @@ -131,6 +129,20 @@ export function buildA2AEdges( * `a2aEdges`. Canvas.tsx merges these with topology edges and passes the * combined list to ReactFlow. * + * Update shape (issue #61 Stage 2, replaces the 60s polling loop): + * - On mount (when showA2AEdges): one HTTP fan-out per visible workspace + * (delegation rows, 60-min window). Bootstraps the local row buffer. + * - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent. + * Each delegation event from a visible workspace is appended to the + * buffer; edges are re-derived via the existing buildA2AEdges helper. + * - showA2AEdges toggle off: clears edges + buffer. + * - Visible-ID-set change: re-bootstraps so a freshly-shown workspace + * backfills its 60-min history (existing visibleIdsKey selector + * behaviour preserved — that's the 2026-05-04 render-loop fix). + * + * No interval poll. The singleton ReconnectingSocket already owns + * reconnect / backoff / health-check; useSocketEvent inherits those. + * * Mount this inside CanvasInner (no ReactFlow hook dependency). */ export function A2ATopologyOverlay() { @@ -157,7 +169,9 @@ export function A2ATopologyOverlay() { // the symptom of this re-render storm. // // The fix is purely the dependency-stability change here; the fetch - // logic is unchanged. + // logic is unchanged. Post-#61 the polling-driven fetch is gone, but + // the visibleIdsKey gate is still required so a peer-discovery write + // doesn't trigger a wasteful re-bootstrap. const visibleIdsKey = useCanvasStore((s) => s.nodes .filter((n) => !n.hidden) @@ -171,16 +185,42 @@ export function A2ATopologyOverlay() { [visibleIdsKey] ); - // Fetch delegation activity for all visible workspaces and rebuild overlay edges. - const fetchAndUpdate = useCallback(async () => { + // Local rolling buffer of delegation rows. Pruned by A2A_WINDOW_MS on + // each rebuild so a long-lived session doesn't accumulate unbounded + // history. The buffer's high-water mark is approximately: + // visibleIds.length × bootstrap-fetch-limit (500) + WS arrivals + // Real-world ceiling: ~3000 entries at the 60-min boundary, all of + // which buildA2AEdges aggregates into at most N² edges. + const bufferRef = useRef([]); + // visibleIdsRef gives the WS handler the latest visible-ID set without + // re-subscribing on every render. The bus listener is registered + // exactly once per mount; subscriber-side filtering reads from this ref. + const visibleIdsRef = useRef(visibleIds); + visibleIdsRef.current = visibleIds; + + // Re-derive overlay edges from the current buffer + push to store. + // Prunes by A2A_WINDOW_MS first so memory stays bounded across long + // sessions and the aggregation cost stays O(window-size). + const recomputeAndPush = useCallback(() => { + const cutoff = Date.now() - A2A_WINDOW_MS; + bufferRef.current = bufferRef.current.filter( + (r) => new Date(r.created_at).getTime() > cutoff + ); + setA2AEdges(buildA2AEdges(bufferRef.current)); + }, [setA2AEdges]); + + // Bootstrap fan-out — one HTTP per visible workspace. Replaces the + // 60s polling loop entirely. Race-aware: any WS arrivals that landed + // in the buffer DURING the fetch (between the await and resume) are + // preserved by id-dedup-with-fetched-first ordering. + const bootstrap = useCallback(async () => { if (visibleIds.length === 0) { + bufferRef.current = []; setA2AEdges([]); return; } try { - // Fan-out — one request per visible workspace. - // Per-request failures are swallowed so one broken workspace doesn't blank the overlay. - const allRows = ( + const fetchedRows = ( await Promise.all( visibleIds.map((id) => api @@ -192,24 +232,76 @@ export function A2ATopologyOverlay() { ) ).flat(); - setA2AEdges(buildA2AEdges(allRows)); + // Merge: fetched rows first, then any in-flight WS arrivals that + // accumulated during the await. Dedup by id so rows that appear + // in both paths are not double-counted in the aggregation. + const merged = [...fetchedRows, ...bufferRef.current]; + const seen = new Set(); + bufferRef.current = merged.filter((r) => { + if (seen.has(r.id)) return false; + seen.add(r.id); + return true; + }); + recomputeAndPush(); } catch { // Overlay failure is non-critical — canvas remains functional } - }, [visibleIds, setA2AEdges]); + }, [visibleIds, setA2AEdges, recomputeAndPush]); useEffect(() => { if (!showA2AEdges) { - // Clear edges immediately when toggled off + // Clear edges + buffer immediately when toggled off + bufferRef.current = []; setA2AEdges([]); return; } + void bootstrap(); + }, [showA2AEdges, bootstrap, setA2AEdges]); - // Initial fetch, then poll every 60 s - void fetchAndUpdate(); - const timer = setInterval(() => void fetchAndUpdate(), A2A_POLL_MS); - return () => clearInterval(timer); - }, [showA2AEdges, fetchAndUpdate, setA2AEdges]); + // Live-update path. Filters server-side ACTIVITY_LOGGED events down + // to delegation initiations from visible workspaces and appends each + // into the rolling buffer, re-deriving edges via buildA2AEdges. + // + // Only `method === "delegate"` rows count — the same filter + // buildA2AEdges applies — so delegate_result rows arriving over the + // wire don't double-count. + useSocketEvent((msg) => { + if (!showA2AEdges) return; + if (msg.event !== "ACTIVITY_LOGGED") return; + + const p = (msg.payload || {}) as Record; + if (p.activity_type !== "delegation") return; + if (p.method !== "delegate") return; + + const wsId = msg.workspace_id; + if (!visibleIdsRef.current.includes(wsId)) return; + + // Synthesise an ActivityEntry from the WS payload so buildA2AEdges + // (which the bootstrap path also feeds) handles it identically. + const entry: ActivityEntry = { + id: + (p.id as string) || + `ws-push-${msg.timestamp || Date.now()}-${wsId}`, + workspace_id: wsId, + activity_type: "delegation", + source_id: (p.source_id as string | null) ?? null, + target_id: (p.target_id as string | null) ?? null, + method: "delegate", + summary: (p.summary as string | null) ?? null, + request_body: null, + response_body: null, + duration_ms: (p.duration_ms as number | null) ?? null, + status: (p.status as string) || "ok", + error_detail: null, + created_at: + (p.created_at as string) || + msg.timestamp || + new Date().toISOString(), + }; + + bufferRef.current = [...bufferRef.current, entry]; + recomputeAndPush(); + }); // Pure side-effect — renders nothing return null; diff --git a/canvas/src/components/CommunicationOverlay.tsx b/canvas/src/components/CommunicationOverlay.tsx index 10d105db..dfe0625e 100644 --- a/canvas/src/components/CommunicationOverlay.tsx +++ b/canvas/src/components/CommunicationOverlay.tsx @@ -3,6 +3,7 @@ import { useState, useEffect, useCallback, useRef } from "react"; import { useCanvasStore } from "@/store/canvas"; import { api } from "@/lib/api"; +import { useSocketEvent } from "@/hooks/useSocketEvent"; import { COMM_TYPE_LABELS } from "@/lib/design-tokens"; interface Communication { @@ -18,32 +19,71 @@ interface Communication { durationMs: number | null; } +/** Workspace-server `ACTIVITY_LOGGED` payload shape. Pulled out so the + * WS handler below has a typed view of the same fields the HTTP + * bootstrap consumes — drift between the two paths is a class of bug + * AgentCommsPanel hit historically. */ +interface ActivityLoggedPayload { + id?: string; + activity_type?: string; + source_id?: string | null; + target_id?: string | null; + workspace_id?: string; + summary?: string | null; + status?: string; + duration_ms?: number | null; + created_at?: string; +} + +/** Fan-out cap for the bootstrap HTTP fetch on mount / on visibility + * re-open. Kept at 3 (carried over from the 2026-05-04 fix) so a + * freshly-mounted overlay on a 15-workspace tenant only spends 3 + * round-trips bootstrapping. Live updates after that arrive via the + * WS subscription below — no polling, no fan-out to maintain. */ +const BOOTSTRAP_FAN_OUT_CAP = 3; + +/** Cap on the rendered list. Bootstrap + every WS push prepends, the + * list is sliced to this size after each update. Mirrors the prior + * polling-loop behaviour. */ +const COMMS_RENDER_CAP = 20; + /** * Overlay showing recent A2A communications between workspaces. - * Renders as a floating log panel that auto-updates. + * + * Update shape (issue #61 Stage 1, replaces the 30s polling loop): + * - On mount (when visible): one HTTP bootstrap per online workspace, + * capped at BOOTSTRAP_FAN_OUT_CAP. Yields the initial recent-comms + * window without waiting for live events. + * - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent. + * Each event with a matching activity_type from a visible online + * workspace gets synthesised into a Communication and prepended. + * - Visibility re-open: re-bootstraps so the user sees the freshest + * window even if WS was idle while collapsed. + * + * No interval poll. The singleton ReconnectingSocket in `store/socket.ts` + * already owns reconnect/backoff/health-check, and `useSocketEvent` + * inherits those guarantees. If WS is genuinely unhealthy, the overlay + * shows the bootstrap snapshot until the next visibility re-open or + * the next WS reconnect (which fires its own rehydrate burst). */ export function CommunicationOverlay() { const [comms, setComms] = useState([]); const [visible, setVisible] = useState(true); const selectedNodeId = useCanvasStore((s) => s.selectedNodeId); const nodes = useCanvasStore((s) => s.nodes); + // nodesRef gives the WS handler current node-name resolution without + // re-subscribing on every node-list change. The bus listener is + // registered exactly once per mount; subscriber-side filtering reads + // the latest value via this ref. const nodesRef = useRef(nodes); nodesRef.current = nodes; - const fetchComms = useCallback(async () => { + const bootstrapComms = useCallback(async () => { try { - // Fan-out cap: each polled workspace = 1 round-trip. The platform - // rate limits at 600 req/min/IP; combined with heartbeats + other - // canvas polling, every workspace polled here costs ~6 req/min - // (1 every 30s × 1 per workspace). Capping at 3 keeps this - // overlay's footprint at 18 req/min worst case — well under - // budget even with 8+ workspaces visible. Caught 2026-05-04 when - // a user with 8+ workspaces (Design Director + 6 sub-agents + - // 3 standalones) saw sustained 429s in canvas console. const onlineNodes = nodesRef.current.filter((n) => n.data.status === "online"); const allComms: Communication[] = []; - for (const node of onlineNodes.slice(0, 3)) { + for (const node of onlineNodes.slice(0, BOOTSTRAP_FAN_OUT_CAP)) { try { const activities = await api.get n.id === (a.source_id || a.workspace_id)); - const targetNode = nodes.find((n) => n.id === (a.target_id || "")); + const sourceNode = nodesRef.current.find((n) => n.id === (a.source_id || a.workspace_id)); + const targetNode = nodesRef.current.find((n) => n.id === (a.target_id || "")); allComms.push({ id: a.id, sourceId: a.source_id || a.workspace_id, @@ -76,11 +116,12 @@ export function CommunicationOverlay() { } } } catch { - // Skip workspaces that fail + // Per-workspace failures must not blank the panel — the same + // robustness the polling version had. } } - // Sort by timestamp, newest first, dedupe + // Newest-first with id-dedup, capped at COMMS_RENDER_CAP. const seen = new Set(); const sorted = allComms .sort((a, b) => b.timestamp.localeCompare(a.timestamp)) @@ -89,29 +130,78 @@ export function CommunicationOverlay() { seen.add(c.id); return true; }) - .slice(0, 20); + .slice(0, COMMS_RENDER_CAP); setComms(sorted); } catch { - // Silently handle API errors + // Bootstrap failure is non-blocking — the WS subscription below + // will populate the panel as live events arrive. } }, []); + // Bootstrap once on mount + every time the user re-opens after a + // collapse. Closed-panel state intentionally drops live updates so + // the panel doesn't churn invisible state — the next open reloads. useEffect(() => { - // Gate polling on visibility — when the user collapses the overlay - // the data isn't being read, so the per-workspace fan-out becomes - // pure rate-limit overhead. Pre-fix this overlay polled regardless - // of whether the panel was shown, costing ~36 req/min from a - // hidden surface. if (!visible) return; - fetchComms(); - // 30s cadence (was 10s). At 3-workspace fan-out that's 6 req/min - // worst case from this overlay. Combined with heartbeats (~30/min) - // and other canvas polling, leaves ample headroom under the 600/ - // min/IP server-side rate limit even at 8+ workspace tenants. - const interval = setInterval(fetchComms, 30000); - return () => clearInterval(interval); - }, [fetchComms, visible]); + bootstrapComms(); + }, [bootstrapComms, visible]); + + // Live-update path. Filters server-side ACTIVITY_LOGGED events down + // to the comm-overlay-relevant subset and prepends each into the + // rendered list with the same dedup the bootstrap path uses. + // + // Scope guard: ignore events for workspaces not in the visible online + // set, so a user collapsing one workspace doesn't see its comms + // continue to scroll in. Same shape the bootstrap path applies. + useSocketEvent((msg) => { + if (!visible) return; + if (msg.event !== "ACTIVITY_LOGGED") return; + + const p = (msg.payload || {}) as ActivityLoggedPayload; + const type = p.activity_type; + if (type !== "a2a_send" && type !== "a2a_receive" && type !== "task_update") return; + + const wsId = msg.workspace_id; + const onlineSet = new Set( + nodesRef.current.filter((n) => n.data.status === "online").map((n) => n.id), + ); + if (!onlineSet.has(wsId)) return; + + const sourceId = p.source_id || wsId; + const targetId = p.target_id || ""; + const sourceNode = nodesRef.current.find((n) => n.id === sourceId); + const targetNode = nodesRef.current.find((n) => n.id === targetId); + + const incoming: Communication = { + id: p.id || `${msg.timestamp || Date.now()}:${sourceId}:${targetId}`, + sourceId, + targetId, + sourceName: sourceNode?.data.name || "Unknown", + targetName: targetNode?.data.name || "Unknown", + type: type as Communication["type"], + summary: p.summary || "", + status: p.status || "ok", + timestamp: p.created_at || msg.timestamp || new Date().toISOString(), + durationMs: p.duration_ms ?? null, + }; + + setComms((prev) => { + // Prepend, dedup by id, re-cap. Functional setState is necessary + // because two ACTIVITY_LOGGED events arriving in the same React + // batch would otherwise read a stale `comms` from the closure. + const seen = new Set(); + const merged = [incoming, ...prev] + .sort((a, b) => b.timestamp.localeCompare(a.timestamp)) + .filter((c) => { + if (seen.has(c.id)) return false; + seen.add(c.id); + return true; + }) + .slice(0, COMMS_RENDER_CAP); + return merged; + }); + }); if (!visible || comms.length === 0) { return ( diff --git a/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx b/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx index 6cdd19a7..ab470e18 100644 --- a/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx +++ b/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx @@ -41,6 +41,10 @@ vi.mock("@/store/canvas", () => ({ // ── Imports (after mocks) ───────────────────────────────────────────────────── import { api } from "@/lib/api"; +import { + emitSocketEvent, + _resetSocketEventListenersForTests, +} from "@/store/socket-events"; import { buildA2AEdges, formatA2ARelativeTime, @@ -342,6 +346,151 @@ describe("A2ATopologyOverlay component", () => { expect(mockGet.mock.calls.length).toBe(callsAfterMount); }); + // ── #61 Stage 2: ACTIVITY_LOGGED subscription tests ──────────────────────── + // + // Pin the post-#61 behaviour: WS push for delegation contributes to + // the overlay's edge buffer with NO additional HTTP fetch. Same shape + // as Stage 1 (CommunicationOverlay). + + describe("#61 stage 2 — ACTIVITY_LOGGED subscription", () => { + beforeEach(() => { + _resetSocketEventListenersForTests(); + }); + afterEach(() => { + _resetSocketEventListenersForTests(); + }); + + function emitDelegation(overrides: { + workspaceId?: string; + sourceId?: string; + targetId?: string; + method?: string; + activityType?: string; + } = {}) { + // Use Date.now() (real time, fake-timer-frozen) rather than the + // hardcoded NOW constant — buildA2AEdges prunes by Date.now() - + // A2A_WINDOW_MS, so a row dated against the wrong epoch silently + // falls outside the window and the test fails for a confusing + // reason ("edges array empty" vs "filter dropped my row"). + const realNow = Date.now(); + emitSocketEvent({ + event: "ACTIVITY_LOGGED", + workspace_id: overrides.workspaceId ?? "ws-a", + timestamp: new Date(realNow).toISOString(), + payload: { + id: `act-${Math.random().toString(36).slice(2)}`, + activity_type: overrides.activityType ?? "delegation", + method: overrides.method ?? "delegate", + source_id: overrides.sourceId ?? "ws-a", + target_id: overrides.targetId ?? "ws-b", + status: "ok", + created_at: new Date(realNow - 30_000).toISOString(), + }, + }); + } + + it("does NOT poll on a 60s interval after bootstrap (post-#61)", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + const callsAfterBootstrap = mockGet.mock.calls.length; + expect(callsAfterBootstrap).toBe(2); // ws-a + ws-b + + // Pre-#61: a 60s clock tick would fire a fresh fan-out (2 more + // calls). Post-#61: no interval, no extra calls. + await act(async () => { + vi.advanceTimersByTime(120_000); + }); + expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap); + }); + + it("WS push for a delegation event from a visible workspace updates edges with NO HTTP call", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); await Promise.resolve(); }); + mockGet.mockClear(); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ sourceId: "ws-a", targetId: "ws-b" }); + }); + + // Edges-set called with at least one a2a edge for the new push. + const calls = mockStoreState.setA2AEdges.mock.calls; + expect(calls.length).toBeGreaterThanOrEqual(1); + const lastCall = calls[calls.length - 1][0] as Array<{ id: string }>; + expect(lastCall.some((e) => e.id === "a2a-ws-a-ws-b")).toBe(true); + + // Critical: no HTTP fetch fired during the WS path. + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for a non-delegation activity_type is ignored", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ activityType: "a2a_send" }); + }); + + // setA2AEdges must not be called by the WS handler — the only + // setA2AEdges calls in this test came from the initial bootstrap. + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push for a delegate_result row is ignored (mirrors buildA2AEdges filter)", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ method: "delegate_result" }); + }); + + // delegate_result rows do not contribute to the edge count — they + // are completion signals, not initiations. + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push from a hidden workspace is ignored", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ workspaceId: "ws-hidden" }); + }); + + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push while showA2AEdges is false is ignored", async () => { + mockStoreState.showA2AEdges = false; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + // The mount path with showA2AEdges=false calls setA2AEdges([]) + // once — clear that to isolate the WS path. + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation(); + }); + + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + expect(mockGet).not.toHaveBeenCalled(); + }); + }); + it("re-fetches when the visible ID set actually changes", async () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any mockGet.mockResolvedValue([] as any); diff --git a/canvas/src/components/__tests__/CommunicationOverlay.test.tsx b/canvas/src/components/__tests__/CommunicationOverlay.test.tsx index 3bed0076..c15249a2 100644 --- a/canvas/src/components/__tests__/CommunicationOverlay.test.tsx +++ b/canvas/src/components/__tests__/CommunicationOverlay.test.tsx @@ -1,18 +1,28 @@ // @vitest-environment jsdom /** - * CommunicationOverlay tests — pin the rate-limit fix shipped 2026-05-04. + * CommunicationOverlay tests — pin both the 2026-05-04 fan-out cap fix + * AND the 2026-05-07 polling → ACTIVITY_LOGGED-subscriber refactor + * (issue #61 stage 1). * - * The overlay polls /workspaces/:id/activity?limit=5 for each online - * workspace. Pre-fix it (a) polled regardless of visibility and (b) - * fanned out to 6 workspaces every 10s. With 8+ workspaces a user - * triggered sustained 429s (server-side rate limit is 600 req/min/IP). + * The overlay used to poll /workspaces/:id/activity?limit=5 on a 30s + * interval per online workspace (capped at 3). Post-#61: it bootstraps + * once on mount via the same HTTP path (cap of 3 retained), then + * subscribes to ACTIVITY_LOGGED via the global socket bus for live + * updates. No interval poll. * * These tests pin: - * 1. Fan-out cap of 3 — even with 6 online nodes, only 3 fetches - * 2. Visibility gate — when collapsed, no polling + * 1. Bootstrap fan-out cap of 3 — even with 6 online nodes, only 3 + * HTTP fetches on mount. + * 2. Visibility gate — when collapsed, no HTTP fetches; re-open + * re-bootstraps. + * 3. NO interval polling — advancing the clock past 30s does not fire + * additional HTTP calls. + * 4. WS push extends the rendered list without firing any HTTP call. + * 5. WS push for an offline workspace is ignored. + * 6. WS push for a non-comm activity_type is ignored. * - * If a future refactor pushes either dial back up, CI fails before - * the regression hits a paying tenant. + * If a future refactor regresses any of these, CI fails before the + * regression hits a paying tenant. */ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { render, cleanup, act, fireEvent } from "@testing-library/react"; @@ -23,7 +33,7 @@ vi.mock("@/lib/api", () => ({ api: { get: vi.fn() }, })); -// Six online nodes — enough to verify the cap of 3. +// Six online nodes — enough to verify the bootstrap cap of 3. const mockStoreState = { selectedNodeId: null as string | null, nodes: [ @@ -56,6 +66,10 @@ vi.mock("@/lib/design-tokens", () => ({ // ── Imports (after mocks) ───────────────────────────────────────────────────── import { api } from "@/lib/api"; +import { + emitSocketEvent, + _resetSocketEventListenersForTests, +} from "@/store/socket-events"; import { CommunicationOverlay } from "../CommunicationOverlay"; const mockGet = vi.mocked(api.get); @@ -66,30 +80,34 @@ beforeEach(() => { vi.useFakeTimers(); mockGet.mockReset(); mockGet.mockResolvedValue([]); + // Drop any subscribers the previous test left on the singleton bus — + // each render adds one via useSocketEvent. + _resetSocketEventListenersForTests(); }); afterEach(() => { cleanup(); vi.useRealTimers(); + _resetSocketEventListenersForTests(); }); // ── Tests ───────────────────────────────────────────────────────────────────── -describe("CommunicationOverlay — fan-out cap", () => { - it("polls at most 3 of 6 online workspaces (rate-limit floor)", async () => { +describe("CommunicationOverlay — bootstrap fan-out cap", () => { + it("bootstraps at most 3 of 6 online workspaces (rate-limit floor preserved post-#61)", async () => { await act(async () => { render(); }); - // Mount fires the first poll synchronously (no interval tick yet). - // Pre-fix: 6 calls. Post-fix: 3. + // Mount fires the bootstrap synchronously — pre-#61 this was the + // first poll cycle; post-#61 it's the only HTTP fetch (live updates + // arrive via WS push). 6 nodes → 3 fetches. expect(mockGet).toHaveBeenCalledTimes(3); - // Verify the calls are for the FIRST 3 online nodes (slice order). expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-1/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-2/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-3/activity?limit=5"); }); - it("never polls offline workspaces", async () => { + it("never bootstraps offline workspaces", async () => { await act(async () => { render(); }); @@ -99,40 +117,39 @@ describe("CommunicationOverlay — fan-out cap", () => { }); }); -describe("CommunicationOverlay — cadence", () => { - it("uses 30s interval cadence (was 10s pre-fix)", async () => { +describe("CommunicationOverlay — no interval polling (post-#61)", () => { + // The pre-#61 implementation re-fetched every 30s per workspace. + // Post-#61 the only HTTP path is the bootstrap on mount + on + // visibility-toggle. This test pins the absence of any interval + // poll: a 60s clock advance must not produce a second round of + // fetches. + it("does NOT poll on a 30s interval after bootstrap", async () => { await act(async () => { render(); }); - expect(mockGet).toHaveBeenCalledTimes(3); // initial mount poll + expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap + mockGet.mockClear(); - // Advance 10s — pre-fix this would fire another poll. Post-fix: silent. + // Advance 60s — well past any plausible cadence the prior version + // could have used. await act(async () => { - vi.advanceTimersByTime(10_000); + vi.advanceTimersByTime(60_000); }); - expect(mockGet).toHaveBeenCalledTimes(3); - - // Advance to 30s — interval fires. - await act(async () => { - vi.advanceTimersByTime(20_000); - }); - expect(mockGet).toHaveBeenCalledTimes(6); // +3 from second tick + expect(mockGet).not.toHaveBeenCalled(); }); }); describe("CommunicationOverlay — visibility gate", () => { - // The visibility gate is the dial that drops collapsed-panel polling - // to ZERO. The cadence test above can't catch its removal — if a - // refactor dropped `if (!visible) return`, the cadence test would - // still pass because the effect would still fire every 30s. + // The visibility gate now does two things post-#61: + // - while closed, the WS handler short-circuits (no setComms churn) + // - re-opening triggers a fresh bootstrap so the list reflects + // anything that happened while the panel was collapsed // // Direct probe: render with comms-returning mock so the panel // actually renders (close button only exists in the expanded panel, // not the collapsed button-state). Click close, advance the clock, // assert no further fetches. - it("stops polling after the user collapses the panel", async () => { - // Mock returns one a2a_send so comms.length > 0 → panel renders → - // close button accessible. + it("stops fetching while collapsed and re-bootstraps on re-open", async () => { mockGet.mockResolvedValue([ { id: "act-1", @@ -150,29 +167,202 @@ describe("CommunicationOverlay — visibility gate", () => { const { getByLabelText } = await act(async () => { return render(); }); - // Drain pending microtasks (resolves the await in fetchComms) so - // setComms lands and the panel renders. Don't advance time — that - // would fire the next interval tick and pollute the assertion. + // Drain pending microtasks (resolves the await in bootstrap) so + // setComms lands and the panel renders. Don't advance time — it's + // not load-bearing for the gate test, but matches the pattern used + // pre-#61 for stability. await act(async () => { await Promise.resolve(); await Promise.resolve(); await Promise.resolve(); }); - // Initial mount polled 3 workspaces. - expect(mockGet).toHaveBeenCalledTimes(3); + expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap mockGet.mockClear(); - // Click the close button. Synchronous getByLabelText avoids - // findBy's internal setTimeout (deadlocks under useFakeTimers). + // Click close. While closed, no fetches and no WS-driven updates. + const closeBtn = getByLabelText("Close communications panel"); + await act(async () => { + fireEvent.click(closeBtn); + }); + await act(async () => { + vi.advanceTimersByTime(60_000); + }); + expect(mockGet).not.toHaveBeenCalled(); + + // Re-open via the collapsed button. Must trigger a fresh bootstrap. + const openBtn = getByLabelText("Show communications panel"); + await act(async () => { + fireEvent.click(openBtn); + }); + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + }); + expect(mockGet).toHaveBeenCalledTimes(3); // re-bootstrap on re-open + }); +}); + +describe("CommunicationOverlay — WS subscription (#61 stage 1 core)", () => { + // The load-bearing post-#61 behaviour. Every test in this block must + // verify (a) the WS push DID update the rendered comms list, and + // (b) NO additional HTTP call was fired — the whole point of the + // refactor is to remove the polling-driven HTTP traffic. + function emitActivityLogged(overrides: Partial<{ + workspaceId: string; + payload: Record; + }> = {}) { + emitSocketEvent({ + event: "ACTIVITY_LOGGED", + workspace_id: overrides.workspaceId ?? "ws-1", + timestamp: new Date().toISOString(), + payload: { + id: `act-${Math.random().toString(36).slice(2)}`, + activity_type: "a2a_send", + source_id: "ws-1", + target_id: "ws-2", + summary: "live push", + status: "ok", + duration_ms: 42, + created_at: new Date().toISOString(), + ...overrides.payload, + }, + }); + } + + it("WS push for a comm activity_type extends the rendered list with NO additional HTTP call", async () => { + const { container } = await act(async () => { + return render(); + }); + expect(mockGet).toHaveBeenCalledTimes(3); // bootstrap + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ payload: { summary: "hello" } }); + }); + await act(async () => { + await Promise.resolve(); + }); + + // Two pins: + // 1. comms list reflects the live push (look for the summary text) + // 2. zero HTTP fetches fired during the WS path + expect(container.textContent).toContain("hello"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for an offline workspace is ignored", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ + workspaceId: "ws-offline", + payload: { source_id: "ws-offline", summary: "should-not-render" }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for a non-comm activity_type is ignored (e.g. delegation)", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ + payload: { + activity_type: "delegation", + summary: "should-not-render-delegation", + }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render-delegation"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push while the panel is collapsed is ignored (no churn on hidden state)", async () => { + // Bootstrap with one comm so the panel renders → close button + // accessible. Then collapse, emit a WS push, re-open: the rendered + // list must come from the re-bootstrap, NOT from the WS-push that + // arrived during the closed state. Also: nothing visible while + // closed (the collapsed button shows only the count, not summaries). + mockGet.mockResolvedValue([ + { + id: "act-bootstrap", + workspace_id: "ws-1", + activity_type: "a2a_send", + source_id: "ws-1", + target_id: "ws-2", + summary: "bootstrap-summary", + status: "ok", + duration_ms: 1, + created_at: new Date().toISOString(), + }, + ]); + const { getByLabelText, container } = await act(async () => { + return render(); + }); + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + }); + + // Collapse. const closeBtn = getByLabelText("Close communications panel"); await act(async () => { fireEvent.click(closeBtn); }); - // Advance well past the 30s cadence — gate should suppress the tick. + // Bootstrap mock returns nothing on the re-open path so we can + // distinguish "WS push leaked through the gate" from "re-bootstrap + // refilled the list." + mockGet.mockReset(); + mockGet.mockResolvedValue([]); + await act(async () => { - vi.advanceTimersByTime(60_000); + emitActivityLogged({ + payload: { summary: "leaked-while-closed" }, + }); }); + await act(async () => { + await Promise.resolve(); + }); + + // Closed state: rendered DOM must not show any push-derived text. + expect(container.textContent).not.toContain("leaked-while-closed"); + }); + + it("non-ACTIVITY_LOGGED events are ignored (e.g. WORKSPACE_OFFLINE)", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitSocketEvent({ + event: "WORKSPACE_OFFLINE", + workspace_id: "ws-1", + timestamp: new Date().toISOString(), + payload: { summary: "should-not-render-event" }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render-event"); expect(mockGet).not.toHaveBeenCalled(); }); }); diff --git a/docs/engineering/ratelimit-observability.md b/docs/engineering/ratelimit-observability.md new file mode 100644 index 00000000..9e886137 --- /dev/null +++ b/docs/engineering/ratelimit-observability.md @@ -0,0 +1,147 @@ +# Rate-limit observability runbook + +> Companion to issue #64 ("RATE_LIMIT default re-tune analysis"). After +> #60 deployed the per-tenant `keyFor` keying, the right RATE_LIMIT +> default became data-dependent. This runbook documents the metrics + +> queries an operator should run to confirm whether the current 600 +> req/min/key default is correct, too tight, or too loose. + +## What's already exposed + +The workspace-server's existing Prometheus middleware +(`workspace-server/internal/metrics/metrics.go`) tracks every request +on every path: + +``` +molecule_http_requests_total{method, path, status} counter +molecule_http_request_duration_seconds_total{method,path,status} counter +``` + +Path is the matched route pattern (`/workspaces/:id/activity` etc), so +high-cardinality workspace UUIDs do not explode the label space. + +The rate limiter middleware (#60, `workspace-server/internal/middleware/ratelimit.go`) +also stamps every response with `X-RateLimit-Limit`, `X-RateLimit-Remaining`, +and `X-RateLimit-Reset`. Operators with browser-side or proxy-side +header capture can read per-request bucket state directly. + +No new instrumentation is needed for #64's acceptance criteria. The +metric surface is sufficient — this runbook just collects the queries. + +## Queries to run after #60 deploys + +### 1. Is the bucket actually firing 429s? + +```promql +sum(rate(molecule_http_requests_total{status="429"}[5m])) +``` + +If this is zero on a given tenant, the bucket isn't being hit. If it's +sustained > 1/min, dig in. + +### 2. Which routes attract 429s? + +```promql +topk( + 10, + sum by (path) ( + rate(molecule_http_requests_total{status="429"}[5m]) + ) +) +``` + +Expected shape post-#60: +- `/workspaces/:id/activity` should be near zero — the canvas no longer + polls it on a 30s/60s/5s cadence (PRs #69 / #71 / #76). +- Probe / health / heartbeat paths should be ~0 (those routes have a + separate IP-fallback bucket). + +If `/workspaces/:id/activity` 429s persist post-PRs-69/71/76 deploy, the +canvas isn't running the WS-subscriber path — investigate WS health +on that tenant. + +### 3. Per-bucket-key inference (no direct exposure today) + +The bucket map itself is in-memory only; we deliberately do **not** +expose `org:` ↔ remaining-tokens because that map can include +SHA-256 hashes of bearer tokens. A tenant that wants per-key visibility +should rely on response headers (`X-RateLimit-Remaining` on every +response from a given session is the bucket's view of that session). + +If you genuinely need server-side per-bucket counts for triage, +file a follow-up — the proper shape is a `/internal/ratelimit-stats` +endpoint that emits **counts per key prefix only** (e.g. `org:`, `tok:`, +`ip:`), never the key payloads. Don't roll that ad-hoc; it's a security +review surface. + +## Decision tree for the re-tune + +After 14 days of production traffic on a tenant, look at the queries +above and walk this tree: + +``` +Q1: Is the 429 rate sustained > 0.1/sec on any tenant? + ├─ NO → The 600 default has comfortable headroom. Either keep it, + │ or lower it carefully (300) ONLY if you have a documented + │ reason (e.g. a misbehaving client we want to throttle harder). + │ Default to "no change" — see #64 for the math. + └─ YES → Q2. + +Q2: Is the 429 rate concentrated on ONE tenant or spread across many? + ├─ ONE tenant → Operator override: set RATE_LIMIT=1200 or 1800 on that + │ tenant's box. Document in the tenant's ops note. The + │ default does not need to change. + └─ MANY tenants → Q3. + +Q3: Are the 429s on a route that polls (e.g. /activity / /peers)? + ├─ YES → Confirm PRs #69, #71, #76 have actually deployed to those + │ tenants. If they have and 429s persist, the canvas may have + │ a regression — do not raise RATE_LIMIT. File a canvas issue. + └─ NO → 429s on mutating routes mean genuine load. Raise the default + to 1200 in `workspace-server/internal/router/router.go:54`. + Same PR should attach: the metric chart, the time window, + and a paragraph explaining what changed in our traffic shape. +``` + +## Alert rule template (drop-in for Prometheus) + +```yaml +# Sustained 429s — file is the SLO trip-wire. If this fires, walk the +# decision tree above. NB: the issue#64 acceptance criterion is "two +# weeks of metrics"; this alert is the inverse — it tells you something +# changed before the two weeks are up. +groups: + - name: workspace-server-ratelimit + rules: + - alert: WorkspaceServerRateLimit429Sustained + expr: | + sum by (instance) ( + rate(molecule_http_requests_total{status="429"}[10m]) + ) > 0.1 + for: 30m + labels: + severity: warning + owner: workspace-server + annotations: + summary: "{{ $labels.instance }} sustained 429s — see ratelimit-observability runbook" + runbook: "https://git.moleculesai.app/molecule-ai/molecule-core/blob/main/docs/engineering/ratelimit-observability.md" +``` + +Threshold rationale: 0.1 req/s = 6/min sustained over 10min. Below +that, a 429 is almost certainly a transient burst that the canvas's +retry-once handler at `canvas/src/lib/api.ts:55` already absorbs. The +30m `for:` keeps the alert from chattering on a brief blip. + +## Companion probe script + +For one-off triage when an operator can reproduce the problem in their +own browser, `scripts/edge-429-probe.sh` (#62) reproduces a canvas- +sized burst against a tenant subdomain and dumps each 429's response +shape so the operator can distinguish workspace-server bucket overflow +from CF/Vercel edge rate-limiting without dashboard access. + +```sh +./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt +``` + +The script's report header explains how to read the output. diff --git a/scripts/edge-429-probe.sh b/scripts/edge-429-probe.sh new file mode 100755 index 00000000..a7db80c2 --- /dev/null +++ b/scripts/edge-429-probe.sh @@ -0,0 +1,155 @@ +#!/usr/bin/env bash +# edge-429-probe.sh — capture 429 origin (workspace-server vs CF/Vercel edge) +# during a simulated canvas-burst against a tenant subdomain. +# +# Issue molecule-core#62. The post-#60 verification step asks an +# operator with CF/Vercel dashboard access to confirm whether the +# layout-chunk 429s observed in DevTools were: +# (a) workspace-server bucket overflow (closes once #60 deploys), or +# (b) actual edge-layer rate-limiting (CF or Vercel). +# +# This script doesn't need dashboard access. It reproduces the burst +# pattern locally and dumps every 429's response shape so the operator +# can distinguish (a) from (b) by inspection: workspace-server emits a +# JSON body, CF emits HTML, Vercel emits a different HTML. Headers tell +# the same story (cf-ray vs x-vercel-*). +# +# Usage: +# ./scripts/edge-429-probe.sh [--burst N] [--waves N] [--pause SECS] [--out file] +# +# Example: +# ./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt +# +# The script is read-only against the target — it only issues GETs to +# public-by-design endpoints. No mutating requests, no credential use. + +set -euo pipefail + +# ── Help / usage handling first, before positional capture ──────────────────── +case "${1:-}" in + -h|--help|"") + sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//' + exit 0 + ;; +esac + +HOST="$1"; shift +BURST=80 +WAVES=3 +WAVE_PAUSE=2 +OUT="" + +while [ "${1:-}" != "" ]; do + case "$1" in + --burst) BURST="$2"; shift 2 ;; + --waves) WAVES="$2"; shift 2 ;; + --pause) WAVE_PAUSE="$2"; shift 2 ;; + --out) OUT="$2"; shift 2 ;; + -h|--help) + sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//' + exit 0 + ;; + *) echo "unknown arg: $1" >&2; exit 2 ;; + esac +done + +# ── Endpoint discovery ──────────────────────────────────────────────────────── +echo "→ Discovering a layout-chunk URL from canvas root..." >&2 +ROOT_BODY=$(curl -fsSL --max-time 10 "https://${HOST}/" 2>/dev/null || true) +LAYOUT_PATH=$(echo "$ROOT_BODY" \ + | grep -oE '/_next/static/chunks/layout-[A-Za-z0-9_-]+\.js' \ + | head -1 || true) +if [ -z "$LAYOUT_PATH" ]; then + LAYOUT_PATH="/_next/static/chunks/layout-probe-not-found.js" + echo " (no layout chunk discovered — using sentinel path; 404 on this is expected)" >&2 +else + echo " layout chunk: $LAYOUT_PATH" >&2 +fi + +# Probe URL: a generic activity endpoint. The rate-limiter middleware +# runs BEFORE workspace-id validation, so unauth/invalid-id requests +# still hit the bucket. +ACTIVITY_PATH="/workspaces/00000000-0000-0000-0000-000000000000/activity?probe=edge-429" + +# ── Fire one curl, write a single-line JSON-ish status record to stdout ────── +# Inlined into xargs as a heredoc-style command rather than a function so +# the function-export pitfalls (some shells lose `export -f` across xargs) +# don't apply. Each output line is a parseable record; failed curls emit +# a curl_err record so request volume is preserved. +TMP_RESULTS="$(mktemp -t edge-429-probe.XXXXXX)" +trap 'rm -f "$TMP_RESULTS"' EXIT + +run_burst() { + # $1 = path; $2 = label; $3 = wave_id + local path="$1" label="$2" wave="$3" + local i + for i in $(seq 1 "$BURST"); do + { + out=$(curl -sS --max-time 10 -o /dev/null \ + -w 'status=%{http_code} size=%{size_download} time=%{time_total} server=%{header.server} cf_ray=%{header.cf-ray} x_vercel=%{header.x-vercel-id} retry_after=%{header.retry-after} content_type=%{header.content-type} x_ratelimit_limit=%{header.x-ratelimit-limit} x_ratelimit_remaining=%{header.x-ratelimit-remaining} x_ratelimit_reset=%{header.x-ratelimit-reset}\n' \ + "https://${HOST}${path}" 2>/dev/null) || out="status=curl_err" + printf 'label=%s-%s-%s %s\n' "$label" "$wave" "$i" "$out" >> "$TMP_RESULTS" + } & + done + wait +} + +emit() { + if [ -n "$OUT" ]; then + printf '%s\n' "$*" >> "$OUT" + else + printf '%s\n' "$*" + fi +} + +if [ -n "$OUT" ]; then : > "$OUT"; fi + +emit "# edge-429-probe report" +emit "# host=$HOST burst=$BURST waves=$WAVES pause=${WAVE_PAUSE}s" +emit "# layout_path=$LAYOUT_PATH" +emit "# activity_path=$ACTIVITY_PATH" +emit "# generated=$(date -u +%Y-%m-%dT%H:%M:%SZ)" +emit "" + +for wave in $(seq 1 "$WAVES"); do + emit "## wave $wave" + : > "$TMP_RESULTS" + run_burst "$LAYOUT_PATH" "layout" "$wave" + run_burst "$ACTIVITY_PATH" "activity" "$wave" + while read -r line; do + emit " $line" + done < "$TMP_RESULTS" + if [ "$wave" -lt "$WAVES" ]; then + sleep "$WAVE_PAUSE" + fi +done + +emit "" +emit "## summary — how to read the report" +emit "# status=429 + content_type starts with application/json + x_ratelimit_limit set" +emit "# => workspace-server bucket overflow. Closes when #60 deploys." +emit "# status=429 + cf_ray set + content_type=text/html" +emit "# => Cloudflare WAF / rate-limit. Audit dashboard rules per #62." +emit "# status=429 + x_vercel set + content_type=text/html" +emit "# => Vercel edge / Bot Fight Mode. Audit Vercel project per #62." +emit "# status=429 with no server/cf_ray/x_vercel" +emit "# => corporate proxy or VPN. Not actionable in this repo." + +if [ -n "$OUT" ]; then + echo "→ Report written to $OUT" >&2 + # Match only data lines (begin with two-space indent + "label="), + # not the summary's reference text which also mentions "status=429". + # grep -c outputs "0" + exits 1 when zero matches; `|| true` masks + # the exit status so set -e doesn't trip without losing the count. + total=$(grep -c '^ label=' "$OUT" 2>/dev/null || true) + total429=$(grep -c '^ label=.*status=429' "$OUT" 2>/dev/null || true) + total=${total:-0} + total429=${total429:-0} + echo "→ Totals: ${total429} of ${total} requests returned 429" >&2 + if [ "${total429}" -gt 0 ]; then + echo "→ Per-label 429 counts:" >&2 + grep '^ label=.*status=429' "$OUT" \ + | sed -E 's/^ label=([^-]+).*/ \1/' \ + | sort | uniq -c >&2 + fi +fi diff --git a/workspace-server/internal/middleware/ratelimit.go b/workspace-server/internal/middleware/ratelimit.go index 1b2f50dd..e01324d3 100644 --- a/workspace-server/internal/middleware/ratelimit.go +++ b/workspace-server/internal/middleware/ratelimit.go @@ -5,17 +5,19 @@ import ( "context" "net/http" "strconv" + "strings" "sync" "time" "github.com/gin-gonic/gin" ) -// RateLimiter implements a simple token bucket rate limiter per IP. +// RateLimiter implements a token bucket rate limiter keyed by tenant +// identity (org id, then bearer token, then client IP — see keyFor). type RateLimiter struct { - mu sync.Mutex - buckets map[string]*bucket - rate int // tokens per interval + mu sync.Mutex + buckets map[string]*bucket + rate int // tokens per interval interval time.Duration } @@ -42,9 +44,9 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate case <-ticker.C: rl.mu.Lock() cutoff := time.Now().Add(-10 * time.Minute) - for ip, b := range rl.buckets { + for k, b := range rl.buckets { if b.lastReset.Before(cutoff) { - delete(rl.buckets, ip) + delete(rl.buckets, k) } } rl.mu.Unlock() @@ -54,29 +56,73 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate return rl } -// Middleware returns a Gin middleware that rate limits by client IP. +// keyFor returns the bucket identifier for this request. Priority: +// +// 1. X-Molecule-Org-Id header — when present (CP-routed SaaS traffic), +// isolates tenants from each other regardless of the upstream proxy IP +// they all share. +// 2. SHA-256 of Authorization Bearer token — when present (per-workspace +// bearer, ADMIN_TOKEN, org-scoped API token). On a per-tenant Caddy +// box where the org-id header isn't attached, this still distinguishes +// distinct user sessions on the same egress IP. +// 3. ClientIP() — anonymous probes, /health scrapes, registry boot +// signals (when SetTrustedProxies(nil) is in effect, this is the +// direct TCP RemoteAddr — fine for the probe surface, not fine as a +// primary key behind a proxy, hence the priority order above). +// +// Mixing these namespaces is fine because they never collide: org ids +// are UUIDs ("org:..."), token hashes are 64-char hex ("tok:..."), IPs +// contain dots/colons ("ip:..."). +// +// Security note on X-Molecule-Org-Id spoofing: the rate limiter runs +// BEFORE TenantGuard, so the org-id value here is unvalidated. A caller +// reaching workspace-server directly could spoof the header to drain +// another org's bucket. In production this surface is closed by the +// CP/Caddy front: tenant SGs reject :8080 from the public internet, and +// CP rewrites the header to the verified org. If a future deployment +// exposes :8080 directly, validate the org-id (e.g. against +// MOLECULE_ORG_ID) before keying on it, or move this middleware after +// TenantGuard. The token-hash and IP fallbacks are unspoofable. +// +// Issue #59 — replaces the previous IP-only keying that silently +// collapsed all canvas traffic into one bucket once #179 disabled +// proxy-header trust. See the issue for the deployment-shape analysis. +func (rl *RateLimiter) keyFor(c *gin.Context) string { + if orgID := strings.TrimSpace(c.GetHeader("X-Molecule-Org-Id")); orgID != "" { + return "org:" + orgID + } + if tok := bearerFromHeader(c.GetHeader("Authorization")); tok != "" { + return "tok:" + tokenKey(tok) + } + return "ip:" + c.ClientIP() +} + +// Middleware returns a Gin middleware that rate limits per caller. The +// caller-key derivation lives in keyFor — see that function's doc for +// the priority list and rationale. func (rl *RateLimiter) Middleware() gin.HandlerFunc { return func(c *gin.Context) { // Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth / // discovery. On a local single-user Docker setup the 600-req/min // bucket fills fast: a 15-workspace canvas + activity polling + - // approvals polling + A2A overlay + initial hydration all share - // one IP bucket, so a minute of active use can trip 429 and blank - // the page. Gated by MOLECULE_ENV=development + empty ADMIN_TOKEN - // so SaaS production keeps the bucket. + // approvals polling + A2A overlay + initial hydration all land in + // one bucket (whichever keyFor returns — typically the dev user's + // IP or shared admin token), so a minute of active use can trip + // 429 and blank the page. Gated by MOLECULE_ENV=development + + // empty ADMIN_TOKEN so SaaS production keeps the bucket. if isDevModeFailOpen() { c.Header("X-RateLimit-Limit", "unlimited") c.Next() return } - ip := c.ClientIP() + key := rl.keyFor(c) rl.mu.Lock() - b, exists := rl.buckets[ip] + b, exists := rl.buckets[key] if !exists { b = &bucket{tokens: rl.rate, lastReset: time.Now()} - rl.buckets[ip] = b + rl.buckets[key] = b } // Reset tokens if interval has passed diff --git a/workspace-server/internal/middleware/ratelimit_keyfor_test.go b/workspace-server/internal/middleware/ratelimit_keyfor_test.go new file mode 100644 index 00000000..ac1a227d --- /dev/null +++ b/workspace-server/internal/middleware/ratelimit_keyfor_test.go @@ -0,0 +1,303 @@ +package middleware + +import ( + "context" + "crypto/sha256" + "fmt" + "go/ast" + "go/parser" + "go/token" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" +) + +// newTestLimiterForKeyFor — same shape as newTestLimiter in ratelimit_test.go +// but exposes the *gin.Engine and lets the caller inject headers per-request. +func newTestLimiterForKeyFor(t *testing.T, rate int) *gin.Engine { + t.Helper() + gin.SetMode(gin.TestMode) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + rl := NewRateLimiter(rate, 5*time.Second, ctx) + r := gin.New() + if err := r.SetTrustedProxies(nil); err != nil { + t.Fatalf("SetTrustedProxies: %v", err) + } + r.Use(rl.Middleware()) + r.GET("/x", func(c *gin.Context) { c.String(http.StatusOK, "ok") }) + return r +} + +// TestKeyFor_OrgIdHeaderTrumpsBearerAndIP — when X-Molecule-Org-Id is set +// the bucket is keyed on it regardless of bearer token or IP. This is the +// load-bearing case for the production SaaS plane: every tenant routed +// through the same upstream proxy IP gets its own bucket because the +// CP attaches the org-id header. +func TestKeyFor_OrgIdHeaderTrumpsBearerAndIP(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + rl := NewRateLimiter(2, 5*time.Second, ctx) + + c, _ := gin.CreateTestContext(httptest.NewRecorder()) + c.Request = httptest.NewRequest(http.MethodGet, "/x", nil) + c.Request.RemoteAddr = "10.0.0.1:1234" + c.Request.Header.Set("X-Molecule-Org-Id", "org-aaa") + c.Request.Header.Set("Authorization", "Bearer ignored-token-value") + + got := rl.keyFor(c) + if got != "org:org-aaa" { + t.Errorf("keyFor with org-id header: got %q, want %q", got, "org:org-aaa") + } +} + +// TestKeyFor_BearerTokenWhenNoOrgId — the per-tenant Caddy box path: +// no org-id header (canvas same-origin), but Authorization Bearer is +// always set by WorkspaceAuth-protected routes. Bucket keyed on the +// SHA-256 hex of the token so distinct sessions on the same egress IP +// get distinct buckets — and so the in-memory map can never become a +// token dump if the process is inspected. +func TestKeyFor_BearerTokenWhenNoOrgId(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + rl := NewRateLimiter(2, 5*time.Second, ctx) + + c, _ := gin.CreateTestContext(httptest.NewRecorder()) + c.Request = httptest.NewRequest(http.MethodGet, "/x", nil) + c.Request.RemoteAddr = "10.0.0.1:1234" + c.Request.Header.Set("Authorization", "Bearer secret-token-abc") + + got := rl.keyFor(c) + expectedHash := fmt.Sprintf("%x", sha256.Sum256([]byte("secret-token-abc"))) + if got != "tok:"+expectedHash { + t.Errorf("keyFor with bearer-only: got %q, want %q", got, "tok:"+expectedHash) + } + // Critical security pin: raw token must never appear in the key. + if strings.Contains(got, "secret-token-abc") { + t.Errorf("keyFor leaked raw bearer token in bucket key: %q", got) + } +} + +// TestKeyFor_IPFallbackWhenNoOrgIdNoBearer — anonymous probes (no auth, +// no tenant header) fall through to ClientIP keying. This is the only +// path that depended on the pre-#179 trust-XFF behaviour and is fine +// to keep IP-keyed because the surface is just /health, /buildinfo, +// and the registry-boot endpoints. +func TestKeyFor_IPFallbackWhenNoOrgIdNoBearer(t *testing.T) { + gin.SetMode(gin.TestMode) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + rl := NewRateLimiter(2, 5*time.Second, ctx) + + c, _ := gin.CreateTestContext(httptest.NewRecorder()) + c.Request = httptest.NewRequest(http.MethodGet, "/x", nil) + c.Request.RemoteAddr = "203.0.113.1:1234" + + got := rl.keyFor(c) + // gin.ClientIP() strips the port — we just need to confirm the prefix + // and that the IP appears. + if !strings.HasPrefix(got, "ip:") { + t.Errorf("keyFor without auth/org headers: got %q, want prefix %q", got, "ip:") + } + if !strings.Contains(got, "203.0.113.1") { + t.Errorf("keyFor IP fallback: got %q, want to contain %q", got, "203.0.113.1") + } +} + +// TestRateLimit_TwoOrgsSameIP_IndependentBuckets — the load-bearing +// regression test for issue #59. Two tenants behind the same upstream +// proxy must NOT share a bucket; the production SaaS-plane outage was +// every tenant collapsing to the proxy IP and saturating one bucket. +// +// Mutation invariant: removing the org-id branch from keyFor — say, +// returning "ip:" + c.ClientIP() unconditionally — collapses both +// tenants back into one bucket and this test fails on the 3rd +// request because it would 429 instead of 200. +func TestRateLimit_TwoOrgsSameIP_IndependentBuckets(t *testing.T) { + r := newTestLimiterForKeyFor(t, 2) + + exhaust := func(orgID string) { + t.Helper() + for i := 0; i < 2; i++ { + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "10.0.0.1:1234" // SAME upstream proxy IP + req.Header.Set("X-Molecule-Org-Id", orgID) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("setup orgID=%s req %d: want 200, got %d", orgID, i+1, w.Code) + } + } + } + + exhaust("org-aaa") + // org-aaa is now at 0 tokens. org-bbb's bucket must be FRESH. + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "10.0.0.1:1234" + req.Header.Set("X-Molecule-Org-Id", "org-bbb") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("org-bbb on same IP must have its own bucket: got %d, want 200 (issue #59 regression)", w.Code) + } + + // Confirm org-aaa is still throttled — proves we're not just opening + // the gate to everyone. + req = httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "10.0.0.1:1234" + req.Header.Set("X-Molecule-Org-Id", "org-aaa") + w = httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusTooManyRequests { + t.Errorf("org-aaa exhausted bucket: want 429, got %d", w.Code) + } +} + +// TestRateLimit_TwoTokensSameIP_IndependentBuckets — analog of the +// org-id case for the per-tenant Caddy box: two distinct user +// sessions on the same egress IP, distinguished only by their bearer +// tokens, must get independent buckets. This was the path Hongming +// hit on hongming.moleculesai.app — a single user with multiple +// browser tabs against one workspace-server box. +func TestRateLimit_TwoTokensSameIP_IndependentBuckets(t *testing.T) { + r := newTestLimiterForKeyFor(t, 2) + + exhaust := func(token string) { + t.Helper() + for i := 0; i < 2; i++ { + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "127.0.0.1:1234" // local Caddy proxy — same for both + req.Header.Set("Authorization", "Bearer "+token) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("setup token=%s req %d: want 200, got %d", token, i+1, w.Code) + } + } + } + + exhaust("user-a-token") + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "127.0.0.1:1234" + req.Header.Set("Authorization", "Bearer user-b-token") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("user-b token on same proxy IP must have its own bucket: got %d, want 200", w.Code) + } +} + +// TestRateLimit_SameOrgDifferentTokens_SharedBucket — counter-pin: +// ensure org-id keying really does collapse all tokens within one +// org into one bucket. This is the desired behaviour: a tenant that +// mints multiple tokens shouldn't be able to circumvent its quota +// by rotating tokens between requests. (The same-IP-different-org +// test above proves we don't collapse ACROSS orgs; this one proves +// we DO collapse WITHIN one org.) +func TestRateLimit_SameOrgDifferentTokens_SharedBucket(t *testing.T) { + r := newTestLimiterForKeyFor(t, 2) + + for _, tok := range []string{"token-1", "token-2"} { + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "10.0.0.1:1234" + req.Header.Set("X-Molecule-Org-Id", "org-shared") + req.Header.Set("Authorization", "Bearer "+tok) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Fatalf("setup tok=%s: want 200, got %d", tok, w.Code) + } + } + // Bucket should be exhausted now — third request, even with a fresh + // token, must 429 because the org-id is keying it. + req := httptest.NewRequest(http.MethodGet, "/x", nil) + req.RemoteAddr = "10.0.0.1:1234" + req.Header.Set("X-Molecule-Org-Id", "org-shared") + req.Header.Set("Authorization", "Bearer token-3") + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusTooManyRequests { + t.Errorf("rotating tokens within one org should NOT bypass the quota: got %d, want 429", w.Code) + } +} + +// TestRateLimit_Middleware_RoutesThroughKeyFor is the AST gate (mirror +// of #36/#10/#12's gates). Pins the SSOT routing invariant: +// (*RateLimiter).Middleware MUST call rl.keyFor and MUST NOT carry a +// direct c.ClientIP() call (= the parallel-impl drift this PR fixes). +// +// Mutation invariant: a future PR that re-introduces direct IP keying +// in Middleware (`ip := c.ClientIP()`) makes this test fail. That's +// the signal to either (a) extend keyFor's contract to cover the new +// case OR (b) update this gate with an explicit reason. Either way the +// drift gets a reviewer's attention before shipping. +func TestRateLimit_Middleware_RoutesThroughKeyFor(t *testing.T) { + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, "ratelimit.go", nil, parser.ParseComments) + if err != nil { + t.Fatalf("parse ratelimit.go: %v", err) + } + + var fn *ast.FuncDecl + ast.Inspect(file, func(n ast.Node) bool { + f, ok := n.(*ast.FuncDecl) + if !ok { + return true + } + // Match `func (rl *RateLimiter) Middleware() ...` + if f.Name.Name != "Middleware" { + return true + } + if f.Recv == nil || len(f.Recv.List) != 1 { + return true + } + star, ok := f.Recv.List[0].Type.(*ast.StarExpr) + if !ok { + return true + } + if id, ok := star.X.(*ast.Ident); !ok || id.Name != "RateLimiter" { + return true + } + fn = f + return false + }) + if fn == nil { + t.Fatal("(*RateLimiter).Middleware not found — was it renamed? update this gate or the SSOT routing assumption") + } + + var ( + callsKeyFor bool + callsClientIP bool + ) + ast.Inspect(fn.Body, func(n ast.Node) bool { + call, ok := n.(*ast.CallExpr) + if !ok { + return true + } + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok { + return true + } + switch sel.Sel.Name { + case "keyFor": + callsKeyFor = true + case "ClientIP": + callsClientIP = true + } + return true + }) + + if !callsKeyFor { + t.Error("(*RateLimiter).Middleware must call rl.keyFor for SSOT bucket-key derivation — see issue #59. Found no keyFor call.") + } + if callsClientIP { + t.Error("(*RateLimiter).Middleware carries a direct c.ClientIP() call. This is the parallel-impl drift issue #59 fixed. " + + "Either route through rl.keyFor OR — if a new use case truly needs direct IP — extend keyFor's contract first and update this gate to allow the specific delta.") + } +}