From d028fe19ff6a5d14879ef4efda175bf4b4e0352d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 19:35:58 -0700 Subject: [PATCH] =?UTF-8?q?feat(notify):=20agent=20=E2=86=92=20user=20file?= =?UTF-8?q?=20attachments=20via=20send=5Fmessage=5Fto=5Fuser?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the gap where the Director would say "ZIP is ready at /tmp/foo.zip" in plain text instead of attaching a download chip — the runtime literally had no API for outbound file attachments. The canvas + platform's chat-uploads infrastructure already supported the inbound (user → agent) direction (commit 94d9331c); this PR wires the outbound side. End-to-end shape: agent: send_message_to_user("Done!", attachments=["/tmp/build.zip"]) ↓ runtime POST /workspaces//chat/uploads (multipart) ↓ platform /workspace/.molecule/chat-uploads/-build.zip → returns {uri: workspace:/...build.zip, name, mimeType, size} ↓ runtime POST /workspaces//notify {message: "Done!", attachments: [{uri, name, mimeType, size}]} ↓ platform Broadcasts AGENT_MESSAGE with attachments + persists to activity_logs with response_body = {result: "Done!", parts: [{kind:file, file:{...}}]} ↓ canvas WS push: canvas-events.ts adds attachments to agentMessages queue Reload: ChatTab.loadMessagesFromDB → extractFilesFromTask sees parts[] Either path → ChatTab renders download chip via existing path Files changed: workspace-server/internal/handlers/activity.go - NotifyAttachment struct {URI, Name, MimeType, Size} - Notify body accepts attachments[], broadcasts in payload, persists as response_body.parts[].kind="file" canvas/src/store/canvas-events.ts - AGENT_MESSAGE handler reads payload.attachments, type-validates each entry, attaches to agentMessages queue - Skips empty events (was: skipped only when content empty) workspace/a2a_tools.py - tool_send_message_to_user(message, attachments=[paths]) - New _upload_chat_files helper: opens each path, multipart POSTs to /chat/uploads, returns the platform's metadata - Fail-fast on missing file / upload error — never sends a notify with a half-rendered attachment chip workspace/a2a_mcp_server.py - inputSchema declares attachments param so claude-code SDK surfaces it to the model - Defensive filter on the dispatch path (drops non-string entries if the model sends a malformed payload) Tests: - 4 new Python: success path, missing file, upload 5xx, no-attach backwards compat - 1 new Go: Notify-with-attachments persists parts[] in response_body so chat reload reconstructs the chip Why /tmp paths work even though they're outside the canvas's allowed roots: the runtime tool reads the bytes locally and re-uploads through /chat/uploads, which lands the file under /workspace (an allowed root). The agent can specify any readable path. Does NOT include: agent → agent file transfer. Different design problem (cross-workspace download auth: peer would need a credential to call sender's /chat/download). Tracked as a follow-up under task #114. Co-Authored-By: Claude Opus 4.7 (1M context) --- canvas/src/store/canvas-events.ts | 27 ++++++- .../internal/handlers/activity.go | 54 ++++++++++++- .../internal/handlers/activity_test.go | 75 +++++++++++++++++ workspace/a2a_mcp_server.py | 21 ++++- workspace/a2a_tools.py | 79 +++++++++++++++++- workspace/tests/test_a2a_tools_impl.py | 80 +++++++++++++++++++ 6 files changed, 323 insertions(+), 13 deletions(-) diff --git a/canvas/src/store/canvas-events.ts b/canvas/src/store/canvas-events.ts index 77220a79..58516d9e 100644 --- a/canvas/src/store/canvas-events.ts +++ b/canvas/src/store/canvas-events.ts @@ -395,7 +395,25 @@ export function handleCanvasEvent( case "AGENT_MESSAGE": { const content = (msg.payload.message as string) ?? ""; - if (content) { + // Attachments come straight through from the platform's Notify + // handler when the agent's tool_send_message_to_user passes file + // refs. Shape mirrors NotifyAttachment in activity.go and matches + // ChatTab's createMessage(role, content, attachments) signature + // exactly, so no adapter needed downstream. + const rawAttachments = msg.payload.attachments; + const attachments = Array.isArray(rawAttachments) + ? (rawAttachments as Array<{ uri?: unknown; name?: unknown; mimeType?: unknown; size?: unknown }>) + .filter((a) => typeof a?.uri === "string" && typeof a?.name === "string") + .map((a) => ({ + uri: a.uri as string, + name: a.name as string, + mimeType: typeof a.mimeType === "string" ? a.mimeType : undefined, + size: typeof a.size === "number" ? a.size : undefined, + })) + : undefined; + // Skip when both content and attachments are empty — pure-noise + // event we don't want to render as a blank bubble. + if (content || (attachments && attachments.length > 0)) { const { agentMessages } = get(); const existing = agentMessages[msg.workspace_id] || []; set({ @@ -403,7 +421,12 @@ export function handleCanvasEvent( ...agentMessages, [msg.workspace_id]: [ ...existing, - { id: crypto.randomUUID(), content, timestamp: new Date().toISOString() }, + { + id: crypto.randomUUID(), + content, + timestamp: new Date().toISOString(), + ...(attachments && attachments.length > 0 ? { attachments } : {}), + }, ], }, }); diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index ba6d9f0f..971d559a 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -257,13 +257,32 @@ func scanSessionSearchRows(rows interface { return items, nil } +// NotifyAttachment is one file the agent wants to attach to its push. +// URIs come from /workspaces/:id/chat/uploads (canonical "workspace:" +// scheme) — the runtime's tool_send_message_to_user uploads any +// caller-specified file path through that endpoint first to get a +// shape the canvas can resolve via the existing Download path. +type NotifyAttachment struct { + URI string `json:"uri" binding:"required"` + Name string `json:"name" binding:"required"` + MimeType string `json:"mimeType,omitempty"` + Size int64 `json:"size,omitempty"` +} + // Notify handles POST /workspaces/:id/notify — agents push messages to the canvas chat. // This enables agents to send interim updates ("I'll check on it") and follow-up results // without waiting for the user to poll. Messages are broadcast via WebSocket only. +// +// Attachments: optional list of file references. Each renders as a +// download chip in the canvas via the existing extractFilesFromTask +// path. The runtime tool uploads file bytes to /chat/uploads first +// and passes the returned URIs here, so this handler only stores +// metadata — never raw bytes. func (h *ActivityHandler) Notify(c *gin.Context) { workspaceID := c.Param("id") var body struct { - Message string `json:"message" binding:"required"` + Message string `json:"message" binding:"required"` + Attachments []NotifyAttachment `json:"attachments,omitempty"` } if err := c.ShouldBindJSON(&body); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"}) @@ -280,11 +299,15 @@ func (h *ActivityHandler) Notify(c *gin.Context) { return } - h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", map[string]interface{}{ + broadcastPayload := map[string]interface{}{ "message": body.Message, "workspace_id": workspaceID, "name": wsName, - }) + } + if len(body.Attachments) > 0 { + broadcastPayload["attachments"] = body.Attachments + } + h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload) // Persist to activity_logs so the chat history loader restores this // message after a page reload. Pre-fix, send_message_to_user pushes @@ -305,7 +328,30 @@ func (h *ActivityHandler) Notify(c *gin.Context) { // sees the message; persistence failure just means the message // won't survive reload (pre-fix behavior). Don't fail the whole // notify on a DB hiccup. - respJSON, _ := json.Marshal(map[string]interface{}{"result": body.Message}) + // response_body shape — chosen to feed BOTH: + // - extractResponseText: looks at body.result (string) and returns it + // - extractFilesFromTask: looks at body.parts[] for kind=file + // so a chat reload after a notify-with-attachments restores both + // the text bubble AND the download chips. + respPayload := map[string]interface{}{"result": body.Message} + if len(body.Attachments) > 0 { + fileParts := make([]map[string]interface{}, 0, len(body.Attachments)) + for _, a := range body.Attachments { + fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name} + if a.MimeType != "" { + fileMeta["mimeType"] = a.MimeType + } + if a.Size > 0 { + fileMeta["size"] = a.Size + } + fileParts = append(fileParts, map[string]interface{}{ + "kind": "file", + "file": fileMeta, + }) + } + respPayload["parts"] = fileParts + } + respJSON, _ := json.Marshal(respPayload) preview := body.Message if len(preview) > 80 { preview = preview[:80] + "…" diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go index 9cba5873..e3d8cda8 100644 --- a/workspace-server/internal/handlers/activity_test.go +++ b/workspace-server/internal/handlers/activity_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "database/sql/driver" "encoding/json" "fmt" "net/http" @@ -265,6 +266,80 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) { } } +func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) { + // Pins the response_body shape: must include {result: msg, parts: [{kind:"file", file: {...}}]} + // so the chat history loader's extractFilesFromTask reconstructs the + // download chips after a page reload. Without `parts`, the bubble + // shows up but the attachment chip is silently dropped on every + // refresh. + mockDB, mock, _ := sqlmock.New() + defer mockDB.Close() + db.DB = mockDB + + mock.ExpectQuery(`SELECT name FROM workspaces`). + WithArgs("ws-attach"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + + // Capture the JSONB arg via a custom matcher so we can assert on + // the persisted shape (must include parts[].kind=file so reload + // reconstructs download chips). + var capturedRespJSON string + respMatcher := sqlmockArgMatcher(func(v driver.Value) bool { + s, ok := v.(string) + if !ok { + return false + } + capturedRespJSON = s + return true + }) + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs("ws-attach", sqlmock.AnyArg(), respMatcher). + WillReturnResult(sqlmock.NewResult(1, 1)) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-attach"}} + body := `{ + "message": "Here's the build:", + "attachments": [ + {"uri": "workspace:/workspace/.molecule/chat-uploads/abc-build.zip", + "name": "build.zip", "mimeType": "application/zip", "size": 12345} + ] + }` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-attach/notify", strings.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Notify(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations not met: %v", err) + } + // Verify the persisted response_body has both the text (so chat + // reload renders the bubble) AND a parts[].kind=file (so reload + // renders the download chip). + if !strings.Contains(capturedRespJSON, `"result":"Here's the build:"`) { + t.Errorf("response_body missing result text: %s", capturedRespJSON) + } + if !strings.Contains(capturedRespJSON, `"kind":"file"`) || + !strings.Contains(capturedRespJSON, `"name":"build.zip"`) || + !strings.Contains(capturedRespJSON, `workspace:/workspace/.molecule/chat-uploads/abc-build.zip`) { + t.Errorf("response_body missing file part — chat reload won't render the chip: %s", capturedRespJSON) + } +} + +// sqlmockArgMatcher adapts a closure into the sqlmock.Argument interface +// so tests can capture/inspect the actual driver value sent into a +// prepared statement. Returns true to match. +type sqlmockArgMatcher func(driver.Value) bool + +func (m sqlmockArgMatcher) Match(v driver.Value) bool { return m(v) } + func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) { // Persistence is best-effort — a DB hiccup must NOT block the // WebSocket push (which the user is already seeing in their open diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 29ca2542..ce1a7569 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -113,13 +113,18 @@ TOOLS = [ }, { "name": "send_message_to_user", - "description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes. The message appears in the user's chat as if you're proactively reaching out.", + "description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download. The message appears in the user's chat as if you're proactively reaching out.", "inputSchema": { "type": "object", "properties": { "message": { "type": "string", - "description": "The message to send to the user", + "description": "The message to send to the user. Required even when sending attachments — set to a short caption like 'Here's the build:' or 'Done — see attached.'", + }, + "attachments": { + "type": "array", + "description": "Optional list of absolute file paths inside this container to attach. Each renders as a clickable download chip in the user's chat. Use this whenever you'd otherwise paste a path in the message text — paths render as plain text the user can't click. Examples: ['/tmp/build-output.zip'] or ['/workspace/report.pdf', '/workspace/data.csv']. Files are uploaded through the platform's chat-uploads endpoint (25 MB per file cap).", + "items": {"type": "string"}, }, }, "required": ["message"], @@ -185,7 +190,17 @@ async def handle_tool_call(name: str, arguments: dict) -> str: arguments.get("task_id", ""), ) elif name == "send_message_to_user": - return await tool_send_message_to_user(arguments.get("message", "")) + raw_attachments = arguments.get("attachments") + attachments: list[str] | None = None + if isinstance(raw_attachments, list): + # Defensive: filter to strings only — claude-code SDK occasionally + # emits dicts here when the model misreads the schema. Drop the + # bad entries rather than 500 the whole call. + attachments = [p for p in raw_attachments if isinstance(p, str) and p] + return await tool_send_message_to_user( + arguments.get("message", ""), + attachments=attachments, + ) elif name == "list_peers": return await tool_list_peers() elif name == "get_workspace_info": diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 3e3671bb..313085ed 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -285,18 +285,89 @@ async def tool_check_task_status(workspace_id: str, task_id: str) -> str: return f"Error checking delegations: {e}" -async def tool_send_message_to_user(message: str) -> str: - """Send a message directly to the user's canvas chat via WebSocket.""" +async def _upload_chat_files(client: httpx.AsyncClient, paths: list[str]) -> tuple[list[dict], str | None]: + """Upload local file paths through /workspaces//chat/uploads. + + The platform stages each upload under /workspace/.molecule/chat-uploads + (an "allowed root" the canvas knows how to render via the Download + endpoint) and returns metadata the broadcast payload references. + + Why we route through upload instead of just passing the agent's path: + the canvas's allowed-root list is /configs, /workspace, /home, /plugins + — files at /tmp or /root would be unreachable. Uploading copies the + bytes into an allowed root regardless of where the agent wrote them. + + Returns (attachments, error). On any failure the caller should NOT + fire the notify — partial-attach would surface a half-rendered chip. + """ + if not paths: + return [], None + files_payload: list[tuple[str, tuple[str, bytes, str]]] = [] + for p in paths: + if not isinstance(p, str) or not p: + return [], f"Error: invalid attachment path {p!r}" + if not os.path.isfile(p): + return [], f"Error: attachment not found: {p}" + try: + with open(p, "rb") as fh: + data = fh.read() + except OSError as e: + return [], f"Error reading {p}: {e}" + files_payload.append(("files", (os.path.basename(p), data, "application/octet-stream"))) + try: + resp = await client.post( + f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/chat/uploads", + files=files_payload, + headers=_auth_headers_for_heartbeat(), + ) + except Exception as e: + return [], f"Error uploading attachments: {e}" + if resp.status_code != 200: + return [], f"Error: chat/uploads returned {resp.status_code}: {resp.text[:200]}" + try: + body = resp.json() + except Exception as e: + return [], f"Error parsing upload response: {e}" + uploaded = body.get("files") or [] + if not isinstance(uploaded, list) or len(uploaded) != len(paths): + return [], f"Error: upload returned {len(uploaded) if isinstance(uploaded, list) else 'invalid'} entries for {len(paths)} files" + return uploaded, None + + +async def tool_send_message_to_user(message: str, attachments: list[str] | None = None) -> str: + """Send a message directly to the user's canvas chat via WebSocket. + + Args: + message: The text to display in the user's chat. Required even + when sending attachments — set to a short caption like + "Here's the build output:" or "Done — see attached." + attachments: Optional list of absolute file paths inside this + container. Each is uploaded to the platform and rendered + in the canvas as a clickable download chip. Use this + instead of pasting paths in the message text — paths + render as plain text and the user can't click them. + Examples: + attachments=["/tmp/build-output.zip"] + attachments=["/workspace/report.pdf", "/workspace/data.csv"] + """ if not message: return "Error: message is required" try: - async with httpx.AsyncClient(timeout=5.0) as client: + async with httpx.AsyncClient(timeout=60.0) as client: + uploaded, upload_err = await _upload_chat_files(client, attachments or []) + if upload_err: + return upload_err + payload: dict = {"message": message} + if uploaded: + payload["attachments"] = uploaded resp = await client.post( f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/notify", - json={"message": message}, + json=payload, headers=_auth_headers_for_heartbeat(), ) if resp.status_code == 200: + if uploaded: + return f"Message sent to user with {len(uploaded)} attachment(s)" return "Message sent to user" return f"Error: platform returned {resp.status_code}" except Exception as e: diff --git a/workspace/tests/test_a2a_tools_impl.py b/workspace/tests/test_a2a_tools_impl.py index 90cb9099..e8cb045b 100644 --- a/workspace/tests/test_a2a_tools_impl.py +++ b/workspace/tests/test_a2a_tools_impl.py @@ -370,6 +370,86 @@ class TestToolSendMessageToUser: assert "Error sending message" in result assert "platform unreachable" in result + # --- attachments --- + + async def test_attachments_uploads_then_notifies_with_uris(self, tmp_path): + import a2a_tools + # Create a real file the tool will read off disk. + f = tmp_path / "build.zip" + f.write_bytes(b"zip-bytes-here") + + # Mock client: first POST = chat/uploads (returns file metadata), + # second POST = notify. + upload_resp = _resp(200, { + "files": [{ + "uri": "workspace:/workspace/.molecule/chat-uploads/abc-build.zip", + "name": "build.zip", + "mimeType": "application/zip", + "size": len(b"zip-bytes-here"), + }], + }) + notify_resp = _resp(200, {}) + mc = _make_http_mock(post_resp=notify_resp) + mc.post = AsyncMock(side_effect=[upload_resp, notify_resp]) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_send_message_to_user( + "Done — see attached.", + attachments=[str(f)], + ) + + assert "1 attachment" in result + # Verify the notify call carried attachment metadata, not bytes. + notify_call = mc.post.await_args_list[1] + notify_body = notify_call.kwargs.get("json") or {} + assert notify_body.get("message") == "Done — see attached." + assert len(notify_body.get("attachments", [])) == 1 + att = notify_body["attachments"][0] + assert att["uri"].startswith("workspace:/workspace/") + assert att["name"] == "build.zip" + + async def test_attachment_path_missing_returns_error_no_notify(self): + # If a path doesn't exist on disk, fail fast — never POST notify + # with a half-rendered attachment chip. + import a2a_tools + mc = _make_http_mock() + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_send_message_to_user( + "Hi", attachments=["/no/such/file.zip"], + ) + assert "not found" in result.lower() + # No post calls at all when the path validation fails. + assert mc.post.await_count == 0 + + async def test_attachments_upload_failure_returns_error_no_notify(self, tmp_path): + # Upload endpoint 5xxs — caller returns an error and never fires + # notify. Otherwise the user sees a chat bubble with a broken chip. + import a2a_tools + f = tmp_path / "x.bin" + f.write_bytes(b"x") + upload_resp = _resp(500, {"error": "boom"}) + mc = _make_http_mock() + mc.post = AsyncMock(return_value=upload_resp) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + result = await a2a_tools.tool_send_message_to_user( + "Hi", attachments=[str(f)], + ) + assert "Error" in result + assert "500" in result + # Exactly one POST — the upload — and no notify follow-up. + assert mc.post.await_count == 1 + + async def test_no_attachments_param_omits_attachments_field(self): + # Backwards-compat: callers passing only `message` should not see + # an `attachments` field added to the notify body. + import a2a_tools + mc = _make_http_mock(post_resp=_resp(200, {})) + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + await a2a_tools.tool_send_message_to_user("plain text") + body = mc.post.await_args.kwargs.get("json") or {} + assert body == {"message": "plain text"} + # --------------------------------------------------------------------------- # tool_list_peers