Merge pull request #2130 from Molecule-AI/feat/agent-to-user-attachments

feat(notify): agent → user file attachments via send_message_to_user
This commit is contained in:
Hongming Wang 2026-04-27 03:13:20 +00:00 committed by GitHub
commit 954d7d9182
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 415 additions and 13 deletions

View File

@ -395,7 +395,34 @@ export function handleCanvasEvent(
case "AGENT_MESSAGE": {
const content = (msg.payload.message as string) ?? "";
if (content) {
// Attachments come straight through from the platform's Notify
// handler when the agent's tool_send_message_to_user passes file
// refs. Shape mirrors NotifyAttachment in activity.go and matches
// ChatTab's createMessage(role, content, attachments) signature
// exactly, so no adapter needed downstream.
const rawAttachments = msg.payload.attachments;
const attachments = Array.isArray(rawAttachments)
? (rawAttachments as Array<{ uri?: unknown; name?: unknown; mimeType?: unknown; size?: unknown }>)
// Reject empty strings as well as non-strings — server-side
// gin validation does NOT enforce binding:"required" on
// slice-element struct fields without `dive` (which the
// notify handler does not use), so a malformed broadcast
// could carry uri:"" or name:"". Defence-in-depth: drop
// those here so the chat doesn't render a blank/broken chip.
.filter((a) =>
typeof a?.uri === "string" && a.uri.length > 0 &&
typeof a?.name === "string" && a.name.length > 0,
)
.map((a) => ({
uri: a.uri as string,
name: a.name as string,
mimeType: typeof a.mimeType === "string" ? a.mimeType : undefined,
size: typeof a.size === "number" ? a.size : undefined,
}))
: undefined;
// Skip when both content and attachments are empty — pure-noise
// event we don't want to render as a blank bubble.
if (content || (attachments && attachments.length > 0)) {
const { agentMessages } = get();
const existing = agentMessages[msg.workspace_id] || [];
set({
@ -403,7 +430,12 @@ export function handleCanvasEvent(
...agentMessages,
[msg.workspace_id]: [
...existing,
{ id: crypto.randomUUID(), content, timestamp: new Date().toISOString() },
{
id: crypto.randomUUID(),
content,
timestamp: new Date().toISOString(),
...(attachments && attachments.length > 0 ? { attachments } : {}),
},
],
},
});

View File

@ -257,19 +257,54 @@ func scanSessionSearchRows(rows interface {
return items, nil
}
// NotifyAttachment is one file the agent wants to attach to its push.
// URIs come from /workspaces/:id/chat/uploads (canonical "workspace:"
// scheme) — the runtime's tool_send_message_to_user uploads any
// caller-specified file path through that endpoint first to get a
// shape the canvas can resolve via the existing Download path.
type NotifyAttachment struct {
URI string `json:"uri" binding:"required"`
Name string `json:"name" binding:"required"`
MimeType string `json:"mimeType,omitempty"`
Size int64 `json:"size,omitempty"`
}
// Notify handles POST /workspaces/:id/notify — agents push messages to the canvas chat.
// This enables agents to send interim updates ("I'll check on it") and follow-up results
// without waiting for the user to poll. Messages are broadcast via WebSocket only.
//
// Attachments: optional list of file references. Each renders as a
// download chip in the canvas via the existing extractFilesFromTask
// path. The runtime tool uploads file bytes to /chat/uploads first
// and passes the returned URIs here, so this handler only stores
// metadata — never raw bytes.
func (h *ActivityHandler) Notify(c *gin.Context) {
workspaceID := c.Param("id")
var body struct {
Message string `json:"message" binding:"required"`
Message string `json:"message" binding:"required"`
Attachments []NotifyAttachment `json:"attachments,omitempty"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
return
}
// Per-element attachment validation: gin's go-playground/validator
// does NOT iterate slice elements without `dive`, so the inner
// `binding:"required"` tags on NotifyAttachment.URI/Name don't
// actually run. Without this loop, attachments: [{"uri":"","name":""}]
// would slip through, broadcast empty-URI chips that render
// blank/broken in the canvas, and persist them in activity_logs
// for every page reload to re-render. Validate explicitly.
for i, a := range body.Attachments {
if a.URI == "" || a.Name == "" {
c.JSON(http.StatusBadRequest, gin.H{
"error": fmt.Sprintf("attachment[%d]: uri and name are required", i),
})
return
}
}
// Verify workspace exists
var wsName string
err := db.DB.QueryRowContext(c.Request.Context(),
@ -280,11 +315,15 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
return
}
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", map[string]interface{}{
broadcastPayload := map[string]interface{}{
"message": body.Message,
"workspace_id": workspaceID,
"name": wsName,
})
}
if len(body.Attachments) > 0 {
broadcastPayload["attachments"] = body.Attachments
}
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload)
// Persist to activity_logs so the chat history loader restores this
// message after a page reload. Pre-fix, send_message_to_user pushes
@ -305,7 +344,30 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
// sees the message; persistence failure just means the message
// won't survive reload (pre-fix behavior). Don't fail the whole
// notify on a DB hiccup.
respJSON, _ := json.Marshal(map[string]interface{}{"result": body.Message})
// response_body shape — chosen to feed BOTH:
// - extractResponseText: looks at body.result (string) and returns it
// - extractFilesFromTask: looks at body.parts[] for kind=file
// so a chat reload after a notify-with-attachments restores both
// the text bubble AND the download chips.
respPayload := map[string]interface{}{"result": body.Message}
if len(body.Attachments) > 0 {
fileParts := make([]map[string]interface{}, 0, len(body.Attachments))
for _, a := range body.Attachments {
fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name}
if a.MimeType != "" {
fileMeta["mimeType"] = a.MimeType
}
if a.Size > 0 {
fileMeta["size"] = a.Size
}
fileParts = append(fileParts, map[string]interface{}{
"kind": "file",
"file": fileMeta,
})
}
respPayload["parts"] = fileParts
}
respJSON, _ := json.Marshal(respPayload)
preview := body.Message
if len(preview) > 80 {
preview = preview[:80] + "…"

View File

@ -2,6 +2,7 @@ package handlers
import (
"bytes"
"database/sql/driver"
"encoding/json"
"fmt"
"net/http"
@ -265,6 +266,128 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
}
}
func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
// Pins the response_body shape: must include {result: msg, parts: [{kind:"file", file: {...}}]}
// so the chat history loader's extractFilesFromTask reconstructs the
// download chips after a page reload. Without `parts`, the bubble
// shows up but the attachment chip is silently dropped on every
// refresh.
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
mock.ExpectQuery(`SELECT name FROM workspaces`).
WithArgs("ws-attach").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
// Capture the JSONB arg so we can assert on the persisted shape
// AFTER the call (must include parts[].kind=file so reload
// reconstructs download chips). Use AnyArg() for the binding
// gate — the substring asserts below are what actually validate
// the shape; a custom matcher that always returned true would
// be misleading about which step does the gating.
var capturedRespJSON string
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs("ws-attach", sqlmock.AnyArg(), sqlmockCaptureArg(&capturedRespJSON)).
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-attach"}}
body := `{
"message": "Here's the build:",
"attachments": [
{"uri": "workspace:/workspace/.molecule/chat-uploads/abc-build.zip",
"name": "build.zip", "mimeType": "application/zip", "size": 12345}
]
}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-attach/notify", strings.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Notify(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations not met: %v", err)
}
// Verify the persisted response_body has both the text (so chat
// reload renders the bubble) AND a parts[].kind=file (so reload
// renders the download chip).
if !strings.Contains(capturedRespJSON, `"result":"Here's the build:"`) {
t.Errorf("response_body missing result text: %s", capturedRespJSON)
}
if !strings.Contains(capturedRespJSON, `"kind":"file"`) ||
!strings.Contains(capturedRespJSON, `"name":"build.zip"`) ||
!strings.Contains(capturedRespJSON, `workspace:/workspace/.molecule/chat-uploads/abc-build.zip`) {
t.Errorf("response_body missing file part — chat reload won't render the chip: %s", capturedRespJSON)
}
}
func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) {
// Critical regression guard. gin's go-playground/validator does NOT
// iterate slice elements without `dive`, so `binding:"required"` on
// NotifyAttachment.URI/Name would silently fail to enforce on
// `attachments: [{"uri":"","name":""}]`. Without this explicit
// per-element check, the platform broadcasts empty-URI chips that
// render blank in the canvas AND get persisted in activity_logs
// for every page reload to re-render. Pre-fix: passed validation.
cases := []struct {
name string
body string
}{
{"empty uri", `{"message":"hi","attachments":[{"uri":"","name":"file.zip"}]}`},
{"empty name", `{"message":"hi","attachments":[{"uri":"workspace:/x","name":""}]}`},
{"both empty", `{"message":"hi","attachments":[{"uri":"","name":""}]}`},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mockDB, _, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
// No DB expectations — handler must reject with 400 BEFORE
// reaching SELECT/INSERT. sqlmock will fail "expectations not met"
// only if the handler unexpectedly queries.
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-x"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-x/notify", strings.NewReader(tc.body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Notify(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for %s, got %d: %s", tc.name, w.Code, w.Body.String())
}
})
}
}
// sqlmockCaptureArg returns an sqlmock.Argument that always matches AND
// writes the string-coerced driver value into `dst`. Lets a test
// inspect the actual JSON bytes written to a JSONB column without
// pretending to enforce shape — that's what the downstream substring
// asserts in the test body do.
func sqlmockCaptureArg(dst *string) sqlmock.Argument {
return sqlmockArgFn(func(v driver.Value) bool {
if s, ok := v.(string); ok {
*dst = s
}
return true
})
}
type sqlmockArgFn func(driver.Value) bool
func (f sqlmockArgFn) Match(v driver.Value) bool { return f(v) }
func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
// Persistence is best-effort — a DB hiccup must NOT block the
// WebSocket push (which the user is already seeing in their open

View File

@ -113,13 +113,18 @@ TOOLS = [
},
{
"name": "send_message_to_user",
"description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes. The message appears in the user's chat as if you're proactively reaching out.",
"description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download. The message appears in the user's chat as if you're proactively reaching out.",
"inputSchema": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The message to send to the user",
"description": "The message to send to the user. Required even when sending attachments — set to a short caption like 'Here's the build:' or 'Done — see attached.'",
},
"attachments": {
"type": "array",
"description": "Optional list of absolute file paths inside this container to attach. Each renders as a clickable download chip in the user's chat. Use this whenever you'd otherwise paste a path in the message text — paths render as plain text the user can't click. Examples: ['/tmp/build-output.zip'] or ['/workspace/report.pdf', '/workspace/data.csv']. Files are uploaded through the platform's chat-uploads endpoint (25 MB per file cap).",
"items": {"type": "string"},
},
},
"required": ["message"],
@ -185,7 +190,17 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
arguments.get("task_id", ""),
)
elif name == "send_message_to_user":
return await tool_send_message_to_user(arguments.get("message", ""))
raw_attachments = arguments.get("attachments")
attachments: list[str] | None = None
if isinstance(raw_attachments, list):
# Defensive: filter to strings only — claude-code SDK occasionally
# emits dicts here when the model misreads the schema. Drop the
# bad entries rather than 500 the whole call.
attachments = [p for p in raw_attachments if isinstance(p, str) and p]
return await tool_send_message_to_user(
arguments.get("message", ""),
attachments=attachments,
)
elif name == "list_peers":
return await tool_list_peers()
elif name == "get_workspace_info":

View File

@ -5,6 +5,7 @@ Imports shared client functions and constants from a2a_client.
import hashlib
import json
import mimetypes
import os
import uuid
@ -285,18 +286,100 @@ async def tool_check_task_status(workspace_id: str, task_id: str) -> str:
return f"Error checking delegations: {e}"
async def tool_send_message_to_user(message: str) -> str:
"""Send a message directly to the user's canvas chat via WebSocket."""
async def _upload_chat_files(client: httpx.AsyncClient, paths: list[str]) -> tuple[list[dict], str | None]:
"""Upload local file paths through /workspaces/<self>/chat/uploads.
The platform stages each upload under /workspace/.molecule/chat-uploads
(an "allowed root" the canvas knows how to render via the Download
endpoint) and returns metadata the broadcast payload references.
Why we route through upload instead of just passing the agent's path:
the canvas's allowed-root list is /configs, /workspace, /home, /plugins
files at /tmp or /root would be unreachable. Uploading copies the
bytes into an allowed root regardless of where the agent wrote them.
Returns (attachments, error). On any failure the caller should NOT
fire the notify partial-attach would surface a half-rendered chip.
"""
if not paths:
return [], None
files_payload: list[tuple[str, tuple[str, bytes, str]]] = []
for p in paths:
if not isinstance(p, str) or not p:
return [], f"Error: invalid attachment path {p!r}"
if not os.path.isfile(p):
return [], f"Error: attachment not found: {p}"
try:
with open(p, "rb") as fh:
data = fh.read()
except OSError as e:
return [], f"Error reading {p}: {e}"
# Sniff mime from filename so the canvas can pick the right
# icon / preview / inline-image renderer. Pre-fix this was
# hardcoded application/octet-stream and chat_files.go's
# Upload trusts whatever Content-Type the multipart part
# carries — `mt := fh.Header.Get("Content-Type")` only falls
# back to extension-sniffing when the header is empty. So a
# hardcoded octet-stream meant every attachment lost its
# real type forever, breaking the canvas chip's icon logic.
mime_type, _ = mimetypes.guess_type(p)
if not mime_type:
mime_type = "application/octet-stream"
files_payload.append(("files", (os.path.basename(p), data, mime_type)))
try:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/chat/uploads",
files=files_payload,
headers=_auth_headers_for_heartbeat(),
)
except Exception as e:
return [], f"Error uploading attachments: {e}"
if resp.status_code != 200:
return [], f"Error: chat/uploads returned {resp.status_code}: {resp.text[:200]}"
try:
body = resp.json()
except Exception as e:
return [], f"Error parsing upload response: {e}"
uploaded = body.get("files") or []
if not isinstance(uploaded, list) or len(uploaded) != len(paths):
return [], f"Error: upload returned {len(uploaded) if isinstance(uploaded, list) else 'invalid'} entries for {len(paths)} files"
return uploaded, None
async def tool_send_message_to_user(message: str, attachments: list[str] | None = None) -> str:
"""Send a message directly to the user's canvas chat via WebSocket.
Args:
message: The text to display in the user's chat. Required even
when sending attachments set to a short caption like
"Here's the build output:" or "Done — see attached."
attachments: Optional list of absolute file paths inside this
container. Each is uploaded to the platform and rendered
in the canvas as a clickable download chip. Use this
instead of pasting paths in the message text paths
render as plain text and the user can't click them.
Examples:
attachments=["/tmp/build-output.zip"]
attachments=["/workspace/report.pdf", "/workspace/data.csv"]
"""
if not message:
return "Error: message is required"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
async with httpx.AsyncClient(timeout=60.0) as client:
uploaded, upload_err = await _upload_chat_files(client, attachments or [])
if upload_err:
return upload_err
payload: dict = {"message": message}
if uploaded:
payload["attachments"] = uploaded
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/notify",
json={"message": message},
json=payload,
headers=_auth_headers_for_heartbeat(),
)
if resp.status_code == 200:
if uploaded:
return f"Message sent to user with {len(uploaded)} attachment(s)"
return "Message sent to user"
return f"Error: platform returned {resp.status_code}"
except Exception as e:

View File

@ -370,6 +370,93 @@ class TestToolSendMessageToUser:
assert "Error sending message" in result
assert "platform unreachable" in result
# --- attachments ---
async def test_attachments_uploads_then_notifies_with_uris(self, tmp_path):
import a2a_tools
# Create a real file the tool will read off disk.
f = tmp_path / "build.zip"
f.write_bytes(b"zip-bytes-here")
# Mock client: first POST = chat/uploads (returns file metadata),
# second POST = notify.
upload_resp = _resp(200, {
"files": [{
"uri": "workspace:/workspace/.molecule/chat-uploads/abc-build.zip",
"name": "build.zip",
"mimeType": "application/zip",
"size": len(b"zip-bytes-here"),
}],
})
notify_resp = _resp(200, {})
mc = _make_http_mock(post_resp=notify_resp)
mc.post = AsyncMock(side_effect=[upload_resp, notify_resp])
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_send_message_to_user(
"Done — see attached.",
attachments=[str(f)],
)
assert "1 attachment" in result
# Verify the notify call carried attachment metadata, not bytes.
# Locate the call by URL suffix, not by index — a future refactor
# in _upload_chat_files that adds a pre-flight call would silently
# shift the array index and the assert would target the wrong call.
notify_calls = [
c for c in mc.post.await_args_list
if c.args and isinstance(c.args[0], str) and c.args[0].endswith("/notify")
]
assert len(notify_calls) == 1, f"expected 1 notify POST, got {len(notify_calls)}"
notify_body = notify_calls[0].kwargs.get("json") or {}
assert notify_body.get("message") == "Done — see attached."
assert len(notify_body.get("attachments", [])) == 1
att = notify_body["attachments"][0]
assert att["uri"].startswith("workspace:/workspace/")
assert att["name"] == "build.zip"
async def test_attachment_path_missing_returns_error_no_notify(self):
# If a path doesn't exist on disk, fail fast — never POST notify
# with a half-rendered attachment chip.
import a2a_tools
mc = _make_http_mock()
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_send_message_to_user(
"Hi", attachments=["/no/such/file.zip"],
)
assert "not found" in result.lower()
# No post calls at all when the path validation fails.
assert mc.post.await_count == 0
async def test_attachments_upload_failure_returns_error_no_notify(self, tmp_path):
# Upload endpoint 5xxs — caller returns an error and never fires
# notify. Otherwise the user sees a chat bubble with a broken chip.
import a2a_tools
f = tmp_path / "x.bin"
f.write_bytes(b"x")
upload_resp = _resp(500, {"error": "boom"})
mc = _make_http_mock()
mc.post = AsyncMock(return_value=upload_resp)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
result = await a2a_tools.tool_send_message_to_user(
"Hi", attachments=[str(f)],
)
assert "Error" in result
assert "500" in result
# Exactly one POST — the upload — and no notify follow-up.
assert mc.post.await_count == 1
async def test_no_attachments_param_omits_attachments_field(self):
# Backwards-compat: callers passing only `message` should not see
# an `attachments` field added to the notify body.
import a2a_tools
mc = _make_http_mock(post_resp=_resp(200, {}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
await a2a_tools.tool_send_message_to_user("plain text")
body = mc.post.await_args.kwargs.get("json") or {}
assert body == {"message": "plain text"}
# ---------------------------------------------------------------------------
# tool_list_peers