diff --git a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx index 909616bf..fd19c3a5 100644 --- a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx +++ b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx @@ -59,6 +59,34 @@ function resolveName(id: string): string { } export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null { + // delegation activity rows are written by the platform's /delegate + // handler. They're always outbound from this workspace's POV (the + // platform proxies the A2A on our behalf). Two methods: + // - "delegate" — the initial outbound; status pending/dispatched + // - "delegate_result" — the eventual reply; status completed/queued/failed + // We surface them in Agent Comms because they ARE agent-to-agent + // calls; without this branch they'd be dropped by the activity_type + // filter and the user would see "No agent-to-agent communications yet" + // even when the director made delegations. + if (entry.activity_type === "delegation") { + const peerId = entry.target_id || ""; + if (!peerId) return null; + return { + id: entry.id, + flow: "out", + peerName: resolveName(peerId), + peerId, + // Prefer summary (set by the platform with a human-readable + // string like "Delegating to X" or "Delegation queued — target + // at capacity"). Fall back to request body for older rows that + // pre-date the summary column being populated. + text: entry.summary || extractRequestText(entry.request_body) || "(delegation)", + responseText: entry.response_body ? extractResponseText(entry.response_body) : null, + status: entry.status || "ok", + timestamp: entry.created_at, + }; + } + // a2a_receive activity rows come in two shapes: // // 1. Real incoming call (a peer called us): source_id = the peer, @@ -132,7 +160,11 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { api.get(`/workspaces/${workspaceId}/activity?source=agent&limit=50`) .then((entries) => { const filtered = (entries ?? []) - .filter((e) => e.activity_type === "a2a_send" || e.activity_type === "a2a_receive") + .filter((e) => + e.activity_type === "a2a_send" || + e.activity_type === "a2a_receive" || + e.activity_type === "delegation", + ) .reverse(); const msgs: CommMessage[] = []; for (const e of filtered) { @@ -181,14 +213,34 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { ws.onmessage = (event) => { try { const msg = JSON.parse(event.data); - if (msg.event === "ACTIVITY_LOGGED" && msg.workspace_id === workspaceId) { + if (msg.workspace_id !== workspaceId) return; + + // Two live-update paths: + // 1. ACTIVITY_LOGGED — fired by the LogActivity helper for + // a2a_send / a2a_receive (and delegation rows IF the + // delegation handler is ever refactored to use it). Today + // the platform's delegation handlers do direct INSERT + // INTO activity_logs WITHOUT firing ACTIVITY_LOGGED, so + // the delegation branch here is defensive — it'd light + // up automatically the day delegation handlers are + // refactored to call LogActivity. + // 2. DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE + // / DELEGATION_FAILED — fired by the platform's delegation + // handlers directly. These are the ONLY live signals the + // panel currently has for delegation rows; the GET on + // mount (which reads from activity_logs) shows them, but + // without this branch, nothing surfaced live until the + // next remount. Synthesise an ActivityEntry from the + // payload so toCommMessage's existing delegation branch + // handles them identically to the GET path. + let entry: ActivityEntry | null = null; + if (msg.event === "ACTIVITY_LOGGED") { const p = msg.payload || {}; const type = p.activity_type as string; const sourceId = p.source_id as string | null; if (!sourceId) return; // canvas-initiated, not agent comms - if (type !== "a2a_send" && type !== "a2a_receive") return; - - const entry: ActivityEntry = { + if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return; + entry = { id: p.id as string || crypto.randomUUID(), activity_type: type, source_id: sourceId, @@ -200,13 +252,56 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { status: p.status as string || "ok", created_at: msg.timestamp || new Date().toISOString(), }; - const m = toCommMessage(entry, workspaceId); - if (m) { - const key = `${m.timestamp}:${m.flow}:${m.peerId}`; - if (seenKeys.current.has(key)) return; - seenKeys.current.add(key); - setMessages((prev) => [...prev, m]); + } else if ( + msg.event === "DELEGATION_SENT" || + msg.event === "DELEGATION_STATUS" || + msg.event === "DELEGATION_COMPLETE" || + msg.event === "DELEGATION_FAILED" + ) { + const p = msg.payload || {}; + const targetId = (p.target_id as string) || ""; + if (!targetId) return; + // Map event → status. DELEGATION_STATUS payload includes its + // own `status` field (queued / dispatched). Other events have + // implicit status: SENT → pending, COMPLETE → completed, + // FAILED → failed. + let status: string; + let summary: string; + if (msg.event === "DELEGATION_STATUS") { + status = (p.status as string) || "queued"; + summary = `Delegation ${status}`; + } else if (msg.event === "DELEGATION_COMPLETE") { + status = "completed"; + summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`; + } else if (msg.event === "DELEGATION_FAILED") { + status = "failed"; + summary = `Delegation failed: ${(p.error as string) || "unknown"}`; + } else { + status = "pending"; + summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`; } + entry = { + id: (p.delegation_id as string) || crypto.randomUUID(), + activity_type: "delegation", + source_id: workspaceId, + target_id: targetId, + method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result", + summary, + request_body: null, + response_body: null, + status, + created_at: msg.timestamp || new Date().toISOString(), + }; + } else { + return; + } + + const m = toCommMessage(entry, workspaceId); + if (m) { + const key = `${m.timestamp}:${m.flow}:${m.peerId}`; + if (seenKeys.current.has(key)) return; + seenKeys.current.add(key); + setMessages((prev) => [...prev, m]); } } catch { /* ignore */ } }; diff --git a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts index fc2eb70f..2e421ed4 100644 --- a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts +++ b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts @@ -110,4 +110,65 @@ describe("toCommMessage — flow derivation", () => { ); expect(m!.status).toBe("error"); }); + + // --- delegation rows --- + // The platform's /delegate handler writes activity_type='delegation' + // for both the initial outbound (method='delegate') and the eventual + // reply (method='delegate_result', status=queued|completed|failed). + // Pre-fix the panel filtered these out and showed "no agent comms" + // even when 6+ delegations existed in the DB. + + it("delegation 'delegate' row maps as outbound to target", () => { + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate", + source_id: SELF, + target_id: PEER, + summary: "Delegating to ws-peer", + status: "pending", + }), + SELF, + ); + expect(m).toBeTruthy(); + expect(m!.flow).toBe("out"); + expect(m!.peerId).toBe(PEER); + expect(m!.peerName).toBe("Peer Agent"); + expect(m!.text).toBe("Delegating to ws-peer"); + expect(m!.status).toBe("pending"); + }); + + it("delegation 'delegate_result' queued row preserves status='queued'", () => { + // The "queued" status is the load-bearing signal the LLM uses to + // decide whether to wait or fall back. If toCommMessage drops or + // rewrites it, the UI loses the ability to show the "peer busy, + // will reply" affordance. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate_result", + source_id: SELF, + target_id: PEER, + summary: "Delegation queued — target at capacity", + status: "queued", + }), + SELF, + ); + expect(m!.status).toBe("queued"); + expect(m!.text).toContain("queued"); + }); + + it("delegation row with no target_id returns null", () => { + // Defensive: a delegation row missing target_id can't be rendered + // (we wouldn't know which peer to attribute it to). Drop instead + // of rendering a ghost. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + target_id: null, + }), + SELF, + ); + expect(m).toBeNull(); + }); }); diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 40b76bf0..25d0ae55 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -39,6 +39,13 @@ DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0")) class DelegationStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" + # QUEUED: peer's a2a-proxy returned HTTP 202 + {queued: true}, meaning + # the peer is mid-task and the request was placed in a drain queue. + # The reply will arrive via the platform's stitch path when the + # peer finishes its current work. The LLM should WAIT, not retry, + # and definitely not fall back to doing the work itself — see the + # check_delegation_status docstring for the prompt-side guidance. + QUEUED = "queued" COMPLETED = "completed" FAILED = "failed" @@ -119,6 +126,69 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str, logger.debug("Delegation record failed (best-effort): %s", e) +async def _refresh_queued_from_platform(task_id: str) -> bool: + """Lazy-refresh a QUEUED delegation's local state from the platform. + + Called by check_delegation_status when local status is QUEUED. The + platform's drain stitch (a2a_queue.go) updates the delegate_result + activity_logs row when a queued delegation eventually completes, + but it has no callback to this runtime — without this lazy refresh, + the LLM polling check_delegation_status would see "queued" forever + even after the platform has the result. + + Returns True if the local delegation was updated to a terminal state + (completed/failed), False otherwise. Best-effort — network/parse + errors leave the local state untouched and let the next call retry. + """ + delegation = _delegations.get(task_id) + if not delegation: + return False + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/delegations", + headers={}, + ) + if resp.status_code != 200: + return False + entries = resp.json() + if not isinstance(entries, list): + return False + except Exception as e: + logger.debug("refresh queued delegation %s: %s", task_id, e) + return False + # Find the latest delegate_result row matching our task_id. + # Platform list is newest-first; the first match is the freshest. + for entry in entries: + if entry.get("delegation_id") != task_id: + continue + if entry.get("type") != "delegation": + continue + # Only delegate_result rows carry the eventual outcome; the + # initial 'delegate' row stays at status='pending' even after + # the result lands. Filtering on summary text is brittle, but + # the rows from the LIST endpoint don't include `method`. The + # `delegate_result` rows are the ones with `error` (failure) + # or `response_preview` (success) populated — pick those. + status = entry.get("status", "") + if status == "completed": + delegation.status = DelegationStatus.COMPLETED + delegation.result = entry.get("response_preview", "") + await _notify_completion(task_id, delegation.workspace_id, "completed") + return True + if status == "failed": + delegation.status = DelegationStatus.FAILED + delegation.error = entry.get("error", "") + await _notify_completion(task_id, delegation.workspace_id, "failed") + return True + # status == "queued" / "pending" / "dispatched": platform hasn't + # resolved yet; leave local state unchanged so the next poll + # retries. Don't break — keep scanning in case there's a newer + # entry for the same task_id (possible if the same delegation + # was retried). + return False + + async def _update_delegation_on_platform(task_id: str, status: str, error: str = "", response_preview: str = ""): """Mirror status changes to the platform's activity_logs (#64 fix). @@ -212,6 +282,39 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): }, ) + # HTTP 202 + {queued: true} = peer's a2a-proxy + # accepted the request but the peer's runtime is + # mid-task. Platform-side drain will deliver the + # reply asynchronously. Mark QUEUED locally so + # check_delegation_status can surface that state + # to the LLM with explicit "wait, don't bypass" + # guidance. Do NOT mark FAILED — the request is + # alive in the platform's queue, not lost. + # + # Without this branch, the loop falls through, the + # `if "error" in result` line below references an + # unbound `result`, and the eventual FAILED status + # leads the LLM to conclude the peer is permanently + # unavailable — at which point it does the delegated + # work itself, defeating the whole orchestration. + if a2a_resp.status_code == 202: + try: + queued_body = a2a_resp.json() + except Exception: + queued_body = {} + if queued_body.get("queued") is True: + delegation.status = DelegationStatus.QUEUED + log_event( + event_type="delegation", action="delegate", + resource=workspace_id, outcome="queued", + trace_id=task_id, attempt=attempt + 1, + ) + await _notify_completion(task_id, workspace_id, "queued") + await _update_delegation_on_platform( + task_id, "queued", "", "", + ) + return + if a2a_resp.status_code == 200: try: result = a2a_resp.json() @@ -324,6 +427,23 @@ async def check_delegation_status( ) -> dict: """Check the status of a delegated task, or list all active delegations. + Status semantics — IMPORTANT: + + - "pending" / "in_progress" → peer is actively working. Wait and check again. + - "queued" → peer's a2a-proxy accepted the call but the peer is + processing a prior task. The reply WILL arrive — the platform's + drain re-dispatches when the peer is free. This tool transparently + polls the platform for the eventual outcome on each call, so + keep polling check_delegation_status periodically and you'll see + the status flip to "completed" / "failed" automatically. + Do NOT retry the delegation. Do NOT do the work yourself. + Acknowledge to the user that the peer is busy and will reply, + then continue with other delegations or check back later. + - "completed" → result is in the `result` field. + - "failed" → real failure (network, peer crashed, etc.). The + `error` field has the cause. Only fall back to doing the work + yourself if status is "failed", never if status is "queued". + Args: task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations. @@ -351,6 +471,16 @@ async def check_delegation_status( if not delegation: return {"error": f"No delegation found with task_id {task_id}"} + # Lazy refresh for QUEUED entries: the platform's drain stitch + # updates its activity_logs row when the queued delegation + # eventually completes, but doesn't push back to this runtime. + # Without this refresh, the LLM polling here would see "queued" + # forever even after the result is available — exactly the bug + # the upstream director-bypass docstring guidance warned against. + if delegation.status == DelegationStatus.QUEUED: + await _refresh_queued_from_platform(task_id) + # delegation is the same dict entry — _refresh mutates in-place. + result = { "task_id": task_id, "workspace_id": delegation.workspace_id, diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index 3b9721cb..33d4f982 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -283,6 +283,248 @@ class TestA2ASuccess: assert "artifact text" in status["result"] +class TestA2AQueued: + """HTTP 202 + {queued: true} comes back when the peer's a2a-proxy + accepted the request but the peer is mid-task. Pre-fix the runtime + treated this as 'no 200 → fall through to FAILED', which led the + LLM to conclude the peer was permanently unavailable and bypass + delegation entirely. Post-fix the status is QUEUED and the LLM + sees explicit guidance to wait.""" + + @pytest.mark.asyncio + async def test_queued_marks_status_queued_not_failed(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True, "summary": "Delegation queued — target at capacity"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "queued", f"expected queued, got {status}" + # No 'error' field on queued (it's not a failure) + assert "error" not in status or not status.get("error") + + @pytest.mark.asyncio + async def test_queued_does_not_retry(self, delegation_mocks): + # The retry loop is for transient transport errors. A 202+queued + # is NOT a failure to retry against — the platform's drain will + # deliver the eventual reply. Retrying would just re-queue the + # same task and double-count it. + mod, *_ = delegation_mocks + client, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + + with patch("httpx.AsyncClient", mock_cls): + await _invoke_and_wait(mod) + + # The mock is shared across all AsyncClient calls (record, A2A, + # notify, update), so total post count includes platform-sync + # bookkeeping POSTs too. Only count the A2A POST itself — + # identified by URL matching the target's /a2a endpoint. + a2a_calls = [ + c for c in client.post.await_args_list + if c.args and c.args[0] == "http://peer:8000" + ] + assert len(a2a_calls) == 1, ( + f"queued should not retry the A2A POST; got {len(a2a_calls)} A2A calls" + ) + + @pytest.mark.asyncio + async def test_202_without_queued_flag_falls_through(self, delegation_mocks): + # A bare 202 with no {queued: true} marker is NOT the platform's + # queue signal — could be a misbehaving proxy or a future protocol + # revision. Don't treat it as queued. Falls through to the existing + # retry-then-FAILED path. + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"some_other_field": "value"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "failed", ( + f"bare 202 should not be treated as queued, expected failed, got {status}" + ) + + +class TestQueuedLazyRefresh: + """When a delegation is QUEUED, check_delegation_status must lazily + refresh from the platform's GET /delegations to pick up drain-stitch + completions. Without this refresh, the LLM sees "queued" forever + because the platform never pushes back to the runtime. + + Pre-fix the docstring told the LLM to wait on QUEUED. With no refresh + path, "wait" was permanent. These tests pin the refresh behavior so + the docstring is actually load-bearing.""" + + @pytest.mark.asyncio + async def test_queued_resolves_to_completed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + # Step 1: invoke delegation, peer returns 202+queued, local + # status becomes QUEUED. + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + initial = await _invoke_and_wait(mod) + assert initial["status"] == "queued" + task_id = next(iter(mod._delegations)) + + # Step 2: simulate platform's drain having stitched a completed + # result. GET /workspaces//delegations now returns a + # 'completed' delegate_result row matching our task_id. + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "completed", + "summary": "Delegation completed (peer reply)", + "response_preview": "the peer's actual reply text", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "completed", ( + f"lazy refresh should advance QUEUED → completed; got {refreshed}" + ) + assert refreshed.get("result") == "the peer's actual reply text" + + @pytest.mark.asyncio + async def test_queued_resolves_to_failed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "failed", + "error": "peer timed out after 30 min", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "failed" + assert refreshed.get("error") == "peer timed out after 30 min" + + @pytest.mark.asyncio + async def test_queued_stays_queued_when_platform_not_resolved(self, delegation_mocks): + # Realistic case: LLM polls before platform's drain has fired. + # Refresh sees only the queued row → no state change. Subsequent + # poll will retry. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "queued", # not yet resolved + "summary": "Delegation queued — target at capacity", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "queued" + + @pytest.mark.asyncio + async def test_refresh_is_safe_when_platform_unreachable(self, delegation_mocks): + # Platform GET fails (network blip). Refresh must not raise — + # local state stays QUEUED so the next poll retries. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(side_effect=httpx.ConnectError("network down")) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + # Doesn't raise; local state preserved. + assert refreshed["status"] == "queued" + + class TestA2AErrors: @pytest.mark.asyncio