diff --git a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx index 909616bf..03dc97a3 100644 --- a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx +++ b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx @@ -59,6 +59,34 @@ function resolveName(id: string): string { } export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null { + // delegation activity rows are written by the platform's /delegate + // handler. They're always outbound from this workspace's POV (the + // platform proxies the A2A on our behalf). Two methods: + // - "delegate" — the initial outbound; status pending/dispatched + // - "delegate_result" — the eventual reply; status completed/queued/failed + // We surface them in Agent Comms because they ARE agent-to-agent + // calls; without this branch they'd be dropped by the activity_type + // filter and the user would see "No agent-to-agent communications yet" + // even when the director made delegations. + if (entry.activity_type === "delegation") { + const peerId = entry.target_id || ""; + if (!peerId) return null; + return { + id: entry.id, + flow: "out", + peerName: resolveName(peerId), + peerId, + // Prefer summary (set by the platform with a human-readable + // string like "Delegating to X" or "Delegation queued — target + // at capacity"). Fall back to request body for older rows that + // pre-date the summary column being populated. + text: entry.summary || extractRequestText(entry.request_body) || "(delegation)", + responseText: entry.response_body ? extractResponseText(entry.response_body) : null, + status: entry.status || "ok", + timestamp: entry.created_at, + }; + } + // a2a_receive activity rows come in two shapes: // // 1. Real incoming call (a peer called us): source_id = the peer, @@ -132,7 +160,11 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { api.get(`/workspaces/${workspaceId}/activity?source=agent&limit=50`) .then((entries) => { const filtered = (entries ?? []) - .filter((e) => e.activity_type === "a2a_send" || e.activity_type === "a2a_receive") + .filter((e) => + e.activity_type === "a2a_send" || + e.activity_type === "a2a_receive" || + e.activity_type === "delegation", + ) .reverse(); const msgs: CommMessage[] = []; for (const e of filtered) { @@ -186,7 +218,7 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { const type = p.activity_type as string; const sourceId = p.source_id as string | null; if (!sourceId) return; // canvas-initiated, not agent comms - if (type !== "a2a_send" && type !== "a2a_receive") return; + if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return; const entry: ActivityEntry = { id: p.id as string || crypto.randomUUID(), diff --git a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts index fc2eb70f..2e421ed4 100644 --- a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts +++ b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts @@ -110,4 +110,65 @@ describe("toCommMessage — flow derivation", () => { ); expect(m!.status).toBe("error"); }); + + // --- delegation rows --- + // The platform's /delegate handler writes activity_type='delegation' + // for both the initial outbound (method='delegate') and the eventual + // reply (method='delegate_result', status=queued|completed|failed). + // Pre-fix the panel filtered these out and showed "no agent comms" + // even when 6+ delegations existed in the DB. + + it("delegation 'delegate' row maps as outbound to target", () => { + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate", + source_id: SELF, + target_id: PEER, + summary: "Delegating to ws-peer", + status: "pending", + }), + SELF, + ); + expect(m).toBeTruthy(); + expect(m!.flow).toBe("out"); + expect(m!.peerId).toBe(PEER); + expect(m!.peerName).toBe("Peer Agent"); + expect(m!.text).toBe("Delegating to ws-peer"); + expect(m!.status).toBe("pending"); + }); + + it("delegation 'delegate_result' queued row preserves status='queued'", () => { + // The "queued" status is the load-bearing signal the LLM uses to + // decide whether to wait or fall back. If toCommMessage drops or + // rewrites it, the UI loses the ability to show the "peer busy, + // will reply" affordance. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate_result", + source_id: SELF, + target_id: PEER, + summary: "Delegation queued — target at capacity", + status: "queued", + }), + SELF, + ); + expect(m!.status).toBe("queued"); + expect(m!.text).toContain("queued"); + }); + + it("delegation row with no target_id returns null", () => { + // Defensive: a delegation row missing target_id can't be rendered + // (we wouldn't know which peer to attribute it to). Drop instead + // of rendering a ghost. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + target_id: null, + }), + SELF, + ); + expect(m).toBeNull(); + }); }); diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 40b76bf0..9e9f8d09 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -39,6 +39,13 @@ DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0")) class DelegationStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" + # QUEUED: peer's a2a-proxy returned HTTP 202 + {queued: true}, meaning + # the peer is mid-task and the request was placed in a drain queue. + # The reply will arrive via the platform's stitch path when the + # peer finishes its current work. The LLM should WAIT, not retry, + # and definitely not fall back to doing the work itself — see the + # check_delegation_status docstring for the prompt-side guidance. + QUEUED = "queued" COMPLETED = "completed" FAILED = "failed" @@ -212,6 +219,39 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): }, ) + # HTTP 202 + {queued: true} = peer's a2a-proxy + # accepted the request but the peer's runtime is + # mid-task. Platform-side drain will deliver the + # reply asynchronously. Mark QUEUED locally so + # check_delegation_status can surface that state + # to the LLM with explicit "wait, don't bypass" + # guidance. Do NOT mark FAILED — the request is + # alive in the platform's queue, not lost. + # + # Without this branch, the loop falls through, the + # `if "error" in result` line below references an + # unbound `result`, and the eventual FAILED status + # leads the LLM to conclude the peer is permanently + # unavailable — at which point it does the delegated + # work itself, defeating the whole orchestration. + if a2a_resp.status_code == 202: + try: + queued_body = a2a_resp.json() + except Exception: + queued_body = {} + if queued_body.get("queued") is True: + delegation.status = DelegationStatus.QUEUED + log_event( + event_type="delegation", action="delegate", + resource=workspace_id, outcome="queued", + trace_id=task_id, attempt=attempt + 1, + ) + await _notify_completion(task_id, workspace_id, "queued") + await _update_delegation_on_platform( + task_id, "queued", "", "", + ) + return + if a2a_resp.status_code == 200: try: result = a2a_resp.json() @@ -324,6 +364,20 @@ async def check_delegation_status( ) -> dict: """Check the status of a delegated task, or list all active delegations. + Status semantics — IMPORTANT: + + - "pending" / "in_progress" → peer is actively working. Wait and check again. + - "queued" → peer's a2a-proxy accepted the call but the peer is + processing a prior task. The reply WILL arrive — the platform's + drain re-dispatches when the peer is free. Do NOT retry the + delegation. Do NOT do the work yourself. Acknowledge to the user + that the peer is busy and will reply, then continue with other + delegations or check back later. + - "completed" → result is in the `result` field. + - "failed" → real failure (network, peer crashed, etc.). The + `error` field has the cause. Only fall back to doing the work + yourself if status is "failed", never if status is "queued". + Args: task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations. diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index 3b9721cb..b957cdee 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -283,6 +283,76 @@ class TestA2ASuccess: assert "artifact text" in status["result"] +class TestA2AQueued: + """HTTP 202 + {queued: true} comes back when the peer's a2a-proxy + accepted the request but the peer is mid-task. Pre-fix the runtime + treated this as 'no 200 → fall through to FAILED', which led the + LLM to conclude the peer was permanently unavailable and bypass + delegation entirely. Post-fix the status is QUEUED and the LLM + sees explicit guidance to wait.""" + + @pytest.mark.asyncio + async def test_queued_marks_status_queued_not_failed(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True, "summary": "Delegation queued — target at capacity"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "queued", f"expected queued, got {status}" + # No 'error' field on queued (it's not a failure) + assert "error" not in status or not status.get("error") + + @pytest.mark.asyncio + async def test_queued_does_not_retry(self, delegation_mocks): + # The retry loop is for transient transport errors. A 202+queued + # is NOT a failure to retry against — the platform's drain will + # deliver the eventual reply. Retrying would just re-queue the + # same task and double-count it. + mod, *_ = delegation_mocks + client, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + + with patch("httpx.AsyncClient", mock_cls): + await _invoke_and_wait(mod) + + # The mock is shared across all AsyncClient calls (record, A2A, + # notify, update), so total post count includes platform-sync + # bookkeeping POSTs too. Only count the A2A POST itself — + # identified by URL matching the target's /a2a endpoint. + a2a_calls = [ + c for c in client.post.await_args_list + if c.args and c.args[0] == "http://peer:8000" + ] + assert len(a2a_calls) == 1, ( + f"queued should not retry the A2A POST; got {len(a2a_calls)} A2A calls" + ) + + @pytest.mark.asyncio + async def test_202_without_queued_flag_falls_through(self, delegation_mocks): + # A bare 202 with no {queued: true} marker is NOT the platform's + # queue signal — could be a misbehaving proxy or a future protocol + # revision. Don't treat it as queued. Falls through to the existing + # retry-then-FAILED path. + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"some_other_field": "value"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "failed", ( + f"bare 202 should not be treated as queued, expected failed, got {status}" + ) + + class TestA2AErrors: @pytest.mark.asyncio