fix(mcp): sender pushback for delegate_task_async delivery failures (#2244) #2384
@@ -291,7 +291,10 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("delivered", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
called := make(chan struct{}, 1)
|
||||
@@ -311,7 +314,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
if !strings.Contains(out, `"status":"queued"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
waitGlobalAsyncForTest()
|
||||
@@ -397,7 +400,10 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("delivered", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
called := make(chan []byte, 1)
|
||||
@@ -423,7 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
if !strings.Contains(out, `"status":"queued"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
waitGlobalAsyncForTest()
|
||||
@@ -455,7 +461,10 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.
|
||||
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
|
||||
WithArgs("queued", "", callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("failed", sqlmock.AnyArg(), callerID, sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Force the (otherwise near-impossible) marshal failure for the A2A body.
|
||||
@@ -478,7 +487,7 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.
|
||||
if err != nil {
|
||||
t.Fatalf("delegate_task_async returned error: %v", err)
|
||||
}
|
||||
if !strings.Contains(out, `"status":"dispatched"`) {
|
||||
if !strings.Contains(out, `"status":"queued"`) {
|
||||
t.Fatalf("delegate_task_async response = %s", out)
|
||||
}
|
||||
|
||||
|
||||
@@ -286,12 +286,12 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
delegationID := uuid.New().String()
|
||||
|
||||
// Issue #158: write delegation row so canvas Agent Comms tab shows the task text.
|
||||
// Insert with 'dispatched' status since the goroutine won't update it.
|
||||
// Insert with 'queued' status; goroutine updates to delivered or failed.
|
||||
if err := insertMCPDelegationRow(ctx, h.database, callerID, targetID, delegationID, task); err != nil {
|
||||
log.Printf("MCP delegate_task_async: failed to record delegation row: %v", err)
|
||||
// Non-fatal: still fire the A2A call.
|
||||
} else {
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "dispatched", "")
|
||||
updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "queued", "")
|
||||
}
|
||||
|
||||
// Fire and forget in a detached goroutine. Use a background context so
|
||||
@@ -321,21 +321,28 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
|
||||
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
// Bail out: proceeding would call proxyA2ARequest with a
|
||||
// nil/empty body, dispatching a malformed A2A request.
|
||||
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", fmt.Sprintf("marshal_error: %v", marshalErr))
|
||||
return
|
||||
}
|
||||
|
||||
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
|
||||
if err != nil || status < 200 || status >= 300 {
|
||||
var errorDetail string
|
||||
if err != nil {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s: %v", targetID, err)
|
||||
errorDetail = fmt.Sprintf("target_offline: %v", err)
|
||||
} else {
|
||||
log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s returned status %d", targetID, status)
|
||||
errorDetail = fmt.Sprintf("http_status: %d", status)
|
||||
}
|
||||
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", errorDetail)
|
||||
return
|
||||
}
|
||||
|
||||
updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "delivered", "")
|
||||
})
|
||||
|
||||
return fmt.Sprintf(`{"task_id":%q,"status":"dispatched","target_id":%q}`, delegationID, targetID), nil
|
||||
return fmt.Sprintf(`{"task_id":%q,"status":"queued","target_id":%q}`, delegationID, targetID), nil
|
||||
}
|
||||
|
||||
func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, args map[string]interface{}) (string, error) {
|
||||
|
||||
Reference in New Issue
Block a user