From 7194b08987f65c498e675faf9436ca0f5118601a Mon Sep 17 00:00:00 2001 From: security-auditor Date: Thu, 7 May 2026 15:17:19 -0700 Subject: [PATCH] =?UTF-8?q?feat(canvas):=20A2ATopologyOverlay=20subscribes?= =?UTF-8?q?=20to=20ACTIVITY=5FLOGGED=20=E2=80=94=20drop=2060s=20polling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 2 of #61. Replaces the 60s setInterval poll that fanned out across every visible workspace fetching `?type=delegation&limit=500` with: 1. One bootstrap fan-out on mount (or on visible-ID-set change), same shape as before — preserves the 60-min look-back history. 2. useSocketEvent subscription to ACTIVITY_LOGGED — every event with activity_type=delegation + method=delegate from a visible workspace appends to a local rolling buffer, edges are re-derived via the existing buildA2AEdges helper. 3. showA2AEdges toggle off: clears edges + buffer. No interval poll. The visibleIdsKey selector gate that fixed the 2026-05-04 render-loop incident is preserved — peer-discovery / status-flip writes still don't trigger a wasteful re-bootstrap. Steady-state HTTP traffic from this overlay drops from N req/min (N visible workspaces × 1 cycle/min) to 0 outside of mount + visible- ID-set-change bootstraps. Live update latency drops from up to 60s to ~10ms. Bootstrap race-aware: any WS arrivals that landed in the buffer during the fetch await are preserved by id-dedup-with-fetched-first ordering. No row is double-counted; no row is lost during in-flight updates. Test changes: - 27 existing tests pass unchanged (buildA2AEdges purity preserved, component visibility/visibleIdsKey/error-swallow behaviour preserved). - 6 new WS-subscription tests: - NO 60s polling after bootstrap (clock advance fires nothing) - WS push for delegation updates edges with NO HTTP call - WS push for non-delegation activity_type ignored - WS push for delegate_result ignored (mirrors buildA2AEdges method filter) - WS push from hidden workspace ignored - WS push while showA2AEdges=false ignored Mutation-tested: - drop activity_type filter → "non-delegation" test fails - drop method===delegate filter → "delegate_result" test fails - drop visible-ws membership filter → "hidden workspace" test fails Full canvas suite: 1395 passing, 0 failing. tsc clean. No API or schema change. ACTIVITY_LOGGED event shape unchanged. The /workspaces/:id/activity HTTP endpoint stays — used for bootstrap. Hostile self-review (three weakest spots): 1. Bootstrap fetches up to 500 rows × N workspaces. Worst-case buffer ~3000 entries before window-prune. Acceptable: window- prune runs on every recomputeAndPush, buildA2AEdges aggregates to at most N² edges. Real-world usage stays well under both. 2. WS handler re-arms on every bootstrap dependency change (visibleIds change). useSocketEvent's ref-based pattern means the bus subscription stays stable across renders, but the handler closure re-captures bootstrap each time. Side effect: fine — handler invocation just calls recomputeAndPush which is idempotent. 3. delegate_result rows arriving over WS are silently dropped. Acceptable: the existing buildA2AEdges already filters them out at aggregation time (avoids double-counting); pre-filtering at the WS handler is the correct mirror — keeps the bus path and the bootstrap path consistent. Co-Authored-By: Claude Opus 4.7 (1M context) --- canvas/src/components/A2ATopologyOverlay.tsx | 128 ++++++++++++--- .../__tests__/A2ATopologyOverlay.test.tsx | 149 ++++++++++++++++++ 2 files changed, 259 insertions(+), 18 deletions(-) diff --git a/canvas/src/components/A2ATopologyOverlay.tsx b/canvas/src/components/A2ATopologyOverlay.tsx index 58f2d976..53920d4a 100644 --- a/canvas/src/components/A2ATopologyOverlay.tsx +++ b/canvas/src/components/A2ATopologyOverlay.tsx @@ -1,9 +1,10 @@ 'use client'; -import { useEffect, useMemo, useCallback } from "react"; +import { useEffect, useMemo, useCallback, useRef } from "react"; import { type Edge, MarkerType } from "@xyflow/react"; import { api } from "@/lib/api"; import { useCanvasStore } from "@/store/canvas"; +import { useSocketEvent } from "@/hooks/useSocketEvent"; import type { ActivityEntry } from "@/types/activity"; // ── Constants ───────────────────────────────────────────────────────────────── @@ -11,9 +12,6 @@ import type { ActivityEntry } from "@/types/activity"; /** 60-minute look-back window for delegation activity */ export const A2A_WINDOW_MS = 60 * 60 * 1000; -/** Polling interval — refresh edges every 60 seconds */ -export const A2A_POLL_MS = 60 * 1_000; - /** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */ export const A2A_HOT_MS = 5 * 60 * 1_000; @@ -131,6 +129,20 @@ export function buildA2AEdges( * `a2aEdges`. Canvas.tsx merges these with topology edges and passes the * combined list to ReactFlow. * + * Update shape (issue #61 Stage 2, replaces the 60s polling loop): + * - On mount (when showA2AEdges): one HTTP fan-out per visible workspace + * (delegation rows, 60-min window). Bootstraps the local row buffer. + * - Steady state: subscribes to ACTIVITY_LOGGED via useSocketEvent. + * Each delegation event from a visible workspace is appended to the + * buffer; edges are re-derived via the existing buildA2AEdges helper. + * - showA2AEdges toggle off: clears edges + buffer. + * - Visible-ID-set change: re-bootstraps so a freshly-shown workspace + * backfills its 60-min history (existing visibleIdsKey selector + * behaviour preserved — that's the 2026-05-04 render-loop fix). + * + * No interval poll. The singleton ReconnectingSocket already owns + * reconnect / backoff / health-check; useSocketEvent inherits those. + * * Mount this inside CanvasInner (no ReactFlow hook dependency). */ export function A2ATopologyOverlay() { @@ -157,7 +169,9 @@ export function A2ATopologyOverlay() { // the symptom of this re-render storm. // // The fix is purely the dependency-stability change here; the fetch - // logic is unchanged. + // logic is unchanged. Post-#61 the polling-driven fetch is gone, but + // the visibleIdsKey gate is still required so a peer-discovery write + // doesn't trigger a wasteful re-bootstrap. const visibleIdsKey = useCanvasStore((s) => s.nodes .filter((n) => !n.hidden) @@ -171,16 +185,42 @@ export function A2ATopologyOverlay() { [visibleIdsKey] ); - // Fetch delegation activity for all visible workspaces and rebuild overlay edges. - const fetchAndUpdate = useCallback(async () => { + // Local rolling buffer of delegation rows. Pruned by A2A_WINDOW_MS on + // each rebuild so a long-lived session doesn't accumulate unbounded + // history. The buffer's high-water mark is approximately: + // visibleIds.length × bootstrap-fetch-limit (500) + WS arrivals + // Real-world ceiling: ~3000 entries at the 60-min boundary, all of + // which buildA2AEdges aggregates into at most N² edges. + const bufferRef = useRef([]); + // visibleIdsRef gives the WS handler the latest visible-ID set without + // re-subscribing on every render. The bus listener is registered + // exactly once per mount; subscriber-side filtering reads from this ref. + const visibleIdsRef = useRef(visibleIds); + visibleIdsRef.current = visibleIds; + + // Re-derive overlay edges from the current buffer + push to store. + // Prunes by A2A_WINDOW_MS first so memory stays bounded across long + // sessions and the aggregation cost stays O(window-size). + const recomputeAndPush = useCallback(() => { + const cutoff = Date.now() - A2A_WINDOW_MS; + bufferRef.current = bufferRef.current.filter( + (r) => new Date(r.created_at).getTime() > cutoff + ); + setA2AEdges(buildA2AEdges(bufferRef.current)); + }, [setA2AEdges]); + + // Bootstrap fan-out — one HTTP per visible workspace. Replaces the + // 60s polling loop entirely. Race-aware: any WS arrivals that landed + // in the buffer DURING the fetch (between the await and resume) are + // preserved by id-dedup-with-fetched-first ordering. + const bootstrap = useCallback(async () => { if (visibleIds.length === 0) { + bufferRef.current = []; setA2AEdges([]); return; } try { - // Fan-out — one request per visible workspace. - // Per-request failures are swallowed so one broken workspace doesn't blank the overlay. - const allRows = ( + const fetchedRows = ( await Promise.all( visibleIds.map((id) => api @@ -192,24 +232,76 @@ export function A2ATopologyOverlay() { ) ).flat(); - setA2AEdges(buildA2AEdges(allRows)); + // Merge: fetched rows first, then any in-flight WS arrivals that + // accumulated during the await. Dedup by id so rows that appear + // in both paths are not double-counted in the aggregation. + const merged = [...fetchedRows, ...bufferRef.current]; + const seen = new Set(); + bufferRef.current = merged.filter((r) => { + if (seen.has(r.id)) return false; + seen.add(r.id); + return true; + }); + recomputeAndPush(); } catch { // Overlay failure is non-critical — canvas remains functional } - }, [visibleIds, setA2AEdges]); + }, [visibleIds, setA2AEdges, recomputeAndPush]); useEffect(() => { if (!showA2AEdges) { - // Clear edges immediately when toggled off + // Clear edges + buffer immediately when toggled off + bufferRef.current = []; setA2AEdges([]); return; } + void bootstrap(); + }, [showA2AEdges, bootstrap, setA2AEdges]); - // Initial fetch, then poll every 60 s - void fetchAndUpdate(); - const timer = setInterval(() => void fetchAndUpdate(), A2A_POLL_MS); - return () => clearInterval(timer); - }, [showA2AEdges, fetchAndUpdate, setA2AEdges]); + // Live-update path. Filters server-side ACTIVITY_LOGGED events down + // to delegation initiations from visible workspaces and appends each + // into the rolling buffer, re-deriving edges via buildA2AEdges. + // + // Only `method === "delegate"` rows count — the same filter + // buildA2AEdges applies — so delegate_result rows arriving over the + // wire don't double-count. + useSocketEvent((msg) => { + if (!showA2AEdges) return; + if (msg.event !== "ACTIVITY_LOGGED") return; + + const p = (msg.payload || {}) as Record; + if (p.activity_type !== "delegation") return; + if (p.method !== "delegate") return; + + const wsId = msg.workspace_id; + if (!visibleIdsRef.current.includes(wsId)) return; + + // Synthesise an ActivityEntry from the WS payload so buildA2AEdges + // (which the bootstrap path also feeds) handles it identically. + const entry: ActivityEntry = { + id: + (p.id as string) || + `ws-push-${msg.timestamp || Date.now()}-${wsId}`, + workspace_id: wsId, + activity_type: "delegation", + source_id: (p.source_id as string | null) ?? null, + target_id: (p.target_id as string | null) ?? null, + method: "delegate", + summary: (p.summary as string | null) ?? null, + request_body: null, + response_body: null, + duration_ms: (p.duration_ms as number | null) ?? null, + status: (p.status as string) || "ok", + error_detail: null, + created_at: + (p.created_at as string) || + msg.timestamp || + new Date().toISOString(), + }; + + bufferRef.current = [...bufferRef.current, entry]; + recomputeAndPush(); + }); // Pure side-effect — renders nothing return null; diff --git a/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx b/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx index 6cdd19a7..ab470e18 100644 --- a/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx +++ b/canvas/src/components/__tests__/A2ATopologyOverlay.test.tsx @@ -41,6 +41,10 @@ vi.mock("@/store/canvas", () => ({ // ── Imports (after mocks) ───────────────────────────────────────────────────── import { api } from "@/lib/api"; +import { + emitSocketEvent, + _resetSocketEventListenersForTests, +} from "@/store/socket-events"; import { buildA2AEdges, formatA2ARelativeTime, @@ -342,6 +346,151 @@ describe("A2ATopologyOverlay component", () => { expect(mockGet.mock.calls.length).toBe(callsAfterMount); }); + // ── #61 Stage 2: ACTIVITY_LOGGED subscription tests ──────────────────────── + // + // Pin the post-#61 behaviour: WS push for delegation contributes to + // the overlay's edge buffer with NO additional HTTP fetch. Same shape + // as Stage 1 (CommunicationOverlay). + + describe("#61 stage 2 — ACTIVITY_LOGGED subscription", () => { + beforeEach(() => { + _resetSocketEventListenersForTests(); + }); + afterEach(() => { + _resetSocketEventListenersForTests(); + }); + + function emitDelegation(overrides: { + workspaceId?: string; + sourceId?: string; + targetId?: string; + method?: string; + activityType?: string; + } = {}) { + // Use Date.now() (real time, fake-timer-frozen) rather than the + // hardcoded NOW constant — buildA2AEdges prunes by Date.now() - + // A2A_WINDOW_MS, so a row dated against the wrong epoch silently + // falls outside the window and the test fails for a confusing + // reason ("edges array empty" vs "filter dropped my row"). + const realNow = Date.now(); + emitSocketEvent({ + event: "ACTIVITY_LOGGED", + workspace_id: overrides.workspaceId ?? "ws-a", + timestamp: new Date(realNow).toISOString(), + payload: { + id: `act-${Math.random().toString(36).slice(2)}`, + activity_type: overrides.activityType ?? "delegation", + method: overrides.method ?? "delegate", + source_id: overrides.sourceId ?? "ws-a", + target_id: overrides.targetId ?? "ws-b", + status: "ok", + created_at: new Date(realNow - 30_000).toISOString(), + }, + }); + } + + it("does NOT poll on a 60s interval after bootstrap (post-#61)", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + const callsAfterBootstrap = mockGet.mock.calls.length; + expect(callsAfterBootstrap).toBe(2); // ws-a + ws-b + + // Pre-#61: a 60s clock tick would fire a fresh fan-out (2 more + // calls). Post-#61: no interval, no extra calls. + await act(async () => { + vi.advanceTimersByTime(120_000); + }); + expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap); + }); + + it("WS push for a delegation event from a visible workspace updates edges with NO HTTP call", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); await Promise.resolve(); }); + mockGet.mockClear(); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ sourceId: "ws-a", targetId: "ws-b" }); + }); + + // Edges-set called with at least one a2a edge for the new push. + const calls = mockStoreState.setA2AEdges.mock.calls; + expect(calls.length).toBeGreaterThanOrEqual(1); + const lastCall = calls[calls.length - 1][0] as Array<{ id: string }>; + expect(lastCall.some((e) => e.id === "a2a-ws-a-ws-b")).toBe(true); + + // Critical: no HTTP fetch fired during the WS path. + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for a non-delegation activity_type is ignored", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ activityType: "a2a_send" }); + }); + + // setA2AEdges must not be called by the WS handler — the only + // setA2AEdges calls in this test came from the initial bootstrap. + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push for a delegate_result row is ignored (mirrors buildA2AEdges filter)", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ method: "delegate_result" }); + }); + + // delegate_result rows do not contribute to the edge count — they + // are completion signals, not initiations. + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push from a hidden workspace is ignored", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + await act(async () => { await Promise.resolve(); }); + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation({ workspaceId: "ws-hidden" }); + }); + + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + }); + + it("WS push while showA2AEdges is false is ignored", async () => { + mockStoreState.showA2AEdges = false; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + mockGet.mockResolvedValue([] as any); + render(); + // The mount path with showA2AEdges=false calls setA2AEdges([]) + // once — clear that to isolate the WS path. + mockStoreState.setA2AEdges.mockClear(); + + await act(async () => { + emitDelegation(); + }); + + expect(mockStoreState.setA2AEdges).not.toHaveBeenCalled(); + expect(mockGet).not.toHaveBeenCalled(); + }); + }); + it("re-fetches when the visible ID set actually changes", async () => { // eslint-disable-next-line @typescript-eslint/no-explicit-any mockGet.mockResolvedValue([] as any);