feat(a2a): attachments support in delegate_task and delegate_task_async (#2222) #2228

Merged
claude-ceo-assistant merged 1 commits from fix/2222-a2a-delegate-task-attachments into main 2026-06-04 12:28:35 +00:00
5 changed files with 213 additions and 3 deletions
@@ -152,7 +152,7 @@ func extractAttachmentsFromMessageParts(body map[string]interface{}) []map[strin
if kind == "" {
kind, _ = part["type"].(string)
}
if kind != "file" && kind != "image" && kind != "audio" {
if kind != "file" && kind != "image" && kind != "audio" && kind != "video" {
continue
}
// The file sub-object holds uri/mime_type/name. The a2a-sdk v1
@@ -118,6 +118,23 @@ func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) {
}
}
func TestExtractAttachmentsFromRequestBody_VideoPart(t *testing.T) {
// Video parts are accepted in message-parts envelope (issue #2222).
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"kind":"video","file":{"uri":"workspace:clip.mp4","mime_type":"video/mp4","name":"clip.mp4"}}
]}}}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["kind"] != "video" {
t.Errorf("kind: want video, got %v", atts[0]["kind"])
}
if atts[0]["uri"] != "workspace:clip.mp4" {
t.Errorf("uri mismatch: %v", atts[0]["uri"])
}
}
func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) {
// Legacy v0 shape: type=file (not kind), inlined fields (no nested .file)
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+52
View File
@@ -126,6 +126,32 @@ var mcpAllTools = []mcpTool{
"type": "string",
"description": "The task description to send to the target workspace",
},
"attachments": map[string]interface{}{
"type": "array",
"description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.",
"items": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"uri": map[string]interface{}{
"type": "string",
"description": "Workspace attachment URI, usually workspace:/absolute/path",
},
"name": map[string]interface{}{
"type": "string",
"description": "Display filename",
},
"mimeType": map[string]interface{}{
"type": "string",
"description": "Optional MIME type",
},
"size": map[string]interface{}{
"type": "number",
"description": "Optional file size in bytes",
},
},
"required": []string{"uri", "name"},
},
},
},
"required": []string{"workspace_id", "task"},
},
@@ -144,6 +170,32 @@ var mcpAllTools = []mcpTool{
"type": "string",
"description": "The task description to send to the target workspace",
},
"attachments": map[string]interface{}{
"type": "array",
"description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.",
"items": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"uri": map[string]interface{}{
"type": "string",
"description": "Workspace attachment URI, usually workspace:/absolute/path",
},
"name": map[string]interface{}{
"type": "string",
"description": "Display filename",
},
"mimeType": map[string]interface{}{
"type": "string",
"description": "Optional MIME type",
},
"size": map[string]interface{}{
"type": "number",
"description": "Optional file size in bytes",
},
},
"required": []string{"uri", "name"},
},
},
},
"required": []string{"workspace_id", "task"},
},
@@ -285,6 +285,121 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
// goroutine returns early and never calls proxyA2ARequest with a nil/empty
// body. Before the fix the goroutine logged the error and fell through,
// dispatching a malformed A2A request.
func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
parentID := "33333333-3333-3333-3333-333333333333"
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
if workspaceID != targetID || proxyCallerID != callerID {
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
}
bodyStr := string(body)
if !strings.Contains(bodyStr, `"text":"review this video"`) {
t.Fatalf("A2A body missing task text: %s", bodyStr)
}
if !strings.Contains(bodyStr, `"kind":"video"`) {
t.Fatalf("A2A body missing video attachment kind: %s", bodyStr)
}
if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/clip.mp4"`) {
t.Fatalf("A2A body missing attachment uri: %s", bodyStr)
}
if !strings.Contains(bodyStr, `"mime_type":"video/mp4"`) {
t.Fatalf("A2A body missing attachment mime_type: %s", bodyStr)
}
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
}
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "review this video",
"attachments": []interface{}{
map[string]interface{}{
"uri": "workspace:/tmp/clip.mp4",
"name": "clip.mp4",
"mimeType": "video/mp4",
"size": 12345,
},
},
}, mcpCallTimeout)
if err != nil {
t.Fatalf("delegate_task returned error: %v", err)
}
if out != "done" {
t.Fatalf("delegate_task response = %q, want done", out)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
parentID := "33333333-3333-3333-3333-333333333333"
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
called := make(chan []byte, 1)
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
if workspaceID != targetID || proxyCallerID != callerID {
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
}
called <- body
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
}
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "async work with image",
"attachments": []interface{}{
map[string]interface{}{
"uri": "workspace:/tmp/screenshot.png",
"name": "screenshot.png",
"mimeType": "image/png",
},
},
})
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
waitGlobalAsyncForTest()
select {
case body := <-called:
bodyStr := string(body)
if !strings.Contains(bodyStr, `"kind":"image"`) {
t.Fatalf("A2A body missing image attachment kind: %s", bodyStr)
}
if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/screenshot.png"`) {
t.Fatalf("A2A body missing attachment uri: %s", bodyStr)
}
default:
t.Fatal("async delegate did not call platform A2A proxy")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
@@ -187,6 +187,28 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
return string(b), nil
}
// buildA2AMessageParts constructs the A2A message parts array from a task string
// and optional attachments. The text part always comes first; attachment parts
// follow in the order provided, with kind derived from MIME type.
func buildA2AMessageParts(task string, attachments []AgentMessageAttachment) []map[string]interface{} {
parts := []map[string]interface{}{
{"type": "text", "text": task},
}
for _, att := range attachments {
kind := kindFromMimeType(att.MimeType)
filePart := map[string]interface{}{
"kind": kind,
"file": map[string]interface{}{
"uri": att.URI,
"mime_type": att.MimeType,
"name": att.Name,
},
}
parts = append(parts, filePart)
}
return parts
}
func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args map[string]interface{}, timeout time.Duration) (string, error) {
targetID, _ := args["workspace_id"].(string)
task, _ := args["task"].(string)
@@ -208,6 +230,8 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
// Non-fatal: still make the A2A call even if activity log write fails.
}
attachments, _ := parseAgentMessageAttachments(args["attachments"])
a2aBody, err := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": uuid.New().String(),
@@ -215,7 +239,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]interface{}{{"type": "text", "text": task}},
"parts": buildA2AMessageParts(task, attachments),
"messageId": uuid.New().String(),
},
},
@@ -275,6 +299,8 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
attachments, _ := parseAgentMessageAttachments(args["attachments"])
a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{
"jsonrpc": "2.0",
"id": delegationID,
@@ -282,7 +308,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]interface{}{{"type": "text", "text": task}},
"parts": buildA2AMessageParts(task, attachments),
"messageId": uuid.New().String(),
},
},