feat(a2a): attachments support in delegate_task and delegate_task_async (#2222) #2228
@@ -152,7 +152,7 @@ func extractAttachmentsFromMessageParts(body map[string]interface{}) []map[strin
|
||||
if kind == "" {
|
||||
kind, _ = part["type"].(string)
|
||||
}
|
||||
if kind != "file" && kind != "image" && kind != "audio" {
|
||||
if kind != "file" && kind != "image" && kind != "audio" && kind != "video" {
|
||||
continue
|
||||
}
|
||||
// The file sub-object holds uri/mime_type/name. The a2a-sdk v1
|
||||
|
||||
@@ -118,6 +118,23 @@ func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_VideoPart(t *testing.T) {
|
||||
// Video parts are accepted in message-parts envelope (issue #2222).
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
{"kind":"video","file":{"uri":"workspace:clip.mp4","mime_type":"video/mp4","name":"clip.mp4"}}
|
||||
]}}}`)
|
||||
atts := extractAttachmentsFromRequestBody(body)
|
||||
if len(atts) != 1 {
|
||||
t.Fatalf("want 1 attachment, got %d", len(atts))
|
||||
}
|
||||
if atts[0]["kind"] != "video" {
|
||||
t.Errorf("kind: want video, got %v", atts[0]["kind"])
|
||||
}
|
||||
if atts[0]["uri"] != "workspace:clip.mp4" {
|
||||
t.Errorf("uri mismatch: %v", atts[0]["uri"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) {
|
||||
// Legacy v0 shape: type=file (not kind), inlined fields (no nested .file)
|
||||
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
|
||||
|
||||
@@ -126,6 +126,32 @@ var mcpAllTools = []mcpTool{
|
||||
"type": "string",
|
||||
"description": "The task description to send to the target workspace",
|
||||
},
|
||||
"attachments": map[string]interface{}{
|
||||
"type": "array",
|
||||
"description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.",
|
||||
"items": map[string]interface{}{
|
||||
"type": "object",
|
||||
"properties": map[string]interface{}{
|
||||
"uri": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Workspace attachment URI, usually workspace:/absolute/path",
|
||||
},
|
||||
"name": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Display filename",
|
||||
},
|
||||
"mimeType": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Optional MIME type",
|
||||
},
|
||||
"size": map[string]interface{}{
|
||||
"type": "number",
|
||||
"description": "Optional file size in bytes",
|
||||
},
|
||||
},
|
||||
"required": []string{"uri", "name"},
|
||||
},
|
||||
},
|
||||
},
|
||||
"required": []string{"workspace_id", "task"},
|
||||
},
|
||||
@@ -144,6 +170,32 @@ var mcpAllTools = []mcpTool{
|
||||
"type": "string",
|
||||
"description": "The task description to send to the target workspace",
|
||||
},
|
||||
"attachments": map[string]interface{}{
|
||||
"type": "array",
|
||||
"description": "Optional files to send with the task. Each item must include uri and name; mimeType and size are optional.",
|
||||
"items": map[string]interface{}{
|
||||
"type": "object",
|
||||
"properties": map[string]interface{}{
|
||||
"uri": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Workspace attachment URI, usually workspace:/absolute/path",
|
||||
},
|
||||
"name": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Display filename",
|
||||
},
|
||||
"mimeType": map[string]interface{}{
|
||||
"type": "string",
|
||||
"description": "Optional MIME type",
|
||||
},
|
||||
"size": map[string]interface{}{
|
||||
"type": "number",
|
||||
"description": "Optional file size in bytes",
|
||||
},
|
||||
},
|
||||
"required": []string{"uri", "name"},
|
||||
},
|
||||
},
|
||||
},
|
||||
"required": []string{"workspace_id", "task"},
|
||||
},
|
||||
|
||||
@@ -285,6 +285,121 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
// goroutine returns early and never calls proxyA2ARequest with a nil/empty
|
||||
// body. Before the fix the goroutine logged the error and fell through,
|
||||
// dispatching a malformed A2A request.
|
||||
|
||||
func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"text":"review this video"`) {
|
||||
t.Fatalf("A2A body missing task text: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"kind":"video"`) {
|
||||
t.Fatalf("A2A body missing video attachment kind: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/clip.mp4"`) {
|
||||
t.Fatalf("A2A body missing attachment uri: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"mime_type":"video/mp4"`) {
|
||||
t.Fatalf("A2A body missing attachment mime_type: %s", bodyStr)
|
||||
}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "review this video",
|
||||
"attachments": []interface{}{
|
||||
map[string]interface{}{
|
||||
"uri": "workspace:/tmp/clip.mp4",
|
||||
"name": "clip.mp4",
|
||||
"mimeType": "video/mp4",
|
||||
"size": 12345,
|
||||
},
|
||||
},
|
||||
}, mcpCallTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task returned error: %v", err)
|
||||
}
|
||||
if out != "done" {
|
||||
t.Fatalf("delegate_task response = %q, want done", out)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
called := make(chan []byte, 1)
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
called <- body
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "async work with image",
|
||||
"attachments": []interface{}{
|
||||
map[string]interface{}{
|
||||
"uri": "workspace:/tmp/screenshot.png",
|
||||
"name": "screenshot.png",
|
||||
"mimeType": "image/png",
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case body := <-called:
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"kind":"image"`) {
|
||||
t.Fatalf("A2A body missing image attachment kind: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"uri":"workspace:/tmp/screenshot.png"`) {
|
||||
t.Fatalf("A2A body missing attachment uri: %s", bodyStr)
|
||||
}
|
||||
default:
|
||||
t.Fatal("async delegate did not call platform A2A proxy")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
|
||||
@@ -187,6 +187,28 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
|
||||
return string(b), nil
|
||||
}
|
||||
|
||||
// buildA2AMessageParts constructs the A2A message parts array from a task string
|
||||
// and optional attachments. The text part always comes first; attachment parts
|
||||
// follow in the order provided, with kind derived from MIME type.
|
||||
func buildA2AMessageParts(task string, attachments []AgentMessageAttachment) []map[string]interface{} {
|
||||
parts := []map[string]interface{}{
|
||||
{"type": "text", "text": task},
|
||||
}
|
||||
for _, att := range attachments {
|
||||
kind := kindFromMimeType(att.MimeType)
|
||||
filePart := map[string]interface{}{
|
||||
"kind": kind,
|
||||
"file": map[string]interface{}{
|
||||
"uri": att.URI,
|
||||
"mime_type": att.MimeType,
|
||||
"name": att.Name,
|
||||
},
|
||||
}
|
||||
parts = append(parts, filePart)
|
||||
}
|
||||
return parts
|
||||
}
|
||||
|
||||
func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args map[string]interface{}, timeout time.Duration) (string, error) {
|
||||
targetID, _ := args["workspace_id"].(string)
|
||||
task, _ := args["task"].(string)
|
||||
@@ -208,6 +230,8 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
// Non-fatal: still make the A2A call even if activity log write fails.
|
||||
}
|
||||
|
||||
attachments, _ := parseAgentMessageAttachments(args["attachments"])
|
||||
|
||||
a2aBody, err := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": uuid.New().String(),
|
||||
@@ -215,7 +239,7 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": task}},
|
||||
"parts": buildA2AMessageParts(task, attachments),
|
||||
"messageId": uuid.New().String(),
|
||||
},
|
||||
},
|
||||
@@ -275,6 +299,8 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
attachments, _ := parseAgentMessageAttachments(args["attachments"])
|
||||
|
||||
a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": delegationID,
|
||||
@@ -282,7 +308,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]interface{}{{"type": "text", "text": task}},
|
||||
"parts": buildA2AMessageParts(task, attachments),
|
||||
"messageId": uuid.New().String(),
|
||||
},
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user