fix(both): fan user's own message to all conversation sessions (#1440) #1514

Open
fullstack-engineer wants to merge 3 commits from fix/user-message-fanout-1440 into staging
10 changed files with 599 additions and 7 deletions
+1 -1
View File
@@ -271,7 +271,7 @@ export function MobileChat({
const msgs = consume(agentId);
for (const m of msgs) {
appendMessageDeduped(
createMessage("agent", m.content, m.attachments),
createMessage(m.role ?? "agent", m.content, m.attachments),
);
}
}, [historyLoading, agentId, appendMessageDeduped]);
@@ -27,7 +27,7 @@ export function useChatSocket(
const msgs = consume(workspaceId);
for (const m of msgs) {
callbacksRef.current.onAgentMessage?.(
createMessage("agent", m.content, m.attachments),
createMessage(m.role ?? "agent", m.content, m.attachments),
);
}
if (msgs.length > 0) {
+130
View File
@@ -0,0 +1,130 @@
/**
* ws-events.ts — canonical WebSocket event taxonomy shared between
* the Go platform and the TypeScript canvas.
*
* Source of truth: `workspace-server/internal/events/types.go` (Go side).
* This file is the canvas mirror — every constant in that file MUST appear
* here. The go vet / build pipeline does NOT enforce this today; the
* discipline is manual: when adding a new EventType in Go, mirror it here.
*
* Consumer usage:
* import { WS_EVENTS } from "@/lib/ws-events";
* switch (msg.event) {
* case WS_EVENTS.AGENT_MESSAGE: ...
* case WS_EVENTS.USER_MESSAGE: ...
* }
*
* Wire format: the string literal is sent over the WebSocket as
* `WSMessage.Event`. Do NOT change these values.
*/
// Chat / agent messaging.
export const WS_AGENT_MESSAGE = "AGENT_MESSAGE";
export const WS_A2A_RESPONSE = "A2A_RESPONSE";
export const WS_USER_MESSAGE = "USER_MESSAGE";
export const WS_ACTIVITY_LOGGED = "ACTIVITY_LOGGED";
export const WS_CHANNEL_MESSAGE = "CHANNEL_MESSAGE";
// Workspace lifecycle.
export const WS_WORKSPACE_PROVISIONING = "WORKSPACE_PROVISIONING";
export const WS_WORKSPACE_PROVISION_FAILED = "WORKSPACE_PROVISION_FAILED";
export const WS_WORKSPACE_ONLINE = "WORKSPACE_ONLINE";
export const WS_WORKSPACE_OFFLINE = "WORKSPACE_OFFLINE";
export const WS_WORKSPACE_DEGRADED = "WORKSPACE_DEGRADED";
export const WS_WORKSPACE_HIBERNATED = "WORKSPACE_HIBERNATED";
export const WS_WORKSPACE_PAUSED = "WORKSPACE_PAUSED";
export const WS_WORKSPACE_REMOVED = "WORKSPACE_REMOVED";
export const WS_WORKSPACE_AWAITING_AGENT = "WORKSPACE_AWAITING_AGENT";
export const WS_WORKSPACE_HEARTBEAT = "WORKSPACE_HEARTBEAT";
// Agent assignment + identity.
export const WS_AGENT_ASSIGNED = "AGENT_ASSIGNED";
export const WS_AGENT_REPLACED = "AGENT_REPLACED";
export const WS_AGENT_REMOVED = "AGENT_REMOVED";
export const WS_AGENT_MOVED = "AGENT_MOVED";
export const WS_AGENT_CARD_UPDATED = "AGENT_CARD_UPDATED";
// Delegation lifecycle.
export const WS_DELEGATION_SENT = "DELEGATION_SENT";
export const WS_DELEGATION_STATUS = "DELEGATION_STATUS";
export const WS_DELEGATION_COMPLETE = "DELEGATION_COMPLETE";
export const WS_DELEGATION_FAILED = "DELEGATION_FAILED";
// Task progression + scheduler.
export const WS_TASK_UPDATED = "TASK_UPDATED";
export const WS_CRON_EXECUTED = "CRON_EXECUTED";
export const WS_CRON_SKIPPED = "CRON_SKIPPED";
// Approvals.
export const WS_APPROVAL_REQUESTED = "APPROVAL_REQUESTED";
export const WS_APPROVAL_ESCALATED = "APPROVAL_ESCALATED";
// Auth / credentials.
export const WS_EXTERNAL_CREDENTIALS_ROTATED = "EXTERNAL_CREDENTIALS_ROTATED";
/** Union type of all known event names. Used to type WSMessage.Event. */
export type WSEventName =
| typeof WS_AGENT_MESSAGE
| typeof WS_A2A_RESPONSE
| typeof WS_USER_MESSAGE
| typeof WS_ACTIVITY_LOGGED
| typeof WS_CHANNEL_MESSAGE
| typeof WS_WORKSPACE_PROVISIONING
| typeof WS_WORKSPACE_PROVISION_FAILED
| typeof WS_WORKSPACE_ONLINE
| typeof WS_WORKSPACE_OFFLINE
| typeof WS_WORKSPACE_DEGRADED
| typeof WS_WORKSPACE_HIBERNATED
| typeof WS_WORKSPACE_PAUSED
| typeof WS_WORKSPACE_REMOVED
| typeof WS_WORKSPACE_AWAITING_AGENT
| typeof WS_WORKSPACE_HEARTBEAT
| typeof WS_AGENT_ASSIGNED
| typeof WS_AGENT_REPLACED
| typeof WS_AGENT_REMOVED
| typeof WS_AGENT_MOVED
| typeof WS_AGENT_CARD_UPDATED
| typeof WS_DELEGATION_SENT
| typeof WS_DELEGATION_STATUS
| typeof WS_DELEGATION_COMPLETE
| typeof WS_DELEGATION_FAILED
| typeof WS_TASK_UPDATED
| typeof WS_CRON_EXECUTED
| typeof WS_CRON_SKIPPED
| typeof WS_APPROVAL_REQUESTED
| typeof WS_APPROVAL_ESCALATED
| typeof WS_EXTERNAL_CREDENTIALS_ROTATED;
/** All event name constants, for exhaustive-switch linting. */
export const WS_EVENTS = {
AGENT_MESSAGE: WS_AGENT_MESSAGE,
A2A_RESPONSE: WS_A2A_RESPONSE,
USER_MESSAGE: WS_USER_MESSAGE,
ACTIVITY_LOGGED: WS_ACTIVITY_LOGGED,
CHANNEL_MESSAGE: WS_CHANNEL_MESSAGE,
WORKSPACE_PROVISIONING: WS_WORKSPACE_PROVISIONING,
WORKSPACE_PROVISION_FAILED: WS_WORKSPACE_PROVISION_FAILED,
WORKSPACE_ONLINE: WS_WORKSPACE_ONLINE,
WORKSPACE_OFFLINE: WS_WORKSPACE_OFFLINE,
WORKSPACE_DEGRADED: WS_WORKSPACE_DEGRADED,
WORKSPACE_HIBERNATED: WS_WORKSPACE_HIBERNATED,
WORKSPACE_PAUSED: WS_WORKSPACE_PAUSED,
WORKSPACE_REMOVED: WS_WORKSPACE_REMOVED,
WORKSPACE_AWAITING_AGENT: WS_WORKSPACE_AWAITING_AGENT,
WORKSPACE_HEARTBEAT: WS_WORKSPACE_HEARTBEAT,
AGENT_ASSIGNED: WS_AGENT_ASSIGNED,
AGENT_REPLACED: WS_AGENT_REPLACED,
AGENT_REMOVED: WS_AGENT_REMOVED,
AGENT_MOVED: WS_AGENT_MOVED,
AGENT_CARD_UPDATED: WS_AGENT_CARD_UPDATED,
DELEGATION_SENT: WS_DELEGATION_SENT,
DELEGATION_STATUS: WS_DELEGATION_STATUS,
DELEGATION_COMPLETE: WS_DELEGATION_COMPLETE,
DELEGATION_FAILED: WS_DELEGATION_FAILED,
TASK_UPDATED: WS_TASK_UPDATED,
CRON_EXECUTED: WS_CRON_EXECUTED,
CRON_SKIPPED: WS_CRON_SKIPPED,
APPROVAL_REQUESTED: WS_APPROVAL_REQUESTED,
APPROVAL_ESCALATED: WS_APPROVAL_ESCALATED,
EXTERNAL_CREDENTIALS_ROTATED: WS_EXTERNAL_CREDENTIALS_ROTATED,
} as const;
@@ -808,6 +808,210 @@ describe("handleCanvasEvent A2A_RESPONSE", () => {
});
});
// ---------------------------------------------------------------------------
// USER_MESSAGE (cross-session fan-out of user's own outbound message)
// ---------------------------------------------------------------------------
// #1440: user's own message is optimistically inserted in the originating
// session by useChatSend; other sessions need this event to render it.
describe("handleCanvasEvent USER_MESSAGE", () => {
it("appends a user message to agentMessages for the workspace", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-abc",
content: "Hello, agent!",
},
}),
get,
set
);
expect(set).toHaveBeenCalledOnce();
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].id).toBe("msg-abc");
expect(agentMessages["ws-1"][0].content).toBe("Hello, agent!");
expect(agentMessages["ws-1"][0].role).toBe("user");
expect(typeof agentMessages["ws-1"][0].timestamp).toBe("string");
});
it("appends to existing messages rather than replacing them", () => {
const node = makeNode("ws-1");
const existing = [{ id: "old", content: "prior msg", timestamp: "2024-01-01T00:00:00Z" }];
const { get, set } = makeStore([node], [], null, { "ws-1": existing });
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-xyz",
content: "second user message",
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; content: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(2);
expect(agentMessages["ws-1"][0].content).toBe("prior msg");
expect(agentMessages["ws-1"][1].content).toBe("second user message");
expect(agentMessages["ws-1"][1].id).toBe("msg-xyz");
});
it("is a no-op when both content and files are absent", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node]);
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {},
}),
get,
set
);
expect(set).not.toHaveBeenCalled();
});
it("is a no-op when content is empty string", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node]);
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { content: "" },
}),
get,
set
);
expect(set).not.toHaveBeenCalled();
});
it("passes through valid file attachments", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
const att = {
uri: "workspace:/uploads/doc.pdf",
name: "doc.pdf",
mimeType: "application/pdf",
size: 98765,
};
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-with-file",
content: "see attached",
files: [att],
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string; attachments?: Array<{ uri: string; name: string; mimeType?: string; size?: number }> }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].id).toBe("msg-with-file");
expect(agentMessages["ws-1"][0].role).toBe("user");
expect(agentMessages["ws-1"][0].attachments).toEqual([att]);
});
it("drops file entries with missing or empty uri/name", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
// Two bad entries followed by one good one
const bad = [{ uri: "" }, { name: "" }, { uri: "x", name: "y" }];
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "msg-bad-files",
content: "check files",
files: bad as Array<{ uri?: unknown; name?: unknown }>,
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ attachments?: Array<{ uri: string }> }>>;
};
// Only the valid entry should survive the filter.
expect(agentMessages["ws-1"][0].attachments).toHaveLength(1);
expect(agentMessages["ws-1"][0].attachments![0].uri).toBe("x");
});
it("uses crypto.randomUUID() when messageId is absent", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: { content: "no id field" },
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ id: string }>>;
};
expect(agentMessages["ws-1"][0].id).toBeDefined();
expect(typeof agentMessages["ws-1"][0].id).toBe("string");
});
it("renders a files-only message (no text) when content is absent but files present", () => {
const node = makeNode("ws-1");
const { get, set } = makeStore([node], [], null, {});
handleCanvasEvent(
makeMsg({
event: "USER_MESSAGE",
workspace_id: "ws-1",
payload: {
messageId: "file-only",
files: [{ uri: "workspace:/x.pdf", name: "x.pdf" }],
},
}),
get,
set
);
const { agentMessages } = set.mock.calls[0][0] as {
agentMessages: Record<string, Array<{ content: string }>>;
};
expect(agentMessages["ws-1"]).toHaveLength(1);
expect(agentMessages["ws-1"][0].content).toBe("");
});
});
// ---------------------------------------------------------------------------
// Unknown event
// ---------------------------------------------------------------------------
+61 -1
View File
@@ -71,7 +71,7 @@ export function handleCanvasEvent(
nodes: Node<WorkspaceNodeData>[];
edges: Edge[];
selectedNodeId: string | null;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
},
set: (partial: Record<string, unknown>) => void,
): void {
@@ -515,6 +515,66 @@ export function handleCanvasEvent(
break;
}
// #1440 USER_MESSAGE: the canvas optimistically inserts the user's
// own message into the originating session's store before the request
// fires (useChatSend → onUserMessage). Other sessions never saw it —
// the agent reply was broadcast but not the user's own text. This event
// fans the user's message to every OTHER session so they render the
// bubble without requiring a manual refresh. The originating session
// collapses its optimistic copy via the dedup mechanism in ChatTab
// (role+content+timestamp window) — no double bubble.
//
// Payload shape mirrors EventUserMessage in Go
// a2a_proxy_helpers.go: userMessagePayload:
// messageId: string
// content: string
// files: ParsedFilePart[] (name, uri, mimeType, size)
case "USER_MESSAGE": {
const payload = msg.payload as {
messageId?: string;
content?: string;
files?: Array<{ name?: unknown; uri?: unknown; mimeType?: unknown; size?: unknown }>;
};
const content = typeof payload?.content === "string" ? payload.content : "";
const files: Array<{ name: string; uri: string; mimeType?: string; size?: number }> = [];
if (Array.isArray(payload?.files)) {
for (const f of payload.files) {
if (typeof f?.uri === "string" && typeof f?.name === "string") {
files.push({
name: f.name,
uri: f.uri,
mimeType: typeof f.mimeType === "string" ? f.mimeType : undefined,
size: typeof f.size === "number" ? f.size : undefined,
});
}
}
}
// Render only when there's something visible.
if (content || files.length > 0) {
// Insert into agentMessages for rendering as a user-bubble.
// ChatTab uses msg.role === "user" for right-side alignment and
// user-toned styling, so we must set role:"user" explicitly.
const { agentMessages } = get();
const existing = agentMessages[msg.workspace_id] || [];
set({
agentMessages: {
...agentMessages,
[msg.workspace_id]: [
...existing,
{
id: payload?.messageId ?? crypto.randomUUID(),
content,
role: "user",
timestamp: new Date().toISOString(),
...(files.length > 0 ? { attachments: files } : {}),
},
],
},
});
}
break;
}
default:
break;
}
+2 -2
View File
@@ -224,8 +224,8 @@ interface CanvasState {
batchPause: () => Promise<void>;
batchDelete: () => Promise<void>;
/** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
/** WebSocket connection status — drives the live indicator in the Toolbar. */
wsStatus: "connected" | "connecting" | "disconnected";
setWsStatus: (status: "connected" | "connecting" | "disconnected") => void;
+4 -2
View File
@@ -41,8 +41,9 @@ type EventType string
// scan-friendly as it grows.
const (
// Chat / agent messaging — surfaces in canvas chat panels.
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventAgentMessage EventType = "AGENT_MESSAGE"
EventA2AResponse EventType = "A2A_RESPONSE"
EventUserMessage EventType = "USER_MESSAGE"
EventActivityLogged EventType = "ACTIVITY_LOGGED"
EventChannelMessage EventType = "CHANNEL_MESSAGE"
@@ -104,6 +105,7 @@ var AllEventTypes = []EventType{
EventApprovalEscalated,
EventApprovalRequested,
EventChannelMessage,
EventUserMessage,
EventCronExecuted,
EventCronSkipped,
EventDelegationComplete,
@@ -41,6 +41,7 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
"DELEGATION_STATUS",
"EXTERNAL_CREDENTIALS_ROTATED",
"TASK_UPDATED",
"USER_MESSAGE",
"WORKSPACE_AWAITING_AGENT",
"WORKSPACE_DEGRADED",
"WORKSPACE_HEARTBEAT",
@@ -344,6 +344,106 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
"duration_ms": durationMs,
})
}
// #1440: fan user's own outbound message to all sessions of this workspace.
// When a canvas user sends a message, the originating session renders it
// optimistically (useChatSend inserts it into the local store immediately).
// Other sessions never saw it — only the agent reply was broadcast. The fix:
// parse the JSON-RPC body and broadcast a USER_MESSAGE event so every
// other session renders the user's bubble without a manual refresh.
// Skipped for non-2xx responses (no message was delivered) and non-canvas
// callers (workspace-to-workspace calls have their own fan-out via the
// channel system).
if callerID == "" && statusCode < 400 {
if payload := extractUserMessagePayload(body, a2aMethod); payload != nil {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), payload)
}
}
}
// userMessagePayload carries the fields the canvas needs to render a user
// bubble from a cross-session fan-out event.
type userMessagePayload struct {
MessageID string `json:"messageId,omitempty"`
Content string `json:"content,omitempty"`
Files []userMessageFile `json:"files,omitempty"`
}
// userMessageFile mirrors the shape canvas/src/components/tabs/chat/message-parser.ts
// ParsedFilePart so the canvas can render the same download chip.
type userMessageFile struct {
Name string `json:"name"`
URI string `json:"uri"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
}
// extractUserMessagePayload parses a JSON-RPC message/send body and extracts the
// user-visible fields (text content + file attachments) for fan-out broadcasting.
// Returns nil when the body is not a canvas message/send request — either the
// method is not message/send, or the message does not have role=user (e.g. a
// heartbeat ping sent by the canvas for keepalive, which carries role:agent).
// Safe to call on any body — malformed JSON returns nil with no side effects.
func extractUserMessagePayload(body []byte, method string) map[string]interface{} {
if method != "message/send" {
return nil
}
var rpc struct {
ID string `json:"id,omitempty"`
Params struct {
Message struct {
Role string `json:"role,omitempty"`
MessageID string `json:"messageId,omitempty"`
Parts []struct {
Kind string `json:"kind,omitempty"`
Text string `json:"text,omitempty"`
File *struct {
Name string `json:"name,omitempty"`
URI string `json:"uri,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
} `json:"file,omitempty"`
} `json:"parts,omitempty"`
} `json:"message,omitempty"`
} `json:"params,omitempty"`
}
if err := json.Unmarshal(body, &rpc); err != nil {
return nil
}
// Only broadcast user-role messages. The canvas also sends internal
// heartbeat pings via message/send (role:agent) that should not appear
// as user bubbles.
if rpc.Params.Message.Role != "user" {
return nil
}
var content string
var files []userMessageFile
for _, part := range rpc.Params.Message.Parts {
if part.Kind == "text" && part.Text != "" {
if content != "" {
content += "\n"
}
content += part.Text
} else if part.Kind == "file" && part.File != nil {
files = append(files, userMessageFile{
Name: part.File.Name,
URI: part.File.URI,
MimeType: part.File.MimeType,
Size: part.File.Size,
})
}
}
// Skip pure-noise events (e.g. empty send with no text and no files).
// The canvas optimistically inserts the bubble before the request; an
// empty broadcast would create an extra blank bubble in other sessions.
if content == "" && len(files) == 0 {
return nil
}
return map[string]interface{}{
"messageId": rpc.Params.Message.MessageID,
"content": content,
"files": files,
}
}
func nilIfEmpty(s string) *string {
@@ -2024,6 +2024,101 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
time.Sleep(80 * time.Millisecond)
}
// ──────────────────────────────────────────────────────────────────────────────
// extractUserMessagePayload fan-out of user's own outbound message (#1440)
// ──────────────────────────────────────────────────────────────────────────────
func TestExtractUserMessagePayload_BasicText(t *testing.T) {
body := []byte(`{"id":"mid-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"hello world"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["messageId"] != "msg-1" {
t.Errorf("messageId: got %v", got["messageId"])
}
if got["content"] != "hello world" {
t.Errorf("content: got %v", got["content"])
}
}
func TestExtractUserMessagePayload_MultipleTextParts(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"line one"},{"kind":"text","text":"line two"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["content"] != "line one\nline two" {
t.Errorf("content: got %v", got["content"])
}
}
func TestExtractUserMessagePayload_WithFile(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"check this"},{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf","mimeType":"application/pdf","size":12345}}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got == nil {
t.Fatal("expected non-nil payload")
}
if got["content"] != "check this" {
t.Errorf("content: got %v", got["content"])
}
files, ok := got["files"].([]userMessageFile)
if !ok {
t.Fatalf("files type: got %T", got["files"])
}
if len(files) != 1 {
t.Errorf("files count: got %d", len(files))
}
if files[0].Name != "report.pdf" {
t.Errorf("file name: got %s", files[0].Name)
}
if files[0].URI != "workspace:/uploads/report.pdf" {
t.Errorf("file uri: got %s", files[0].URI)
}
}
func TestExtractUserMessagePayload_EmptyParts(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for empty parts, got %v", got)
}
}
func TestExtractUserMessagePayload_NonMessageSendMethod(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"hello"}]}}}`)
got := extractUserMessagePayload(body, "initialize")
if got != nil {
t.Errorf("expected nil for non-message/send, got %v", got)
}
}
func TestExtractUserMessagePayload_NonUserRole(t *testing.T) {
// Canvas also sends heartbeat pings via message/send with role:agent;
// these must NOT appear as user bubbles.
body := []byte(`{"params":{"message":{"role":"agent","parts":[{"kind":"text","text":"ping"}]}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for role=agent, got %v", got)
}
}
func TestExtractUserMessagePayload_MalformedJSON(t *testing.T) {
body := []byte(`not json at all`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for malformed JSON, got %v", got)
}
}
func TestExtractUserMessagePayload_MissingPartsField(t *testing.T) {
body := []byte(`{"params":{"message":{"role":"user"}}}`)
got := extractUserMessagePayload(body, "message/send")
if got != nil {
t.Errorf("expected nil for missing parts, got %v", got)
}
}
// ──────────────────────────────────────────────────────────────────────────────
// A2A auto-wake: hibernated workspace (#711)
// ──────────────────────────────────────────────────────────────────────────────