Merge pull request #2131 from Molecule-AI/feat/agent-comms-grouped-by-peer

feat(canvas): Agent Comms grouped by peer with sub-tabs
This commit is contained in:
Hongming Wang 2026-04-27 03:43:45 +00:00 committed by GitHub
commit fb080227a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 380 additions and 36 deletions

View File

@ -1,6 +1,6 @@
"use client";
import { useState, useEffect, useRef } from "react";
import { useState, useEffect, useMemo, useRef } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
@ -60,28 +60,63 @@ function resolveName(id: string): string {
export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null {
// delegation activity rows are written by the platform's /delegate
// handler. They're always outbound from this workspace's POV (the
// platform proxies the A2A on our behalf). Two methods:
// handler. Two methods:
// - "delegate" — the initial outbound; status pending/dispatched
// - "delegate_result" — the eventual reply; status completed/queued/failed
// We surface them in Agent Comms because they ARE agent-to-agent
// calls; without this branch they'd be dropped by the activity_type
// filter and the user would see "No agent-to-agent communications yet"
// even when the director made delegations.
//
// Flow direction: even though both rows have source_id=us (the
// platform writes them on our row), the CONVERSATIONAL direction
// differs. 'delegate' is us asking the peer; 'delegate_result' is
// the peer's reply coming back. Render them as alternating bubbles
// (out + in) so the user sees a chat-like back-and-forth instead
// of a one-sided wall of "→ To X" rows.
//
// Text content: the platform's `summary` is boilerplate
// ("Delegating to <UUID>" / "Delegation queued — target at
// capacity") — useful for an audit log, useless in a chat UI.
// Prefer the real payload:
// - outbound: request_body.task (the task text the agent sent)
// - inbound: response_body.response_preview (the peer's reply text)
// Falls back to a name-resolved summary when the payload is empty.
if (entry.activity_type === "delegation") {
const peerId = entry.target_id || "";
if (!peerId) return null;
const isResult = entry.method === "delegate_result";
const peerName = resolveName(peerId);
let text: string;
if (isResult) {
const rb = entry.response_body as Record<string, unknown> | null;
const replyText =
(typeof rb?.response_preview === "string" && rb.response_preview) ||
(typeof rb?.text === "string" && rb.text) ||
"";
if (replyText) {
text = replyText;
} else if (entry.status === "queued") {
// No actual reply yet — peer's a2a-proxy queued the call;
// show what the user needs to know without the boilerplate.
text = `Queued — ${peerName} is busy on a prior task, reply will arrive when they're free`;
} else if (entry.status === "failed") {
text = entry.summary || `Delegation to ${peerName} failed`;
} else {
text = entry.summary || "(no reply)";
}
} else {
const reqTask = (entry.request_body as Record<string, unknown> | null)?.task;
text = (typeof reqTask === "string" && reqTask) || `Delegating to ${peerName}`;
}
return {
id: entry.id,
flow: "out",
peerName: resolveName(peerId),
flow: isResult ? "in" : "out",
peerName,
peerId,
// Prefer summary (set by the platform with a human-readable
// string like "Delegating to X" or "Delegation queued — target
// at capacity"). Fall back to request body for older rows that
// pre-date the summary column being populated.
text: entry.summary || extractRequestText(entry.request_body) || "(delegation)",
responseText: entry.response_body ? extractResponseText(entry.response_body) : null,
text,
// Result text is now the primary `text` (above), so don't
// duplicate it as responseText — that would render a divider
// line under the reply with the same content below.
responseText: null,
status: entry.status || "ok",
timestamp: entry.created_at,
};
@ -265,20 +300,39 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
// own `status` field (queued / dispatched). Other events have
// implicit status: SENT → pending, COMPLETE → completed,
// FAILED → failed.
//
// Populate request_body / response_body from the payload so
// toCommMessage's delegation branch can read the actual
// task / reply text via the same code path the GET-on-mount
// uses. Without this, live-pushed bubbles would fall back
// to the boilerplate summary ("Delegating to <id>") instead
// of the real text.
let status: string;
let summary: string;
let requestBody: Record<string, unknown> | null = null;
let responseBody: Record<string, unknown> | null = null;
if (msg.event === "DELEGATION_STATUS") {
status = (p.status as string) || "queued";
summary = `Delegation ${status}`;
} else if (msg.event === "DELEGATION_COMPLETE") {
status = "completed";
summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`;
const preview = (p.response_preview as string) || "";
summary = `Delegation completed (${preview.slice(0, 60)})`;
responseBody = { response_preview: preview };
} else if (msg.event === "DELEGATION_FAILED") {
status = "failed";
summary = `Delegation failed: ${(p.error as string) || "unknown"}`;
} else {
status = "pending";
// DELEGATION_SENT carries `task_preview` (truncated to 100
// chars at broadcast time in delegation.go). Surface as
// request_body.task so the inbound bubble shows what was
// actually delegated, not the UUID stub summary.
const taskPreview = (p.task_preview as string) || "";
summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`;
if (taskPreview) {
requestBody = { task: taskPreview };
}
}
entry = {
id: (p.delegation_id as string) || crypto.randomUUID(),
@ -287,8 +341,8 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
target_id: targetId,
method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result",
summary,
request_body: null,
response_body: null,
request_body: requestBody,
response_body: responseBody,
status,
created_at: msg.timestamp || new Date().toISOString(),
};
@ -328,20 +382,190 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
);
}
return <GroupedCommsView messages={messages} bottomRef={bottomRef} />;
}
// ALL_PEERS is the sentinel selectedPeerId value for "show every peer
// in one chronological feed" — the panel's pre-grouping default.
// Picked to be a value no real peerId can collide with (workspace IDs
// are UUIDs).
export const ALL_PEERS = "__all__";
/** PeerSummary is one entry in the sub-tab bar the per-peer
* message count + most-recent timestamp used for ordering. Exported
* so the sort/count behaviour can be unit-tested without React. */
export interface PeerSummary {
peerId: string;
peerName: string;
count: number;
lastTs: string;
}
/** buildPeerSummary collapses the flat message list into per-peer
* rows, sorted by most-recent activity descending. Order matches
* Slack/Linear's DM list active conversations rise to the top.
* Pure function so the sort + count behaviour is testable without
* rendering the panel. */
export function buildPeerSummary(messages: CommMessage[]): PeerSummary[] {
const acc = new Map<string, PeerSummary>();
for (const m of messages) {
const existing = acc.get(m.peerId);
if (existing) {
existing.count += 1;
if (m.timestamp > existing.lastTs) existing.lastTs = m.timestamp;
} else {
acc.set(m.peerId, {
peerId: m.peerId,
peerName: m.peerName,
count: 1,
lastTs: m.timestamp,
});
}
}
return Array.from(acc.values()).sort((a, b) => (a.lastTs < b.lastTs ? 1 : -1));
}
/** GroupedCommsView renders the messages list with a peer-keyed
* sub-tab bar at the top so the user can drill into one DDX thread
* at a time instead of reading a single chronological mix.
*
* Tab list derivation: walk the messages once, count per-peer, sort
* by most-recent timestamp DESC so the active conversations rise to
* the top. "All" stays pinned as the leftmost tab. */
function GroupedCommsView({
messages,
bottomRef,
}: {
messages: CommMessage[];
bottomRef: React.RefObject<HTMLDivElement | null>;
}) {
const [selectedPeerId, setSelectedPeerId] = useState<string>(ALL_PEERS);
// Build per-peer summary: count + most-recent timestamp + display
// name. One pass over messages — O(n). Logic lives in a pure
// helper so it's unit-testable without rendering the panel.
const peerSummary = useMemo(() => buildPeerSummary(messages), [messages]);
// Auto-prune: if the user had selected a peer and that peer no
// longer has messages (rare — only happens if dedupe removes the
// last bubble for them), fall back to "All" rather than rendering
// an empty thread.
useEffect(() => {
if (selectedPeerId === ALL_PEERS) return;
if (!peerSummary.some((p) => p.peerId === selectedPeerId)) {
setSelectedPeerId(ALL_PEERS);
}
}, [peerSummary, selectedPeerId]);
const visible = useMemo(() => {
if (selectedPeerId === ALL_PEERS) return messages;
return messages.filter((m) => m.peerId === selectedPeerId);
}, [messages, selectedPeerId]);
return (
<div className="flex-1 overflow-y-auto p-3 space-y-2">
{messages.map((msg) =>
msg.status === "error" ? (
<ErrorMessage key={msg.id} msg={msg} />
) : (
<NormalMessage key={msg.id} msg={msg} />
),
)}
<div ref={bottomRef} />
<div className="flex flex-col h-full min-h-0">
<PeerTabs
peers={peerSummary}
totalCount={messages.length}
selectedPeerId={selectedPeerId}
onSelect={setSelectedPeerId}
/>
<div className="flex-1 overflow-y-auto p-3 space-y-2">
{visible.map((msg) =>
msg.status === "error" ? (
<ErrorMessage key={msg.id} msg={msg} />
) : (
<NormalMessage key={msg.id} msg={msg} />
),
)}
<div ref={bottomRef} />
</div>
</div>
);
}
/** PeerTabs renders the horizontally-scrolling sub-tab bar.
* Keyboard: ArrowLeft / ArrowRight cycle peers (matches the existing
* My Chat / Agent Comms tab pattern in ChatTab). */
function PeerTabs({
peers,
totalCount,
selectedPeerId,
onSelect,
}: {
peers: Array<{ peerId: string; peerName: string; count: number; lastTs: string }>;
totalCount: number;
selectedPeerId: string;
onSelect: (peerId: string) => void;
}) {
// "All" + each peer, in tab-bar order. Built once per render and
// used both for click handling and for ArrowLeft/ArrowRight cycling.
const ids = [ALL_PEERS, ...peers.map((p) => p.peerId)];
return (
<div
role="tablist"
aria-label="Peer threads"
className="flex border-b border-zinc-800/40 bg-zinc-900/30 px-2 shrink-0 overflow-x-auto"
onKeyDown={(e) => {
const idx = ids.indexOf(selectedPeerId);
if (idx < 0) return;
if (e.key === "ArrowRight") {
e.preventDefault();
onSelect(ids[(idx + 1) % ids.length]);
} else if (e.key === "ArrowLeft") {
e.preventDefault();
onSelect(ids[(idx - 1 + ids.length) % ids.length]);
}
}}
>
<PeerTabButton
active={selectedPeerId === ALL_PEERS}
onClick={() => onSelect(ALL_PEERS)}
label="All"
count={totalCount}
/>
{peers.map((p) => (
<PeerTabButton
key={p.peerId}
active={selectedPeerId === p.peerId}
onClick={() => onSelect(p.peerId)}
label={p.peerName}
count={p.count}
/>
))}
</div>
);
}
function PeerTabButton({
active,
onClick,
label,
count,
}: {
active: boolean;
onClick: () => void;
label: string;
count: number;
}) {
return (
<button
role="tab"
aria-selected={active}
tabIndex={active ? 0 : -1}
onClick={onClick}
className={`shrink-0 px-3 py-1.5 text-[10px] font-medium transition-colors whitespace-nowrap ${
active
? "border-b-2 border-cyan-500 text-cyan-200"
: "border-b-2 border-transparent text-zinc-500 hover:text-zinc-300"
}`}
>
{label} <span className="text-[9px] text-zinc-500">({count})</span>
</button>
);
}
function NormalMessage({ msg }: { msg: CommMessage }) {
return (
<div className={`flex ${msg.flow === "out" ? "justify-end" : "justify-start"}`}>

View File

@ -15,7 +15,7 @@ vi.mock("@/store/canvas", () => ({
},
}));
import { toCommMessage, type ActivityEntry } from "../AgentCommsPanel";
import { toCommMessage, buildPeerSummary, type ActivityEntry } from "../AgentCommsPanel";
const SELF = "ws-self";
const PEER = "ws-peer";
@ -118,7 +118,10 @@ describe("toCommMessage — flow derivation", () => {
// Pre-fix the panel filtered these out and showed "no agent comms"
// even when 6+ delegations existed in the DB.
it("delegation 'delegate' row maps as outbound to target", () => {
it("delegation 'delegate' row prefers request_body.task over the boilerplate summary", () => {
// The platform's `summary` field is "Delegating to <UUID>" — useless
// in chat. The real task text lives in request_body.task. Show that
// so the user sees WHAT was delegated, not just where.
const m = toCommMessage(
makeEntry({
activity_type: "delegation",
@ -126,6 +129,7 @@ describe("toCommMessage — flow derivation", () => {
source_id: SELF,
target_id: PEER,
summary: "Delegating to ws-peer",
request_body: { task: "Build me 10 landing pages" },
status: "pending",
}),
SELF,
@ -134,15 +138,52 @@ describe("toCommMessage — flow derivation", () => {
expect(m!.flow).toBe("out");
expect(m!.peerId).toBe(PEER);
expect(m!.peerName).toBe("Peer Agent");
expect(m!.text).toBe("Delegating to ws-peer");
expect(m!.text).toBe("Build me 10 landing pages");
expect(m!.status).toBe("pending");
});
it("delegation 'delegate_result' queued row preserves status='queued'", () => {
// The "queued" status is the load-bearing signal the LLM uses to
// decide whether to wait or fall back. If toCommMessage drops or
// rewrites it, the UI loses the ability to show the "peer busy,
// will reply" affordance.
it("delegation 'delegate' row falls back to a name-resolved label when request_body is missing", () => {
// Older rows or some queued paths don't have request_body.task.
// Don't render the raw UUID — resolve to the peer name so the
// bubble at least reads "Delegating to Peer Agent".
const m = toCommMessage(
makeEntry({
activity_type: "delegation",
method: "delegate",
source_id: SELF,
target_id: PEER,
summary: "Delegating to ws-peer",
request_body: null,
status: "pending",
}),
SELF,
);
expect(m!.text).toBe("Delegating to Peer Agent");
});
it("delegation 'delegate_result' row is INBOUND so the chat shows alternating bubbles", () => {
// Even though source_id=us (we wrote the row), the conversational
// direction is peer → us. Render as flow="in" so the user sees
// a chat-style back-and-forth instead of a one-sided "→ To X" wall.
const m = toCommMessage(
makeEntry({
activity_type: "delegation",
method: "delegate_result",
source_id: SELF,
target_id: PEER,
summary: "Delegation completed (...)",
response_body: { response_preview: "Done — ZIP at /tmp/x.zip" },
status: "completed",
}),
SELF,
);
expect(m!.flow).toBe("in");
expect(m!.text).toBe("Done — ZIP at /tmp/x.zip");
});
it("delegation 'delegate_result' queued row shows a human-readable wait message", () => {
// "Delegation queued — target at capacity" is platform jargon.
// Render with the resolved peer name so the user knows WHO is busy.
const m = toCommMessage(
makeEntry({
activity_type: "delegation",
@ -150,12 +191,15 @@ describe("toCommMessage — flow derivation", () => {
source_id: SELF,
target_id: PEER,
summary: "Delegation queued — target at capacity",
response_body: { queued: true },
status: "queued",
}),
SELF,
);
expect(m!.flow).toBe("in");
expect(m!.status).toBe("queued");
expect(m!.text).toContain("queued");
expect(m!.text).toContain("Peer Agent");
expect(m!.text.toLowerCase()).toContain("busy");
});
it("delegation row with no target_id returns null", () => {
@ -172,3 +216,79 @@ describe("toCommMessage — flow derivation", () => {
expect(m).toBeNull();
});
});
// --- buildPeerSummary — peer-tab ordering + counts -------------------
//
// The grouped view sorts peer tabs by most-recent activity descending
// (Slack-style DM list) so active conversations rise to the top.
// These tests pin that ordering plus the count aggregation. Pure
// helper — no React render required.
describe("buildPeerSummary", () => {
function msg(peerId: string, peerName: string, timestamp: string): never {
// Cast through unknown — we only need the fields buildPeerSummary
// reads (peerId, peerName, timestamp). Other CommMessage fields
// are irrelevant to the sort/count logic.
return {
id: `id-${peerId}-${timestamp}`,
flow: "out",
peerId,
peerName,
text: "",
responseText: null,
status: "ok",
timestamp,
} as never;
}
it("collapses messages into one row per peer with correct count", () => {
const summary = buildPeerSummary([
msg("ws-a", "Alpha", "2026-04-25T10:00:00Z"),
msg("ws-a", "Alpha", "2026-04-25T10:01:00Z"),
msg("ws-b", "Bravo", "2026-04-25T10:02:00Z"),
]);
expect(summary).toHaveLength(2);
const byId = new Map(summary.map((s) => [s.peerId, s]));
expect(byId.get("ws-a")?.count).toBe(2);
expect(byId.get("ws-b")?.count).toBe(1);
});
it("orders peers by most-recent activity DESC", () => {
// ws-old's last activity was at 10:00, ws-new's was at 10:30 —
// ws-new should sort first because it's more recently active.
const summary = buildPeerSummary([
msg("ws-old", "Old", "2026-04-25T09:00:00Z"),
msg("ws-old", "Old", "2026-04-25T10:00:00Z"),
msg("ws-new", "New", "2026-04-25T10:30:00Z"),
]);
expect(summary[0].peerId).toBe("ws-new");
expect(summary[1].peerId).toBe("ws-old");
});
it("tracks lastTs as the maximum timestamp across that peer's messages", () => {
// Out-of-order messages — buildPeerSummary should still pick the
// newest. Pre-fix a naive "last-seen-wins" would have set lastTs
// to the second message's timestamp (older).
const summary = buildPeerSummary([
msg("ws-a", "Alpha", "2026-04-25T11:00:00Z"),
msg("ws-a", "Alpha", "2026-04-25T09:00:00Z"),
msg("ws-a", "Alpha", "2026-04-25T10:00:00Z"),
]);
expect(summary[0].lastTs).toBe("2026-04-25T11:00:00Z");
});
it("empty input returns empty array", () => {
expect(buildPeerSummary([])).toEqual([]);
});
it("preserves the peer's display name from the first occurrence", () => {
// If two messages for the same peerId carry different peerName
// (shouldn't happen in practice, but defensive), the first wins
// — matches what the user sees in the tile and avoids name flicker.
const summary = buildPeerSummary([
msg("ws-a", "Alpha", "2026-04-25T10:00:00Z"),
msg("ws-a", "Renamed", "2026-04-25T10:01:00Z"),
]);
expect(summary[0].peerName).toBe("Alpha");
});
});