fix(canvas+workspace-server): fan user's own message to all conversation sessions (#228) #1440

Closed
hongming wants to merge 1 commits from fix/canvas-user-message-cross-session-fanout into staging
11 changed files with 404 additions and 2 deletions
@@ -236,6 +236,13 @@ export function MobileChat({
useChatSocket(agentId, {
onAgentMessage: appendMessageDeduped,
// User message fanned in from another session of this conversation
// (#228). appendMessageDeduped collapses the originating session's
// optimistic copy (role+content+3s window) so no double bubble; a
// second device renders it fresh. Same helper this component already
// uses for the send-path optimistic insert and agent replies, so
// the realtime path stays unified across surfaces.
onUserMessage: appendMessageDeduped,
onSendComplete: releaseSendGuards,
});
+10
View File
@@ -143,6 +143,16 @@ function MyChatPanel({ workspaceId, data }: Props) {
releaseSendGuards();
}
},
// User message fanned in from another session of this conversation
// (#228). MUST go through appendMessageDeduped: this session may be
// the originating one (it already did the optimistic insert at
// useChatSend onUserMessage), in which case the dedupe (role+content
// +3s window) collapses the echo. On a different device it renders
// fresh. Deliberately does NOT touch the send guards — receiving a
// user message from elsewhere is not this session's send resolving.
onUserMessage: (msg) => {
history.setMessages((prev) => appendMessageDeduped(prev, msg));
},
onActivityLog: (entry) => {
if (!sending) return;
setActivityLog((prev) => appendActivityLine(prev, entry));
@@ -97,4 +97,22 @@ describe("appendMessageDeduped", () => {
const next = appendMessageDeduped([first], dup, 100);
expect(next).toHaveLength(2);
});
// #228 regression: the originating session inserts the user message
// optimistically (useChatSend onUserMessage), THEN receives the
// server's USER_MESSAGE fan-in echo of its own message. Routing the
// echo through appendMessageDeduped (ChatTab/MobileChat onUserMessage)
// must collapse it so the sender does not see a duplicate bubble,
// while a SECOND device — which never did the optimistic insert —
// renders it fresh (covered by the "appends a brand-new message"
// case above). This is the load-bearing dedupe for the cross-session
// user-message fix.
it("collapses a USER_MESSAGE echo against the originating optimistic insert", () => {
const optimistic = createMessage("user", "what is the deploy status?");
vi.advanceTimersByTime(300); // server round-trip + fan-out, inside 3s
const fannedInEcho = createMessage("user", "what is the deploy status?");
const next = appendMessageDeduped([optimistic], fannedInEcho);
expect(next).toHaveLength(1);
expect(next[0]).toBe(optimistic);
});
});
@@ -7,6 +7,11 @@ import { createMessage, type ChatMessage } from "../types";
export interface UseChatSocketCallbacks {
onAgentMessage?: (msg: ChatMessage) => void;
/** A user message fanned in from ANOTHER session of this conversation.
* The caller MUST route this through appendMessageDeduped so the
* originating session (which already inserted it optimistically)
* does not render a duplicate bubble. (#228) */
onUserMessage?: (msg: ChatMessage) => void;
onActivityLog?: (entry: string) => void;
onSendComplete?: () => void;
onSendError?: (error: string) => void;
@@ -35,6 +40,31 @@ export function useChatSocket(
}
}, [pendingAgentMsgs, workspaceId]);
// User messages fanned in from OTHER sessions of this conversation
// (#228). Same store-then-consume shape as agent messages above. The
// callback MUST dedupe (ChatTab/MobileChat use appendMessageDeduped):
// the originating session also receives this echo and already
// rendered the message via its optimistic insert. Does NOT call
// onSendComplete — that's the agent-reply terminal signal; a user
// message arriving from another device is not this session's send
// completing.
// Optional-chained: a partial store (some component-test mocks, or a
// store-shape skew during a deploy) may not carry userMessages /
// consumeUserMessages. Degrade to no-op rather than throw — the
// agentMessages path above is unconditional so chat still works.
const pendingUserMsgs = useCanvasStore((s) => s.userMessages?.[workspaceId]);
useEffect(() => {
if (!pendingUserMsgs || pendingUserMsgs.length === 0) return;
const consume = useCanvasStore.getState().consumeUserMessages;
if (!consume) return;
const msgs = consume(workspaceId);
for (const m of msgs) {
callbacksRef.current.onUserMessage?.(
createMessage("user", m.content, m.attachments),
);
}
}, [pendingUserMsgs, workspaceId]);
const resolveWorkspaceName = useCallback((id: string) => {
const nodes = useCanvasStore.getState().nodes;
const node = nodes.find((n) => n.id === id);
@@ -53,9 +53,10 @@ function makeStore(
edges: Edge[] = [],
selectedNodeId: string | null = null,
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string }>> = {},
liveAnnouncement = ""
liveAnnouncement = "",
userMessages: Record<string, Array<{ id: string; messageId?: string; content: string; timestamp: string }>> = {}
) {
const state = { nodes, edges, selectedNodeId, agentMessages, liveAnnouncement };
const state = { nodes, edges, selectedNodeId, agentMessages, liveAnnouncement, userMessages };
const get = () => state;
const set = vi.fn((partial: Record<string, unknown>) => {
Object.assign(state, partial);
@@ -1013,3 +1014,99 @@ describe("handleCanvasEvent liveAnnouncement", () => {
expect(state.liveAnnouncement ?? "").toBe("");
});
});
// ---------------------------------------------------------------------------
// USER_MESSAGE (#228) — the user's OWN message, fanned in from the
// server so EVERY session of the conversation renders it live, not just
// the originating session. Mirrors AGENT_MESSAGE store-then-consume.
// ---------------------------------------------------------------------------
describe("handleCanvasEvent USER_MESSAGE", () => {
it("appends a user message to userMessages for the workspace", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {}, "", {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { message: "what is the status?", message_id: "req-9" },
}),
get,
set
);
expect(set).toHaveBeenCalledOnce();
const { userMessages } = set.mock.calls[0][0] as {
userMessages: Record<string, Array<{ id: string; messageId?: string; content: string; timestamp: string }>>;
};
expect(userMessages["ws-1"]).toHaveLength(1);
expect(userMessages["ws-1"][0].content).toBe("what is the status?");
expect(userMessages["ws-1"][0].messageId).toBe("req-9");
expect(typeof userMessages["ws-1"][0].id).toBe("string");
});
it("appends to existing user messages rather than replacing", () => {
const node = makeNode("ws-1");
const existing = [{ id: "old", content: "earlier", timestamp: "2024-01-01T00:00:00Z" }];
const { get, set } = makeStore([node], [], null, {}, "", { "ws-1": existing });
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { message: "later" },
}),
get,
set
);
const { userMessages } = set.mock.calls[0][0] as {
userMessages: Record<string, Array<{ id: string; content: string; timestamp: string }>>;
};
expect(userMessages["ws-1"]).toHaveLength(2);
expect(userMessages["ws-1"][0].content).toBe("earlier");
expect(userMessages["ws-1"][1].content).toBe("later");
});
it("is a no-op when both content and attachments are empty", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {}, "", {});
handleCanvasEvent(
makeMsg({ event: "USER_MESSAGE", workspace_id: "ws-1", payload: {} }),
get,
set
);
expect(set).not.toHaveBeenCalled();
});
it("carries through valid attachments and drops blank ones", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {}, "", {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
message: "",
attachments: [
{ name: "doc.pdf", uri: "workspace:/doc.pdf", mimeType: "application/pdf", size: 10 },
{ name: "", uri: "workspace:/blank" },
],
},
}),
get,
set
);
const { userMessages } = set.mock.calls[0][0] as {
userMessages: Record<string, Array<{ id: string; content: string; attachments?: Array<{ name: string; uri: string }> }>>;
};
expect(userMessages["ws-1"]).toHaveLength(1);
expect(userMessages["ws-1"][0].attachments).toHaveLength(1);
expect(userMessages["ws-1"][0].attachments?.[0].name).toBe("doc.pdf");
});
});
+57
View File
@@ -72,6 +72,7 @@ export function handleCanvasEvent(
edges: Edge[];
selectedNodeId: string | null;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
userMessages: Record<string, Array<{ id: string; messageId?: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
},
set: (partial: Record<string, unknown>) => void,
): void {
@@ -408,6 +409,62 @@ export function handleCanvasEvent(
break;
}
case "USER_MESSAGE": {
// The user's OWN outbound message, fanned in from the server so
// every session of this conversation renders it live — not just
// the originating session (which inserts it optimistically). The
// originating session ALSO receives this echo; it dedupes against
// its optimistic copy in the chat panel via appendMessageDeduped
// (content + role + 3s window), so no double bubble. Non-
// originating sessions render it fresh. message_id is the server-
// relayed request messageId — carried so a future id-stable
// dedupe can replace the time-window heuristic without a wire
// change. Mirrors the AGENT_MESSAGE store-then-consume shape so
// the realtime path stays unified across user/agent and across
// desktop/mobile surfaces. (#228)
const userContent = (msg.payload.message as string) ?? "";
const rawUserAtts = msg.payload.attachments;
const userAttachments = Array.isArray(rawUserAtts)
? (rawUserAtts as Array<{ uri?: unknown; name?: unknown; mimeType?: unknown; size?: unknown }>)
.filter((a) =>
typeof a?.uri === "string" && a.uri.length > 0 &&
typeof a?.name === "string" && a.name.length > 0,
)
.map((a) => ({
uri: a.uri as string,
name: a.name as string,
mimeType: typeof a.mimeType === "string" ? a.mimeType : undefined,
size: typeof a.size === "number" ? a.size : undefined,
}))
: undefined;
if (userContent || (userAttachments && userAttachments.length > 0)) {
const messageId =
typeof msg.payload.message_id === "string" && msg.payload.message_id.length > 0
? (msg.payload.message_id as string)
: undefined;
const { userMessages } = get();
const existing = userMessages[msg.workspace_id] || [];
set({
userMessages: {
...userMessages,
[msg.workspace_id]: [
...existing,
{
id: crypto.randomUUID(),
...(messageId ? { messageId } : {}),
content: userContent,
timestamp: new Date().toISOString(),
...(userAttachments && userAttachments.length > 0
? { attachments: userAttachments }
: {}),
},
],
},
});
}
break;
}
case "AGENT_MESSAGE": {
const content = (msg.payload.message as string) ?? "";
// Attachments come straight through from the platform's Notify
+16
View File
@@ -226,6 +226,12 @@ interface CanvasState {
/** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
/** User messages fanned in from OTHER sessions of the same conversation,
* keyed by workspace ID. ChatTab/MobileChat consume, dedupe (by the
* server-supplied messageId / content+role+time window) against the
* originating session's optimistic insert, and clear these. (#228) */
userMessages: Record<string, Array<{ id: string; messageId?: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeUserMessages: (workspaceId: string) => Array<{ id: string; messageId?: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
/** WebSocket connection status — drives the live indicator in the Toolbar. */
wsStatus: "connected" | "connecting" | "disconnected";
setWsStatus: (status: "connected" | "connecting" | "disconnected") => void;
@@ -373,6 +379,16 @@ export const useCanvasStore = create<CanvasState>((set, get) => ({
}
return msgs;
},
userMessages: {},
consumeUserMessages: (workspaceId) => {
const msgs = get().userMessages[workspaceId] || [];
if (msgs.length > 0) {
const { userMessages } = get();
const { [workspaceId]: _, ...rest } = userMessages;
set({ userMessages: rest });
}
return msgs;
},
setViewport: (v) => set({ viewport: v }),
saveViewport: async (x, y, zoom) => {
set({ viewport: { x, y, zoom } });
+10
View File
@@ -45,6 +45,15 @@ const (
EventA2AResponse EventType = "A2A_RESPONSE"
EventActivityLogged EventType = "ACTIVITY_LOGGED"
EventChannelMessage EventType = "CHANNEL_MESSAGE"
// EventUserMessage fans the user's OWN outbound canvas message out
// to every other session subscribed to the same conversation. The
// originating session renders it via an optimistic local insert;
// other sessions/devices previously got no live event for the
// user's message (only the agent's A2A_RESPONSE), so they showed
// the reply with no question until a manual refresh re-pulled
// activity_logs. Carries the request messageId so the originating
// session dedupes its optimistic copy. (#228)
EventUserMessage EventType = "USER_MESSAGE"
// Workspace lifecycle.
EventWorkspaceProvisioning EventType = "WORKSPACE_PROVISIONING"
@@ -112,6 +121,7 @@ var AllEventTypes = []EventType{
EventDelegationStatus,
EventExternalCredentialsRotated,
EventTaskUpdated,
EventUserMessage,
EventWorkspaceAwaitingAgent,
EventWorkspaceDegraded,
EventWorkspaceHeartbeat,
@@ -41,6 +41,7 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
"DELEGATION_STATUS",
"EXTERNAL_CREDENTIALS_ROTATED",
"TASK_UPDATED",
"USER_MESSAGE",
"WORKSPACE_AWAITING_AGENT",
"WORKSPACE_DEGRADED",
"WORKSPACE_HEARTBEAT",
@@ -338,6 +338,25 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
})
if callerID == "" && statusCode < 400 {
// Fan the USER's own outbound message out to every OTHER session
// subscribed to this conversation. Before this, canvas only ever
// broadcast the agent reply (EventA2AResponse below); a second
// device viewing the same workspace saw the reply appear with no
// question until a manual refresh re-pulled activity_logs. The
// originating session already rendered the message via an
// optimistic local insert and dedupes this echo by messageId +
// content (canvas appendMessageDeduped). Only canvas message/send
// carries a user bubble — skip for other methods (initialize,
// tasks/*) so we don't emit empty USER_MESSAGE noise. (#228)
if a2aMethod == "message/send" {
if userText, userMessageID, userAttachments := extractCanvasUserMessage(body); userText != "" || len(userAttachments) > 0 {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), map[string]interface{}{
"message": userText,
"message_id": userMessageID,
"attachments": userAttachments,
})
}
}
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
"response_body": json.RawMessage(respBody),
"method": a2aMethod,
@@ -346,6 +365,64 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
}
}
// extractCanvasUserMessage pulls the user-visible text, the request
// messageId (stable cross-session dedupe key), and any file parts out
// of a canvas A2A message/send envelope:
//
// {"params":{"message":{"messageId":"<uuid>","parts":[
// {"kind":"text","text":"..."},
// {"kind":"file","file":{"name","mimeType","uri","size"}}]}}}
//
// Mirrors messagestore.extractRequestText / extractFilesFromUserMessage
// (same envelope) but kept local to the handlers package rather than
// exporting those — this path needs only the shallow shape and the
// messageId, and a cross-package export would widen the messagestore
// API surface for one call site. Returns ("", "", nil) on any parse
// failure so a malformed body silently degrades to "no user-echo"
// (the agent reply still broadcasts; manual refresh still works).
func extractCanvasUserMessage(body []byte) (text string, messageID string, attachments []map[string]any) {
if len(body) == 0 {
return "", "", nil
}
var env struct {
Params struct {
Message struct {
MessageID string `json:"messageId"`
Parts []map[string]any `json:"parts"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return "", "", nil
}
messageID = env.Params.Message.MessageID
for _, p := range env.Params.Message.Parts {
if kind, _ := p["kind"].(string); kind == "file" {
if f, ok := p["file"].(map[string]any); ok {
name, _ := f["name"].(string)
uri, _ := f["uri"].(string)
if name != "" && uri != "" {
att := map[string]any{"name": name, "uri": uri}
if mt, ok := f["mimeType"].(string); ok && mt != "" {
att["mimeType"] = mt
}
if sz, ok := f["size"]; ok {
att["size"] = sz
}
attachments = append(attachments, att)
}
}
continue
}
if text == "" {
if t, ok := p["text"].(string); ok && t != "" {
text = t
}
}
}
return text, messageID, attachments
}
func nilIfEmpty(s string) *string {
if s == "" {
return nil
@@ -241,3 +241,82 @@ func TestParseUsageFromA2AResponse_MissingTokensInUsageObject(t *testing.T) {
t.Errorf("missing tokens: got (%d, %d), want (0, 0)", in, out)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// extractCanvasUserMessage tests (#228) — the load-bearing parse for the
// user-message cross-session fan-out. logA2ASuccess emits USER_MESSAGE
// only when this returns a non-empty text or attachments, so a parse
// regression here would silently re-break the bug.
// ─────────────────────────────────────────────────────────────────────────────
func TestExtractCanvasUserMessage_TextAndMessageID(t *testing.T) {
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"role":"user","messageId":"abc-123","parts":[{"kind":"text","text":"hello there"}]}}}`)
text, mid, atts := extractCanvasUserMessage(body)
if text != "hello there" {
t.Errorf("text: got %q, want %q", text, "hello there")
}
if mid != "abc-123" {
t.Errorf("messageID: got %q, want %q", mid, "abc-123")
}
if len(atts) != 0 {
t.Errorf("attachments: got %d, want 0", len(atts))
}
}
func TestExtractCanvasUserMessage_FilePartOnly(t *testing.T) {
body := []byte(`{"params":{"message":{"messageId":"m1","parts":[{"kind":"file","file":{"name":"report.pdf","mimeType":"application/pdf","uri":"workspace:/tmp/report.pdf","size":2048}}]}}}`)
text, mid, atts := extractCanvasUserMessage(body)
if text != "" {
t.Errorf("text: got %q, want empty", text)
}
if mid != "m1" {
t.Errorf("messageID: got %q, want %q", mid, "m1")
}
if len(atts) != 1 {
t.Fatalf("attachments: got %d, want 1", len(atts))
}
if atts[0]["name"] != "report.pdf" || atts[0]["uri"] != "workspace:/tmp/report.pdf" {
t.Errorf("attachment fields wrong: %+v", atts[0])
}
if atts[0]["mimeType"] != "application/pdf" {
t.Errorf("mimeType: got %v, want application/pdf", atts[0]["mimeType"])
}
}
func TestExtractCanvasUserMessage_TextPlusFile(t *testing.T) {
body := []byte(`{"params":{"message":{"messageId":"x","parts":[{"kind":"text","text":"see attached"},{"kind":"file","file":{"name":"a.txt","uri":"workspace:/a.txt"}}]}}}`)
text, _, atts := extractCanvasUserMessage(body)
if text != "see attached" {
t.Errorf("text: got %q, want %q", text, "see attached")
}
if len(atts) != 1 {
t.Fatalf("attachments: got %d, want 1", len(atts))
}
}
func TestExtractCanvasUserMessage_MalformedReturnsEmpty(t *testing.T) {
for _, body := range [][]byte{
nil,
[]byte(``),
[]byte(`not json`),
[]byte(`{"params":{}}`),
[]byte(`{"params":{"message":{"parts":[]}}}`),
} {
text, mid, atts := extractCanvasUserMessage(body)
if text != "" || mid != "" || len(atts) != 0 {
t.Errorf("body %q: got (%q,%q,%d), want empty — malformed must degrade to no-echo", string(body), text, mid, len(atts))
}
}
}
func TestExtractCanvasUserMessage_FileMissingNameOrURIDropped(t *testing.T) {
// gin binding does not enforce required on slice-element struct
// fields without `dive`; a malformed file part could carry uri:""
// or name:"". Defence-in-depth: drop those so the broadcast
// doesn't carry a blank chip (mirrors the canvas-side filter).
body := []byte(`{"params":{"message":{"messageId":"m","parts":[{"kind":"file","file":{"name":"","uri":"workspace:/x"}},{"kind":"file","file":{"name":"ok.txt","uri":""}}]}}}`)
_, _, atts := extractCanvasUserMessage(body)
if len(atts) != 0 {
t.Errorf("attachments: got %d, want 0 (blank name/uri must be dropped)", len(atts))
}
}