fix(handlers): return after marshal failure in toolDelegateTaskAsync #1949

Merged
hongming merged 1 commits from fix/delegate-async-return-after-marshal-fail into main 2026-05-27 14:30:13 +00:00
2 changed files with 65 additions and 1 deletions
@@ -280,6 +280,62 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
}
}
// TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy proves the
// extracted #1933 fix: when the A2A body fails to marshal, the detached
// goroutine returns early and never calls proxyA2ARequest with a nil/empty
// body. Before the fix the goroutine logged the error and fell through,
// dispatching a malformed A2A request.
func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
parentID := "33333333-3333-3333-3333-333333333333"
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Force the (otherwise near-impossible) marshal failure for the A2A body.
origMarshal := marshalA2ABody
marshalA2ABody = func(any) ([]byte, error) {
return nil, errors.New("forced marshal failure")
}
t.Cleanup(func() { marshalA2ABody = origMarshal })
proxyCalled := make(chan struct{}, 1)
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
proxyCalled <- struct{}{}
return 200, []byte(`{}`), nil
}
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "async work",
})
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
// Wait for the detached goroutine to finish, then assert the proxy was
// never reached because of the early return on marshal failure.
waitGlobalAsyncForTest()
select {
case <-proxyCalled:
t.Fatal("proxyA2ARequest was called after marshal failure; expected early return")
default:
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// notifications/initialized
// ─────────────────────────────────────────────────────────────────────────────
@@ -20,6 +20,11 @@ import (
"github.com/google/uuid"
)
// marshalA2ABody marshals the JSON-RPC body for an async A2A dispatch.
// Indirected through a package var so tests can force the (otherwise
// near-impossible) marshal-failure path and assert the early return.
var marshalA2ABody = json.Marshal
// insertMCPDelegationRow writes a delegation activity row so the canvas
// Agent Comms tab can show the task text for MCP-initiated delegations.
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
@@ -269,7 +274,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{
"jsonrpc": "2.0",
"id": delegationID,
"method": "message/send",
@@ -283,6 +288,9 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
})
if marshalErr != nil {
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
// Bail out: proceeding would call proxyA2ARequest with a
// nil/empty body, dispatching a malformed A2A request.
return
}
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)