From 621d60276c55e3e8936f27b25e3dae372fcfe4e1 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 7 Jun 2026 01:31:23 +0000 Subject: [PATCH 1/2] fix(mcp): sender pushback for delegate_task_async delivery failures (#2244) The async delivery goroutine previously only logged A2A proxy errors and left the delegation status stuck at 'dispatched'. Callers had to poll check_task_status blindly to discover failures. Changes: - Update delegation status to 'failed' with a structured reason (target_offline | http_status | marshal_error) when the async goroutine encounters a non-2xx or transport error. - Update delegation status to 'delivered' on success so callers can distinguish completed dispatches from in-flight ones. - Return 'queued' instead of 'dispatched' from delegate_task_async so the API accurately reflects the async lifecycle. Fixes #2244 --- workspace-server/internal/handlers/mcp_tools.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index 4664de25e..ea72c73a6 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -286,12 +286,12 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string, delegationID := uuid.New().String() // Issue #158: write delegation row so canvas Agent Comms tab shows the task text. - // Insert with 'dispatched' status since the goroutine won't update it. + // Insert with 'queued' status; goroutine updates to delivered or failed. if err := insertMCPDelegationRow(ctx, h.database, callerID, targetID, delegationID, task); err != nil { log.Printf("MCP delegate_task_async: failed to record delegation row: %v", err) // Non-fatal: still fire the A2A call. } else { - updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "dispatched", "") + updateMCPDelegationStatus(ctx, h.database, callerID, delegationID, "queued", "") } // Fire and forget in a detached goroutine. Use a background context so @@ -321,21 +321,28 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string, log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr) // Bail out: proceeding would call proxyA2ARequest with a // nil/empty body, dispatching a malformed A2A request. + updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", fmt.Sprintf("marshal_error: %v", marshalErr)) return } status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true) if err != nil || status < 200 || status >= 300 { + var errorDetail string if err != nil { log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s: %v", targetID, err) + errorDetail = fmt.Sprintf("target_offline: %v", err) } else { log.Printf("MCPHandler.delegate_task_async: A2A proxy to %s returned status %d", targetID, status) + errorDetail = fmt.Sprintf("http_status: %d", status) } + updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "failed", errorDetail) return } + + updateMCPDelegationStatus(bgCtx, h.database, callerID, delegationID, "delivered", "") }) - return fmt.Sprintf(`{"task_id":%q,"status":"dispatched","target_id":%q}`, delegationID, targetID), nil + return fmt.Sprintf(`{"task_id":%q,"status":"queued","target_id":%q}`, delegationID, targetID), nil } func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, args map[string]interface{}) (string, error) { -- 2.52.0 From c5a6df0d857a25946990dbec2b3b1230e6b03b30 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 7 Jun 2026 01:50:27 +0000 Subject: [PATCH 2/2] test(mcp): align async delegation tests with queued/delivered/failed lifecycle (#2384 CR1)\n\nThe delegate_task_async implementation now writes:\n- queued (sync, on initial row insert)\n- delivered (async, on 2xx proxy response)\n- failed (async, on marshal error or non-2xx/transport error)\n\nUpdate the three affected async tests to expect the new contract:\n- TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy\n- TestMCPHandler_DelegateTaskAsync_WithAttachments\n- TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy\n\nFixes race between async goroutine and mock expectations by explicitly\nexpecting the async UPDATE calls before waitGlobalAsyncForTest. --- .../internal/handlers/mcp_test.go | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 9813334a1..4aaf8a4e6 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -291,7 +291,10 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(`UPDATE activity_logs`). - WithArgs("dispatched", "", callerID, sqlmock.AnyArg()). + WithArgs("queued", "", callerID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("delivered", "", callerID, sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 1)) called := make(chan struct{}, 1) @@ -311,7 +314,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T if err != nil { t.Fatalf("delegate_task_async returned error: %v", err) } - if !strings.Contains(out, `"status":"dispatched"`) { + if !strings.Contains(out, `"status":"queued"`) { t.Fatalf("delegate_task_async response = %s", out) } waitGlobalAsyncForTest() @@ -397,7 +400,10 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) { WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(`UPDATE activity_logs`). - WithArgs("dispatched", "", callerID, sqlmock.AnyArg()). + WithArgs("queued", "", callerID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("delivered", "", callerID, sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 1)) called := make(chan []byte, 1) @@ -423,7 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) { if err != nil { t.Fatalf("delegate_task_async returned error: %v", err) } - if !strings.Contains(out, `"status":"dispatched"`) { + if !strings.Contains(out, `"status":"queued"`) { t.Fatalf("delegate_task_async response = %s", out) } waitGlobalAsyncForTest() @@ -455,7 +461,10 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing. WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending"). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(`UPDATE activity_logs`). - WithArgs("dispatched", "", callerID, sqlmock.AnyArg()). + WithArgs("queued", "", callerID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("failed", sqlmock.AnyArg(), callerID, sqlmock.AnyArg()). WillReturnResult(sqlmock.NewResult(0, 1)) // Force the (otherwise near-impossible) marshal failure for the A2A body. @@ -478,7 +487,7 @@ func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing. if err != nil { t.Fatalf("delegate_task_async returned error: %v", err) } - if !strings.Contains(out, `"status":"dispatched"`) { + if !strings.Contains(out, `"status":"queued"`) { t.Fatalf("delegate_task_async response = %s", out) } -- 2.52.0