Merge pull request #751 from Molecule-AI/feat/issue-744-a2a-topology-overlay

feat(canvas): A2A topology overlay with animated delegation edges
This commit is contained in:
Hongming Wang 2026-04-17 09:15:10 -07:00 committed by GitHub
commit b7072d87f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 546 additions and 1 deletions

View File

@ -0,0 +1,188 @@
'use client';
import { useEffect, useMemo, useCallback } from "react";
import { type Edge, MarkerType } from "@xyflow/react";
import { api } from "@/lib/api";
import { useCanvasStore } from "@/store/canvas";
import type { ActivityEntry } from "@/types/activity";
// ── Constants ─────────────────────────────────────────────────────────────────
/** 60-minute look-back window for delegation activity */
export const A2A_WINDOW_MS = 60 * 60 * 1000;
/** Polling interval — refresh edges every 60 seconds */
export const A2A_POLL_MS = 60 * 1_000;
/** Threshold for "hot" edges: < 5 minutes → animated + violet stroke */
export const A2A_HOT_MS = 5 * 60 * 1_000;
// ── Helpers ───────────────────────────────────────────────────────────────────
/** Format millisecond timestamp as human-readable relative time ("2m ago"). */
export function formatA2ARelativeTime(ts: number, now = Date.now()): string {
const diff = now - ts;
if (diff < 60_000) return "just now";
if (diff < 3_600_000) return `${Math.floor(diff / 60_000)}m ago`;
return `${Math.floor(diff / 3_600_000)}h ago`;
}
// ── Pure aggregation function (exported for unit tests) ───────────────────────
/**
* Converts raw delegation activity rows into React Flow overlay edges.
*
* Rules applied:
* - Only `method === "delegate"` rows (initiation, not result) to avoid double-counting.
* - Rows older than A2A_WINDOW_MS are discarded.
* - Rows with null source_id or target_id are skipped.
* - Multiple rows on the same sourcetarget pair are aggregated (count + latest timestamp).
* - Edge is animated + violet-500 when lastAt < A2A_HOT_MS ago; otherwise blue-500.
* - All styles have `pointerEvents: "none"` so canvas nodes remain draggable.
*/
export function buildA2AEdges(
rows: ActivityEntry[],
now = Date.now()
): Edge[] {
const cutoff = now - A2A_WINDOW_MS;
// 1. Filter: only delegate initiations within the window with valid endpoints
const initiations = rows.filter(
(r) =>
r.method === "delegate" &&
r.source_id != null &&
r.target_id != null &&
new Date(r.created_at).getTime() > cutoff
);
if (initiations.length === 0) return [];
// 2. Aggregate by "source→target" pair
type Agg = { source: string; target: string; count: number; lastAt: number };
const map = new Map<string, Agg>();
for (const row of initiations) {
const source = row.source_id as string;
const target = row.target_id as string;
const key = `${source}${target}`;
const ts = new Date(row.created_at).getTime();
const prev = map.get(key) ?? { source, target, count: 0, lastAt: 0 };
map.set(key, {
...prev,
count: prev.count + 1,
lastAt: Math.max(prev.lastAt, ts),
});
}
// 3. Build React Flow Edge objects
return Array.from(map.values()).map(({ source, target, count, lastAt }) => {
const isHot = now - lastAt < A2A_HOT_MS;
const stroke = isHot ? "#8b5cf6" : "#3b82f6"; // violet-500 : blue-500
const callWord = count === 1 ? "call" : "calls";
const label = `${count} ${callWord} · ${formatA2ARelativeTime(lastAt, now)}`;
return {
id: `a2a-${source}-${target}`,
source,
target,
animated: isHot,
markerEnd: {
type: MarkerType.ArrowClosed,
color: stroke,
width: 12,
height: 12,
},
style: {
stroke,
strokeWidth: 2,
// Non-blocking: label overlay never intercepts pointer events
pointerEvents: "none" as React.CSSProperties["pointerEvents"],
},
label,
labelStyle: {
fill: "#a1a1aa", // zinc-400
fontSize: 10,
pointerEvents: "none" as React.CSSProperties["pointerEvents"],
},
labelBgStyle: {
fill: "#18181b", // zinc-900
fillOpacity: 0.9,
pointerEvents: "none" as React.CSSProperties["pointerEvents"],
},
labelBgPadding: [4, 6] as [number, number],
labelBgBorderRadius: 4,
};
});
}
// ── Component ─────────────────────────────────────────────────────────────────
/**
* A2ATopologyOverlay null-rendering side-effect component.
*
* Fetches delegation activity from all visible workspace nodes (fan-out),
* aggregates into directed edges, and writes them to the canvas store as
* `a2aEdges`. Canvas.tsx merges these with topology edges and passes the
* combined list to ReactFlow.
*
* Mount this inside CanvasInner (no ReactFlow hook dependency).
*/
export function A2ATopologyOverlay() {
const showA2AEdges = useCanvasStore((s) => s.showA2AEdges);
// Stable Zustand action reference — safe to call inside effects
const setA2AEdges = useCanvasStore((s) => s.setA2AEdges);
// Read the nodes array as a primitive ref; derive visible IDs outside the selector
const nodes = useCanvasStore((s) => s.nodes);
// IDs of visible (non-nested, non-hidden) workspace nodes.
// Recomputed only when the nodes array reference changes.
const visibleIds = useMemo(
() => nodes.filter((n) => !n.hidden).map((n) => n.id),
[nodes]
);
// Fetch delegation activity for all visible workspaces and rebuild overlay edges.
const fetchAndUpdate = useCallback(async () => {
if (visibleIds.length === 0) {
setA2AEdges([]);
return;
}
try {
// Fan-out — one request per visible workspace.
// Per-request failures are swallowed so one broken workspace doesn't blank the overlay.
const allRows = (
await Promise.all(
visibleIds.map((id) =>
api
.get<ActivityEntry[]>(
`/workspaces/${id}/activity?type=delegation&limit=500&source=agent`
)
.catch(() => [] as ActivityEntry[])
)
)
).flat();
setA2AEdges(buildA2AEdges(allRows));
} catch {
// Overlay failure is non-critical — canvas remains functional
}
}, [visibleIds, setA2AEdges]);
useEffect(() => {
if (!showA2AEdges) {
// Clear edges immediately when toggled off
setA2AEdges([]);
return;
}
// Initial fetch, then poll every 60 s
void fetchAndUpdate();
const timer = setInterval(() => void fetchAndUpdate(), A2A_POLL_MS);
return () => clearInterval(timer);
}, [showA2AEdges, fetchAndUpdate, setA2AEdges]);
// Pure side-effect — renders nothing
return null;
}

View File

@ -16,6 +16,7 @@ import {
import "@xyflow/react/dist/style.css";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { A2ATopologyOverlay } from "./A2ATopologyOverlay";
import { WorkspaceNode } from "./WorkspaceNode";
import { SidePanel } from "./SidePanel";
import { CreateWorkspaceButton } from "./CreateWorkspaceDialog";
@ -56,6 +57,13 @@ export function Canvas() {
function CanvasInner() {
const nodes = useCanvasStore((s) => s.nodes);
const edges = useCanvasStore((s) => s.edges);
const a2aEdges = useCanvasStore((s) => s.a2aEdges);
const showA2AEdges = useCanvasStore((s) => s.showA2AEdges);
// Merge topology edges with A2A overlay edges via useMemo (no new object in selector)
const allEdges = useMemo(
() => (showA2AEdges ? [...edges, ...a2aEdges] : edges),
[edges, a2aEdges, showA2AEdges]
);
const onNodesChange = useCanvasStore((s) => s.onNodesChange);
const savePosition = useCanvasStore((s) => s.savePosition);
const selectNode = useCanvasStore((s) => s.selectNode);
@ -257,7 +265,7 @@ function CanvasInner() {
<ReactFlow
colorMode="dark"
nodes={nodes}
edges={edges}
edges={allEdges}
onNodesChange={onNodesChange}
onNodeDragStart={onNodeDragStart}
onNodeDrag={onNodeDrag}
@ -316,6 +324,7 @@ function CanvasInner() {
</div>
{nodes.length === 0 && <EmptyState />}
<A2ATopologyOverlay />
<OnboardingWizard />
<Toolbar />
<ApprovalBanner />

View File

@ -12,6 +12,8 @@ import { statusDotClass } from "@/lib/design-tokens";
export function Toolbar() {
const nodes = useCanvasStore((s) => s.nodes);
const wsStatus = useCanvasStore((s) => s.wsStatus);
const showA2AEdges = useCanvasStore((s) => s.showA2AEdges);
const setShowA2AEdges = useCanvasStore((s) => s.setShowA2AEdges);
const [stopping, setStopping] = useState(false);
const [restartingAll, setRestartingAll] = useState(false);
@ -180,6 +182,40 @@ export function Toolbar() {
</button>
)}
{/* A2A topology overlay toggle */}
<button
onClick={() => setShowA2AEdges(!showA2AEdges)}
aria-pressed={showA2AEdges}
aria-label={showA2AEdges ? "Hide A2A edges" : "Show A2A edges"}
title={showA2AEdges ? "Hide A2A delegation edges" : "Show A2A delegation edges (last 60 min)"}
className={`flex items-center gap-1.5 px-2.5 py-1 border rounded-lg transition-colors ${
showA2AEdges
? "bg-blue-950/50 hover:bg-blue-900/50 border-blue-800/40 text-blue-300"
: "bg-zinc-800/50 hover:bg-zinc-700/50 border-zinc-700/40 text-zinc-500 hover:text-zinc-300"
}`}
>
{/* Mesh / network icon */}
<svg
width="12"
height="12"
viewBox="0 0 16 16"
fill="none"
className="shrink-0"
aria-hidden="true"
>
<circle cx="3" cy="3" r="2" stroke="currentColor" strokeWidth="1.4" />
<circle cx="13" cy="3" r="2" stroke="currentColor" strokeWidth="1.4" />
<circle cx="8" cy="13" r="2" stroke="currentColor" strokeWidth="1.4" />
<path
d="M5 3h6M3.7 5l3.3 6M12.3 5l-3.3 6"
stroke="currentColor"
strokeWidth="1.3"
strokeLinecap="round"
/>
</svg>
<span className="text-[10px] font-medium">A2A</span>
</button>
{/* Search shortcut */}
<button
onClick={() => useCanvasStore.getState().setSearchOpen(true)}

View File

@ -0,0 +1,280 @@
// @vitest-environment jsdom
/**
* A2ATopologyOverlay tests issue #744
*
* Split into two suites:
* 1. buildA2AEdges pure aggregation function (no mocks needed)
* 2. A2ATopologyOverlay component side-effect behavior (API + store mocks)
*/
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { render, cleanup, waitFor, act } from "@testing-library/react";
// ── Mocks (hoisted before imports) ────────────────────────────────────────────
vi.mock("@/lib/api", () => ({
api: { get: vi.fn() },
}));
// MarkerType is a plain enum — mock @xyflow/react with it intact
vi.mock("@xyflow/react", () => ({
MarkerType: { ArrowClosed: "arrowclosed" },
}));
// Minimal canvas store mock — selectors drive real state via the selector fn
const mockStoreState = {
showA2AEdges: true,
nodes: [
{ id: "ws-a", hidden: false, data: {} },
{ id: "ws-b", hidden: false, data: {} },
{ id: "ws-hidden", hidden: true, data: {} }, // nested — should be excluded
],
setA2AEdges: vi.fn(),
};
vi.mock("@/store/canvas", () => ({
useCanvasStore: vi.fn(
(selector: (s: typeof mockStoreState) => unknown) =>
selector(mockStoreState)
),
}));
// ── Imports (after mocks) ─────────────────────────────────────────────────────
import { api } from "@/lib/api";
import {
buildA2AEdges,
formatA2ARelativeTime,
A2ATopologyOverlay,
A2A_WINDOW_MS,
A2A_HOT_MS,
} from "../A2ATopologyOverlay";
import type { ActivityEntry } from "@/types/activity";
const mockGet = vi.mocked(api.get);
// ── Helpers ───────────────────────────────────────────────────────────────────
const NOW = 1_745_000_000_000; // fixed "now" for deterministic tests
function makeRow(overrides: Partial<ActivityEntry> = {}): ActivityEntry {
return {
id: "row-1",
workspace_id: "ws-a",
activity_type: "delegation",
source_id: "ws-a",
target_id: "ws-b",
method: "delegate",
summary: null,
request_body: null,
response_body: null,
duration_ms: null,
status: "completed",
error_detail: null,
created_at: new Date(NOW - 60_000).toISOString(), // 1 minute ago
...overrides,
};
}
// ── Suite 1: buildA2AEdges (pure function) ────────────────────────────────────
describe("buildA2AEdges — filtering", () => {
it("returns [] for empty input", () => {
expect(buildA2AEdges([], NOW)).toEqual([]);
});
it("discards rows older than the 60-minute window", () => {
const old = makeRow({
created_at: new Date(NOW - A2A_WINDOW_MS - 1).toISOString(),
});
expect(buildA2AEdges([old], NOW)).toEqual([]);
});
it("keeps rows exactly at the window boundary (cutoff exclusive)", () => {
const boundary = makeRow({
created_at: new Date(NOW - A2A_WINDOW_MS + 1000).toISOString(),
});
expect(buildA2AEdges([boundary], NOW)).toHaveLength(1);
});
it("discards delegate_result rows (avoids double-counting)", () => {
const result = makeRow({ method: "delegate_result" });
expect(buildA2AEdges([result], NOW)).toEqual([]);
});
it("discards rows with null source_id", () => {
const row = makeRow({ source_id: null });
expect(buildA2AEdges([row], NOW)).toEqual([]);
});
it("discards rows with null target_id", () => {
const row = makeRow({ target_id: null });
expect(buildA2AEdges([row], NOW)).toEqual([]);
});
});
describe("buildA2AEdges — aggregation", () => {
it("aggregates multiple delegate rows on the same pair into one edge", () => {
const rows = [
makeRow({ id: "r1", created_at: new Date(NOW - 10_000).toISOString() }),
makeRow({ id: "r2", created_at: new Date(NOW - 20_000).toISOString() }),
makeRow({ id: "r3", created_at: new Date(NOW - 30_000).toISOString() }),
];
const edges = buildA2AEdges(rows, NOW);
expect(edges).toHaveLength(1);
expect(edges[0].label).toMatch(/^3 calls/);
});
it("produces separate edges for different source→target pairs", () => {
const rows = [
makeRow({ source_id: "ws-a", target_id: "ws-b" }),
makeRow({ source_id: "ws-b", target_id: "ws-a" }),
];
const edges = buildA2AEdges(rows, NOW);
expect(edges).toHaveLength(2);
const ids = edges.map((e) => e.id).sort();
expect(ids).toContain("a2a-ws-a-ws-b");
expect(ids).toContain("a2a-ws-b-ws-a");
});
it("uses the latest created_at timestamp as lastAt for label recency", () => {
const recent = NOW - 2 * 60_000; // 2 min ago
const older = NOW - 30 * 60_000; // 30 min ago
const rows = [
makeRow({ id: "r1", created_at: new Date(older).toISOString() }),
makeRow({ id: "r2", created_at: new Date(recent).toISOString() }),
];
const [edge] = buildA2AEdges(rows, NOW);
// Label should show 2m ago (the most recent), not 30m ago
expect(edge.label).toContain("2m ago");
expect(edge.label).not.toContain("30m ago");
});
});
describe("buildA2AEdges — edge properties", () => {
it("assigns correct id format: a2a-{source}-{target}", () => {
const [edge] = buildA2AEdges([makeRow()], NOW);
expect(edge.id).toBe("a2a-ws-a-ws-b");
});
it("marks edge as animated with violet stroke when lastAt < 5 min ago", () => {
const row = makeRow({ created_at: new Date(NOW - A2A_HOT_MS + 10_000).toISOString() });
const [edge] = buildA2AEdges([row], NOW);
expect(edge.animated).toBe(true);
expect((edge.style as { stroke: string }).stroke).toBe("#8b5cf6");
});
it("marks edge as non-animated with blue stroke when lastAt >= 5 min ago", () => {
const row = makeRow({ created_at: new Date(NOW - A2A_HOT_MS - 10_000).toISOString() });
const [edge] = buildA2AEdges([row], NOW);
expect(edge.animated).toBe(false);
expect((edge.style as { stroke: string }).stroke).toBe("#3b82f6");
});
it("sets pointerEvents: 'none' on style so nodes stay draggable", () => {
const [edge] = buildA2AEdges([makeRow()], NOW);
expect((edge.style as React.CSSProperties).pointerEvents).toBe("none");
});
it("sets pointerEvents: 'none' on labelStyle", () => {
const [edge] = buildA2AEdges([makeRow()], NOW);
expect((edge.labelStyle as React.CSSProperties).pointerEvents).toBe("none");
});
it("label uses singular 'call' for count === 1", () => {
const [edge] = buildA2AEdges([makeRow()], NOW);
expect(edge.label).toMatch(/^1 call ·/);
});
it("label uses plural 'calls' for count > 1", () => {
const rows = [makeRow({ id: "r1" }), makeRow({ id: "r2" })];
const [edge] = buildA2AEdges(rows, NOW);
expect(edge.label).toMatch(/^2 calls ·/);
});
});
// ── Suite 2: formatA2ARelativeTime ───────────────────────────────────────────
describe("formatA2ARelativeTime", () => {
it("returns 'just now' when diff < 60s", () => {
expect(formatA2ARelativeTime(NOW - 30_000, NOW)).toBe("just now");
});
it("returns 'Xm ago' for minute-scale diffs", () => {
expect(formatA2ARelativeTime(NOW - 3 * 60_000, NOW)).toBe("3m ago");
});
it("returns 'Xh ago' for hour-scale diffs", () => {
expect(formatA2ARelativeTime(NOW - 2 * 3_600_000, NOW)).toBe("2h ago");
});
});
// ── Suite 3: A2ATopologyOverlay component ─────────────────────────────────────
describe("A2ATopologyOverlay component", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.useFakeTimers();
// Reset store state to defaults
mockStoreState.showA2AEdges = true;
mockStoreState.nodes = [
{ id: "ws-a", hidden: false, data: {} },
{ id: "ws-b", hidden: false, data: {} },
{ id: "ws-hidden", hidden: true, data: {} },
];
mockStoreState.setA2AEdges = vi.fn();
});
afterEach(() => {
vi.useRealTimers();
cleanup();
});
it("renders null (no DOM output)", () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
const { container } = render(<A2ATopologyOverlay />);
expect(container.firstChild).toBeNull();
});
it("fetches activity only for visible (non-hidden) nodes", async () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); });
const paths = mockGet.mock.calls.map(([p]) => p as string);
// ws-a and ws-b should be fetched; ws-hidden should NOT
expect(paths.some((p) => p.includes("ws-a"))).toBe(true);
expect(paths.some((p) => p.includes("ws-b"))).toBe(true);
expect(paths.some((p) => p.includes("ws-hidden"))).toBe(false);
});
it("calls setA2AEdges([]) immediately when showA2AEdges is false", () => {
mockStoreState.showA2AEdges = false;
render(<A2ATopologyOverlay />);
expect(mockStoreState.setA2AEdges).toHaveBeenCalledWith([]);
expect(mockGet).not.toHaveBeenCalled();
});
it("passes built edges to setA2AEdges after fetch", async () => {
const row = makeRow({ created_at: new Date(Date.now() - 60_000).toISOString() });
// eslint-disable-next-line @typescript-eslint/no-explicit-any
mockGet.mockResolvedValue([row] as any);
render(<A2ATopologyOverlay />);
await act(async () => { await Promise.resolve(); await Promise.resolve(); });
const calls = mockStoreState.setA2AEdges.mock.calls;
const lastCall = calls[calls.length - 1][0] as unknown[];
// Should have produced at least one edge
expect(lastCall.length).toBeGreaterThanOrEqual(1);
});
it("swallows per-workspace API errors (fail-safe)", async () => {
mockGet.mockRejectedValue(new Error("Network error"));
render(<A2ATopologyOverlay />);
// Should not throw
await act(async () => { await Promise.resolve(); await Promise.resolve(); });
// setA2AEdges should still be called with an empty array
expect(mockStoreState.setA2AEdges).toHaveBeenCalled();
});
});

View File

@ -62,6 +62,12 @@ const mockStoreState = {
nestNode: vi.fn(),
isDescendant: vi.fn(() => false),
setSearchOpen: vi.fn(),
wsStatus: "connected" as const,
setWsStatus: vi.fn(),
a2aEdges: [],
setA2AEdges: vi.fn(),
showA2AEdges: false,
setShowA2AEdges: vi.fn(),
};
vi.mock("@/store/canvas", () => ({

View File

@ -72,6 +72,12 @@ const mockStoreState = {
nestNode: vi.fn(),
isDescendant: vi.fn(() => false),
setSearchOpen: vi.fn(),
wsStatus: "connected" as const,
setWsStatus: vi.fn(),
a2aEdges: [],
setA2AEdges: vi.fn(),
showA2AEdges: false,
setShowA2AEdges: vi.fn(),
};
vi.mock("@/store/canvas", () => ({

View File

@ -80,6 +80,13 @@ interface CanvasState {
/** Hydration error message — set when initial canvas load fails. Null when no error. */
hydrationError: string | null;
setHydrationError: (error: string | null) => void;
// ── A2A topology overlay (issue #744) ─────────────────────────────────────
/** Directed delegation edges shown as an overlay on the canvas (separate from topology edges). */
a2aEdges: Edge[];
setA2AEdges: (edges: Edge[]) => void;
/** Whether the A2A topology overlay is visible. Persisted to localStorage. Default: true. */
showA2AEdges: boolean;
setShowA2AEdges: (show: boolean) => void;
}
export const useCanvasStore = create<CanvasState>((set, get) => ({
@ -93,6 +100,19 @@ export const useCanvasStore = create<CanvasState>((set, get) => ({
setWsStatus: (status) => set({ wsStatus: status }),
hydrationError: null,
setHydrationError: (error) => set({ hydrationError: error }),
// A2A overlay — default on, persisted to localStorage
a2aEdges: [],
setA2AEdges: (edges) => set({ a2aEdges: edges }),
showA2AEdges:
typeof window !== "undefined"
? localStorage.getItem("molecule:show-a2a-edges") !== "false"
: true,
setShowA2AEdges: (show) => {
set({ showA2AEdges: show });
if (typeof window !== "undefined") {
localStorage.setItem("molecule:show-a2a-edges", String(show));
}
},
viewport: { x: 0, y: 0, zoom: 1 },