diff --git a/canvas/src/components/__tests__/ActivityTab.test.tsx b/canvas/src/components/__tests__/ActivityTab.test.tsx index c5af736b..ee23c3c3 100644 --- a/canvas/src/components/__tests__/ActivityTab.test.tsx +++ b/canvas/src/components/__tests__/ActivityTab.test.tsx @@ -36,6 +36,10 @@ vi.mock("@/hooks/useWorkspaceName", () => ({ useWorkspaceName: () => () => "Test WS", })); +import { + emitSocketEvent, + _resetSocketEventListenersForTests, +} from "@/store/socket-events"; import { ActivityTab } from "../tabs/ActivityTab"; // ── Fixtures ────────────────────────────────────────────────────────────────── @@ -358,6 +362,191 @@ describe("ActivityTab — refresh button", () => { }); }); +// ── Suite 6.5: ACTIVITY_LOGGED subscription (#61 stage 3) ───────────────────── +// +// Pin the post-#61 behaviour: WS push extends the rendered list with NO +// additional HTTP fetch. The 5s polling loop is gone; live updates +// arrive over the WebSocket bus. + +describe("ActivityTab — #61 stage 3: ACTIVITY_LOGGED subscription", () => { + beforeEach(() => { + vi.clearAllMocks(); + mockGet.mockResolvedValue([]); + _resetSocketEventListenersForTests(); + }); + afterEach(() => { + cleanup(); + _resetSocketEventListenersForTests(); + }); + + function emitActivity(overrides: { + workspaceId?: string; + activityType?: string; + summary?: string; + id?: string; + } = {}) { + const realNow = Date.now(); + emitSocketEvent({ + event: "ACTIVITY_LOGGED", + workspace_id: overrides.workspaceId ?? "ws-1", + timestamp: new Date(realNow).toISOString(), + payload: { + id: overrides.id ?? `act-${Math.random().toString(36).slice(2)}`, + activity_type: overrides.activityType ?? "agent_log", + source_id: null, + target_id: null, + method: null, + summary: overrides.summary ?? "live-pushed", + status: "ok", + created_at: new Date(realNow - 5_000).toISOString(), + }, + }); + } + + it("WS push for matching workspace prepends to the list with NO HTTP call", async () => { + render(); + await waitFor(() => { + expect(screen.getByText(/0 activities|no activity/i)).toBeTruthy(); + }); + mockGet.mockClear(); + + await act(async () => { + emitActivity({ summary: "live-row-from-bus" }); + }); + + await waitFor(() => { + expect(screen.getByText(/live-row-from-bus/)).toBeTruthy(); + }); + expect(mockGet).not.toHaveBeenCalled(); + }); + + it("WS push for a different workspace is ignored", async () => { + render(); + await waitFor(() => screen.getByText(/no activity/i)); + + await act(async () => { + emitActivity({ + workspaceId: "ws-other", + summary: "should-not-render-other-ws", + }); + }); + + expect(screen.queryByText(/should-not-render-other-ws/)).toBeNull(); + }); + + it("WS push respects the active filter — non-matching activity_type is ignored", async () => { + render(); + await waitFor(() => screen.getByText(/no activity/i)); + + // Apply "Tasks" filter. + clickButton(/tasks/i); + await waitFor(() => { + expect( + screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"), + ).toBe("true"); + }); + + // Push an a2a_send (does NOT match task_update filter). + await act(async () => { + emitActivity({ + activityType: "a2a_send", + summary: "should-not-render-filter-mismatch", + }); + }); + + expect( + screen.queryByText(/should-not-render-filter-mismatch/), + ).toBeNull(); + }); + + it("WS push respects the active filter — matching activity_type is rendered", async () => { + render(); + await waitFor(() => screen.getByText(/no activity/i)); + + clickButton(/tasks/i); + await waitFor(() => { + expect( + screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"), + ).toBe("true"); + }); + + await act(async () => { + emitActivity({ + activityType: "task_update", + summary: "task-filter-match", + }); + }); + + await waitFor(() => { + expect(screen.getByText(/task-filter-match/)).toBeTruthy(); + }); + }); + + it("WS push while autoRefresh is paused is ignored", async () => { + render(); + await waitFor(() => screen.getByText(/no activity/i)); + + // Toggle Live → Paused. + clickButton(/live/i); + await waitFor(() => { + expect(screen.getByText(/Paused/)).toBeTruthy(); + }); + + await act(async () => { + emitActivity({ summary: "should-not-render-paused" }); + }); + + expect(screen.queryByText(/should-not-render-paused/)).toBeNull(); + }); + + it("WS push for a row already in the list is deduped (no double-render)", async () => { + // Bootstrap with one row — same id as the WS push to trigger dedup. + mockGet.mockResolvedValueOnce([ + makeEntry({ id: "shared-id", summary: "bootstrap-summary" }), + ]); + render(); + await waitFor(() => { + expect(screen.getByText(/bootstrap-summary/)).toBeTruthy(); + }); + mockGet.mockClear(); + + // Push a row with the SAME id but a different summary — must not + // render the new summary; original row stays. + await act(async () => { + emitActivity({ + id: "shared-id", + summary: "should-not-replace-existing", + }); + }); + + expect(screen.queryByText(/should-not-replace-existing/)).toBeNull(); + // Also verify count didn't grow. + expect(screen.getByText(/1 activities/)).toBeTruthy(); + }); + + it("does NOT poll on a 5s interval after mount (post-#61)", async () => { + vi.useFakeTimers(); + try { + render(); + // Drain the mount-time bootstrap promise. + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + }); + const callsAfterBootstrap = mockGet.mock.calls.length; + expect(callsAfterBootstrap).toBeGreaterThanOrEqual(1); + + // Pre-#61: a 30s clock advance fires 6 more polls. Post-#61: 0. + await act(async () => { + vi.advanceTimersByTime(30_000); + }); + expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap); + } finally { + vi.useRealTimers(); + } + }); +}); + // ── Suite 7: Activity count ─────────────────────────────────────────────────── describe("ActivityTab — activity count", () => { diff --git a/canvas/src/components/tabs/ActivityTab.tsx b/canvas/src/components/tabs/ActivityTab.tsx index 8f90e51d..34671dd2 100644 --- a/canvas/src/components/tabs/ActivityTab.tsx +++ b/canvas/src/components/tabs/ActivityTab.tsx @@ -1,8 +1,9 @@ "use client"; -import { useState, useEffect, useCallback } from "react"; +import { useState, useEffect, useCallback, useRef } from "react"; import { api } from "@/lib/api"; import { ConversationTraceModal } from "@/components/ConversationTraceModal"; +import { useSocketEvent } from "@/hooks/useSocketEvent"; import { type ActivityEntry } from "@/types/activity"; import { useWorkspaceName } from "@/hooks/useWorkspaceName"; import { inferA2AErrorHint } from "./chat/a2aErrorHint"; @@ -48,6 +49,15 @@ export function ActivityTab({ workspaceId }: Props) { const [traceOpen, setTraceOpen] = useState(false); const resolveName = useWorkspaceName(); + // Refs let the WS handler read the latest filter / autoRefresh + // selection without re-subscribing on every state change. The bus + // listener is registered exactly once per mount via useSocketEvent's + // ref-internal pattern; subscriber-side filtering reads from these. + const filterRef = useRef(filter); + filterRef.current = filter; + const autoRefreshRef = useRef(autoRefresh); + autoRefreshRef.current = autoRefresh; + const loadActivities = useCallback(async () => { try { const typeParam = filter !== "all" ? `?type=${filter}` : ""; @@ -66,11 +76,58 @@ export function ActivityTab({ workspaceId }: Props) { loadActivities(); }, [loadActivities]); - useEffect(() => { - if (!autoRefresh) return; - const interval = setInterval(loadActivities, 5000); - return () => clearInterval(interval); - }, [loadActivities, autoRefresh]); + // Live-update path (issue #61 stage 3, replaces the 5s setInterval). + // ACTIVITY_LOGGED events from this workspace prepend to the rendered + // list — dedup by id so a server-side update + a poll reply don't + // double-render the same row. + // + // Honours the user's autoRefresh toggle: when paused, live updates + // are dropped until the user re-enables Live (or hits Refresh, which + // re-bootstraps via loadActivities). + // + // Filter awareness: matches the server-side `?type=` + // semantics so the panel doesn't show rows the user excluded. + useSocketEvent((msg) => { + if (!autoRefreshRef.current) return; + if (msg.event !== "ACTIVITY_LOGGED") return; + if (msg.workspace_id !== workspaceId) return; + + const p = (msg.payload || {}) as Record; + const activityType = (p.activity_type as string) || ""; + + const f = filterRef.current; + if (f !== "all" && activityType !== f) return; + + const entry: ActivityEntry = { + id: + (p.id as string) || + `ws-push-${msg.timestamp || Date.now()}-${msg.workspace_id}`, + workspace_id: msg.workspace_id, + activity_type: activityType, + source_id: (p.source_id as string | null) ?? null, + target_id: (p.target_id as string | null) ?? null, + method: (p.method as string | null) ?? null, + summary: (p.summary as string | null) ?? null, + request_body: (p.request_body as Record | null) ?? null, + response_body: + (p.response_body as Record | null) ?? null, + duration_ms: (p.duration_ms as number | null) ?? null, + status: (p.status as string) || "ok", + error_detail: (p.error_detail as string | null) ?? null, + created_at: + (p.created_at as string) || + msg.timestamp || + new Date().toISOString(), + }; + + setActivities((prev) => { + // Dedup by id — a row that arrived via the bootstrap fetch and + // also fires ACTIVITY_LOGGED from a delayed server-side hook + // must render exactly once. + if (prev.some((e) => e.id === entry.id)) return prev; + return [entry, ...prev]; + }); + }); return (