From 830de70e84dfb240c16820e65b1adfe161678edc Mon Sep 17 00:00:00 2001 From: security-auditor Date: Thu, 7 May 2026 15:11:02 -0700 Subject: [PATCH] =?UTF-8?q?feat(canvas):=20CommunicationOverlay=20subscrib?= =?UTF-8?q?es=20to=20ACTIVITY=5FLOGGED=20=E2=80=94=20drop=2030s=20polling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 1 of #61. Replaces the 30s setInterval poll with: 1. One bootstrap fan-out on mount (cap of 3 retained from the 2026-05-04 fix), gives the initial recent-comms window without waiting for live events. 2. useSocketEvent subscription to ACTIVITY_LOGGED — every event with a comm-overlay-relevant activity_type from a visible online workspace prepends to the rendered list. 3. Re-bootstrap on visibility-toggle re-open so the snapshot is fresh after a long collapsed period. No interval poll. Inherits the singleton ReconnectingSocket's reconnect / backoff / health-check guarantees via useSocketEvent. Steady-state HTTP traffic from this overlay drops from ~6 req/min (3 ws × 2 cycles/min) to 0 outside of mount/visibility-toggle bootstraps. Live updates arrive within ~10ms of the server insert instead of after up to 30s. Test changes: - Bootstrap fan-out cap of 3 — kept (was the cadence test's role pre-#61) - 30s cadence test — replaced with "no interval polling" test that pins the absence of any cadence-driven HTTP after bootstrap - Visibility gate test — extended to verify both: no fetches while closed, AND re-bootstrap on re-open - WS subscription tests (new): - WS push extends rendered list with NO HTTP call - WS push for offline workspace ignored - WS push for non-comm activity_type ignored - WS push while collapsed ignored - non-ACTIVITY_LOGGED events ignored Mutation-tested: - drop visibility gate → visibility test fails - drop activity_type filter → "non-comm activity_type" test fails - drop workspace online-set filter → "offline workspace" test fails Full canvas suite: 1393 passing, 0 failing. tsc clean. No API or schema change. ACTIVITY_LOGGED event shape pinned by existing socket-events tests. Hostile self-review (three weakest spots): 1. Sustained WS outage shows stale comms until visibility-toggle re-bootstrap. Acceptable: the singleton socket already auto- reconnects and the comm overlay isn't a critical-path surface. 2. Bootstrap on visibility-toggle costs another 3 HTTP calls each re-open. Acceptable: visibility-toggle is a deliberate user action, not a tight loop. 3. The WS handler reads the latest `nodes` via nodesRef rather than re-subscribing on node changes. By design — the bus listener stays bound for the component lifetime to avoid the "tear-down storm" pattern A2ATopologyOverlay's comment warns about (ref-based current-state lookup, stable subscription). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/components/CommunicationOverlay.tsx | 150 ++++++++-- .../__tests__/CommunicationOverlay.test.tsx | 278 +++++++++++++++--- 2 files changed, 354 insertions(+), 74 deletions(-) diff --git a/canvas/src/components/CommunicationOverlay.tsx b/canvas/src/components/CommunicationOverlay.tsx index 10d105db..dfe0625e 100644 --- a/canvas/src/components/CommunicationOverlay.tsx +++ b/canvas/src/components/CommunicationOverlay.tsx @@ -3,6 +3,7 @@ import { useState, useEffect, useCallback, useRef } from "react"; import { useCanvasStore } from "@/store/canvas"; import { api } from "@/lib/api"; +import { useSocketEvent } from "@/hooks/useSocketEvent"; import { COMM_TYPE_LABELS } from "@/lib/design-tokens"; interface Communication { @@ -18,32 +19,71 @@ interface Communication { durationMs: number | null; } +/** Workspace-server `ACTIVITY_LOGGED` payload shape. Pulled out so the + * WS handler below has a typed view of the same fields the HTTP + * bootstrap consumes — drift between the two paths is a class of bug + * AgentCommsPanel hit historically. */ +interface ActivityLoggedPayload { + id?: string; + activity_type?: string; + source_id?: string | null; + target_id?: string | null; + workspace_id?: string; + summary?: string | null; + status?: string; + duration_ms?: number | null; + created_at?: string; +} + +/** Fan-out cap for the bootstrap HTTP fetch on mount / on visibility + * re-open. Kept at 3 (carried over from the 2026-05-04 fix) so a + * freshly-mounted overlay on a 15-workspace tenant only spends 3 + * round-trips bootstrapping. Live updates after that arrive via the + * WS subscription below — no polling, no fan-out to maintain. */ +const BOOTSTRAP_FAN_OUT_CAP = 3; + +/** Cap on the rendered list. Bootstrap + every WS push prepends, the + * list is sliced to this size after each update. Mirrors the prior + * polling-loop behaviour. */ +const COMMS_RENDER_CAP = 20; + /** * Overlay showing recent A2A communications between workspaces. - * Renders as a floating log panel that auto-updates. + * + * Update shape (issue #61 Stage 1, replaces the 30s polling loop): + * - On mount (when visible): one HTTP bootstrap per online workspace, + * capped at BOOTSTRAP_FAN_OUT_CAP. Yields the initial recent-comms + * window without waiting for live events. + * - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent. + * Each event with a matching activity_type from a visible online + * workspace gets synthesised into a Communication and prepended. + * - Visibility re-open: re-bootstraps so the user sees the freshest + * window even if WS was idle while collapsed. + * + * No interval poll. The singleton ReconnectingSocket in `store/socket.ts` + * already owns reconnect/backoff/health-check, and `useSocketEvent` + * inherits those guarantees. If WS is genuinely unhealthy, the overlay + * shows the bootstrap snapshot until the next visibility re-open or + * the next WS reconnect (which fires its own rehydrate burst). */ export function CommunicationOverlay() { const [comms, setComms] = useState([]); const [visible, setVisible] = useState(true); const selectedNodeId = useCanvasStore((s) => s.selectedNodeId); const nodes = useCanvasStore((s) => s.nodes); + // nodesRef gives the WS handler current node-name resolution without + // re-subscribing on every node-list change. The bus listener is + // registered exactly once per mount; subscriber-side filtering reads + // the latest value via this ref. const nodesRef = useRef(nodes); nodesRef.current = nodes; - const fetchComms = useCallback(async () => { + const bootstrapComms = useCallback(async () => { try { - // Fan-out cap: each polled workspace = 1 round-trip. The platform - // rate limits at 600 req/min/IP; combined with heartbeats + other - // canvas polling, every workspace polled here costs ~6 req/min - // (1 every 30s × 1 per workspace). Capping at 3 keeps this - // overlay's footprint at 18 req/min worst case — well under - // budget even with 8+ workspaces visible. Caught 2026-05-04 when - // a user with 8+ workspaces (Design Director + 6 sub-agents + - // 3 standalones) saw sustained 429s in canvas console. const onlineNodes = nodesRef.current.filter((n) => n.data.status === "online"); const allComms: Communication[] = []; - for (const node of onlineNodes.slice(0, 3)) { + for (const node of onlineNodes.slice(0, BOOTSTRAP_FAN_OUT_CAP)) { try { const activities = await api.get n.id === (a.source_id || a.workspace_id)); - const targetNode = nodes.find((n) => n.id === (a.target_id || "")); + const sourceNode = nodesRef.current.find((n) => n.id === (a.source_id || a.workspace_id)); + const targetNode = nodesRef.current.find((n) => n.id === (a.target_id || "")); allComms.push({ id: a.id, sourceId: a.source_id || a.workspace_id, @@ -76,11 +116,12 @@ export function CommunicationOverlay() { } } } catch { - // Skip workspaces that fail + // Per-workspace failures must not blank the panel — the same + // robustness the polling version had. } } - // Sort by timestamp, newest first, dedupe + // Newest-first with id-dedup, capped at COMMS_RENDER_CAP. const seen = new Set(); const sorted = allComms .sort((a, b) => b.timestamp.localeCompare(a.timestamp)) @@ -89,29 +130,78 @@ export function CommunicationOverlay() { seen.add(c.id); return true; }) - .slice(0, 20); + .slice(0, COMMS_RENDER_CAP); setComms(sorted); } catch { - // Silently handle API errors + // Bootstrap failure is non-blocking — the WS subscription below + // will populate the panel as live events arrive. } }, []); + // Bootstrap once on mount + every time the user re-opens after a + // collapse. Closed-panel state intentionally drops live updates so + // the panel doesn't churn invisible state — the next open reloads. useEffect(() => { - // Gate polling on visibility — when the user collapses the overlay - // the data isn't being read, so the per-workspace fan-out becomes - // pure rate-limit overhead. Pre-fix this overlay polled regardless - // of whether the panel was shown, costing ~36 req/min from a - // hidden surface. if (!visible) return; - fetchComms(); - // 30s cadence (was 10s). At 3-workspace fan-out that's 6 req/min - // worst case from this overlay. Combined with heartbeats (~30/min) - // and other canvas polling, leaves ample headroom under the 600/ - // min/IP server-side rate limit even at 8+ workspace tenants. - const interval = setInterval(fetchComms, 30000); - return () => clearInterval(interval); - }, [fetchComms, visible]); + bootstrapComms(); + }, [bootstrapComms, visible]); + + // Live-update path. Filters server-side ACTIVITY_LOGGED events down + // to the comm-overlay-relevant subset and prepends each into the + // rendered list with the same dedup the bootstrap path uses. + // + // Scope guard: ignore events for workspaces not in the visible online + // set, so a user collapsing one workspace doesn't see its comms + // continue to scroll in. Same shape the bootstrap path applies. + useSocketEvent((msg) => { + if (!visible) return; + if (msg.event !== "ACTIVITY_LOGGED") return; + + const p = (msg.payload || {}) as ActivityLoggedPayload; + const type = p.activity_type; + if (type !== "a2a_send" && type !== "a2a_receive" && type !== "task_update") return; + + const wsId = msg.workspace_id; + const onlineSet = new Set( + nodesRef.current.filter((n) => n.data.status === "online").map((n) => n.id), + ); + if (!onlineSet.has(wsId)) return; + + const sourceId = p.source_id || wsId; + const targetId = p.target_id || ""; + const sourceNode = nodesRef.current.find((n) => n.id === sourceId); + const targetNode = nodesRef.current.find((n) => n.id === targetId); + + const incoming: Communication = { + id: p.id || `${msg.timestamp || Date.now()}:${sourceId}:${targetId}`, + sourceId, + targetId, + sourceName: sourceNode?.data.name || "Unknown", + targetName: targetNode?.data.name || "Unknown", + type: type as Communication["type"], + summary: p.summary || "", + status: p.status || "ok", + timestamp: p.created_at || msg.timestamp || new Date().toISOString(), + durationMs: p.duration_ms ?? null, + }; + + setComms((prev) => { + // Prepend, dedup by id, re-cap. Functional setState is necessary + // because two ACTIVITY_LOGGED events arriving in the same React + // batch would otherwise read a stale `comms` from the closure. + const seen = new Set(); + const merged = [incoming, ...prev] + .sort((a, b) => b.timestamp.localeCompare(a.timestamp)) + .filter((c) => { + if (seen.has(c.id)) return false; + seen.add(c.id); + return true; + }) + .slice(0, COMMS_RENDER_CAP); + return merged; + }); + }); if (!visible || comms.length === 0) { return ( diff --git a/canvas/src/components/__tests__/CommunicationOverlay.test.tsx b/canvas/src/components/__tests__/CommunicationOverlay.test.tsx index 3bed0076..c15249a2 100644 --- a/canvas/src/components/__tests__/CommunicationOverlay.test.tsx +++ b/canvas/src/components/__tests__/CommunicationOverlay.test.tsx @@ -1,18 +1,28 @@ // @vitest-environment jsdom /** - * CommunicationOverlay tests — pin the rate-limit fix shipped 2026-05-04. + * CommunicationOverlay tests — pin both the 2026-05-04 fan-out cap fix + * AND the 2026-05-07 polling → ACTIVITY_LOGGED-subscriber refactor + * (issue #61 stage 1). * - * The overlay polls /workspaces/:id/activity?limit=5 for each online - * workspace. Pre-fix it (a) polled regardless of visibility and (b) - * fanned out to 6 workspaces every 10s. With 8+ workspaces a user - * triggered sustained 429s (server-side rate limit is 600 req/min/IP). + * The overlay used to poll /workspaces/:id/activity?limit=5 on a 30s + * interval per online workspace (capped at 3). Post-#61: it bootstraps + * once on mount via the same HTTP path (cap of 3 retained), then + * subscribes to ACTIVITY_LOGGED via the global socket bus for live + * updates. No interval poll. * * These tests pin: - * 1. Fan-out cap of 3 — even with 6 online nodes, only 3 fetches - * 2. Visibility gate — when collapsed, no polling + * 1. Bootstrap fan-out cap of 3 — even with 6 online nodes, only 3 + * HTTP fetches on mount. + * 2. Visibility gate — when collapsed, no HTTP fetches; re-open + * re-bootstraps. + * 3. NO interval polling — advancing the clock past 30s does not fire + * additional HTTP calls. + * 4. WS push extends the rendered list without firing any HTTP call. + * 5. WS push for an offline workspace is ignored. + * 6. WS push for a non-comm activity_type is ignored. * - * If a future refactor pushes either dial back up, CI fails before - * the regression hits a paying tenant. + * If a future refactor regresses any of these, CI fails before the + * regression hits a paying tenant. */ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { render, cleanup, act, fireEvent } from "@testing-library/react"; @@ -23,7 +33,7 @@ vi.mock("@/lib/api", () => ({ api: { get: vi.fn() }, })); -// Six online nodes — enough to verify the cap of 3. +// Six online nodes — enough to verify the bootstrap cap of 3. const mockStoreState = { selectedNodeId: null as string | null, nodes: [ @@ -56,6 +66,10 @@ vi.mock("@/lib/design-tokens", () => ({ // ── Imports (after mocks) ───────────────────────────────────────────────────── import { api } from "@/lib/api"; +import { + emitSocketEvent, + _resetSocketEventListenersForTests, +} from "@/store/socket-events"; import { CommunicationOverlay } from "../CommunicationOverlay"; const mockGet = vi.mocked(api.get); @@ -66,30 +80,34 @@ beforeEach(() => { vi.useFakeTimers(); mockGet.mockReset(); mockGet.mockResolvedValue([]); + // Drop any subscribers the previous test left on the singleton bus — + // each render adds one via useSocketEvent. + _resetSocketEventListenersForTests(); }); afterEach(() => { cleanup(); vi.useRealTimers(); + _resetSocketEventListenersForTests(); }); // ── Tests ───────────────────────────────────────────────────────────────────── -describe("CommunicationOverlay — fan-out cap", () => { - it("polls at most 3 of 6 online workspaces (rate-limit floor)", async () => { +describe("CommunicationOverlay — bootstrap fan-out cap", () => { + it("bootstraps at most 3 of 6 online workspaces (rate-limit floor preserved post-#61)", async () => { await act(async () => { render(); }); - // Mount fires the first poll synchronously (no interval tick yet). - // Pre-fix: 6 calls. Post-fix: 3. + // Mount fires the bootstrap synchronously — pre-#61 this was the + // first poll cycle; post-#61 it's the only HTTP fetch (live updates + // arrive via WS push). 6 nodes → 3 fetches. expect(mockGet).toHaveBeenCalledTimes(3); - // Verify the calls are for the FIRST 3 online nodes (slice order). expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-1/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-2/activity?limit=5"); expect(mockGet).toHaveBeenCalledWith("/workspaces/ws-3/activity?limit=5"); }); - it("never polls offline workspaces", async () => { + it("never bootstraps offline workspaces", async () => { await act(async () => { render(); }); @@ -99,40 +117,39 @@ describe("CommunicationOverlay — fan-out cap", () => { }); }); -describe("CommunicationOverlay — cadence", () => { - it("uses 30s interval cadence (was 10s pre-fix)", async () => { +describe("CommunicationOverlay — no interval polling (post-#61)", () => { + // The pre-#61 implementation re-fetched every 30s per workspace. + // Post-#61 the only HTTP path is the bootstrap on mount + on + // visibility-toggle. This test pins the absence of any interval + // poll: a 60s clock advance must not produce a second round of + // fetches. + it("does NOT poll on a 30s interval after bootstrap", async () => { await act(async () => { render(); }); - expect(mockGet).toHaveBeenCalledTimes(3); // initial mount poll + expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap + mockGet.mockClear(); - // Advance 10s — pre-fix this would fire another poll. Post-fix: silent. + // Advance 60s — well past any plausible cadence the prior version + // could have used. await act(async () => { - vi.advanceTimersByTime(10_000); + vi.advanceTimersByTime(60_000); }); - expect(mockGet).toHaveBeenCalledTimes(3); - - // Advance to 30s — interval fires. - await act(async () => { - vi.advanceTimersByTime(20_000); - }); - expect(mockGet).toHaveBeenCalledTimes(6); // +3 from second tick + expect(mockGet).not.toHaveBeenCalled(); }); }); describe("CommunicationOverlay — visibility gate", () => { - // The visibility gate is the dial that drops collapsed-panel polling - // to ZERO. The cadence test above can't catch its removal — if a - // refactor dropped `if (!visible) return`, the cadence test would - // still pass because the effect would still fire every 30s. + // The visibility gate now does two things post-#61: + // - while closed, the WS handler short-circuits (no setComms churn) + // - re-opening triggers a fresh bootstrap so the list reflects + // anything that happened while the panel was collapsed // // Direct probe: render with comms-returning mock so the panel // actually renders (close button only exists in the expanded panel, // not the collapsed button-state). Click close, advance the clock, // assert no further fetches. - it("stops polling after the user collapses the panel", async () => { - // Mock returns one a2a_send so comms.length > 0 → panel renders → - // close button accessible. + it("stops fetching while collapsed and re-bootstraps on re-open", async () => { mockGet.mockResolvedValue([ { id: "act-1", @@ -150,29 +167,202 @@ describe("CommunicationOverlay — visibility gate", () => { const { getByLabelText } = await act(async () => { return render(); }); - // Drain pending microtasks (resolves the await in fetchComms) so - // setComms lands and the panel renders. Don't advance time — that - // would fire the next interval tick and pollute the assertion. + // Drain pending microtasks (resolves the await in bootstrap) so + // setComms lands and the panel renders. Don't advance time — it's + // not load-bearing for the gate test, but matches the pattern used + // pre-#61 for stability. await act(async () => { await Promise.resolve(); await Promise.resolve(); await Promise.resolve(); }); - // Initial mount polled 3 workspaces. - expect(mockGet).toHaveBeenCalledTimes(3); + expect(mockGet).toHaveBeenCalledTimes(3); // initial bootstrap mockGet.mockClear(); - // Click the close button. Synchronous getByLabelText avoids - // findBy's internal setTimeout (deadlocks under useFakeTimers). + // Click close. While closed, no fetches and no WS-driven updates. + const closeBtn = getByLabelText("Close communications panel"); + await act(async () => { + fireEvent.click(closeBtn); + }); + await act(async () => { + vi.advanceTimersByTime(60_000); + }); + expect(mockGet).not.toHaveBeenCalled(); + + // Re-open via the collapsed button. Must trigger a fresh bootstrap. + const openBtn = getByLabelText("Show communications panel"); + await act(async () => { + fireEvent.click(openBtn); + }); + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + }); + expect(mockGet).toHaveBeenCalledTimes(3); // re-bootstrap on re-open + }); +}); + +describe("CommunicationOverlay — WS subscription (#61 stage 1 core)", () => { + // The load-bearing post-#61 behaviour. Every test in this block must + // verify (a) the WS push DID update the rendered comms list, and + // (b) NO additional HTTP call was fired — the whole point of the + // refactor is to remove the polling-driven HTTP traffic. + function emitActivityLogged(overrides: Partial<{ + workspaceId: string; + payload: Record; + }> = {}) { + emitSocketEvent({ + event: "ACTIVITY_LOGGED", + workspace_id: overrides.workspaceId ?? "ws-1", + timestamp: new Date().toISOString(), + payload: { + id: `act-${Math.random().toString(36).slice(2)}`, + activity_type: "a2a_send", + source_id: "ws-1", + target_id: "ws-2", + summary: "live push", + status: "ok", + duration_ms: 42, + created_at: new Date().toISOString(), + ...overrides.payload, + }, + }); + } + + it("WS push for a comm activity_type extends the rendered list with NO additional HTTP call", async () => { + const { container } = await act(async () => { + return render(); + }); + expect(mockGet).toHaveBeenCalledTimes(3); // bootstrap + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ payload: { summary: "hello" } }); + }); + await act(async () => { + await Promise.resolve(); + }); + + // Two pins: + // 1. comms list reflects the live push (look for the summary text) + // 2. zero HTTP fetches fired during the WS path + expect(container.textContent).toContain("hello"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for an offline workspace is ignored", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ + workspaceId: "ws-offline", + payload: { source_id: "ws-offline", summary: "should-not-render" }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for a non-comm activity_type is ignored (e.g. delegation)", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitActivityLogged({ + payload: { + activity_type: "delegation", + summary: "should-not-render-delegation", + }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render-delegation"); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push while the panel is collapsed is ignored (no churn on hidden state)", async () => { + // Bootstrap with one comm so the panel renders → close button + // accessible. Then collapse, emit a WS push, re-open: the rendered + // list must come from the re-bootstrap, NOT from the WS-push that + // arrived during the closed state. Also: nothing visible while + // closed (the collapsed button shows only the count, not summaries). + mockGet.mockResolvedValue([ + { + id: "act-bootstrap", + workspace_id: "ws-1", + activity_type: "a2a_send", + source_id: "ws-1", + target_id: "ws-2", + summary: "bootstrap-summary", + status: "ok", + duration_ms: 1, + created_at: new Date().toISOString(), + }, + ]); + const { getByLabelText, container } = await act(async () => { + return render(); + }); + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + }); + + // Collapse. const closeBtn = getByLabelText("Close communications panel"); await act(async () => { fireEvent.click(closeBtn); }); - // Advance well past the 30s cadence — gate should suppress the tick. + // Bootstrap mock returns nothing on the re-open path so we can + // distinguish "WS push leaked through the gate" from "re-bootstrap + // refilled the list." + mockGet.mockReset(); + mockGet.mockResolvedValue([]); + await act(async () => { - vi.advanceTimersByTime(60_000); + emitActivityLogged({ + payload: { summary: "leaked-while-closed" }, + }); }); + await act(async () => { + await Promise.resolve(); + }); + + // Closed state: rendered DOM must not show any push-derived text. + expect(container.textContent).not.toContain("leaked-while-closed"); + }); + + it("non-ACTIVITY_LOGGED events are ignored (e.g. WORKSPACE_OFFLINE)", async () => { + const { container } = await act(async () => { + return render(); + }); + mockGet.mockClear(); + + await act(async () => { + emitSocketEvent({ + event: "WORKSPACE_OFFLINE", + workspace_id: "ws-1", + timestamp: new Date().toISOString(), + payload: { summary: "should-not-render-event" }, + }); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(container.textContent).not.toContain("should-not-render-event"); expect(mockGet).not.toHaveBeenCalled(); }); });