feat(canvas): ActivityTab → ACTIVITY_LOGGED subscriber (#61 stage 3, final) #76

Merged
claude-ceo-assistant merged 5 commits from feat/canvas-activity-tab-ws-subscribe into main 2026-05-07 23:27:34 +00:00
2 changed files with 252 additions and 6 deletions

View File

@ -36,6 +36,10 @@ vi.mock("@/hooks/useWorkspaceName", () => ({
useWorkspaceName: () => () => "Test WS",
}));
import {
emitSocketEvent,
_resetSocketEventListenersForTests,
} from "@/store/socket-events";
import { ActivityTab } from "../tabs/ActivityTab";
// ── Fixtures ──────────────────────────────────────────────────────────────────
@ -358,6 +362,191 @@ describe("ActivityTab — refresh button", () => {
});
});
// ── Suite 6.5: ACTIVITY_LOGGED subscription (#61 stage 3) ─────────────────────
//
// Pin the post-#61 behaviour: WS push extends the rendered list with NO
// additional HTTP fetch. The 5s polling loop is gone; live updates
// arrive over the WebSocket bus.
describe("ActivityTab — #61 stage 3: ACTIVITY_LOGGED subscription", () => {
beforeEach(() => {
vi.clearAllMocks();
mockGet.mockResolvedValue([]);
_resetSocketEventListenersForTests();
});
afterEach(() => {
cleanup();
_resetSocketEventListenersForTests();
});
function emitActivity(overrides: {
workspaceId?: string;
activityType?: string;
summary?: string;
id?: string;
} = {}) {
const realNow = Date.now();
emitSocketEvent({
event: "ACTIVITY_LOGGED",
workspace_id: overrides.workspaceId ?? "ws-1",
timestamp: new Date(realNow).toISOString(),
payload: {
id: overrides.id ?? `act-${Math.random().toString(36).slice(2)}`,
activity_type: overrides.activityType ?? "agent_log",
source_id: null,
target_id: null,
method: null,
summary: overrides.summary ?? "live-pushed",
status: "ok",
created_at: new Date(realNow - 5_000).toISOString(),
},
});
}
it("WS push for matching workspace prepends to the list with NO HTTP call", async () => {
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => {
expect(screen.getByText(/0 activities|no activity/i)).toBeTruthy();
});
mockGet.mockClear();
await act(async () => {
emitActivity({ summary: "live-row-from-bus" });
});
await waitFor(() => {
expect(screen.getByText(/live-row-from-bus/)).toBeTruthy();
});
expect(mockGet).not.toHaveBeenCalled();
});
it("WS push for a different workspace is ignored", async () => {
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => screen.getByText(/no activity/i));
await act(async () => {
emitActivity({
workspaceId: "ws-other",
summary: "should-not-render-other-ws",
});
});
expect(screen.queryByText(/should-not-render-other-ws/)).toBeNull();
});
it("WS push respects the active filter — non-matching activity_type is ignored", async () => {
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => screen.getByText(/no activity/i));
// Apply "Tasks" filter.
clickButton(/tasks/i);
await waitFor(() => {
expect(
screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
).toBe("true");
});
// Push an a2a_send (does NOT match task_update filter).
await act(async () => {
emitActivity({
activityType: "a2a_send",
summary: "should-not-render-filter-mismatch",
});
});
expect(
screen.queryByText(/should-not-render-filter-mismatch/),
).toBeNull();
});
it("WS push respects the active filter — matching activity_type is rendered", async () => {
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => screen.getByText(/no activity/i));
clickButton(/tasks/i);
await waitFor(() => {
expect(
screen.getByRole("button", { name: /tasks/i }).getAttribute("aria-pressed"),
).toBe("true");
});
await act(async () => {
emitActivity({
activityType: "task_update",
summary: "task-filter-match",
});
});
await waitFor(() => {
expect(screen.getByText(/task-filter-match/)).toBeTruthy();
});
});
it("WS push while autoRefresh is paused is ignored", async () => {
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => screen.getByText(/no activity/i));
// Toggle Live → Paused.
clickButton(/live/i);
await waitFor(() => {
expect(screen.getByText(/Paused/)).toBeTruthy();
});
await act(async () => {
emitActivity({ summary: "should-not-render-paused" });
});
expect(screen.queryByText(/should-not-render-paused/)).toBeNull();
});
it("WS push for a row already in the list is deduped (no double-render)", async () => {
// Bootstrap with one row — same id as the WS push to trigger dedup.
mockGet.mockResolvedValueOnce([
makeEntry({ id: "shared-id", summary: "bootstrap-summary" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await waitFor(() => {
expect(screen.getByText(/bootstrap-summary/)).toBeTruthy();
});
mockGet.mockClear();
// Push a row with the SAME id but a different summary — must not
// render the new summary; original row stays.
await act(async () => {
emitActivity({
id: "shared-id",
summary: "should-not-replace-existing",
});
});
expect(screen.queryByText(/should-not-replace-existing/)).toBeNull();
// Also verify count didn't grow.
expect(screen.getByText(/1 activities/)).toBeTruthy();
});
it("does NOT poll on a 5s interval after mount (post-#61)", async () => {
vi.useFakeTimers();
try {
render(<ActivityTab workspaceId="ws-1" />);
// Drain the mount-time bootstrap promise.
await act(async () => {
await Promise.resolve();
await Promise.resolve();
});
const callsAfterBootstrap = mockGet.mock.calls.length;
expect(callsAfterBootstrap).toBeGreaterThanOrEqual(1);
// Pre-#61: a 30s clock advance fires 6 more polls. Post-#61: 0.
await act(async () => {
vi.advanceTimersByTime(30_000);
});
expect(mockGet.mock.calls.length).toBe(callsAfterBootstrap);
} finally {
vi.useRealTimers();
}
});
});
// ── Suite 7: Activity count ───────────────────────────────────────────────────
describe("ActivityTab — activity count", () => {

View File

@ -1,8 +1,9 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { useState, useEffect, useCallback, useRef } from "react";
import { api } from "@/lib/api";
import { ConversationTraceModal } from "@/components/ConversationTraceModal";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { type ActivityEntry } from "@/types/activity";
import { useWorkspaceName } from "@/hooks/useWorkspaceName";
import { inferA2AErrorHint } from "./chat/a2aErrorHint";
@ -48,6 +49,15 @@ export function ActivityTab({ workspaceId }: Props) {
const [traceOpen, setTraceOpen] = useState(false);
const resolveName = useWorkspaceName();
// Refs let the WS handler read the latest filter / autoRefresh
// selection without re-subscribing on every state change. The bus
// listener is registered exactly once per mount via useSocketEvent's
// ref-internal pattern; subscriber-side filtering reads from these.
const filterRef = useRef(filter);
filterRef.current = filter;
const autoRefreshRef = useRef(autoRefresh);
autoRefreshRef.current = autoRefresh;
const loadActivities = useCallback(async () => {
try {
const typeParam = filter !== "all" ? `?type=${filter}` : "";
@ -66,11 +76,58 @@ export function ActivityTab({ workspaceId }: Props) {
loadActivities();
}, [loadActivities]);
useEffect(() => {
if (!autoRefresh) return;
const interval = setInterval(loadActivities, 5000);
return () => clearInterval(interval);
}, [loadActivities, autoRefresh]);
// Live-update path (issue #61 stage 3, replaces the 5s setInterval).
// ACTIVITY_LOGGED events from this workspace prepend to the rendered
// list — dedup by id so a server-side update + a poll reply don't
// double-render the same row.
//
// Honours the user's autoRefresh toggle: when paused, live updates
// are dropped until the user re-enables Live (or hits Refresh, which
// re-bootstraps via loadActivities).
//
// Filter awareness: matches the server-side `?type=<filter>`
// semantics so the panel doesn't show rows the user excluded.
useSocketEvent((msg) => {
if (!autoRefreshRef.current) return;
if (msg.event !== "ACTIVITY_LOGGED") return;
if (msg.workspace_id !== workspaceId) return;
const p = (msg.payload || {}) as Record<string, unknown>;
const activityType = (p.activity_type as string) || "";
const f = filterRef.current;
if (f !== "all" && activityType !== f) return;
const entry: ActivityEntry = {
id:
(p.id as string) ||
`ws-push-${msg.timestamp || Date.now()}-${msg.workspace_id}`,
workspace_id: msg.workspace_id,
activity_type: activityType,
source_id: (p.source_id as string | null) ?? null,
target_id: (p.target_id as string | null) ?? null,
method: (p.method as string | null) ?? null,
summary: (p.summary as string | null) ?? null,
request_body: (p.request_body as Record<string, unknown> | null) ?? null,
response_body:
(p.response_body as Record<string, unknown> | null) ?? null,
duration_ms: (p.duration_ms as number | null) ?? null,
status: (p.status as string) || "ok",
error_detail: (p.error_detail as string | null) ?? null,
created_at:
(p.created_at as string) ||
msg.timestamp ||
new Date().toISOString(),
};
setActivities((prev) => {
// Dedup by id — a row that arrived via the bootstrap fetch and
// also fires ACTIVITY_LOGGED from a delayed server-side hook
// must render exactly once.
if (prev.some((e) => e.id === entry.id)) return prev;
return [entry, ...prev];
});
});
return (
<div className="flex flex-col h-full">