fix(mcp): sender pushback for delegate_task_async delivery failures (#2244) #2384

Merged
devops-engineer merged 2 commits from fix/delegate-task-async-sender-pushback-2244 into main 2026-06-07 20:57:59 +00:00
2 changed files with 25 additions and 9 deletions
+15 -6
View File
@@ -291,7 +291,10 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("delivered", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
called := make(chan struct{}, 1)
@@ -311,7 +314,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
if !strings.Contains(out, `"status":"queued"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
waitGlobalAsyncForTest()
@@ -397,7 +400,10 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("delivered", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
called := make(chan []byte, 1)
@@ -423,7 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
if !strings.Contains(out, `"status":"queued"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
waitGlobalAsyncForTest()
@@ -455,7 +461,10 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("failed", sqlmock.AnyArg(), callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Force the (otherwise near-impossible) marshal failure for the A2A body.
@@ -478,7 +487,7 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
if !strings.Contains(out, `"status":"queued"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
@@ -286,12 +286,12 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
delegationID := uuid.New().String()
// Issue #158: write delegation row so canvas Agent Comms tab shows the task text.
// Insert with 'dispatched' status since the goroutine won't update it.
// Insert with 'queued' status; goroutine updates to delivered or failed.
if err := insertMCPDelegationRow(ctx, h.database, callerID, targetID, delegationID, task); err != nil {
log.Printf("MCP delegate_task_async: failed to record delegation row: %v", err)
// Non-fatal: still fire the A2A call.
} else {
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "dispatched", "")
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "queued", "")
}
// Fire and forget in a detached goroutine. Use a background context so
@@ -321,21 +321,28 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
// Bail out: proceeding would call proxyA2ARequest with a
// nil/empty body, dispatching a malformed A2A request.
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", fmt.Sprintf("marshal_error: %v", marshalErr))
return
}
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
if err != nil || status < 200 || status >= 300 {
var errorDetail string
if err != nil {
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s: %v", targetID, err)
errorDetail = fmt.Sprintf("target_offline: %v", err)
} else {
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s returned status %d", targetID, status)
errorDetail = fmt.Sprintf("http_status: %d", status)
}
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", errorDetail)
return
}
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "delivered", "")
})
return fmt.Sprintf(`{"task_id":%q,"status":"dispatched","target_id":%q}`, delegationID, targetID), nil
return fmt.Sprintf(`{"task_id":%q,"status":"queued","target_id":%q}`, delegationID, targetID), nil
}
func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, args map[string]interface{}) (string, error) {