fix(handlers): return after marshal failure in toolDelegateTaskAsync #1949
@@ -280,6 +280,62 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy proves the
|
||||
// extracted #1933 fix: when the A2A body fails to marshal, the detached
|
||||
// goroutine returns early and never calls proxyA2ARequest with a nil/empty
|
||||
// body. Before the fix the goroutine logged the error and fell through,
|
||||
// dispatching a malformed A2A request.
|
||||
func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) {
|
||||
h, mock := newMCPHandler(t)
|
||||
callerID := "11111111-1111-1111-1111-111111111111"
|
||||
targetID := "22222222-2222-2222-2222-222222222222"
|
||||
parentID := "33333333-3333-3333-3333-333333333333"
|
||||
|
||||
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
|
||||
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Force the (otherwise near-impossible) marshal failure for the A2A body.
|
||||
origMarshal := marshalA2ABody
|
||||
marshalA2ABody = func(any) ([]byte, error) {
|
||||
return nil, errors.New("forced marshal failure")
|
||||
}
|
||||
t.Cleanup(func() { marshalA2ABody = origMarshal })
|
||||
|
||||
proxyCalled := make(chan struct{}, 1)
|
||||
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
|
||||
proxyCalled <- struct{}{}
|
||||
return 200, []byte(`{}`), nil
|
||||
}
|
||||
|
||||
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
|
||||
"workspace_id": targetID,
|
||||
"task": "async work",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
|
||||
// Wait for the detached goroutine to finish, then assert the proxy was
|
||||
// never reached because of the early return on marshal failure.
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case <-proxyCalled:
|
||||
t.Fatal("proxyA2ARequest was called after marshal failure; expected early return")
|
||||
default:
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// notifications/initialized
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -20,6 +20,11 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// marshalA2ABody marshals the JSON-RPC body for an async A2A dispatch.
|
||||
// Indirected through a package var so tests can force the (otherwise
|
||||
// near-impossible) marshal-failure path and assert the early return.
|
||||
var marshalA2ABody = json.Marshal
|
||||
|
||||
// insertMCPDelegationRow writes a delegation activity row so the canvas
|
||||
// Agent Comms tab can show the task text for MCP-initiated delegations.
|
||||
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
|
||||
@@ -269,7 +274,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
|
||||
defer cancel()
|
||||
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": delegationID,
|
||||
"method": "message/send",
|
||||
@@ -283,6 +288,9 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
})
|
||||
if marshalErr != nil {
|
||||
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
// Bail out: proceeding would call proxyA2ARequest with a
|
||||
// nil/empty body, dispatching a malformed A2A request.
|
||||
return
|
||||
}
|
||||
|
||||
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
|
||||
|
||||
Reference in New Issue
Block a user