diff --git a/canvas/src/components/mobile/MobileChat.tsx b/canvas/src/components/mobile/MobileChat.tsx index b5940a0ea..569f540eb 100644 --- a/canvas/src/components/mobile/MobileChat.tsx +++ b/canvas/src/components/mobile/MobileChat.tsx @@ -271,7 +271,7 @@ export function MobileChat({ const msgs = consume(agentId); for (const m of msgs) { appendMessageDeduped( - createMessage("agent", m.content, m.attachments), + createMessage(m.role ?? "agent", m.content, m.attachments), ); } }, [historyLoading, agentId, appendMessageDeduped]); diff --git a/canvas/src/components/tabs/chat/hooks/useChatSocket.ts b/canvas/src/components/tabs/chat/hooks/useChatSocket.ts index 15815e9a8..3dc532436 100644 --- a/canvas/src/components/tabs/chat/hooks/useChatSocket.ts +++ b/canvas/src/components/tabs/chat/hooks/useChatSocket.ts @@ -27,7 +27,7 @@ export function useChatSocket( const msgs = consume(workspaceId); for (const m of msgs) { callbacksRef.current.onAgentMessage?.( - createMessage("agent", m.content, m.attachments), + createMessage(m.role ?? "agent", m.content, m.attachments), ); } if (msgs.length > 0) { diff --git a/canvas/src/lib/ws-events.ts b/canvas/src/lib/ws-events.ts new file mode 100644 index 000000000..792290820 --- /dev/null +++ b/canvas/src/lib/ws-events.ts @@ -0,0 +1,130 @@ +/** + * ws-events.ts — canonical WebSocket event taxonomy shared between + * the Go platform and the TypeScript canvas. + * + * Source of truth: `workspace-server/internal/events/types.go` (Go side). + * This file is the canvas mirror — every constant in that file MUST appear + * here. The go vet / build pipeline does NOT enforce this today; the + * discipline is manual: when adding a new EventType in Go, mirror it here. + * + * Consumer usage: + * import { WS_EVENTS } from "@/lib/ws-events"; + * switch (msg.event) { + * case WS_EVENTS.AGENT_MESSAGE: ... + * case WS_EVENTS.USER_MESSAGE: ... + * } + * + * Wire format: the string literal is sent over the WebSocket as + * `WSMessage.Event`. Do NOT change these values. + */ + +// Chat / agent messaging. +export const WS_AGENT_MESSAGE = "AGENT_MESSAGE"; +export const WS_A2A_RESPONSE = "A2A_RESPONSE"; +export const WS_USER_MESSAGE = "USER_MESSAGE"; +export const WS_ACTIVITY_LOGGED = "ACTIVITY_LOGGED"; +export const WS_CHANNEL_MESSAGE = "CHANNEL_MESSAGE"; + +// Workspace lifecycle. +export const WS_WORKSPACE_PROVISIONING = "WORKSPACE_PROVISIONING"; +export const WS_WORKSPACE_PROVISION_FAILED = "WORKSPACE_PROVISION_FAILED"; +export const WS_WORKSPACE_ONLINE = "WORKSPACE_ONLINE"; +export const WS_WORKSPACE_OFFLINE = "WORKSPACE_OFFLINE"; +export const WS_WORKSPACE_DEGRADED = "WORKSPACE_DEGRADED"; +export const WS_WORKSPACE_HIBERNATED = "WORKSPACE_HIBERNATED"; +export const WS_WORKSPACE_PAUSED = "WORKSPACE_PAUSED"; +export const WS_WORKSPACE_REMOVED = "WORKSPACE_REMOVED"; +export const WS_WORKSPACE_AWAITING_AGENT = "WORKSPACE_AWAITING_AGENT"; +export const WS_WORKSPACE_HEARTBEAT = "WORKSPACE_HEARTBEAT"; + +// Agent assignment + identity. +export const WS_AGENT_ASSIGNED = "AGENT_ASSIGNED"; +export const WS_AGENT_REPLACED = "AGENT_REPLACED"; +export const WS_AGENT_REMOVED = "AGENT_REMOVED"; +export const WS_AGENT_MOVED = "AGENT_MOVED"; +export const WS_AGENT_CARD_UPDATED = "AGENT_CARD_UPDATED"; + +// Delegation lifecycle. +export const WS_DELEGATION_SENT = "DELEGATION_SENT"; +export const WS_DELEGATION_STATUS = "DELEGATION_STATUS"; +export const WS_DELEGATION_COMPLETE = "DELEGATION_COMPLETE"; +export const WS_DELEGATION_FAILED = "DELEGATION_FAILED"; + +// Task progression + scheduler. +export const WS_TASK_UPDATED = "TASK_UPDATED"; +export const WS_CRON_EXECUTED = "CRON_EXECUTED"; +export const WS_CRON_SKIPPED = "CRON_SKIPPED"; + +// Approvals. +export const WS_APPROVAL_REQUESTED = "APPROVAL_REQUESTED"; +export const WS_APPROVAL_ESCALATED = "APPROVAL_ESCALATED"; + +// Auth / credentials. +export const WS_EXTERNAL_CREDENTIALS_ROTATED = "EXTERNAL_CREDENTIALS_ROTATED"; + +/** Union type of all known event names. Used to type WSMessage.Event. */ +export type WSEventName = + | typeof WS_AGENT_MESSAGE + | typeof WS_A2A_RESPONSE + | typeof WS_USER_MESSAGE + | typeof WS_ACTIVITY_LOGGED + | typeof WS_CHANNEL_MESSAGE + | typeof WS_WORKSPACE_PROVISIONING + | typeof WS_WORKSPACE_PROVISION_FAILED + | typeof WS_WORKSPACE_ONLINE + | typeof WS_WORKSPACE_OFFLINE + | typeof WS_WORKSPACE_DEGRADED + | typeof WS_WORKSPACE_HIBERNATED + | typeof WS_WORKSPACE_PAUSED + | typeof WS_WORKSPACE_REMOVED + | typeof WS_WORKSPACE_AWAITING_AGENT + | typeof WS_WORKSPACE_HEARTBEAT + | typeof WS_AGENT_ASSIGNED + | typeof WS_AGENT_REPLACED + | typeof WS_AGENT_REMOVED + | typeof WS_AGENT_MOVED + | typeof WS_AGENT_CARD_UPDATED + | typeof WS_DELEGATION_SENT + | typeof WS_DELEGATION_STATUS + | typeof WS_DELEGATION_COMPLETE + | typeof WS_DELEGATION_FAILED + | typeof WS_TASK_UPDATED + | typeof WS_CRON_EXECUTED + | typeof WS_CRON_SKIPPED + | typeof WS_APPROVAL_REQUESTED + | typeof WS_APPROVAL_ESCALATED + | typeof WS_EXTERNAL_CREDENTIALS_ROTATED; + +/** All event name constants, for exhaustive-switch linting. */ +export const WS_EVENTS = { + AGENT_MESSAGE: WS_AGENT_MESSAGE, + A2A_RESPONSE: WS_A2A_RESPONSE, + USER_MESSAGE: WS_USER_MESSAGE, + ACTIVITY_LOGGED: WS_ACTIVITY_LOGGED, + CHANNEL_MESSAGE: WS_CHANNEL_MESSAGE, + WORKSPACE_PROVISIONING: WS_WORKSPACE_PROVISIONING, + WORKSPACE_PROVISION_FAILED: WS_WORKSPACE_PROVISION_FAILED, + WORKSPACE_ONLINE: WS_WORKSPACE_ONLINE, + WORKSPACE_OFFLINE: WS_WORKSPACE_OFFLINE, + WORKSPACE_DEGRADED: WS_WORKSPACE_DEGRADED, + WORKSPACE_HIBERNATED: WS_WORKSPACE_HIBERNATED, + WORKSPACE_PAUSED: WS_WORKSPACE_PAUSED, + WORKSPACE_REMOVED: WS_WORKSPACE_REMOVED, + WORKSPACE_AWAITING_AGENT: WS_WORKSPACE_AWAITING_AGENT, + WORKSPACE_HEARTBEAT: WS_WORKSPACE_HEARTBEAT, + AGENT_ASSIGNED: WS_AGENT_ASSIGNED, + AGENT_REPLACED: WS_AGENT_REPLACED, + AGENT_REMOVED: WS_AGENT_REMOVED, + AGENT_MOVED: WS_AGENT_MOVED, + AGENT_CARD_UPDATED: WS_AGENT_CARD_UPDATED, + DELEGATION_SENT: WS_DELEGATION_SENT, + DELEGATION_STATUS: WS_DELEGATION_STATUS, + DELEGATION_COMPLETE: WS_DELEGATION_COMPLETE, + DELEGATION_FAILED: WS_DELEGATION_FAILED, + TASK_UPDATED: WS_TASK_UPDATED, + CRON_EXECUTED: WS_CRON_EXECUTED, + CRON_SKIPPED: WS_CRON_SKIPPED, + APPROVAL_REQUESTED: WS_APPROVAL_REQUESTED, + APPROVAL_ESCALATED: WS_APPROVAL_ESCALATED, + EXTERNAL_CREDENTIALS_ROTATED: WS_EXTERNAL_CREDENTIALS_ROTATED, +} as const; diff --git a/canvas/src/store/__tests__/canvas-events.test.ts b/canvas/src/store/__tests__/canvas-events.test.ts index f6e0924d4..7a5571632 100644 --- a/canvas/src/store/__tests__/canvas-events.test.ts +++ b/canvas/src/store/__tests__/canvas-events.test.ts @@ -808,6 +808,210 @@ describe("handleCanvasEvent – A2A_RESPONSE", () => { }); }); +// --------------------------------------------------------------------------- +// USER_MESSAGE (cross-session fan-out of user's own outbound message) +// --------------------------------------------------------------------------- + +// #1440: user's own message is optimistically inserted in the originating +// session by useChatSend; other sessions need this event to render it. + +describe("handleCanvasEvent – USER_MESSAGE", () => { + it("appends a user message to agentMessages for the workspace", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node], [], null, {}); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { + messageId: "msg-abc", + content: "Hello, agent!", + }, + }), + get, + set + ); + + expect(set).toHaveBeenCalledOnce(); + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record>; + }; + expect(agentMessages["ws-1"]).toHaveLength(1); + expect(agentMessages["ws-1"][0].id).toBe("msg-abc"); + expect(agentMessages["ws-1"][0].content).toBe("Hello, agent!"); + expect(agentMessages["ws-1"][0].role).toBe("user"); + expect(typeof agentMessages["ws-1"][0].timestamp).toBe("string"); + }); + + it("appends to existing messages rather than replacing them", () => { + const node = makeNode("ws-1"); + const existing = [{ id: "old", content: "prior msg", timestamp: "2024-01-01T00:00:00Z" }]; + const { get, set } = makeStore([node], [], null, { "ws-1": existing }); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { + messageId: "msg-xyz", + content: "second user message", + }, + }), + get, + set + ); + + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record>; + }; + expect(agentMessages["ws-1"]).toHaveLength(2); + expect(agentMessages["ws-1"][0].content).toBe("prior msg"); + expect(agentMessages["ws-1"][1].content).toBe("second user message"); + expect(agentMessages["ws-1"][1].id).toBe("msg-xyz"); + }); + + it("is a no-op when both content and files are absent", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node]); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: {}, + }), + get, + set + ); + + expect(set).not.toHaveBeenCalled(); + }); + + it("is a no-op when content is empty string", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node]); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { content: "" }, + }), + get, + set + ); + + expect(set).not.toHaveBeenCalled(); + }); + + it("passes through valid file attachments", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node], [], null, {}); + const att = { + uri: "workspace:/uploads/doc.pdf", + name: "doc.pdf", + mimeType: "application/pdf", + size: 98765, + }; + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { + messageId: "msg-with-file", + content: "see attached", + files: [att], + }, + }), + get, + set + ); + + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record }>>; + }; + expect(agentMessages["ws-1"]).toHaveLength(1); + expect(agentMessages["ws-1"][0].id).toBe("msg-with-file"); + expect(agentMessages["ws-1"][0].role).toBe("user"); + expect(agentMessages["ws-1"][0].attachments).toEqual([att]); + }); + + it("drops file entries with missing or empty uri/name", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node], [], null, {}); + // Two bad entries followed by one good one + const bad = [{ uri: "" }, { name: "" }, { uri: "x", name: "y" }]; + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { + messageId: "msg-bad-files", + content: "check files", + files: bad as Array<{ uri?: unknown; name?: unknown }>, + }, + }), + get, + set + ); + + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record }>>; + }; + // Only the valid entry should survive the filter. + expect(agentMessages["ws-1"][0].attachments).toHaveLength(1); + expect(agentMessages["ws-1"][0].attachments![0].uri).toBe("x"); + }); + + it("uses crypto.randomUUID() when messageId is absent", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node], [], null, {}); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { content: "no id field" }, + }), + get, + set + ); + + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record>; + }; + expect(agentMessages["ws-1"][0].id).toBeDefined(); + expect(typeof agentMessages["ws-1"][0].id).toBe("string"); + }); + + it("renders a files-only message (no text) when content is absent but files present", () => { + const node = makeNode("ws-1"); + const { get, set } = makeStore([node], [], null, {}); + + handleCanvasEvent( + makeMsg({ + event: "USER_MESSAGE", + workspace_id: "ws-1", + payload: { + messageId: "file-only", + files: [{ uri: "workspace:/x.pdf", name: "x.pdf" }], + }, + }), + get, + set + ); + + const { agentMessages } = set.mock.calls[0][0] as { + agentMessages: Record>; + }; + expect(agentMessages["ws-1"]).toHaveLength(1); + expect(agentMessages["ws-1"][0].content).toBe(""); + }); +}); + // --------------------------------------------------------------------------- // Unknown event // --------------------------------------------------------------------------- diff --git a/canvas/src/store/canvas-events.ts b/canvas/src/store/canvas-events.ts index 97b204e29..ada09a906 100644 --- a/canvas/src/store/canvas-events.ts +++ b/canvas/src/store/canvas-events.ts @@ -71,7 +71,7 @@ export function handleCanvasEvent( nodes: Node[]; edges: Edge[]; selectedNodeId: string | null; - agentMessages: Record }>>; + agentMessages: Record }>>; }, set: (partial: Record) => void, ): void { @@ -515,6 +515,66 @@ export function handleCanvasEvent( break; } + // #1440 USER_MESSAGE: the canvas optimistically inserts the user's + // own message into the originating session's store before the request + // fires (useChatSend → onUserMessage). Other sessions never saw it — + // the agent reply was broadcast but not the user's own text. This event + // fans the user's message to every OTHER session so they render the + // bubble without requiring a manual refresh. The originating session + // collapses its optimistic copy via the dedup mechanism in ChatTab + // (role+content+timestamp window) — no double bubble. + // + // Payload shape mirrors EventUserMessage in Go + // a2a_proxy_helpers.go: userMessagePayload: + // messageId: string + // content: string + // files: ParsedFilePart[] (name, uri, mimeType, size) + case "USER_MESSAGE": { + const payload = msg.payload as { + messageId?: string; + content?: string; + files?: Array<{ name?: unknown; uri?: unknown; mimeType?: unknown; size?: unknown }>; + }; + const content = typeof payload?.content === "string" ? payload.content : ""; + const files: Array<{ name: string; uri: string; mimeType?: string; size?: number }> = []; + if (Array.isArray(payload?.files)) { + for (const f of payload.files) { + if (typeof f?.uri === "string" && typeof f?.name === "string") { + files.push({ + name: f.name, + uri: f.uri, + mimeType: typeof f.mimeType === "string" ? f.mimeType : undefined, + size: typeof f.size === "number" ? f.size : undefined, + }); + } + } + } + // Render only when there's something visible. + if (content || files.length > 0) { + // Insert into agentMessages for rendering as a user-bubble. + // ChatTab uses msg.role === "user" for right-side alignment and + // user-toned styling, so we must set role:"user" explicitly. + const { agentMessages } = get(); + const existing = agentMessages[msg.workspace_id] || []; + set({ + agentMessages: { + ...agentMessages, + [msg.workspace_id]: [ + ...existing, + { + id: payload?.messageId ?? crypto.randomUUID(), + content, + role: "user", + timestamp: new Date().toISOString(), + ...(files.length > 0 ? { attachments: files } : {}), + }, + ], + }, + }); + } + break; + } + default: break; } diff --git a/canvas/src/store/canvas.ts b/canvas/src/store/canvas.ts index 1baa0e660..e92f2db29 100644 --- a/canvas/src/store/canvas.ts +++ b/canvas/src/store/canvas.ts @@ -224,8 +224,8 @@ interface CanvasState { batchPause: () => Promise; batchDelete: () => Promise; /** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */ - agentMessages: Record }>>; - consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>; + agentMessages: Record }>>; + consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>; /** WebSocket connection status — drives the live indicator in the Toolbar. */ wsStatus: "connected" | "connecting" | "disconnected"; setWsStatus: (status: "connected" | "connecting" | "disconnected") => void; diff --git a/workspace-server/internal/events/types.go b/workspace-server/internal/events/types.go index a081d46e8..4837260a1 100644 --- a/workspace-server/internal/events/types.go +++ b/workspace-server/internal/events/types.go @@ -41,8 +41,9 @@ type EventType string // scan-friendly as it grows. const ( // Chat / agent messaging — surfaces in canvas chat panels. - EventAgentMessage EventType = "AGENT_MESSAGE" - EventA2AResponse EventType = "A2A_RESPONSE" + EventAgentMessage EventType = "AGENT_MESSAGE" + EventA2AResponse EventType = "A2A_RESPONSE" + EventUserMessage EventType = "USER_MESSAGE" EventActivityLogged EventType = "ACTIVITY_LOGGED" EventChannelMessage EventType = "CHANNEL_MESSAGE" @@ -104,6 +105,7 @@ var AllEventTypes = []EventType{ EventApprovalEscalated, EventApprovalRequested, EventChannelMessage, + EventUserMessage, EventCronExecuted, EventCronSkipped, EventDelegationComplete, diff --git a/workspace-server/internal/events/types_test.go b/workspace-server/internal/events/types_test.go index bef0ed8bf..361f898ad 100644 --- a/workspace-server/internal/events/types_test.go +++ b/workspace-server/internal/events/types_test.go @@ -41,6 +41,7 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) { "DELEGATION_STATUS", "EXTERNAL_CREDENTIALS_ROTATED", "TASK_UPDATED", + "USER_MESSAGE", "WORKSPACE_AWAITING_AGENT", "WORKSPACE_DEGRADED", "WORKSPACE_HEARTBEAT", diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 08fd21020..5376024c1 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -344,6 +344,106 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle "duration_ms": durationMs, }) } + + // #1440: fan user's own outbound message to all sessions of this workspace. + // When a canvas user sends a message, the originating session renders it + // optimistically (useChatSend inserts it into the local store immediately). + // Other sessions never saw it — only the agent reply was broadcast. The fix: + // parse the JSON-RPC body and broadcast a USER_MESSAGE event so every + // other session renders the user's bubble without a manual refresh. + // Skipped for non-2xx responses (no message was delivered) and non-canvas + // callers (workspace-to-workspace calls have their own fan-out via the + // channel system). + if callerID == "" && statusCode < 400 { + if payload := extractUserMessagePayload(body, a2aMethod); payload != nil { + h.broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), payload) + } + } +} + +// userMessagePayload carries the fields the canvas needs to render a user +// bubble from a cross-session fan-out event. +type userMessagePayload struct { + MessageID string `json:"messageId,omitempty"` + Content string `json:"content,omitempty"` + Files []userMessageFile `json:"files,omitempty"` +} + +// userMessageFile mirrors the shape canvas/src/components/tabs/chat/message-parser.ts +// ParsedFilePart so the canvas can render the same download chip. +type userMessageFile struct { + Name string `json:"name"` + URI string `json:"uri"` + MimeType string `json:"mimeType,omitempty"` + Size int64 `json:"size,omitempty"` +} + +// extractUserMessagePayload parses a JSON-RPC message/send body and extracts the +// user-visible fields (text content + file attachments) for fan-out broadcasting. +// Returns nil when the body is not a canvas message/send request — either the +// method is not message/send, or the message does not have role=user (e.g. a +// heartbeat ping sent by the canvas for keepalive, which carries role:agent). +// Safe to call on any body — malformed JSON returns nil with no side effects. +func extractUserMessagePayload(body []byte, method string) map[string]interface{} { + if method != "message/send" { + return nil + } + var rpc struct { + ID string `json:"id,omitempty"` + Params struct { + Message struct { + Role string `json:"role,omitempty"` + MessageID string `json:"messageId,omitempty"` + Parts []struct { + Kind string `json:"kind,omitempty"` + Text string `json:"text,omitempty"` + File *struct { + Name string `json:"name,omitempty"` + URI string `json:"uri,omitempty"` + MimeType string `json:"mimeType,omitempty"` + Size int64 `json:"size,omitempty"` + } `json:"file,omitempty"` + } `json:"parts,omitempty"` + } `json:"message,omitempty"` + } `json:"params,omitempty"` + } + if err := json.Unmarshal(body, &rpc); err != nil { + return nil + } + // Only broadcast user-role messages. The canvas also sends internal + // heartbeat pings via message/send (role:agent) that should not appear + // as user bubbles. + if rpc.Params.Message.Role != "user" { + return nil + } + var content string + var files []userMessageFile + for _, part := range rpc.Params.Message.Parts { + if part.Kind == "text" && part.Text != "" { + if content != "" { + content += "\n" + } + content += part.Text + } else if part.Kind == "file" && part.File != nil { + files = append(files, userMessageFile{ + Name: part.File.Name, + URI: part.File.URI, + MimeType: part.File.MimeType, + Size: part.File.Size, + }) + } + } + // Skip pure-noise events (e.g. empty send with no text and no files). + // The canvas optimistically inserts the bubble before the request; an + // empty broadcast would create an extra blank bubble in other sessions. + if content == "" && len(files) == 0 { + return nil + } + return map[string]interface{}{ + "messageId": rpc.Params.Message.MessageID, + "content": content, + "files": files, + } } func nilIfEmpty(s string) *string { diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index d2173d4c3..f1fb7b575 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -2024,6 +2024,101 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { time.Sleep(80 * time.Millisecond) } +// ────────────────────────────────────────────────────────────────────────────── +// extractUserMessagePayload – fan-out of user's own outbound message (#1440) +// ────────────────────────────────────────────────────────────────────────────── + +func TestExtractUserMessagePayload_BasicText(t *testing.T) { + body := []byte(`{"id":"mid-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"hello world"}]}}}`) + got := extractUserMessagePayload(body, "message/send") + if got == nil { + t.Fatal("expected non-nil payload") + } + if got["messageId"] != "msg-1" { + t.Errorf("messageId: got %v", got["messageId"]) + } + if got["content"] != "hello world" { + t.Errorf("content: got %v", got["content"]) + } +} + +func TestExtractUserMessagePayload_MultipleTextParts(t *testing.T) { + body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"line one"},{"kind":"text","text":"line two"}]}}}`) + got := extractUserMessagePayload(body, "message/send") + if got == nil { + t.Fatal("expected non-nil payload") + } + if got["content"] != "line one\nline two" { + t.Errorf("content: got %v", got["content"]) + } +} + +func TestExtractUserMessagePayload_WithFile(t *testing.T) { + body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"check this"},{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf","mimeType":"application/pdf","size":12345}}]}}}`) + got := extractUserMessagePayload(body, "message/send") + if got == nil { + t.Fatal("expected non-nil payload") + } + if got["content"] != "check this" { + t.Errorf("content: got %v", got["content"]) + } + files, ok := got["files"].([]userMessageFile) + if !ok { + t.Fatalf("files type: got %T", got["files"]) + } + if len(files) != 1 { + t.Errorf("files count: got %d", len(files)) + } + if files[0].Name != "report.pdf" { + t.Errorf("file name: got %s", files[0].Name) + } + if files[0].URI != "workspace:/uploads/report.pdf" { + t.Errorf("file uri: got %s", files[0].URI) + } +} + +func TestExtractUserMessagePayload_EmptyParts(t *testing.T) { + body := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`) + got := extractUserMessagePayload(body, "message/send") + if got != nil { + t.Errorf("expected nil for empty parts, got %v", got) + } +} + +func TestExtractUserMessagePayload_NonMessageSendMethod(t *testing.T) { + body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"hello"}]}}}`) + got := extractUserMessagePayload(body, "initialize") + if got != nil { + t.Errorf("expected nil for non-message/send, got %v", got) + } +} + +func TestExtractUserMessagePayload_NonUserRole(t *testing.T) { + // Canvas also sends heartbeat pings via message/send with role:agent; + // these must NOT appear as user bubbles. + body := []byte(`{"params":{"message":{"role":"agent","parts":[{"kind":"text","text":"ping"}]}}}`) + got := extractUserMessagePayload(body, "message/send") + if got != nil { + t.Errorf("expected nil for role=agent, got %v", got) + } +} + +func TestExtractUserMessagePayload_MalformedJSON(t *testing.T) { + body := []byte(`not json at all`) + got := extractUserMessagePayload(body, "message/send") + if got != nil { + t.Errorf("expected nil for malformed JSON, got %v", got) + } +} + +func TestExtractUserMessagePayload_MissingPartsField(t *testing.T) { + body := []byte(`{"params":{"message":{"role":"user"}}}`) + got := extractUserMessagePayload(body, "message/send") + if got != nil { + t.Errorf("expected nil for missing parts, got %v", got) + } +} + // ────────────────────────────────────────────────────────────────────────────── // A2A auto-wake: hibernated workspace (#711) // ──────────────────────────────────────────────────────────────────────────────