diff --git a/canvas/src/components/__tests__/ActivityTab.test.tsx b/canvas/src/components/__tests__/ActivityTab.test.tsx
index c5af736b..ee23c3c3 100644
--- a/canvas/src/components/__tests__/ActivityTab.test.tsx
+++ b/canvas/src/components/__tests__/ActivityTab.test.tsx
@@ -36,6 +36,10 @@ vi.mock("@/hooks/useWorkspaceName", () => ({
useWorkspaceName: () => () => "Test WS",
}));
+import {
+ emitSocketEvent,
+ _resetSocketEventListenersForTests,
+} from "@/store/socket-events";
import { ActivityTab } from "../tabs/ActivityTab";
// ── Fixtures ──────────────────────────────────────────────────────────────────
@@ -358,6 +362,191 @@ describe("ActivityTab — refresh button", () => {
});
});
+// ── Suite 6.5: ACTIVITY_LOGGED subscription (#61 stage 3) ─────────────────────
+//
+// Pin the post-#61 behaviour: WS push extends the rendered list with NO
+// additional HTTP fetch. The 5s polling loop is gone; live updates
+// arrive over the WebSocket bus.
+
+describe("ActivityTab — #61 stage 3: ACTIVITY_LOGGED subscription", () => {
+ beforeEach(() => {
+ vi.clearAllMocks();
+ mockGet.mockResolvedValue([]);
+ _resetSocketEventListenersForTests();
+ });
+ afterEach(() => {
+ cleanup();
+ _resetSocketEventListenersForTests();
+ });
+
+ function emitActivity(overrides: {
+ workspaceId?: string;
+ activityType?: string;
+ summary?: string;
+ id?: string;
+ } = {}) {
+ const realNow = Date.now();
+ emitSocketEvent({
+ event: "ACTIVITY_LOGGED",
+ workspace_id: overrides.workspaceId ?? "ws-1",
+ timestamp: new Date(realNow).toISOString(),
+ payload: {
+ id: overrides.id ?? `act-${Math.random().toString(36).slice(2)}`,
+ activity_type: overrides.activityType ?? "agent_log",
+ source_id: null,
+ target_id: null,
+ method: null,
+ summary: overrides.summary ?? "live-pushed",
+ status: "ok",
+ created_at: new Date(realNow - 5_000).toISOString(),
+ },
+ });
+ }
+
+ it("WS push for matching workspace prepends to the list with NO HTTP call", async () => {
+ render();
+ await waitFor(() => {
+ expect(screen.getByText(/0 activities|no activity/i)).toBeTruthy();
+ });
+ mockGet.mockClear();
+
+ await act(async () => {
+ emitActivity({ summary: "live-row-from-bus" });
+ });
+
+ await waitFor(() => {
+ expect(screen.getByText(/live-row-from-bus/)).toBeTruthy();
+ });
+ expect(mockGet).not.toHaveBeenCalled();
+ });
+
+ it("WS push for a different workspace is ignored", async () => {
+ render();
+ await waitFor(() => screen.getByText(/no activity/i));
+
+ await act(async () => {
+ emitActivity({
+ workspaceId: "ws-other",
+ summary: "should-not-render-other-ws",
+ });
+ });
+
+ expect(screen.queryByText(/should-not-render-other-ws/)).toBeNull();
+ });
+
+ it("WS push respects the active filter — non-matching activity_type is ignored", async () => {
+ render();
+ await waitFor(() => screen.getByText(/no activity/i));
+
+ // Apply "Tasks" filter.
+ clickButton(/tasks/i);
+ await waitFor(() => {
+ expect(
+ screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
+ ).toBe("true");
+ });
+
+ // Push an a2a_send (does NOT match task_update filter).
+ await act(async () => {
+ emitActivity({
+ activityType: "a2a_send",
+ summary: "should-not-render-filter-mismatch",
+ });
+ });
+
+ expect(
+ screen.queryByText(/should-not-render-filter-mismatch/),
+ ).toBeNull();
+ });
+
+ it("WS push respects the active filter — matching activity_type is rendered", async () => {
+ render();
+ await waitFor(() => screen.getByText(/no activity/i));
+
+ clickButton(/tasks/i);
+ await waitFor(() => {
+ expect(
+ screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
+ ).toBe("true");
+ });
+
+ await act(async () => {
+ emitActivity({
+ activityType: "task_update",
+ summary: "task-filter-match",
+ });
+ });
+
+ await waitFor(() => {
+ expect(screen.getByText(/task-filter-match/)).toBeTruthy();
+ });
+ });
+
+ it("WS push while autoRefresh is paused is ignored", async () => {
+ render();
+ await waitFor(() => screen.getByText(/no activity/i));
+
+ // Toggle Live → Paused.
+ clickButton(/live/i);
+ await waitFor(() => {
+ expect(screen.getByText(/Paused/)).toBeTruthy();
+ });
+
+ await act(async () => {
+ emitActivity({ summary: "should-not-render-paused" });
+ });
+
+ expect(screen.queryByText(/should-not-render-paused/)).toBeNull();
+ });
+
+ it("WS push for a row already in the list is deduped (no double-render)", async () => {
+ // Bootstrap with one row — same id as the WS push to trigger dedup.
+ mockGet.mockResolvedValueOnce([
+ makeEntry({ id: "shared-id", summary: "bootstrap-summary" }),
+ ]);
+ render();
+ await waitFor(() => {
+ expect(screen.getByText(/bootstrap-summary/)).toBeTruthy();
+ });
+ mockGet.mockClear();
+
+ // Push a row with the SAME id but a different summary — must not
+ // render the new summary; original row stays.
+ await act(async () => {
+ emitActivity({
+ id: "shared-id",
+ summary: "should-not-replace-existing",
+ });
+ });
+
+ expect(screen.queryByText(/should-not-replace-existing/)).toBeNull();
+ // Also verify count didn't grow.
+ expect(screen.getByText(/1 activities/)).toBeTruthy();
+ });
+
+ it("does NOT poll on a 5s interval after mount (post-#61)", async () => {
+ vi.useFakeTimers();
+ try {
+ render();
+ // Drain the mount-time bootstrap promise.
+ await act(async () => {
+ await Promise.resolve();
+ await Promise.resolve();
+ });
+ const callsAfterBootstrap = mockGet.mock.calls.length;
+ expect(callsAfterBootstrap).toBeGreaterThanOrEqual(1);
+
+ // Pre-#61: a 30s clock advance fires 6 more polls. Post-#61: 0.
+ await act(async () => {
+ vi.advanceTimersByTime(30_000);
+ });
+ expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap);
+ } finally {
+ vi.useRealTimers();
+ }
+ });
+});
+
// ── Suite 7: Activity count ───────────────────────────────────────────────────
describe("ActivityTab — activity count", () => {
diff --git a/canvas/src/components/tabs/ActivityTab.tsx b/canvas/src/components/tabs/ActivityTab.tsx
index 8f90e51d..34671dd2 100644
--- a/canvas/src/components/tabs/ActivityTab.tsx
+++ b/canvas/src/components/tabs/ActivityTab.tsx
@@ -1,8 +1,9 @@
"use client";
-import { useState, useEffect, useCallback } from "react";
+import { useState, useEffect, useCallback, useRef } from "react";
import { api } from "@/lib/api";
import { ConversationTraceModal } from "@/components/ConversationTraceModal";
+import { useSocketEvent } from "@/hooks/useSocketEvent";
import { type ActivityEntry } from "@/types/activity";
import { useWorkspaceName } from "@/hooks/useWorkspaceName";
import { inferA2AErrorHint } from "./chat/a2aErrorHint";
@@ -48,6 +49,15 @@ export function ActivityTab({ workspaceId }: Props) {
const [traceOpen, setTraceOpen] = useState(false);
const resolveName = useWorkspaceName();
+ // Refs let the WS handler read the latest filter / autoRefresh
+ // selection without re-subscribing on every state change. The bus
+ // listener is registered exactly once per mount via useSocketEvent's
+ // ref-internal pattern; subscriber-side filtering reads from these.
+ const filterRef = useRef(filter);
+ filterRef.current = filter;
+ const autoRefreshRef = useRef(autoRefresh);
+ autoRefreshRef.current = autoRefresh;
+
const loadActivities = useCallback(async () => {
try {
const typeParam = filter !== "all" ? `?type=${filter}` : "";
@@ -66,11 +76,58 @@ export function ActivityTab({ workspaceId }: Props) {
loadActivities();
}, [loadActivities]);
- useEffect(() => {
- if (!autoRefresh) return;
- const interval = setInterval(loadActivities, 5000);
- return () => clearInterval(interval);
- }, [loadActivities, autoRefresh]);
+ // Live-update path (issue #61 stage 3, replaces the 5s setInterval).
+ // ACTIVITY_LOGGED events from this workspace prepend to the rendered
+ // list — dedup by id so a server-side update + a poll reply don't
+ // double-render the same row.
+ //
+ // Honours the user's autoRefresh toggle: when paused, live updates
+ // are dropped until the user re-enables Live (or hits Refresh, which
+ // re-bootstraps via loadActivities).
+ //
+ // Filter awareness: matches the server-side `?type=`
+ // semantics so the panel doesn't show rows the user excluded.
+ useSocketEvent((msg) => {
+ if (!autoRefreshRef.current) return;
+ if (msg.event !== "ACTIVITY_LOGGED") return;
+ if (msg.workspace_id !== workspaceId) return;
+
+ const p = (msg.payload || {}) as Record;
+ const activityType = (p.activity_type as string) || "";
+
+ const f = filterRef.current;
+ if (f !== "all" && activityType !== f) return;
+
+ const entry: ActivityEntry = {
+ id:
+ (p.id as string) ||
+ `ws-push-${msg.timestamp || Date.now()}-${msg.workspace_id}`,
+ workspace_id: msg.workspace_id,
+ activity_type: activityType,
+ source_id: (p.source_id as string | null) ?? null,
+ target_id: (p.target_id as string | null) ?? null,
+ method: (p.method as string | null) ?? null,
+ summary: (p.summary as string | null) ?? null,
+ request_body: (p.request_body as Record | null) ?? null,
+ response_body:
+ (p.response_body as Record | null) ?? null,
+ duration_ms: (p.duration_ms as number | null) ?? null,
+ status: (p.status as string) || "ok",
+ error_detail: (p.error_detail as string | null) ?? null,
+ created_at:
+ (p.created_at as string) ||
+ msg.timestamp ||
+ new Date().toISOString(),
+ };
+
+ setActivities((prev) => {
+ // Dedup by id — a row that arrived via the bootstrap fetch and
+ // also fires ACTIVITY_LOGGED from a delayed server-side hook
+ // must render exactly once.
+ if (prev.some((e) => e.id === entry.id)) return prev;
+ return [entry, ...prev];
+ });
+ });
return (