Merge branch 'main' into fix/195-auto-promote-staging-gitea-rest
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Successful in 5s
pr-guards / disable-auto-merge-on-push (pull_request) Failing after 11s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 12s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Successful in 5s
Check merge_group trigger on required workflows / Required workflows have merge_group trigger (pull_request) Successful in 16s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Successful in 6s
branch-protection drift check / Branch protection drift (pull_request) Successful in 26s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 21s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 24s
CI / Platform (Go) (pull_request) Successful in 11s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 20s
E2E API Smoke Test / detect-changes (pull_request) Successful in 26s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 18s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 9s
CI / Python Lint & Test (pull_request) Successful in 10s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 16s
CI / Canvas (Next.js) (pull_request) Successful in 14s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 16s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 14s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 13s

This commit is contained in:
claude-ceo-assistant 2026-05-07 23:22:24 +00:00
commit 2505b36a2c
8 changed files with 1278 additions and 106 deletions

View File

@ -1,9 +1,10 @@
'use client'; 'use client';
import { useEffect, useMemo, useCallback } from "react"; import { useEffect, useMemo, useCallback, useRef } from "react";
import { type Edge, MarkerType } from "@xyflow/react"; import { type Edge, MarkerType } from "@xyflow/react";
import { api } from "@/lib/api"; import { api } from "@/lib/api";
import { useCanvasStore } from "@/store/canvas"; import { useCanvasStore } from "@/store/canvas";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import type { ActivityEntry } from "@/types/activity"; import type { ActivityEntry } from "@/types/activity";
// ── Constants ───────────────────────────────────────────────────────────────── // ── Constants ─────────────────────────────────────────────────────────────────
@ -11,9 +12,6 @@ import type { ActivityEntry } from "@/types/activity";
/** 60-minute look-back window for delegation activity */ /** 60-minute look-back window for delegation activity */
export const A2A_WINDOW_MS = 60 * 60 * 1000; export const A2A_WINDOW_MS = 60 * 60 * 1000;
/** Polling interval — refresh edges every 60 seconds */
export const A2A_POLL_MS = 60 * 1_000;
/** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */ /** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */
export const A2A_HOT_MS = 5 * 60 * 1_000; export const A2A_HOT_MS = 5 * 60 * 1_000;
@ -131,6 +129,20 @@ export function buildA2AEdges(
* `a2aEdges`. Canvas.tsx merges these with topology edges and passes the * `a2aEdges`. Canvas.tsx merges these with topology edges and passes the
* combined list to ReactFlow. * combined list to ReactFlow.
* *
* Update shape (issue #61 Stage 2, replaces the 60s polling loop):
* - On mount (when showA2AEdges): one HTTP fan-out per visible workspace
* (delegation rows, 60-min window). Bootstraps the local row buffer.
* - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent.
* Each delegation event from a visible workspace is appended to the
* buffer; edges are re-derived via the existing buildA2AEdges helper.
* - showA2AEdges toggle off: clears edges + buffer.
* - Visible-ID-set change: re-bootstraps so a freshly-shown workspace
* backfills its 60-min history (existing visibleIdsKey selector
* behaviour preserved that's the 2026-05-04 render-loop fix).
*
* No interval poll. The singleton ReconnectingSocket already owns
* reconnect / backoff / health-check; useSocketEvent inherits those.
*
* Mount this inside CanvasInner (no ReactFlow hook dependency). * Mount this inside CanvasInner (no ReactFlow hook dependency).
*/ */
export function A2ATopologyOverlay() { export function A2ATopologyOverlay() {
@ -157,7 +169,9 @@ export function A2ATopologyOverlay() {
// the symptom of this re-render storm. // the symptom of this re-render storm.
// //
// The fix is purely the dependency-stability change here; the fetch // The fix is purely the dependency-stability change here; the fetch
// logic is unchanged. // logic is unchanged. Post-#61 the polling-driven fetch is gone, but
// the visibleIdsKey gate is still required so a peer-discovery write
// doesn't trigger a wasteful re-bootstrap.
const visibleIdsKey = useCanvasStore((s) => const visibleIdsKey = useCanvasStore((s) =>
s.nodes s.nodes
.filter((n) => !n.hidden) .filter((n) => !n.hidden)
@ -171,16 +185,42 @@ export function A2ATopologyOverlay() {
[visibleIdsKey] [visibleIdsKey]
); );
// Fetch delegation activity for all visible workspaces and rebuild overlay edges. // Local rolling buffer of delegation rows. Pruned by A2A_WINDOW_MS on
const fetchAndUpdate = useCallback(async () => { // each rebuild so a long-lived session doesn't accumulate unbounded
// history. The buffer's high-water mark is approximately:
// visibleIds.length × bootstrap-fetch-limit (500) + WS arrivals
// Real-world ceiling: ~3000 entries at the 60-min boundary, all of
// which buildA2AEdges aggregates into at most N² edges.
const bufferRef = useRef<ActivityEntry[]>([]);
// visibleIdsRef gives the WS handler the latest visible-ID set without
// re-subscribing on every render. The bus listener is registered
// exactly once per mount; subscriber-side filtering reads from this ref.
const visibleIdsRef = useRef(visibleIds);
visibleIdsRef.current = visibleIds;
// Re-derive overlay edges from the current buffer + push to store.
// Prunes by A2A_WINDOW_MS first so memory stays bounded across long
// sessions and the aggregation cost stays O(window-size).
const recomputeAndPush = useCallback(() => {
const cutoff = Date.now() - A2A_WINDOW_MS;
bufferRef.current = bufferRef.current.filter(
(r) => new Date(r.created_at).getTime() > cutoff
);
setA2AEdges(buildA2AEdges(bufferRef.current));
}, [setA2AEdges]);
// Bootstrap fan-out — one HTTP per visible workspace. Replaces the
// 60s polling loop entirely. Race-aware: any WS arrivals that landed
// in the buffer DURING the fetch (between the await and resume) are
// preserved by id-dedup-with-fetched-first ordering.
const bootstrap = useCallback(async () => {
if (visibleIds.length === 0) { if (visibleIds.length === 0) {
bufferRef.current = [];
setA2AEdges([]); setA2AEdges([]);
return; return;
} }
try { try {
// Fan-out — one request per visible workspace. const fetchedRows = (
// Per-request failures are swallowed so one broken workspace doesn't blank the overlay.
const allRows = (
await Promise.all( await Promise.all(
visibleIds.map((id) => visibleIds.map((id) =>
api api
@ -192,24 +232,76 @@ export function A2ATopologyOverlay() {
) )
).flat(); ).flat();
setA2AEdges(buildA2AEdges(allRows)); // Merge: fetched rows first, then any in-flight WS arrivals that
// accumulated during the await. Dedup by id so rows that appear
// in both paths are not double-counted in the aggregation.
const merged = [...fetchedRows, ...bufferRef.current];
const seen = new Set<string>();
bufferRef.current = merged.filter((r) => {
if (seen.has(r.id)) return false;
seen.add(r.id);
return true;
});
recomputeAndPush();
} catch { } catch {
// Overlay failure is non-critical — canvas remains functional // Overlay failure is non-critical — canvas remains functional
} }
}, [visibleIds, setA2AEdges]); }, [visibleIds, setA2AEdges, recomputeAndPush]);
useEffect(() => { useEffect(() => {
if (!showA2AEdges) { if (!showA2AEdges) {
// Clear edges immediately when toggled off // Clear edges + buffer immediately when toggled off
bufferRef.current = [];
setA2AEdges([]); setA2AEdges([]);
return; return;
} }
void bootstrap();
}, [showA2AEdges, bootstrap, setA2AEdges]);
// Initial fetch, then poll every 60 s // Live-update path. Filters server-side ACTIVITY_LOGGED events down
void fetchAndUpdate(); // to delegation initiations from visible workspaces and appends each
const timer = setInterval(() => void fetchAndUpdate(), A2A_POLL_MS); // into the rolling buffer, re-deriving edges via buildA2AEdges.
return () => clearInterval(timer); //
}, [showA2AEdges, fetchAndUpdate, setA2AEdges]); // Only `method === "delegate"` rows count — the same filter
// buildA2AEdges applies — so delegate_result rows arriving over the
// wire don't double-count.
useSocketEvent((msg) => {
if (!showA2AEdges) return;
if (msg.event !== "ACTIVITY_LOGGED") return;
const p = (msg.payload || {}) as Record<string, unknown>;
if (p.activity_type !== "delegation") return;
if (p.method !== "delegate") return;
const wsId = msg.workspace_id;
if (!visibleIdsRef.current.includes(wsId)) return;
// Synthesise an ActivityEntry from the WS payload so buildA2AEdges
// (which the bootstrap path also feeds) handles it identically.
const entry: ActivityEntry = {
id:
(p.id as string) ||
`ws-push-${msg.timestamp || Date.now()}-${wsId}`,
workspace_id: wsId,
activity_type: "delegation",
source_id: (p.source_id as string | null) ?? null,
target_id: (p.target_id as string | null) ?? null,
method: "delegate",
summary: (p.summary as string | null) ?? null,
request_body: null,
response_body: null,
duration_ms: (p.duration_ms as number | null) ?? null,
status: (p.status as string) || "ok",
error_detail: null,
created_at:
(p.created_at as string) ||
msg.timestamp ||
new Date().toISOString(),
};
bufferRef.current = [...bufferRef.current, entry];
recomputeAndPush();
});
// Pure side-effect — renders nothing // Pure side-effect — renders nothing
return null; return null;

View File

@ -3,6 +3,7 @@
import { useState, useEffect, useCallback, useRef } from "react"; import { useState, useEffect, useCallback, useRef } from "react";
import { useCanvasStore } from "@/store/canvas"; import { useCanvasStore } from "@/store/canvas";
import { api } from "@/lib/api"; import { api } from "@/lib/api";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { COMM_TYPE_LABELS } from "@/lib/design-tokens"; import { COMM_TYPE_LABELS } from "@/lib/design-tokens";
interface Communication { interface Communication {
@ -18,32 +19,71 @@ interface Communication {
durationMs: number | null; durationMs: number | null;
} }
/** Workspace-server `ACTIVITY_LOGGED` payload shape. Pulled out so the
* WS handler below has a typed view of the same fields the HTTP
* bootstrap consumes drift between the two paths is a class of bug
* AgentCommsPanel hit historically. */
interface ActivityLoggedPayload {
id?: string;
activity_type?: string;
source_id?: string | null;
target_id?: string | null;
workspace_id?: string;
summary?: string | null;
status?: string;
duration_ms?: number | null;
created_at?: string;
}
/** Fan-out cap for the bootstrap HTTP fetch on mount / on visibility
* re-open. Kept at 3 (carried over from the 2026-05-04 fix) so a
* freshly-mounted overlay on a 15-workspace tenant only spends 3
* round-trips bootstrapping. Live updates after that arrive via the
* WS subscription below no polling, no fan-out to maintain. */
const BOOTSTRAP_FAN_OUT_CAP = 3;
/** Cap on the rendered list. Bootstrap + every WS push prepends, the
* list is sliced to this size after each update. Mirrors the prior
* polling-loop behaviour. */
const COMMS_RENDER_CAP = 20;
/** /**
* Overlay showing recent A2A communications between workspaces. * Overlay showing recent A2A communications between workspaces.
* Renders as a floating log panel that auto-updates. *
* Update shape (issue #61 Stage 1, replaces the 30s polling loop):
* - On mount (when visible): one HTTP bootstrap per online workspace,
* capped at BOOTSTRAP_FAN_OUT_CAP. Yields the initial recent-comms
* window without waiting for live events.
* - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent.
* Each event with a matching activity_type from a visible online
* workspace gets synthesised into a Communication and prepended.
* - Visibility re-open: re-bootstraps so the user sees the freshest
* window even if WS was idle while collapsed.
*
* No interval poll. The singleton ReconnectingSocket in `store/socket.ts`
* already owns reconnect/backoff/health-check, and `useSocketEvent`
* inherits those guarantees. If WS is genuinely unhealthy, the overlay
* shows the bootstrap snapshot until the next visibility re-open or
* the next WS reconnect (which fires its own rehydrate burst).
*/ */
export function CommunicationOverlay() { export function CommunicationOverlay() {
const [comms, setComms] = useState<Communication[]>([]); const [comms, setComms] = useState<Communication[]>([]);
const [visible, setVisible] = useState(true); const [visible, setVisible] = useState(true);
const selectedNodeId = useCanvasStore((s) => s.selectedNodeId); const selectedNodeId = useCanvasStore((s) => s.selectedNodeId);
const nodes = useCanvasStore((s) => s.nodes); const nodes = useCanvasStore((s) => s.nodes);
// nodesRef gives the WS handler current node-name resolution without
// re-subscribing on every node-list change. The bus listener is
// registered exactly once per mount; subscriber-side filtering reads
// the latest value via this ref.
const nodesRef = useRef(nodes); const nodesRef = useRef(nodes);
nodesRef.current = nodes; nodesRef.current = nodes;
const fetchComms = useCallback(async () => { const bootstrapComms = useCallback(async () => {
try { try {
// Fan-out cap: each polled workspace = 1 round-trip. The platform
// rate limits at 600 req/min/IP; combined with heartbeats + other
// canvas polling, every workspace polled here costs ~6 req/min
// (1 every 30s × 1 per workspace). Capping at 3 keeps this
// overlay's footprint at 18 req/min worst case — well under
// budget even with 8+ workspaces visible. Caught 2026-05-04 when
// a user with 8+ workspaces (Design Director + 6 sub-agents +
// 3 standalones) saw sustained 429s in canvas console.
const onlineNodes = nodesRef.current.filter((n) => n.data.status === "online"); const onlineNodes = nodesRef.current.filter((n) => n.data.status === "online");
const allComms: Communication[] = []; const allComms: Communication[] = [];
for (const node of onlineNodes.slice(0, 3)) { for (const node of onlineNodes.slice(0, BOOTSTRAP_FAN_OUT_CAP)) {
try { try {
const activities = await api.get<Array<{ const activities = await api.get<Array<{
id: string; id: string;
@ -59,8 +99,8 @@ export function CommunicationOverlay() {
for (const a of activities) { for (const a of activities) {
if (a.activity_type === "a2a_send" || a.activity_type === "a2a_receive") { if (a.activity_type === "a2a_send" || a.activity_type === "a2a_receive") {
const sourceNode = nodes.find((n) => n.id === (a.source_id || a.workspace_id)); const sourceNode = nodesRef.current.find((n) => n.id === (a.source_id || a.workspace_id));
const targetNode = nodes.find((n) => n.id === (a.target_id || "")); const targetNode = nodesRef.current.find((n) => n.id === (a.target_id || ""));
allComms.push({ allComms.push({
id: a.id, id: a.id,
sourceId: a.source_id || a.workspace_id, sourceId: a.source_id || a.workspace_id,
@ -76,11 +116,12 @@ export function CommunicationOverlay() {
} }
} }
} catch { } catch {
// Skip workspaces that fail // Per-workspace failures must not blank the panel — the same
// robustness the polling version had.
} }
} }
// Sort by timestamp, newest first, dedupe // Newest-first with id-dedup, capped at COMMS_RENDER_CAP.
const seen = new Set<string>(); const seen = new Set<string>();
const sorted = allComms const sorted = allComms
.sort((a, b) => b.timestamp.localeCompare(a.timestamp)) .sort((a, b) => b.timestamp.localeCompare(a.timestamp))
@ -89,29 +130,78 @@ export function CommunicationOverlay() {
seen.add(c.id); seen.add(c.id);
return true; return true;
}) })
.slice(0, 20); .slice(0, COMMS_RENDER_CAP);
setComms(sorted); setComms(sorted);
} catch { } catch {
// Silently handle API errors // Bootstrap failure is non-blocking — the WS subscription below
// will populate the panel as live events arrive.
} }
}, []); }, []);
// Bootstrap once on mount + every time the user re-opens after a
// collapse. Closed-panel state intentionally drops live updates so
// the panel doesn't churn invisible state — the next open reloads.
useEffect(() => { useEffect(() => {
// Gate polling on visibility — when the user collapses the overlay
// the data isn't being read, so the per-workspace fan-out becomes
// pure rate-limit overhead. Pre-fix this overlay polled regardless
// of whether the panel was shown, costing ~36 req/min from a
// hidden surface.
if (!visible) return; if (!visible) return;
fetchComms(); bootstrapComms();
// 30s cadence (was 10s). At 3-workspace fan-out that's 6 req/min }, [bootstrapComms, visible]);
// worst case from this overlay. Combined with heartbeats (~30/min)
// and other canvas polling, leaves ample headroom under the 600/ // Live-update path. Filters server-side ACTIVITY_LOGGED events down
// min/IP server-side rate limit even at 8+ workspace tenants. // to the comm-overlay-relevant subset and prepends each into the
const interval = setInterval(fetchComms, 30000); // rendered list with the same dedup the bootstrap path uses.
return () => clearInterval(interval); //
}, [fetchComms, visible]); // Scope guard: ignore events for workspaces not in the visible online
// set, so a user collapsing one workspace doesn't see its comms
// continue to scroll in. Same shape the bootstrap path applies.
useSocketEvent((msg) => {
if (!visible) return;
if (msg.event !== "ACTIVITY_LOGGED") return;
const p = (msg.payload || {}) as ActivityLoggedPayload;
const type = p.activity_type;
if (type !== "a2a_send" && type !== "a2a_receive" && type !== "task_update") return;
const wsId = msg.workspace_id;
const onlineSet = new Set(
nodesRef.current.filter((n) => n.data.status === "online").map((n) => n.id),
);
if (!onlineSet.has(wsId)) return;
const sourceId = p.source_id || wsId;
const targetId = p.target_id || "";
const sourceNode = nodesRef.current.find((n) => n.id === sourceId);
const targetNode = nodesRef.current.find((n) => n.id === targetId);
const incoming: Communication = {
id: p.id || `${msg.timestamp || Date.now()}:${sourceId}:${targetId}`,
sourceId,
targetId,
sourceName: sourceNode?.data.name || "Unknown",
targetName: targetNode?.data.name || "Unknown",
type: type as Communication["type"],
summary: p.summary || "",
status: p.status || "ok",
timestamp: p.created_at || msg.timestamp || new Date().toISOString(),
durationMs: p.duration_ms ?? null,
};
setComms((prev) => {
// Prepend, dedup by id, re-cap. Functional setState is necessary
// because two ACTIVITY_LOGGED events arriving in the same React
// batch would otherwise read a stale `comms` from the closure.
const seen = new Set<string>();
const merged = [incoming, ...prev]
.sort((a, b) => b.timestamp.localeCompare(a.timestamp))
.filter((c) => {
if (seen.has(c.id)) return false;
seen.add(c.id);
return true;
})
.slice(0, COMMS_RENDER_CAP);
return merged;
});
});
if (!visible || comms.length === 0) { if (!visible || comms.length === 0) {
return ( return (

View File

@ -41,6 +41,10 @@ vi.mock("@/store/canvas", () => ({
// ── Imports (after mocks) ───────────────────────────────────────────────────── // ── Imports (after mocks) ─────────────────────────────────────────────────────
import { api } from "@/lib/api"; import { api } from "@/lib/api";
import {
emitSocketEvent,
_resetSocketEventListenersForTests,
} from "@/store/socket-events";
import { import {
buildA2AEdges, buildA2AEdges,
formatA2ARelativeTime, formatA2ARelativeTime,
@ -342,6 +346,151 @@ describe("A2ATopologyOverlay component", () => {
expect(mockGet.mock.calls.length).toBe(callsAfterMount); expect(mockGet.mock.calls.length).toBe(callsAfterMount);
}); });
// ── #61 Stage 2: ACTIVITY_LOGGED subscription tests ────────────────────────
//
// Pin the post-#61 behaviour: WS push for delegation contributes to
// the overlay's edge buffer with NO additional HTTP fetch. Same shape
// as Stage 1 (CommunicationOverlay).
describe("#61 stage 2 — ACTIVITY_LOGGED subscription", () => {
beforeEach(() => {
_resetSocketEventListenersForTests();
});
afterEach(() => {
_resetSocketEventListenersForTests();
});
function emitDelegation(overrides: {
workspaceId?: string;
sourceId?: string;
targetId?: string;
method?: string;
activityType?: string;
} = {}) {
// Use Date.now() (real time, fake-timer-frozen) rather than the
// hardcoded NOW constant — buildA2AEdges prunes by Date.now() -
// A2A_WINDOW_MS, so a row dated against the wrong epoch silently
// falls outside the window and the test fails for a confusing
// reason ("edges array empty" vs "filter dropped my row").
const realNow = Date.now();
emitSocketEvent({
event: "ACTIVITY_LOGGED",
workspace_id: overrides.workspaceId ?? "ws-a",
timestamp: new Date(realNow).toISOString(),
payload: {
id: `act-${Math.random().toString(36).slice(2)}`,
activity_type: overrides.activityType ?? "delegation",
method: overrides.method ?? "delegate",
source_id: overrides.sourceId ?? "ws-a",
target_id: overrides.targetId ?? "ws-b",
status: "ok",
created_at: new Date(realNow - 30_000).toISOString(),
},
});
}
it("does NOT poll on a 60s interval after bootstrap (post-#61)", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); });
const callsAfterBootstrap = mockGet.mock.calls.length;
expect(callsAfterBootstrap).toBe(2); // ws-a + ws-b
// Pre-#61: a 60s clock tick would fire a fresh fan-out (2 more
// calls). Post-#61: no interval, no extra calls.
await act(async () => {
vi.advanceTimersByTime(120_000);
});
expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap);
});
it("WS push for a delegation event from a visible workspace updates edges with NO HTTP call", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); await Promise.resolve(); });
mockGet.mockClear();
mockStoreState.setA2AEdges.mockClear();
await act(async () => {
emitDelegation({ sourceId: "ws-a", targetId: "ws-b" });
});
// Edges-set called with at least one a2a edge for the new push.
const calls = mockStoreState.setA2AEdges.mock.calls;
expect(calls.length).toBeGreaterThanOrEqual(1);
const lastCall = calls[calls.length - 1][0] as Array<{ id: string }>;
expect(lastCall.some((e) => e.id === "a2a-ws-a-ws-b")).toBe(true);
// Critical: no HTTP fetch fired during the WS path.
expect(mockGet).not.toHaveBeenCalled();
});
it("WS push for a non-delegation activity_type is ignored", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); });
mockStoreState.setA2AEdges.mockClear();
await act(async () => {
emitDelegation({ activityType: "a2a_send" });
});
// setA2AEdges must not be called by the WS handler — the only
// setA2AEdges calls in this test came from the initial bootstrap.
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
});
it("WS push for a delegate_result row is ignored (mirrors buildA2AEdges filter)", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); });
mockStoreState.setA2AEdges.mockClear();
await act(async () => {
emitDelegation({ method: "delegate_result" });
});
// delegate_result rows do not contribute to the edge count — they
// are completion signals, not initiations.
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
});
it("WS push from a hidden workspace is ignored", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); });
mockStoreState.setA2AEdges.mockClear();
await act(async () => {
emitDelegation({ workspaceId: "ws-hidden" });
});
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
});
it("WS push while showA2AEdges is false is ignored", async () => {
mockStoreState.showA2AEdges = false;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
// The mount path with showA2AEdges=false calls setA2AEdges([])
// once — clear that to isolate the WS path.
mockStoreState.setA2AEdges.mockClear();
await act(async () => {
emitDelegation();
});
expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled();
expect(mockGet).not.toHaveBeenCalled();
});
});
it("re-fetches when the visible ID set actually changes", async () => { it("re-fetches when the visible ID set actually changes", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any); mockGet.mockResolvedValue([] as any);

View File

@ -1,18 +1,28 @@
// @vitest-environment jsdom // @vitest-environment jsdom
/** /**
* CommunicationOverlay tests pin the rate-limit fix shipped 2026-05-04. * CommunicationOverlay tests pin both the 2026-05-04 fan-out cap fix
* AND the 2026-05-07 polling ACTIVITY_LOGGED-subscriber refactor
* (issue #61 stage 1).
* *
* The overlay polls /workspaces/:id/activity?limit=5 for each online * The overlay used to poll /workspaces/:id/activity?limit=5 on a 30s
* workspace. Pre-fix it (a) polled regardless of visibility and (b) * interval per online workspace (capped at 3). Post-#61: it bootstraps
* fanned out to 6 workspaces every 10s. With 8+ workspaces a user * once on mount via the same HTTP path (cap of 3 retained), then
* triggered sustained 429s (server-side rate limit is 600 req/min/IP). * subscribes to ACTIVITY_LOGGED via the global socket bus for live
* updates. No interval poll.
* *
* These tests pin: * These tests pin:
* 1. Fan-out cap of 3 even with 6 online nodes, only 3 fetches * 1. Bootstrap fan-out cap of 3 even with 6 online nodes, only 3
* 2. Visibility gate when collapsed, no polling * HTTP fetches on mount.
* 2. Visibility gate when collapsed, no HTTP fetches; re-open
* re-bootstraps.
* 3. NO interval polling advancing the clock past 30s does not fire
* additional HTTP calls.
* 4. WS push extends the rendered list without firing any HTTP call.
* 5. WS push for an offline workspace is ignored.
* 6. WS push for a non-comm activity_type is ignored.
* *
* If a future refactor pushes either dial back up, CI fails before * If a future refactor regresses any of these, CI fails before the
* the regression hits a paying tenant. * regression hits a paying tenant.
*/ */
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { render, cleanup, act, fireEvent } from "@testing-library/react"; import { render, cleanup, act, fireEvent } from "@testing-library/react";
@ -23,7 +33,7 @@ vi.mock("@/lib/api", () => ({
api: { get: vi.fn() }, api: { get: vi.fn() },
})); }));
// Six online nodes — enough to verify the cap of 3. // Six online nodes — enough to verify the bootstrap cap of 3.
const mockStoreState = { const mockStoreState = {
selectedNodeId: null as string | null, selectedNodeId: null as string | null,
nodes: [ nodes: [
@ -56,6 +66,10 @@ vi.mock("@/lib/design-tokens", () => ({
// ── Imports (after mocks) ───────────────────────────────────────────────────── // ── Imports (after mocks) ─────────────────────────────────────────────────────
import { api } from "@/lib/api"; import { api } from "@/lib/api";
import {
emitSocketEvent,
_resetSocketEventListenersForTests,
} from "@/store/socket-events";
import { CommunicationOverlay } from "../CommunicationOverlay"; import { CommunicationOverlay } from "../CommunicationOverlay";
const mockGet = vi.mocked(api.get); const mockGet = vi.mocked(api.get);
@ -66,30 +80,34 @@ beforeEach(() => {
vi.useFakeTimers(); vi.useFakeTimers();
mockGet.mockReset(); mockGet.mockReset();
mockGet.mockResolvedValue([]); mockGet.mockResolvedValue([]);
// Drop any subscribers the previous test left on the singleton bus —
// each render adds one via useSocketEvent.
_resetSocketEventListenersForTests();
}); });
afterEach(() => { afterEach(() => {
cleanup(); cleanup();
vi.useRealTimers(); vi.useRealTimers();
_resetSocketEventListenersForTests();
}); });
// ── Tests ───────────────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────────
describe("CommunicationOverlay — fan-out cap", () => { describe("CommunicationOverlay — bootstrap fan-out cap", () => {
it("polls at most 3 of 6 online workspaces (rate-limit floor)", async () => { it("bootstraps at most 3 of 6 online workspaces (rate-limit floor preserved post-#61)", async () => {
await act(async () => { await act(async () => {
render(<CommunicationOverlay />); render(<CommunicationOverlay />);
}); });
// Mount fires the first poll synchronously (no interval tick yet). // Mount fires the bootstrap synchronously — pre-#61 this was the
// Pre-fix: 6 calls. Post-fix: 3. // first poll cycle; post-#61 it's the only HTTP fetch (live updates
// arrive via WS push). 6 nodes → 3 fetches.
expect(mockGet).toHaveBeenCalledTimes(3); expect(mockGet).toHaveBeenCalledTimes(3);
// Verify the calls are for the FIRST 3 online nodes (slice order).
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-1/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-1/activity?limit=5");
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-2/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-2/activity?limit=5");
expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-3/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-3/activity?limit=5");
}); });
it("never polls offline workspaces", async () => { it("never bootstraps offline workspaces", async () => {
await act(async () => { await act(async () => {
render(<CommunicationOverlay />); render(<CommunicationOverlay />);
}); });
@ -99,40 +117,39 @@ describe("CommunicationOverlay — fan-out cap", () => {
}); });
}); });
describe("CommunicationOverlay — cadence", () => { describe("CommunicationOverlay — no interval polling (post-#61)", () => {
it("uses 30s interval cadence (was 10s pre-fix)", async () => { // The pre-#61 implementation re-fetched every 30s per workspace.
// Post-#61 the only HTTP path is the bootstrap on mount + on
// visibility-toggle. This test pins the absence of any interval
// poll: a 60s clock advance must not produce a second round of
// fetches.
it("does NOT poll on a 30s interval after bootstrap", async () => {
await act(async () => { await act(async () => {
render(<CommunicationOverlay />); render(<CommunicationOverlay />);
}); });
expect(mockGet).toHaveBeenCalledTimes(3); // initial mount poll expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap
mockGet.mockClear();
// Advance 10s — pre-fix this would fire another poll. Post-fix: silent. // Advance 60s — well past any plausible cadence the prior version
// could have used.
await act(async () => { await act(async () => {
vi.advanceTimersByTime(10_000); vi.advanceTimersByTime(60_000);
}); });
expect(mockGet).toHaveBeenCalledTimes(3); expect(mockGet).not.toHaveBeenCalled();
// Advance to 30s — interval fires.
await act(async () => {
vi.advanceTimersByTime(20_000);
});
expect(mockGet).toHaveBeenCalledTimes(6); // +3 from second tick
}); });
}); });
describe("CommunicationOverlay — visibility gate", () => { describe("CommunicationOverlay — visibility gate", () => {
// The visibility gate is the dial that drops collapsed-panel polling // The visibility gate now does two things post-#61:
// to ZERO. The cadence test above can't catch its removal — if a // - while closed, the WS handler short-circuits (no setComms churn)
// refactor dropped `if (!visible) return`, the cadence test would // - re-opening triggers a fresh bootstrap so the list reflects
// still pass because the effect would still fire every 30s. // anything that happened while the panel was collapsed
// //
// Direct probe: render with comms-returning mock so the panel // Direct probe: render with comms-returning mock so the panel
// actually renders (close button only exists in the expanded panel, // actually renders (close button only exists in the expanded panel,
// not the collapsed button-state). Click close, advance the clock, // not the collapsed button-state). Click close, advance the clock,
// assert no further fetches. // assert no further fetches.
it("stops polling after the user collapses the panel", async () => { it("stops fetching while collapsed and re-bootstraps on re-open", async () => {
// Mock returns one a2a_send so comms.length > 0 → panel renders →
// close button accessible.
mockGet.mockResolvedValue([ mockGet.mockResolvedValue([
{ {
id: "act-1", id: "act-1",
@ -150,29 +167,202 @@ describe("CommunicationOverlay — visibility gate", () => {
const { getByLabelText } = await act(async () => { const { getByLabelText } = await act(async () => {
return render(<CommunicationOverlay />); return render(<CommunicationOverlay />);
}); });
// Drain pending microtasks (resolves the await in fetchComms) so // Drain pending microtasks (resolves the await in bootstrap) so
// setComms lands and the panel renders. Don't advance time — that // setComms lands and the panel renders. Don't advance time — it's
// would fire the next interval tick and pollute the assertion. // not load-bearing for the gate test, but matches the pattern used
// pre-#61 for stability.
await act(async () => { await act(async () => {
await Promise.resolve(); await Promise.resolve();
await Promise.resolve(); await Promise.resolve();
await Promise.resolve(); await Promise.resolve();
}); });
// Initial mount polled 3 workspaces. expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap
expect(mockGet).toHaveBeenCalledTimes(3);
mockGet.mockClear(); mockGet.mockClear();
// Click the close button. Synchronous getByLabelText avoids // Click close. While closed, no fetches and no WS-driven updates.
// findBy's internal setTimeout (deadlocks under useFakeTimers). const closeBtn = getByLabelText("Close communications panel");
await act(async () => {
fireEvent.click(closeBtn);
});
await act(async () => {
vi.advanceTimersByTime(60_000);
});
expect(mockGet).not.toHaveBeenCalled();
// Re-open via the collapsed button. Must trigger a fresh bootstrap.
const openBtn = getByLabelText("Show communications panel");
await act(async () => {
fireEvent.click(openBtn);
});
await act(async () => {
await Promise.resolve();
await Promise.resolve();
});
expect(mockGet).toHaveBeenCalledTimes(3); // re-bootstrap on re-open
});
});
describe("CommunicationOverlay — WS subscription (#61 stage 1 core)", () => {
// The load-bearing post-#61 behaviour. Every test in this block must
// verify (a) the WS push DID update the rendered comms list, and
// (b) NO additional HTTP call was fired — the whole point of the
// refactor is to remove the polling-driven HTTP traffic.
function emitActivityLogged(overrides: Partial<{
workspaceId: string;
payload: Record<string, unknown>;
}> = {}) {
emitSocketEvent({
event: "ACTIVITY_LOGGED",
workspace_id: overrides.workspaceId ?? "ws-1",
timestamp: new Date().toISOString(),
payload: {
id: `act-${Math.random().toString(36).slice(2)}`,
activity_type: "a2a_send",
source_id: "ws-1",
target_id: "ws-2",
summary: "live push",
status: "ok",
duration_ms: 42,
created_at: new Date().toISOString(),
...overrides.payload,
},
});
}
it("WS push for a comm activity_type extends the rendered list with NO additional HTTP call", async () => {
const { container } = await act(async () => {
return render(<CommunicationOverlay />);
});
expect(mockGet).toHaveBeenCalledTimes(3); // bootstrap
mockGet.mockClear();
await act(async () => {
emitActivityLogged({ payload: { summary: "hello" } });
});
await act(async () => {
await Promise.resolve();
});
// Two pins:
// 1. comms list reflects the live push (look for the summary text)
// 2. zero HTTP fetches fired during the WS path
expect(container.textContent).toContain("hello");
expect(mockGet).not.toHaveBeenCalled();
});
it("WS push for an offline workspace is ignored", async () => {
const { container } = await act(async () => {
return render(<CommunicationOverlay />);
});
mockGet.mockClear();
await act(async () => {
emitActivityLogged({
workspaceId: "ws-offline",
payload: { source_id: "ws-offline", summary: "should-not-render" },
});
});
await act(async () => {
await Promise.resolve();
});
expect(container.textContent).not.toContain("should-not-render");
expect(mockGet).not.toHaveBeenCalled();
});
it("WS push for a non-comm activity_type is ignored (e.g. delegation)", async () => {
const { container } = await act(async () => {
return render(<CommunicationOverlay />);
});
mockGet.mockClear();
await act(async () => {
emitActivityLogged({
payload: {
activity_type: "delegation",
summary: "should-not-render-delegation",
},
});
});
await act(async () => {
await Promise.resolve();
});
expect(container.textContent).not.toContain("should-not-render-delegation");
expect(mockGet).not.toHaveBeenCalled();
});
it("WS push while the panel is collapsed is ignored (no churn on hidden state)", async () => {
// Bootstrap with one comm so the panel renders → close button
// accessible. Then collapse, emit a WS push, re-open: the rendered
// list must come from the re-bootstrap, NOT from the WS-push that
// arrived during the closed state. Also: nothing visible while
// closed (the collapsed button shows only the count, not summaries).
mockGet.mockResolvedValue([
{
id: "act-bootstrap",
workspace_id: "ws-1",
activity_type: "a2a_send",
source_id: "ws-1",
target_id: "ws-2",
summary: "bootstrap-summary",
status: "ok",
duration_ms: 1,
created_at: new Date().toISOString(),
},
]);
const { getByLabelText, container } = await act(async () => {
return render(<CommunicationOverlay />);
});
await act(async () => {
await Promise.resolve();
await Promise.resolve();
});
// Collapse.
const closeBtn = getByLabelText("Close communications panel"); const closeBtn = getByLabelText("Close communications panel");
await act(async () => { await act(async () => {
fireEvent.click(closeBtn); fireEvent.click(closeBtn);
}); });
// Advance well past the 30s cadence — gate should suppress the tick. // Bootstrap mock returns nothing on the re-open path so we can
// distinguish "WS push leaked through the gate" from "re-bootstrap
// refilled the list."
mockGet.mockReset();
mockGet.mockResolvedValue([]);
await act(async () => { await act(async () => {
vi.advanceTimersByTime(60_000); emitActivityLogged({
payload: { summary: "leaked-while-closed" },
});
}); });
await act(async () => {
await Promise.resolve();
});
// Closed state: rendered DOM must not show any push-derived text.
expect(container.textContent).not.toContain("leaked-while-closed");
});
it("non-ACTIVITY_LOGGED events are ignored (e.g. WORKSPACE_OFFLINE)", async () => {
const { container } = await act(async () => {
return render(<CommunicationOverlay />);
});
mockGet.mockClear();
await act(async () => {
emitSocketEvent({
event: "WORKSPACE_OFFLINE",
workspace_id: "ws-1",
timestamp: new Date().toISOString(),
payload: { summary: "should-not-render-event" },
});
});
await act(async () => {
await Promise.resolve();
});
expect(container.textContent).not.toContain("should-not-render-event");
expect(mockGet).not.toHaveBeenCalled(); expect(mockGet).not.toHaveBeenCalled();
}); });
}); });

View File

@ -0,0 +1,147 @@
# Rate-limit observability runbook
> Companion to issue #64 ("RATE_LIMIT default re-tune analysis"). After
> #60 deployed the per-tenant `keyFor` keying, the right RATE_LIMIT
> default became data-dependent. This runbook documents the metrics +
> queries an operator should run to confirm whether the current 600
> req/min/key default is correct, too tight, or too loose.
## What's already exposed
The workspace-server's existing Prometheus middleware
(`workspace-server/internal/metrics/metrics.go`) tracks every request
on every path:
```
molecule_http_requests_total{method, path, status} counter
molecule_http_request_duration_seconds_total{method,path,status} counter
```
Path is the matched route pattern (`/workspaces/:id/activity` etc), so
high-cardinality workspace UUIDs do not explode the label space.
The rate limiter middleware (#60, `workspace-server/internal/middleware/ratelimit.go`)
also stamps every response with `X-RateLimit-Limit`, `X-RateLimit-Remaining`,
and `X-RateLimit-Reset`. Operators with browser-side or proxy-side
header capture can read per-request bucket state directly.
No new instrumentation is needed for #64's acceptance criteria. The
metric surface is sufficient — this runbook just collects the queries.
## Queries to run after #60 deploys
### 1. Is the bucket actually firing 429s?
```promql
sum(rate(molecule_http_requests_total{status="429"}[5m]))
```
If this is zero on a given tenant, the bucket isn't being hit. If it's
sustained > 1/min, dig in.
### 2. Which routes attract 429s?
```promql
topk(
10,
sum by (path) (
rate(molecule_http_requests_total{status="429"}[5m])
)
)
```
Expected shape post-#60:
- `/workspaces/:id/activity` should be near zero — the canvas no longer
polls it on a 30s/60s/5s cadence (PRs #69 / #71 / #76).
- Probe / health / heartbeat paths should be ~0 (those routes have a
separate IP-fallback bucket).
If `/workspaces/:id/activity` 429s persist post-PRs-69/71/76 deploy, the
canvas isn't running the WS-subscriber path — investigate WS health
on that tenant.
### 3. Per-bucket-key inference (no direct exposure today)
The bucket map itself is in-memory only; we deliberately do **not**
expose `org:<uuid>` ↔ remaining-tokens because that map can include
SHA-256 hashes of bearer tokens. A tenant that wants per-key visibility
should rely on response headers (`X-RateLimit-Remaining` on every
response from a given session is the bucket's view of that session).
If you genuinely need server-side per-bucket counts for triage,
file a follow-up — the proper shape is a `/internal/ratelimit-stats`
endpoint that emits **counts per key prefix only** (e.g. `org:`, `tok:`,
`ip:`), never the key payloads. Don't roll that ad-hoc; it's a security
review surface.
## Decision tree for the re-tune
After 14 days of production traffic on a tenant, look at the queries
above and walk this tree:
```
Q1: Is the 429 rate sustained > 0.1/sec on any tenant?
├─ NO → The 600 default has comfortable headroom. Either keep it,
│ or lower it carefully (300) ONLY if you have a documented
│ reason (e.g. a misbehaving client we want to throttle harder).
│ Default to "no change" — see #64 for the math.
└─ YES → Q2.
Q2: Is the 429 rate concentrated on ONE tenant or spread across many?
├─ ONE tenant → Operator override: set RATE_LIMIT=1200 or 1800 on that
│ tenant's box. Document in the tenant's ops note. The
│ default does not need to change.
└─ MANY tenants → Q3.
Q3: Are the 429s on a route that polls (e.g. /activity / /peers)?
├─ YES → Confirm PRs #69, #71, #76 have actually deployed to those
│ tenants. If they have and 429s persist, the canvas may have
│ a regression — do not raise RATE_LIMIT. File a canvas issue.
└─ NO → 429s on mutating routes mean genuine load. Raise the default
to 1200 in `workspace-server/internal/router/router.go:54`.
Same PR should attach: the metric chart, the time window,
and a paragraph explaining what changed in our traffic shape.
```
## Alert rule template (drop-in for Prometheus)
```yaml
# Sustained 429s — file is the SLO trip-wire. If this fires, walk the
# decision tree above. NB: the issue#64 acceptance criterion is "two
# weeks of metrics"; this alert is the inverse — it tells you something
# changed before the two weeks are up.
groups:
- name: workspace-server-ratelimit
rules:
- alert: WorkspaceServerRateLimit429Sustained
expr: |
sum by (instance) (
rate(molecule_http_requests_total{status="429"}[10m])
) > 0.1
for: 30m
labels:
severity: warning
owner: workspace-server
annotations:
summary: "{{ $labels.instance }} sustained 429s — see ratelimit-observability runbook"
runbook: "https://git.moleculesai.app/molecule-ai/molecule-core/blob/main/docs/engineering/ratelimit-observability.md"
```
Threshold rationale: 0.1 req/s = 6/min sustained over 10min. Below
that, a 429 is almost certainly a transient burst that the canvas's
retry-once handler at `canvas/src/lib/api.ts:55` already absorbs. The
30m `for:` keeps the alert from chattering on a brief blip.
## Companion probe script
For one-off triage when an operator can reproduce the problem in their
own browser, `scripts/edge-429-probe.sh` (#62) reproduces a canvas-
sized burst against a tenant subdomain and dumps each 429's response
shape so the operator can distinguish workspace-server bucket overflow
from CF/Vercel edge rate-limiting without dashboard access.
```sh
./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt
```
The script's report header explains how to read the output.

155
scripts/edge-429-probe.sh Executable file
View File

@ -0,0 +1,155 @@
#!/usr/bin/env bash
# edge-429-probe.sh — capture 429 origin (workspace-server vs CF/Vercel edge)
# during a simulated canvas-burst against a tenant subdomain.
#
# Issue molecule-core#62. The post-#60 verification step asks an
# operator with CF/Vercel dashboard access to confirm whether the
# layout-chunk 429s observed in DevTools were:
# (a) workspace-server bucket overflow (closes once #60 deploys), or
# (b) actual edge-layer rate-limiting (CF or Vercel).
#
# This script doesn't need dashboard access. It reproduces the burst
# pattern locally and dumps every 429's response shape so the operator
# can distinguish (a) from (b) by inspection: workspace-server emits a
# JSON body, CF emits HTML, Vercel emits a different HTML. Headers tell
# the same story (cf-ray vs x-vercel-*).
#
# Usage:
# ./scripts/edge-429-probe.sh <tenant-host> [--burst N] [--waves N] [--pause SECS] [--out file]
#
# Example:
# ./scripts/edge-429-probe.sh hongming.moleculesai.app --burst 80 --out /tmp/edge.txt
#
# The script is read-only against the target — it only issues GETs to
# public-by-design endpoints. No mutating requests, no credential use.
set -euo pipefail
# ── Help / usage handling first, before positional capture ────────────────────
case "${1:-}" in
-h|--help|"")
sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//'
exit 0
;;
esac
HOST="$1"; shift
BURST=80
WAVES=3
WAVE_PAUSE=2
OUT=""
while [ "${1:-}" != "" ]; do
case "$1" in
--burst) BURST="$2"; shift 2 ;;
--waves) WAVES="$2"; shift 2 ;;
--pause) WAVE_PAUSE="$2"; shift 2 ;;
--out) OUT="$2"; shift 2 ;;
-h|--help)
sed -n '/^# edge-429-probe.sh/,/^$/p' "$0" | sed 's/^# \{0,1\}//'
exit 0
;;
*) echo "unknown arg: $1" >&2; exit 2 ;;
esac
done
# ── Endpoint discovery ────────────────────────────────────────────────────────
echo "→ Discovering a layout-chunk URL from canvas root..." >&2
ROOT_BODY=$(curl -fsSL --max-time 10 "https://${HOST}/" 2>/dev/null || true)
LAYOUT_PATH=$(echo "$ROOT_BODY" \
| grep -oE '/_next/static/chunks/layout-[A-Za-z0-9_-]+\.js' \
| head -1 || true)
if [ -z "$LAYOUT_PATH" ]; then
LAYOUT_PATH="/_next/static/chunks/layout-probe-not-found.js"
echo " (no layout chunk discovered — using sentinel path; 404 on this is expected)" >&2
else
echo " layout chunk: $LAYOUT_PATH" >&2
fi
# Probe URL: a generic activity endpoint. The rate-limiter middleware
# runs BEFORE workspace-id validation, so unauth/invalid-id requests
# still hit the bucket.
ACTIVITY_PATH="/workspaces/00000000-0000-0000-0000-000000000000/activity?probe=edge-429"
# ── Fire one curl, write a single-line JSON-ish status record to stdout ──────
# Inlined into xargs as a heredoc-style command rather than a function so
# the function-export pitfalls (some shells lose `export -f` across xargs)
# don't apply. Each output line is a parseable record; failed curls emit
# a curl_err record so request volume is preserved.
TMP_RESULTS="$(mktemp -t edge-429-probe.XXXXXX)"
trap 'rm -f "$TMP_RESULTS"' EXIT
run_burst() {
# $1 = path; $2 = label; $3 = wave_id
local path="$1" label="$2" wave="$3"
local i
for i in $(seq 1 "$BURST"); do
{
out=$(curl -sS --max-time 10 -o /dev/null \
-w 'status=%{http_code} size=%{size_download} time=%{time_total} server=%{header.server} cf_ray=%{header.cf-ray} x_vercel=%{header.x-vercel-id} retry_after=%{header.retry-after} content_type=%{header.content-type} x_ratelimit_limit=%{header.x-ratelimit-limit} x_ratelimit_remaining=%{header.x-ratelimit-remaining} x_ratelimit_reset=%{header.x-ratelimit-reset}\n' \
"https://${HOST}${path}" 2>/dev/null) || out="status=curl_err"
printf 'label=%s-%s-%s %s\n' "$label" "$wave" "$i" "$out" >> "$TMP_RESULTS"
} &
done
wait
}
emit() {
if [ -n "$OUT" ]; then
printf '%s\n' "$*" >> "$OUT"
else
printf '%s\n' "$*"
fi
}
if [ -n "$OUT" ]; then : > "$OUT"; fi
emit "# edge-429-probe report"
emit "# host=$HOST burst=$BURST waves=$WAVES pause=${WAVE_PAUSE}s"
emit "# layout_path=$LAYOUT_PATH"
emit "# activity_path=$ACTIVITY_PATH"
emit "# generated=$(date -u +%Y-%m-%dT%H:%M:%SZ)"
emit ""
for wave in $(seq 1 "$WAVES"); do
emit "## wave $wave"
: > "$TMP_RESULTS"
run_burst "$LAYOUT_PATH" "layout" "$wave"
run_burst "$ACTIVITY_PATH" "activity" "$wave"
while read -r line; do
emit " $line"
done < "$TMP_RESULTS"
if [ "$wave" -lt "$WAVES" ]; then
sleep "$WAVE_PAUSE"
fi
done
emit ""
emit "## summary — how to read the report"
emit "# status=429 + content_type starts with application/json + x_ratelimit_limit set"
emit "# => workspace-server bucket overflow. Closes when #60 deploys."
emit "# status=429 + cf_ray set + content_type=text/html"
emit "# => Cloudflare WAF / rate-limit. Audit dashboard rules per #62."
emit "# status=429 + x_vercel set + content_type=text/html"
emit "# => Vercel edge / Bot Fight Mode. Audit Vercel project per #62."
emit "# status=429 with no server/cf_ray/x_vercel"
emit "# => corporate proxy or VPN. Not actionable in this repo."
if [ -n "$OUT" ]; then
echo "→ Report written to $OUT" >&2
# Match only data lines (begin with two-space indent + "label="),
# not the summary's reference text which also mentions "status=429".
# grep -c outputs "0" + exits 1 when zero matches; `|| true` masks
# the exit status so set -e doesn't trip without losing the count.
total=$(grep -c '^ label=' "$OUT" 2>/dev/null || true)
total429=$(grep -c '^ label=.*status=429' "$OUT" 2>/dev/null || true)
total=${total:-0}
total429=${total429:-0}
echo "→ Totals: ${total429} of ${total} requests returned 429" >&2
if [ "${total429}" -gt 0 ]; then
echo "→ Per-label 429 counts:" >&2
grep '^ label=.*status=429' "$OUT" \
| sed -E 's/^ label=([^-]+).*/ \1/' \
| sort | uniq -c >&2
fi
fi

View File

@ -5,17 +5,19 @@ import (
"context" "context"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
// RateLimiter implements a simple token bucket rate limiter per IP. // RateLimiter implements a token bucket rate limiter keyed by tenant
// identity (org id, then bearer token, then client IP — see keyFor).
type RateLimiter struct { type RateLimiter struct {
mu sync.Mutex mu sync.Mutex
buckets map[string]*bucket buckets map[string]*bucket
rate int // tokens per interval rate int // tokens per interval
interval time.Duration interval time.Duration
} }
@ -42,9 +44,9 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
case <-ticker.C: case <-ticker.C:
rl.mu.Lock() rl.mu.Lock()
cutoff := time.Now().Add(-10 * time.Minute) cutoff := time.Now().Add(-10 * time.Minute)
for ip, b := range rl.buckets { for k, b := range rl.buckets {
if b.lastReset.Before(cutoff) { if b.lastReset.Before(cutoff) {
delete(rl.buckets, ip) delete(rl.buckets, k)
} }
} }
rl.mu.Unlock() rl.mu.Unlock()
@ -54,29 +56,73 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
return rl return rl
} }
// Middleware returns a Gin middleware that rate limits by client IP. // keyFor returns the bucket identifier for this request. Priority:
//
// 1. X-Molecule-Org-Id header — when present (CP-routed SaaS traffic),
// isolates tenants from each other regardless of the upstream proxy IP
// they all share.
// 2. SHA-256 of Authorization Bearer token — when present (per-workspace
// bearer, ADMIN_TOKEN, org-scoped API token). On a per-tenant Caddy
// box where the org-id header isn't attached, this still distinguishes
// distinct user sessions on the same egress IP.
// 3. ClientIP() — anonymous probes, /health scrapes, registry boot
// signals (when SetTrustedProxies(nil) is in effect, this is the
// direct TCP RemoteAddr — fine for the probe surface, not fine as a
// primary key behind a proxy, hence the priority order above).
//
// Mixing these namespaces is fine because they never collide: org ids
// are UUIDs ("org:..."), token hashes are 64-char hex ("tok:..."), IPs
// contain dots/colons ("ip:...").
//
// Security note on X-Molecule-Org-Id spoofing: the rate limiter runs
// BEFORE TenantGuard, so the org-id value here is unvalidated. A caller
// reaching workspace-server directly could spoof the header to drain
// another org's bucket. In production this surface is closed by the
// CP/Caddy front: tenant SGs reject :8080 from the public internet, and
// CP rewrites the header to the verified org. If a future deployment
// exposes :8080 directly, validate the org-id (e.g. against
// MOLECULE_ORG_ID) before keying on it, or move this middleware after
// TenantGuard. The token-hash and IP fallbacks are unspoofable.
//
// Issue #59 — replaces the previous IP-only keying that silently
// collapsed all canvas traffic into one bucket once #179 disabled
// proxy-header trust. See the issue for the deployment-shape analysis.
func (rl *RateLimiter) keyFor(c *gin.Context) string {
if orgID := strings.TrimSpace(c.GetHeader("X-Molecule-Org-Id")); orgID != "" {
return "org:" + orgID
}
if tok := bearerFromHeader(c.GetHeader("Authorization")); tok != "" {
return "tok:" + tokenKey(tok)
}
return "ip:" + c.ClientIP()
}
// Middleware returns a Gin middleware that rate limits per caller. The
// caller-key derivation lives in keyFor — see that function's doc for
// the priority list and rationale.
func (rl *RateLimiter) Middleware() gin.HandlerFunc { func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
// Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth / // Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth /
// discovery. On a local single-user Docker setup the 600-req/min // discovery. On a local single-user Docker setup the 600-req/min
// bucket fills fast: a 15-workspace canvas + activity polling + // bucket fills fast: a 15-workspace canvas + activity polling +
// approvals polling + A2A overlay + initial hydration all share // approvals polling + A2A overlay + initial hydration all land in
// one IP bucket, so a minute of active use can trip 429 and blank // one bucket (whichever keyFor returns — typically the dev user's
// the page. Gated by MOLECULE_ENV=development + empty ADMIN_TOKEN // IP or shared admin token), so a minute of active use can trip
// so SaaS production keeps the bucket. // 429 and blank the page. Gated by MOLECULE_ENV=development +
// empty ADMIN_TOKEN so SaaS production keeps the bucket.
if isDevModeFailOpen() { if isDevModeFailOpen() {
c.Header("X-RateLimit-Limit", "unlimited") c.Header("X-RateLimit-Limit", "unlimited")
c.Next() c.Next()
return return
} }
ip := c.ClientIP() key := rl.keyFor(c)
rl.mu.Lock() rl.mu.Lock()
b, exists := rl.buckets[ip] b, exists := rl.buckets[key]
if !exists { if !exists {
b = &bucket{tokens: rl.rate, lastReset: time.Now()} b = &bucket{tokens: rl.rate, lastReset: time.Now()}
rl.buckets[ip] = b rl.buckets[key] = b
} }
// Reset tokens if interval has passed // Reset tokens if interval has passed

View File

@ -0,0 +1,303 @@
package middleware
import (
"context"
"crypto/sha256"
"fmt"
"go/ast"
"go/parser"
"go/token"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
)
// newTestLimiterForKeyFor — same shape as newTestLimiter in ratelimit_test.go
// but exposes the *gin.Engine and lets the caller inject headers per-request.
func newTestLimiterForKeyFor(t *testing.T, rate int) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(rate, 5*time.Second, ctx)
r := gin.New()
if err := r.SetTrustedProxies(nil); err != nil {
t.Fatalf("SetTrustedProxies: %v", err)
}
r.Use(rl.Middleware())
r.GET("/x", func(c *gin.Context) { c.String(http.StatusOK, "ok") })
return r
}
// TestKeyFor_OrgIdHeaderTrumpsBearerAndIP — when X-Molecule-Org-Id is set
// the bucket is keyed on it regardless of bearer token or IP. This is the
// load-bearing case for the production SaaS plane: every tenant routed
// through the same upstream proxy IP gets its own bucket because the
// CP attaches the org-id header.
func TestKeyFor_OrgIdHeaderTrumpsBearerAndIP(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "10.0.0.1:1234"
c.Request.Header.Set("X-Molecule-Org-Id", "org-aaa")
c.Request.Header.Set("Authorization", "Bearer ignored-token-value")
got := rl.keyFor(c)
if got != "org:org-aaa" {
t.Errorf("keyFor with org-id header: got %q, want %q", got, "org:org-aaa")
}
}
// TestKeyFor_BearerTokenWhenNoOrgId — the per-tenant Caddy box path:
// no org-id header (canvas same-origin), but Authorization Bearer is
// always set by WorkspaceAuth-protected routes. Bucket keyed on the
// SHA-256 hex of the token so distinct sessions on the same egress IP
// get distinct buckets — and so the in-memory map can never become a
// token dump if the process is inspected.
func TestKeyFor_BearerTokenWhenNoOrgId(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "10.0.0.1:1234"
c.Request.Header.Set("Authorization", "Bearer secret-token-abc")
got := rl.keyFor(c)
expectedHash := fmt.Sprintf("%x", sha256.Sum256([]byte("secret-token-abc")))
if got != "tok:"+expectedHash {
t.Errorf("keyFor with bearer-only: got %q, want %q", got, "tok:"+expectedHash)
}
// Critical security pin: raw token must never appear in the key.
if strings.Contains(got, "secret-token-abc") {
t.Errorf("keyFor leaked raw bearer token in bucket key: %q", got)
}
}
// TestKeyFor_IPFallbackWhenNoOrgIdNoBearer — anonymous probes (no auth,
// no tenant header) fall through to ClientIP keying. This is the only
// path that depended on the pre-#179 trust-XFF behaviour and is fine
// to keep IP-keyed because the surface is just /health, /buildinfo,
// and the registry-boot endpoints.
func TestKeyFor_IPFallbackWhenNoOrgIdNoBearer(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "203.0.113.1:1234"
got := rl.keyFor(c)
// gin.ClientIP() strips the port — we just need to confirm the prefix
// and that the IP appears.
if !strings.HasPrefix(got, "ip:") {
t.Errorf("keyFor without auth/org headers: got %q, want prefix %q", got, "ip:")
}
if !strings.Contains(got, "203.0.113.1") {
t.Errorf("keyFor IP fallback: got %q, want to contain %q", got, "203.0.113.1")
}
}
// TestRateLimit_TwoOrgsSameIP_IndependentBuckets — the load-bearing
// regression test for issue #59. Two tenants behind the same upstream
// proxy must NOT share a bucket; the production SaaS-plane outage was
// every tenant collapsing to the proxy IP and saturating one bucket.
//
// Mutation invariant: removing the org-id branch from keyFor — say,
// returning "ip:" + c.ClientIP() unconditionally — collapses both
// tenants back into one bucket and this test fails on the 3rd
// request because it would 429 instead of 200.
func TestRateLimit_TwoOrgsSameIP_IndependentBuckets(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
exhaust := func(orgID string) {
t.Helper()
for i := 0; i < 2; i++ {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234" // SAME upstream proxy IP
req.Header.Set("X-Molecule-Org-Id", orgID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup orgID=%s req %d: want 200, got %d", orgID, i+1, w.Code)
}
}
}
exhaust("org-aaa")
// org-aaa is now at 0 tokens. org-bbb's bucket must be FRESH.
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-bbb")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("org-bbb on same IP must have its own bucket: got %d, want 200 (issue #59 regression)", w.Code)
}
// Confirm org-aaa is still throttled — proves we're not just opening
// the gate to everyone.
req = httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-aaa")
w = httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusTooManyRequests {
t.Errorf("org-aaa exhausted bucket: want 429, got %d", w.Code)
}
}
// TestRateLimit_TwoTokensSameIP_IndependentBuckets — analog of the
// org-id case for the per-tenant Caddy box: two distinct user
// sessions on the same egress IP, distinguished only by their bearer
// tokens, must get independent buckets. This was the path Hongming
// hit on hongming.moleculesai.app — a single user with multiple
// browser tabs against one workspace-server box.
func TestRateLimit_TwoTokensSameIP_IndependentBuckets(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
exhaust := func(token string) {
t.Helper()
for i := 0; i < 2; i++ {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "127.0.0.1:1234" // local Caddy proxy — same for both
req.Header.Set("Authorization", "Bearer "+token)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup token=%s req %d: want 200, got %d", token, i+1, w.Code)
}
}
}
exhaust("user-a-token")
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "127.0.0.1:1234"
req.Header.Set("Authorization", "Bearer user-b-token")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("user-b token on same proxy IP must have its own bucket: got %d, want 200", w.Code)
}
}
// TestRateLimit_SameOrgDifferentTokens_SharedBucket — counter-pin:
// ensure org-id keying really does collapse all tokens within one
// org into one bucket. This is the desired behaviour: a tenant that
// mints multiple tokens shouldn't be able to circumvent its quota
// by rotating tokens between requests. (The same-IP-different-org
// test above proves we don't collapse ACROSS orgs; this one proves
// we DO collapse WITHIN one org.)
func TestRateLimit_SameOrgDifferentTokens_SharedBucket(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
for _, tok := range []string{"token-1", "token-2"} {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-shared")
req.Header.Set("Authorization", "Bearer "+tok)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup tok=%s: want 200, got %d", tok, w.Code)
}
}
// Bucket should be exhausted now — third request, even with a fresh
// token, must 429 because the org-id is keying it.
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-shared")
req.Header.Set("Authorization", "Bearer token-3")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusTooManyRequests {
t.Errorf("rotating tokens within one org should NOT bypass the quota: got %d, want 429", w.Code)
}
}
// TestRateLimit_Middleware_RoutesThroughKeyFor is the AST gate (mirror
// of #36/#10/#12's gates). Pins the SSOT routing invariant:
// (*RateLimiter).Middleware MUST call rl.keyFor and MUST NOT carry a
// direct c.ClientIP() call (= the parallel-impl drift this PR fixes).
//
// Mutation invariant: a future PR that re-introduces direct IP keying
// in Middleware (`ip := c.ClientIP()`) makes this test fail. That's
// the signal to either (a) extend keyFor's contract to cover the new
// case OR (b) update this gate with an explicit reason. Either way the
// drift gets a reviewer's attention before shipping.
func TestRateLimit_Middleware_RoutesThroughKeyFor(t *testing.T) {
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, "ratelimit.go", nil, parser.ParseComments)
if err != nil {
t.Fatalf("parse ratelimit.go: %v", err)
}
var fn *ast.FuncDecl
ast.Inspect(file, func(n ast.Node) bool {
f, ok := n.(*ast.FuncDecl)
if !ok {
return true
}
// Match `func (rl *RateLimiter) Middleware() ...`
if f.Name.Name != "Middleware" {
return true
}
if f.Recv == nil || len(f.Recv.List) != 1 {
return true
}
star, ok := f.Recv.List[0].Type.(*ast.StarExpr)
if !ok {
return true
}
if id, ok := star.X.(*ast.Ident); !ok || id.Name != "RateLimiter" {
return true
}
fn = f
return false
})
if fn == nil {
t.Fatal("(*RateLimiter).Middleware not found — was it renamed? update this gate or the SSOT routing assumption")
}
var (
callsKeyFor bool
callsClientIP bool
)
ast.Inspect(fn.Body, func(n ast.Node) bool {
call, ok := n.(*ast.CallExpr)
if !ok {
return true
}
sel, ok := call.Fun.(*ast.SelectorExpr)
if !ok {
return true
}
switch sel.Sel.Name {
case "keyFor":
callsKeyFor = true
case "ClientIP":
callsClientIP = true
}
return true
})
if !callsKeyFor {
t.Error("(*RateLimiter).Middleware must call rl.keyFor for SSOT bucket-key derivation — see issue #59. Found no keyFor call.")
}
if callsClientIP {
t.Error("(*RateLimiter).Middleware carries a direct c.ClientIP() call. This is the parallel-impl drift issue #59 fixed. " +
"Either route through rl.keyFor OR — if a new use case truly needs direct IP — extend keyFor's contract first and update this gate to allow the specific delta.")
}
}