From 99607e0f58e7f5b3d4166c2254c9a9790faae6a9 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Thu, 4 Jun 2026 08:16:02 +0000 Subject: [PATCH] feat(a2a): attachments support in delegate_task and delegate_task_async (#2222) Peer agents can now pass files (images, video, audio, documents) alongside task text when delegating to another workspace. The attachments schema mirrors send_message_to_user: each item needs uri + name; mimeType and size are optional. Changes: - MCP tool schemas for delegate_task / delegate_task_async gain optional attachments array (same shape as send_message_to_user). - toolDelegateTask + toolDelegateTaskAsync parse attachments and emit them as a2a-sdk v1 message parts with kind derived from MIME type. - buildA2AMessageParts helper constructs the parts array: text part first, then file/image/audio/video parts in order. - extractAttachmentsFromMessageParts now accepts video kind (was file/image/audio only), so video attachments round-trip correctly through the A2A envelope. - Tests cover sync + async delegation with video and image attachments, and video part extraction from message bodies. Closes #2222. Co-Authored-By: Claude Opus 4.7 --- .../internal/handlers/activity.go | 2 +- .../handlers/activity_peer_info_test.go | 17 +++ workspace-server/internal/handlers/mcp.go | 52 ++++++++ .../internal/handlers/mcp_test.go | 115 ++++++++++++++++++ .../internal/handlers/mcp_tools.go | 30 ++++- 5 files changed, 213 insertions(+), 3 deletions(-) diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 0b20f932b..e07f442f9 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -152,7 +152,7 @@ func extractAttachmentsFromMessageParts(body map[string]interface{}) []map[strin if kind == "" { kind, _ = part["type"].(string) } - if kind != "file" && kind != "image" && kind != "audio" { + if kind != "file" && kind != "image" && kind != "audio" && kind != "video" { continue } // The file sub-object holds uri/mime_type/name. The a2a-sdk v1 diff --git a/workspace-server/internal/handlers/activity_peer_info_test.go b/workspace-server/internal/handlers/activity_peer_info_test.go index 8c5de4a6e..245ceced4 100644 --- a/workspace-server/internal/handlers/activity_peer_info_test.go +++ b/workspace-server/internal/handlers/activity_peer_info_test.go @@ -118,6 +118,23 @@ func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) { } } +func TestExtractAttachmentsFromRequestBody_VideoPart(t *testing.T) { + // Video parts are accepted in message-parts envelope (issue #2222). + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"kind":"video","file":{"uri":"workspace:clip.mp4","mime_type":"video/mp4","name":"clip.mp4"}} + ]}}}`) + atts := extractAttachmentsFromRequestBody(body) + if len(atts) != 1 { + t.Fatalf("want 1 attachment, got %d", len(atts)) + } + if atts[0]["kind"] != "video" { + t.Errorf("kind: want video, got %v", atts[0]["kind"]) + } + if atts[0]["uri"] != "workspace:clip.mp4" { + t.Errorf("uri mismatch: %v", atts[0]["uri"]) + } +} + func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) { // Legacy v0 shape: type=file (not kind), inlined fields (no nested .file) body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ diff --git a/workspace-server/internal/handlers/mcp.go b/workspace-server/internal/handlers/mcp.go index 960eb5f38..3dbf3ab4c 100644 --- a/workspace-server/internal/handlers/mcp.go +++ b/workspace-server/internal/handlers/mcp.go @@ -126,6 +126,32 @@ var mcpAllTools = []mcpTool{ "type": "string", "description": "The task description to send to the target workspace", }, + "attachments": map[string]interface{}{ + "type": "array", + "description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.", + "items": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "uri": map[string]interface{}{ + "type": "string", + "description": "Workspace attachment URI, usually workspace:/absolute/path", + }, + "name": map[string]interface{}{ + "type": "string", + "description": "Display filename", + }, + "mimeType": map[string]interface{}{ + "type": "string", + "description": "Optional MIME type", + }, + "size": map[string]interface{}{ + "type": "number", + "description": "Optional file size in bytes", + }, + }, + "required": []string{"uri", "name"}, + }, + }, }, "required": []string{"workspace_id", "task"}, }, @@ -144,6 +170,32 @@ var mcpAllTools = []mcpTool{ "type": "string", "description": "The task description to send to the target workspace", }, + "attachments": map[string]interface{}{ + "type": "array", + "description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.", + "items": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "uri": map[string]interface{}{ + "type": "string", + "description": "Workspace attachment URI, usually workspace:/absolute/path", + }, + "name": map[string]interface{}{ + "type": "string", + "description": "Display filename", + }, + "mimeType": map[string]interface{}{ + "type": "string", + "description": "Optional MIME type", + }, + "size": map[string]interface{}{ + "type": "number", + "description": "Optional file size in bytes", + }, + }, + "required": []string{"uri", "name"}, + }, + }, }, "required": []string{"workspace_id", "task"}, }, diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 8fc7492bd..dad564e81 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -285,6 +285,121 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T // goroutine returns early and never calls proxyA2ARequest with a nil/empty // body. Before the fix the goroutine logged the error and fell through, // dispatching a malformed A2A request. + +func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) { + h, mock := newMCPHandler(t) + callerID := "11111111-1111-1111-1111-111111111111" + targetID := "22222222-2222-2222-2222-222222222222" + parentID := "33333333-3333-3333-3333-333333333333" + + expectCanCommunicateSiblings(mock, callerID, targetID, parentID) + mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`). + WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("dispatched", "", callerID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) { + if workspaceID != targetID || proxyCallerID != callerID { + t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID) + } + bodyStr := string(body) + if !strings.Contains(bodyStr, `"text":"review this video"`) { + t.Fatalf("A2A body missing task text: %s", bodyStr) + } + if !strings.Contains(bodyStr, `"kind":"video"`) { + t.Fatalf("A2A body missing video attachment kind: %s", bodyStr) + } + if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/clip.mp4"`) { + t.Fatalf("A2A body missing attachment uri: %s", bodyStr) + } + if !strings.Contains(bodyStr, `"mime_type":"video/mp4"`) { + t.Fatalf("A2A body missing attachment mime_type: %s", bodyStr) + } + return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil + } + + out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{ + "workspace_id": targetID, + "task": "review this video", + "attachments": []interface{}{ + map[string]interface{}{ + "uri": "workspace:/tmp/clip.mp4", + "name": "clip.mp4", + "mimeType": "video/mp4", + "size": 12345, + }, + }, + }, mcpCallTimeout) + if err != nil { + t.Fatalf("delegate_task returned error: %v", err) + } + if out != "done" { + t.Fatalf("delegate_task response = %q, want done", out) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) { + h, mock := newMCPHandler(t) + callerID := "11111111-1111-1111-1111-111111111111" + targetID := "22222222-2222-2222-2222-222222222222" + parentID := "33333333-3333-3333-3333-333333333333" + + expectCanCommunicateSiblings(mock, callerID, targetID, parentID) + mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`). + WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending"). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("dispatched", "", callerID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + called := make(chan []byte, 1) + h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) { + if workspaceID != targetID || proxyCallerID != callerID { + t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID) + } + called <- body + return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil + } + + out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{ + "workspace_id": targetID, + "task": "async work with image", + "attachments": []interface{}{ + map[string]interface{}{ + "uri": "workspace:/tmp/screenshot.png", + "name": "screenshot.png", + "mimeType": "image/png", + }, + }, + }) + if err != nil { + t.Fatalf("delegate_task_async returned error: %v", err) + } + if !strings.Contains(out, `"status":"dispatched"`) { + t.Fatalf("delegate_task_async response = %s", out) + } + waitGlobalAsyncForTest() + select { + case body := <-called: + bodyStr := string(body) + if !strings.Contains(bodyStr, `"kind":"image"`) { + t.Fatalf("A2A body missing image attachment kind: %s", bodyStr) + } + if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/screenshot.png"`) { + t.Fatalf("A2A body missing attachment uri: %s", bodyStr) + } + default: + t.Fatal("async delegate did not call platform A2A proxy") + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) { h, mock := newMCPHandler(t) callerID := "11111111-1111-1111-1111-111111111111" diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index 247071007..3e0f1930a 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -187,6 +187,28 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin return string(b), nil } +// buildA2AMessageParts constructs the A2A message parts array from a task string +// and optional attachments. The text part always comes first; attachment parts +// follow in the order provided, with kind derived from MIME type. +func buildA2AMessageParts(task string, attachments []AgentMessageAttachment) []map[string]interface{} { + parts := []map[string]interface{}{ + {"type": "text", "text": task}, + } + for _, att := range attachments { + kind := kindFromMimeType(att.MimeType) + filePart := map[string]interface{}{ + "kind": kind, + "file": map[string]interface{}{ + "uri": att.URI, + "mime_type": att.MimeType, + "name": att.Name, + }, + } + parts = append(parts, filePart) + } + return parts +} + func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args map[string]interface{}, timeout time.Duration) (string, error) { targetID, _ := args["workspace_id"].(string) task, _ := args["task"].(string) @@ -208,6 +230,8 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args // Non-fatal: still make the A2A call even if activity log write fails. } + attachments, _ := parseAgentMessageAttachments(args["attachments"]) + a2aBody, err := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": uuid.New().String(), @@ -215,7 +239,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args "params": map[string]interface{}{ "message": map[string]interface{}{ "role": "user", - "parts": []map[string]interface{}{{"type": "text", "text": task}}, + "parts": buildA2AMessageParts(task, attachments), "messageId": uuid.New().String(), }, }, @@ -275,6 +299,8 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string, bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout) defer cancel() + attachments, _ := parseAgentMessageAttachments(args["attachments"]) + a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{ "jsonrpc": "2.0", "id": delegationID, @@ -282,7 +308,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string, "params": map[string]interface{}{ "message": map[string]interface{}{ "role": "user", - "parts": []map[string]interface{}{{"type": "text", "text": task}}, + "parts": buildA2AMessageParts(task, attachments), "messageId": uuid.New().String(), }, }, -- 2.52.0