fix(both): fan user's own message to all conversation sessions (#1440) #1514
@@ -271,7 +271,7 @@ export function MobileChat({
|
||||
const msgs = consume(agentId);
|
||||
for (const m of msgs) {
|
||||
appendMessageDeduped(
|
||||
createMessage("agent", m.content, m.attachments),
|
||||
createMessage(m.role ?? "agent", m.content, m.attachments),
|
||||
);
|
||||
}
|
||||
}, [historyLoading, agentId, appendMessageDeduped]);
|
||||
|
||||
@@ -27,7 +27,7 @@ export function useChatSocket(
|
||||
const msgs = consume(workspaceId);
|
||||
for (const m of msgs) {
|
||||
callbacksRef.current.onAgentMessage?.(
|
||||
createMessage("agent", m.content, m.attachments),
|
||||
createMessage(m.role ?? "agent", m.content, m.attachments),
|
||||
);
|
||||
}
|
||||
if (msgs.length > 0) {
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
/**
|
||||
* ws-events.ts — canonical WebSocket event taxonomy shared between
|
||||
* the Go platform and the TypeScript canvas.
|
||||
*
|
||||
* Source of truth: `workspace-server/internal/events/types.go` (Go side).
|
||||
* This file is the canvas mirror — every constant in that file MUST appear
|
||||
* here. The go vet / build pipeline does NOT enforce this today; the
|
||||
* discipline is manual: when adding a new EventType in Go, mirror it here.
|
||||
*
|
||||
* Consumer usage:
|
||||
* import { WS_EVENTS } from "@/lib/ws-events";
|
||||
* switch (msg.event) {
|
||||
* case WS_EVENTS.AGENT_MESSAGE: ...
|
||||
* case WS_EVENTS.USER_MESSAGE: ...
|
||||
* }
|
||||
*
|
||||
* Wire format: the string literal is sent over the WebSocket as
|
||||
* `WSMessage.Event`. Do NOT change these values.
|
||||
*/
|
||||
|
||||
// Chat / agent messaging.
|
||||
export const WS_AGENT_MESSAGE = "AGENT_MESSAGE";
|
||||
export const WS_A2A_RESPONSE = "A2A_RESPONSE";
|
||||
export const WS_USER_MESSAGE = "USER_MESSAGE";
|
||||
export const WS_ACTIVITY_LOGGED = "ACTIVITY_LOGGED";
|
||||
export const WS_CHANNEL_MESSAGE = "CHANNEL_MESSAGE";
|
||||
|
||||
// Workspace lifecycle.
|
||||
export const WS_WORKSPACE_PROVISIONING = "WORKSPACE_PROVISIONING";
|
||||
export const WS_WORKSPACE_PROVISION_FAILED = "WORKSPACE_PROVISION_FAILED";
|
||||
export const WS_WORKSPACE_ONLINE = "WORKSPACE_ONLINE";
|
||||
export const WS_WORKSPACE_OFFLINE = "WORKSPACE_OFFLINE";
|
||||
export const WS_WORKSPACE_DEGRADED = "WORKSPACE_DEGRADED";
|
||||
export const WS_WORKSPACE_HIBERNATED = "WORKSPACE_HIBERNATED";
|
||||
export const WS_WORKSPACE_PAUSED = "WORKSPACE_PAUSED";
|
||||
export const WS_WORKSPACE_REMOVED = "WORKSPACE_REMOVED";
|
||||
export const WS_WORKSPACE_AWAITING_AGENT = "WORKSPACE_AWAITING_AGENT";
|
||||
export const WS_WORKSPACE_HEARTBEAT = "WORKSPACE_HEARTBEAT";
|
||||
|
||||
// Agent assignment + identity.
|
||||
export const WS_AGENT_ASSIGNED = "AGENT_ASSIGNED";
|
||||
export const WS_AGENT_REPLACED = "AGENT_REPLACED";
|
||||
export const WS_AGENT_REMOVED = "AGENT_REMOVED";
|
||||
export const WS_AGENT_MOVED = "AGENT_MOVED";
|
||||
export const WS_AGENT_CARD_UPDATED = "AGENT_CARD_UPDATED";
|
||||
|
||||
// Delegation lifecycle.
|
||||
export const WS_DELEGATION_SENT = "DELEGATION_SENT";
|
||||
export const WS_DELEGATION_STATUS = "DELEGATION_STATUS";
|
||||
export const WS_DELEGATION_COMPLETE = "DELEGATION_COMPLETE";
|
||||
export const WS_DELEGATION_FAILED = "DELEGATION_FAILED";
|
||||
|
||||
// Task progression + scheduler.
|
||||
export const WS_TASK_UPDATED = "TASK_UPDATED";
|
||||
export const WS_CRON_EXECUTED = "CRON_EXECUTED";
|
||||
export const WS_CRON_SKIPPED = "CRON_SKIPPED";
|
||||
|
||||
// Approvals.
|
||||
export const WS_APPROVAL_REQUESTED = "APPROVAL_REQUESTED";
|
||||
export const WS_APPROVAL_ESCALATED = "APPROVAL_ESCALATED";
|
||||
|
||||
// Auth / credentials.
|
||||
export const WS_EXTERNAL_CREDENTIALS_ROTATED = "EXTERNAL_CREDENTIALS_ROTATED";
|
||||
|
||||
/** Union type of all known event names. Used to type WSMessage.Event. */
|
||||
export type WSEventName =
|
||||
| typeof WS_AGENT_MESSAGE
|
||||
| typeof WS_A2A_RESPONSE
|
||||
| typeof WS_USER_MESSAGE
|
||||
| typeof WS_ACTIVITY_LOGGED
|
||||
| typeof WS_CHANNEL_MESSAGE
|
||||
| typeof WS_WORKSPACE_PROVISIONING
|
||||
| typeof WS_WORKSPACE_PROVISION_FAILED
|
||||
| typeof WS_WORKSPACE_ONLINE
|
||||
| typeof WS_WORKSPACE_OFFLINE
|
||||
| typeof WS_WORKSPACE_DEGRADED
|
||||
| typeof WS_WORKSPACE_HIBERNATED
|
||||
| typeof WS_WORKSPACE_PAUSED
|
||||
| typeof WS_WORKSPACE_REMOVED
|
||||
| typeof WS_WORKSPACE_AWAITING_AGENT
|
||||
| typeof WS_WORKSPACE_HEARTBEAT
|
||||
| typeof WS_AGENT_ASSIGNED
|
||||
| typeof WS_AGENT_REPLACED
|
||||
| typeof WS_AGENT_REMOVED
|
||||
| typeof WS_AGENT_MOVED
|
||||
| typeof WS_AGENT_CARD_UPDATED
|
||||
| typeof WS_DELEGATION_SENT
|
||||
| typeof WS_DELEGATION_STATUS
|
||||
| typeof WS_DELEGATION_COMPLETE
|
||||
| typeof WS_DELEGATION_FAILED
|
||||
| typeof WS_TASK_UPDATED
|
||||
| typeof WS_CRON_EXECUTED
|
||||
| typeof WS_CRON_SKIPPED
|
||||
| typeof WS_APPROVAL_REQUESTED
|
||||
| typeof WS_APPROVAL_ESCALATED
|
||||
| typeof WS_EXTERNAL_CREDENTIALS_ROTATED;
|
||||
|
||||
/** All event name constants, for exhaustive-switch linting. */
|
||||
export const WS_EVENTS = {
|
||||
AGENT_MESSAGE: WS_AGENT_MESSAGE,
|
||||
A2A_RESPONSE: WS_A2A_RESPONSE,
|
||||
USER_MESSAGE: WS_USER_MESSAGE,
|
||||
ACTIVITY_LOGGED: WS_ACTIVITY_LOGGED,
|
||||
CHANNEL_MESSAGE: WS_CHANNEL_MESSAGE,
|
||||
WORKSPACE_PROVISIONING: WS_WORKSPACE_PROVISIONING,
|
||||
WORKSPACE_PROVISION_FAILED: WS_WORKSPACE_PROVISION_FAILED,
|
||||
WORKSPACE_ONLINE: WS_WORKSPACE_ONLINE,
|
||||
WORKSPACE_OFFLINE: WS_WORKSPACE_OFFLINE,
|
||||
WORKSPACE_DEGRADED: WS_WORKSPACE_DEGRADED,
|
||||
WORKSPACE_HIBERNATED: WS_WORKSPACE_HIBERNATED,
|
||||
WORKSPACE_PAUSED: WS_WORKSPACE_PAUSED,
|
||||
WORKSPACE_REMOVED: WS_WORKSPACE_REMOVED,
|
||||
WORKSPACE_AWAITING_AGENT: WS_WORKSPACE_AWAITING_AGENT,
|
||||
WORKSPACE_HEARTBEAT: WS_WORKSPACE_HEARTBEAT,
|
||||
AGENT_ASSIGNED: WS_AGENT_ASSIGNED,
|
||||
AGENT_REPLACED: WS_AGENT_REPLACED,
|
||||
AGENT_REMOVED: WS_AGENT_REMOVED,
|
||||
AGENT_MOVED: WS_AGENT_MOVED,
|
||||
AGENT_CARD_UPDATED: WS_AGENT_CARD_UPDATED,
|
||||
DELEGATION_SENT: WS_DELEGATION_SENT,
|
||||
DELEGATION_STATUS: WS_DELEGATION_STATUS,
|
||||
DELEGATION_COMPLETE: WS_DELEGATION_COMPLETE,
|
||||
DELEGATION_FAILED: WS_DELEGATION_FAILED,
|
||||
TASK_UPDATED: WS_TASK_UPDATED,
|
||||
CRON_EXECUTED: WS_CRON_EXECUTED,
|
||||
CRON_SKIPPED: WS_CRON_SKIPPED,
|
||||
APPROVAL_REQUESTED: WS_APPROVAL_REQUESTED,
|
||||
APPROVAL_ESCALATED: WS_APPROVAL_ESCALATED,
|
||||
EXTERNAL_CREDENTIALS_ROTATED: WS_EXTERNAL_CREDENTIALS_ROTATED,
|
||||
} as const;
|
||||
@@ -808,6 +808,210 @@ describe("handleCanvasEvent – A2A_RESPONSE", () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// USER_MESSAGE (cross-session fan-out of user's own outbound message)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// #1440: user's own message is optimistically inserted in the originating
|
||||
// session by useChatSend; other sessions need this event to render it.
|
||||
|
||||
describe("handleCanvasEvent – USER_MESSAGE", () => {
|
||||
it("appends a user message to agentMessages for the workspace", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node], [], null, {});
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {
|
||||
messageId: "msg-abc",
|
||||
content: "Hello, agent!",
|
||||
},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
expect(set).toHaveBeenCalledOnce();
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string }>>;
|
||||
};
|
||||
expect(agentMessages["ws-1"]).toHaveLength(1);
|
||||
expect(agentMessages["ws-1"][0].id).toBe("msg-abc");
|
||||
expect(agentMessages["ws-1"][0].content).toBe("Hello, agent!");
|
||||
expect(agentMessages["ws-1"][0].role).toBe("user");
|
||||
expect(typeof agentMessages["ws-1"][0].timestamp).toBe("string");
|
||||
});
|
||||
|
||||
it("appends to existing messages rather than replacing them", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const existing = [{ id: "old", content: "prior msg", timestamp: "2024-01-01T00:00:00Z" }];
|
||||
const { get, set } = makeStore([node], [], null, { "ws-1": existing });
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {
|
||||
messageId: "msg-xyz",
|
||||
content: "second user message",
|
||||
},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ id: string; content: string }>>;
|
||||
};
|
||||
expect(agentMessages["ws-1"]).toHaveLength(2);
|
||||
expect(agentMessages["ws-1"][0].content).toBe("prior msg");
|
||||
expect(agentMessages["ws-1"][1].content).toBe("second user message");
|
||||
expect(agentMessages["ws-1"][1].id).toBe("msg-xyz");
|
||||
});
|
||||
|
||||
it("is a no-op when both content and files are absent", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node]);
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
expect(set).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("is a no-op when content is empty string", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node]);
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: { content: "" },
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
expect(set).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("passes through valid file attachments", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node], [], null, {});
|
||||
const att = {
|
||||
uri: "workspace:/uploads/doc.pdf",
|
||||
name: "doc.pdf",
|
||||
mimeType: "application/pdf",
|
||||
size: 98765,
|
||||
};
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {
|
||||
messageId: "msg-with-file",
|
||||
content: "see attached",
|
||||
files: [att],
|
||||
},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ id: string; attachments?: Array<{ uri: string; name: string; mimeType?: string; size?: number }> }>>;
|
||||
};
|
||||
expect(agentMessages["ws-1"]).toHaveLength(1);
|
||||
expect(agentMessages["ws-1"][0].id).toBe("msg-with-file");
|
||||
expect(agentMessages["ws-1"][0].role).toBe("user");
|
||||
expect(agentMessages["ws-1"][0].attachments).toEqual([att]);
|
||||
});
|
||||
|
||||
it("drops file entries with missing or empty uri/name", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node], [], null, {});
|
||||
// Two bad entries followed by one good one
|
||||
const bad = [{ uri: "" }, { name: "" }, { uri: "x", name: "y" }];
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {
|
||||
messageId: "msg-bad-files",
|
||||
content: "check files",
|
||||
files: bad as Array<{ uri?: unknown; name?: unknown }>,
|
||||
},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ attachments?: Array<{ uri: string }> }>>;
|
||||
};
|
||||
// Only the valid entry should survive the filter.
|
||||
expect(agentMessages["ws-1"][0].attachments).toHaveLength(1);
|
||||
expect(agentMessages["ws-1"][0].attachments![0].uri).toBe("x");
|
||||
});
|
||||
|
||||
it("uses crypto.randomUUID() when messageId is absent", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node], [], null, {});
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: { content: "no id field" },
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ id: string }>>;
|
||||
};
|
||||
expect(agentMessages["ws-1"][0].id).toBeDefined();
|
||||
expect(typeof agentMessages["ws-1"][0].id).toBe("string");
|
||||
});
|
||||
|
||||
it("renders a files-only message (no text) when content is absent but files present", () => {
|
||||
const node = makeNode("ws-1");
|
||||
const { get, set } = makeStore([node], [], null, {});
|
||||
|
||||
handleCanvasEvent(
|
||||
makeMsg({
|
||||
event: "USER_MESSAGE",
|
||||
workspace_id: "ws-1",
|
||||
payload: {
|
||||
messageId: "file-only",
|
||||
files: [{ uri: "workspace:/x.pdf", name: "x.pdf" }],
|
||||
},
|
||||
}),
|
||||
get,
|
||||
set
|
||||
);
|
||||
|
||||
const { agentMessages } = set.mock.calls[0][0] as {
|
||||
agentMessages: Record<string, Array<{ content: string }>>;
|
||||
};
|
||||
expect(agentMessages["ws-1"]).toHaveLength(1);
|
||||
expect(agentMessages["ws-1"][0].content).toBe("");
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unknown event
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -71,7 +71,7 @@ export function handleCanvasEvent(
|
||||
nodes: Node<WorkspaceNodeData>[];
|
||||
edges: Edge[];
|
||||
selectedNodeId: string | null;
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
},
|
||||
set: (partial: Record<string, unknown>) => void,
|
||||
): void {
|
||||
@@ -515,6 +515,66 @@ export function handleCanvasEvent(
|
||||
break;
|
||||
}
|
||||
|
||||
// #1440 USER_MESSAGE: the canvas optimistically inserts the user's
|
||||
// own message into the originating session's store before the request
|
||||
// fires (useChatSend → onUserMessage). Other sessions never saw it —
|
||||
// the agent reply was broadcast but not the user's own text. This event
|
||||
// fans the user's message to every OTHER session so they render the
|
||||
// bubble without requiring a manual refresh. The originating session
|
||||
// collapses its optimistic copy via the dedup mechanism in ChatTab
|
||||
// (role+content+timestamp window) — no double bubble.
|
||||
//
|
||||
// Payload shape mirrors EventUserMessage in Go
|
||||
// a2a_proxy_helpers.go: userMessagePayload:
|
||||
// messageId: string
|
||||
// content: string
|
||||
// files: ParsedFilePart[] (name, uri, mimeType, size)
|
||||
case "USER_MESSAGE": {
|
||||
const payload = msg.payload as {
|
||||
messageId?: string;
|
||||
content?: string;
|
||||
files?: Array<{ name?: unknown; uri?: unknown; mimeType?: unknown; size?: unknown }>;
|
||||
};
|
||||
const content = typeof payload?.content === "string" ? payload.content : "";
|
||||
const files: Array<{ name: string; uri: string; mimeType?: string; size?: number }> = [];
|
||||
if (Array.isArray(payload?.files)) {
|
||||
for (const f of payload.files) {
|
||||
if (typeof f?.uri === "string" && typeof f?.name === "string") {
|
||||
files.push({
|
||||
name: f.name,
|
||||
uri: f.uri,
|
||||
mimeType: typeof f.mimeType === "string" ? f.mimeType : undefined,
|
||||
size: typeof f.size === "number" ? f.size : undefined,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
// Render only when there's something visible.
|
||||
if (content || files.length > 0) {
|
||||
// Insert into agentMessages for rendering as a user-bubble.
|
||||
// ChatTab uses msg.role === "user" for right-side alignment and
|
||||
// user-toned styling, so we must set role:"user" explicitly.
|
||||
const { agentMessages } = get();
|
||||
const existing = agentMessages[msg.workspace_id] || [];
|
||||
set({
|
||||
agentMessages: {
|
||||
...agentMessages,
|
||||
[msg.workspace_id]: [
|
||||
...existing,
|
||||
{
|
||||
id: payload?.messageId ?? crypto.randomUUID(),
|
||||
content,
|
||||
role: "user",
|
||||
timestamp: new Date().toISOString(),
|
||||
...(files.length > 0 ? { attachments: files } : {}),
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -224,8 +224,8 @@ interface CanvasState {
|
||||
batchPause: () => Promise<void>;
|
||||
batchDelete: () => Promise<void>;
|
||||
/** Agent-pushed messages keyed by workspace ID. ChatTab consumes and clears these. */
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
|
||||
agentMessages: Record<string, Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>>;
|
||||
consumeAgentMessages: (workspaceId: string) => Array<{ id: string; content: string; timestamp: string; role?: "user" | "agent"; attachments?: Array<{ name: string; uri: string; mimeType?: string; size?: number }> }>;
|
||||
/** WebSocket connection status — drives the live indicator in the Toolbar. */
|
||||
wsStatus: "connected" | "connecting" | "disconnected";
|
||||
setWsStatus: (status: "connected" | "connecting" | "disconnected") => void;
|
||||
|
||||
@@ -41,8 +41,9 @@ type EventType string
|
||||
// scan-friendly as it grows.
|
||||
const (
|
||||
// Chat / agent messaging — surfaces in canvas chat panels.
|
||||
EventAgentMessage EventType = "AGENT_MESSAGE"
|
||||
EventA2AResponse EventType = "A2A_RESPONSE"
|
||||
EventAgentMessage EventType = "AGENT_MESSAGE"
|
||||
EventA2AResponse EventType = "A2A_RESPONSE"
|
||||
EventUserMessage EventType = "USER_MESSAGE"
|
||||
EventActivityLogged EventType = "ACTIVITY_LOGGED"
|
||||
EventChannelMessage EventType = "CHANNEL_MESSAGE"
|
||||
|
||||
@@ -104,6 +105,7 @@ var AllEventTypes = []EventType{
|
||||
EventApprovalEscalated,
|
||||
EventApprovalRequested,
|
||||
EventChannelMessage,
|
||||
EventUserMessage,
|
||||
EventCronExecuted,
|
||||
EventCronSkipped,
|
||||
EventDelegationComplete,
|
||||
|
||||
@@ -41,6 +41,7 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
|
||||
"DELEGATION_STATUS",
|
||||
"EXTERNAL_CREDENTIALS_ROTATED",
|
||||
"TASK_UPDATED",
|
||||
"USER_MESSAGE",
|
||||
"WORKSPACE_AWAITING_AGENT",
|
||||
"WORKSPACE_DEGRADED",
|
||||
"WORKSPACE_HEARTBEAT",
|
||||
|
||||
@@ -344,6 +344,106 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
"duration_ms": durationMs,
|
||||
})
|
||||
}
|
||||
|
||||
// #1440: fan user's own outbound message to all sessions of this workspace.
|
||||
// When a canvas user sends a message, the originating session renders it
|
||||
// optimistically (useChatSend inserts it into the local store immediately).
|
||||
// Other sessions never saw it — only the agent reply was broadcast. The fix:
|
||||
// parse the JSON-RPC body and broadcast a USER_MESSAGE event so every
|
||||
// other session renders the user's bubble without a manual refresh.
|
||||
// Skipped for non-2xx responses (no message was delivered) and non-canvas
|
||||
// callers (workspace-to-workspace calls have their own fan-out via the
|
||||
// channel system).
|
||||
if callerID == "" && statusCode < 400 {
|
||||
if payload := extractUserMessagePayload(body, a2aMethod); payload != nil {
|
||||
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// userMessagePayload carries the fields the canvas needs to render a user
|
||||
// bubble from a cross-session fan-out event.
|
||||
type userMessagePayload struct {
|
||||
MessageID string `json:"messageId,omitempty"`
|
||||
Content string `json:"content,omitempty"`
|
||||
Files []userMessageFile `json:"files,omitempty"`
|
||||
}
|
||||
|
||||
// userMessageFile mirrors the shape canvas/src/components/tabs/chat/message-parser.ts
|
||||
// ParsedFilePart so the canvas can render the same download chip.
|
||||
type userMessageFile struct {
|
||||
Name string `json:"name"`
|
||||
URI string `json:"uri"`
|
||||
MimeType string `json:"mimeType,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
}
|
||||
|
||||
// extractUserMessagePayload parses a JSON-RPC message/send body and extracts the
|
||||
// user-visible fields (text content + file attachments) for fan-out broadcasting.
|
||||
// Returns nil when the body is not a canvas message/send request — either the
|
||||
// method is not message/send, or the message does not have role=user (e.g. a
|
||||
// heartbeat ping sent by the canvas for keepalive, which carries role:agent).
|
||||
// Safe to call on any body — malformed JSON returns nil with no side effects.
|
||||
func extractUserMessagePayload(body []byte, method string) map[string]interface{} {
|
||||
if method != "message/send" {
|
||||
return nil
|
||||
}
|
||||
var rpc struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Params struct {
|
||||
Message struct {
|
||||
Role string `json:"role,omitempty"`
|
||||
MessageID string `json:"messageId,omitempty"`
|
||||
Parts []struct {
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Text string `json:"text,omitempty"`
|
||||
File *struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
URI string `json:"uri,omitempty"`
|
||||
MimeType string `json:"mimeType,omitempty"`
|
||||
Size int64 `json:"size,omitempty"`
|
||||
} `json:"file,omitempty"`
|
||||
} `json:"parts,omitempty"`
|
||||
} `json:"message,omitempty"`
|
||||
} `json:"params,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &rpc); err != nil {
|
||||
return nil
|
||||
}
|
||||
// Only broadcast user-role messages. The canvas also sends internal
|
||||
// heartbeat pings via message/send (role:agent) that should not appear
|
||||
// as user bubbles.
|
||||
if rpc.Params.Message.Role != "user" {
|
||||
return nil
|
||||
}
|
||||
var content string
|
||||
var files []userMessageFile
|
||||
for _, part := range rpc.Params.Message.Parts {
|
||||
if part.Kind == "text" && part.Text != "" {
|
||||
if content != "" {
|
||||
content += "\n"
|
||||
}
|
||||
content += part.Text
|
||||
} else if part.Kind == "file" && part.File != nil {
|
||||
files = append(files, userMessageFile{
|
||||
Name: part.File.Name,
|
||||
URI: part.File.URI,
|
||||
MimeType: part.File.MimeType,
|
||||
Size: part.File.Size,
|
||||
})
|
||||
}
|
||||
}
|
||||
// Skip pure-noise events (e.g. empty send with no text and no files).
|
||||
// The canvas optimistically inserts the bubble before the request; an
|
||||
// empty broadcast would create an extra blank bubble in other sessions.
|
||||
if content == "" && len(files) == 0 {
|
||||
return nil
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"messageId": rpc.Params.Message.MessageID,
|
||||
"content": content,
|
||||
"files": files,
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) *string {
|
||||
|
||||
@@ -2024,6 +2024,101 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// extractUserMessagePayload – fan-out of user's own outbound message (#1440)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestExtractUserMessagePayload_BasicText(t *testing.T) {
|
||||
body := []byte(`{"id":"mid-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-1","parts":[{"kind":"text","text":"hello world"}]}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got == nil {
|
||||
t.Fatal("expected non-nil payload")
|
||||
}
|
||||
if got["messageId"] != "msg-1" {
|
||||
t.Errorf("messageId: got %v", got["messageId"])
|
||||
}
|
||||
if got["content"] != "hello world" {
|
||||
t.Errorf("content: got %v", got["content"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_MultipleTextParts(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"line one"},{"kind":"text","text":"line two"}]}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got == nil {
|
||||
t.Fatal("expected non-nil payload")
|
||||
}
|
||||
if got["content"] != "line one\nline two" {
|
||||
t.Errorf("content: got %v", got["content"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_WithFile(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"check this"},{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf","mimeType":"application/pdf","size":12345}}]}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got == nil {
|
||||
t.Fatal("expected non-nil payload")
|
||||
}
|
||||
if got["content"] != "check this" {
|
||||
t.Errorf("content: got %v", got["content"])
|
||||
}
|
||||
files, ok := got["files"].([]userMessageFile)
|
||||
if !ok {
|
||||
t.Fatalf("files type: got %T", got["files"])
|
||||
}
|
||||
if len(files) != 1 {
|
||||
t.Errorf("files count: got %d", len(files))
|
||||
}
|
||||
if files[0].Name != "report.pdf" {
|
||||
t.Errorf("file name: got %s", files[0].Name)
|
||||
}
|
||||
if files[0].URI != "workspace:/uploads/report.pdf" {
|
||||
t.Errorf("file uri: got %s", files[0].URI)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_EmptyParts(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"role":"user","parts":[]}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got != nil {
|
||||
t.Errorf("expected nil for empty parts, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_NonMessageSendMethod(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"hello"}]}}}`)
|
||||
got := extractUserMessagePayload(body, "initialize")
|
||||
if got != nil {
|
||||
t.Errorf("expected nil for non-message/send, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_NonUserRole(t *testing.T) {
|
||||
// Canvas also sends heartbeat pings via message/send with role:agent;
|
||||
// these must NOT appear as user bubbles.
|
||||
body := []byte(`{"params":{"message":{"role":"agent","parts":[{"kind":"text","text":"ping"}]}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got != nil {
|
||||
t.Errorf("expected nil for role=agent, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_MalformedJSON(t *testing.T) {
|
||||
body := []byte(`not json at all`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got != nil {
|
||||
t.Errorf("expected nil for malformed JSON, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractUserMessagePayload_MissingPartsField(t *testing.T) {
|
||||
body := []byte(`{"params":{"message":{"role":"user"}}}`)
|
||||
got := extractUserMessagePayload(body, "message/send")
|
||||
if got != nil {
|
||||
t.Errorf("expected nil for missing parts, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// A2A auto-wake: hibernated workspace (#711)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user