diff --git a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx index 03dc97a3..fd19c3a5 100644 --- a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx +++ b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx @@ -213,14 +213,34 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { ws.onmessage = (event) => { try { const msg = JSON.parse(event.data); - if (msg.event === "ACTIVITY_LOGGED" && msg.workspace_id === workspaceId) { + if (msg.workspace_id !== workspaceId) return; + + // Two live-update paths: + // 1. ACTIVITY_LOGGED — fired by the LogActivity helper for + // a2a_send / a2a_receive (and delegation rows IF the + // delegation handler is ever refactored to use it). Today + // the platform's delegation handlers do direct INSERT + // INTO activity_logs WITHOUT firing ACTIVITY_LOGGED, so + // the delegation branch here is defensive — it'd light + // up automatically the day delegation handlers are + // refactored to call LogActivity. + // 2. DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE + // / DELEGATION_FAILED — fired by the platform's delegation + // handlers directly. These are the ONLY live signals the + // panel currently has for delegation rows; the GET on + // mount (which reads from activity_logs) shows them, but + // without this branch, nothing surfaced live until the + // next remount. Synthesise an ActivityEntry from the + // payload so toCommMessage's existing delegation branch + // handles them identically to the GET path. + let entry: ActivityEntry | null = null; + if (msg.event === "ACTIVITY_LOGGED") { const p = msg.payload || {}; const type = p.activity_type as string; const sourceId = p.source_id as string | null; if (!sourceId) return; // canvas-initiated, not agent comms if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return; - - const entry: ActivityEntry = { + entry = { id: p.id as string || crypto.randomUUID(), activity_type: type, source_id: sourceId, @@ -232,13 +252,56 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { status: p.status as string || "ok", created_at: msg.timestamp || new Date().toISOString(), }; - const m = toCommMessage(entry, workspaceId); - if (m) { - const key = `${m.timestamp}:${m.flow}:${m.peerId}`; - if (seenKeys.current.has(key)) return; - seenKeys.current.add(key); - setMessages((prev) => [...prev, m]); + } else if ( + msg.event === "DELEGATION_SENT" || + msg.event === "DELEGATION_STATUS" || + msg.event === "DELEGATION_COMPLETE" || + msg.event === "DELEGATION_FAILED" + ) { + const p = msg.payload || {}; + const targetId = (p.target_id as string) || ""; + if (!targetId) return; + // Map event → status. DELEGATION_STATUS payload includes its + // own `status` field (queued / dispatched). Other events have + // implicit status: SENT → pending, COMPLETE → completed, + // FAILED → failed. + let status: string; + let summary: string; + if (msg.event === "DELEGATION_STATUS") { + status = (p.status as string) || "queued"; + summary = `Delegation ${status}`; + } else if (msg.event === "DELEGATION_COMPLETE") { + status = "completed"; + summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`; + } else if (msg.event === "DELEGATION_FAILED") { + status = "failed"; + summary = `Delegation failed: ${(p.error as string) || "unknown"}`; + } else { + status = "pending"; + summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`; } + entry = { + id: (p.delegation_id as string) || crypto.randomUUID(), + activity_type: "delegation", + source_id: workspaceId, + target_id: targetId, + method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result", + summary, + request_body: null, + response_body: null, + status, + created_at: msg.timestamp || new Date().toISOString(), + }; + } else { + return; + } + + const m = toCommMessage(entry, workspaceId); + if (m) { + const key = `${m.timestamp}:${m.flow}:${m.peerId}`; + if (seenKeys.current.has(key)) return; + seenKeys.current.add(key); + setMessages((prev) => [...prev, m]); } } catch { /* ignore */ } }; diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 9e9f8d09..25d0ae55 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -126,6 +126,69 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str, logger.debug("Delegation record failed (best-effort): %s", e) +async def _refresh_queued_from_platform(task_id: str) -> bool: + """Lazy-refresh a QUEUED delegation's local state from the platform. + + Called by check_delegation_status when local status is QUEUED. The + platform's drain stitch (a2a_queue.go) updates the delegate_result + activity_logs row when a queued delegation eventually completes, + but it has no callback to this runtime — without this lazy refresh, + the LLM polling check_delegation_status would see "queued" forever + even after the platform has the result. + + Returns True if the local delegation was updated to a terminal state + (completed/failed), False otherwise. Best-effort — network/parse + errors leave the local state untouched and let the next call retry. + """ + delegation = _delegations.get(task_id) + if not delegation: + return False + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/delegations", + headers={}, + ) + if resp.status_code != 200: + return False + entries = resp.json() + if not isinstance(entries, list): + return False + except Exception as e: + logger.debug("refresh queued delegation %s: %s", task_id, e) + return False + # Find the latest delegate_result row matching our task_id. + # Platform list is newest-first; the first match is the freshest. + for entry in entries: + if entry.get("delegation_id") != task_id: + continue + if entry.get("type") != "delegation": + continue + # Only delegate_result rows carry the eventual outcome; the + # initial 'delegate' row stays at status='pending' even after + # the result lands. Filtering on summary text is brittle, but + # the rows from the LIST endpoint don't include `method`. The + # `delegate_result` rows are the ones with `error` (failure) + # or `response_preview` (success) populated — pick those. + status = entry.get("status", "") + if status == "completed": + delegation.status = DelegationStatus.COMPLETED + delegation.result = entry.get("response_preview", "") + await _notify_completion(task_id, delegation.workspace_id, "completed") + return True + if status == "failed": + delegation.status = DelegationStatus.FAILED + delegation.error = entry.get("error", "") + await _notify_completion(task_id, delegation.workspace_id, "failed") + return True + # status == "queued" / "pending" / "dispatched": platform hasn't + # resolved yet; leave local state unchanged so the next poll + # retries. Don't break — keep scanning in case there's a newer + # entry for the same task_id (possible if the same delegation + # was retried). + return False + + async def _update_delegation_on_platform(task_id: str, status: str, error: str = "", response_preview: str = ""): """Mirror status changes to the platform's activity_logs (#64 fix). @@ -369,10 +432,13 @@ async def check_delegation_status( - "pending" / "in_progress" → peer is actively working. Wait and check again. - "queued" → peer's a2a-proxy accepted the call but the peer is processing a prior task. The reply WILL arrive — the platform's - drain re-dispatches when the peer is free. Do NOT retry the - delegation. Do NOT do the work yourself. Acknowledge to the user - that the peer is busy and will reply, then continue with other - delegations or check back later. + drain re-dispatches when the peer is free. This tool transparently + polls the platform for the eventual outcome on each call, so + keep polling check_delegation_status periodically and you'll see + the status flip to "completed" / "failed" automatically. + Do NOT retry the delegation. Do NOT do the work yourself. + Acknowledge to the user that the peer is busy and will reply, + then continue with other delegations or check back later. - "completed" → result is in the `result` field. - "failed" → real failure (network, peer crashed, etc.). The `error` field has the cause. Only fall back to doing the work @@ -405,6 +471,16 @@ async def check_delegation_status( if not delegation: return {"error": f"No delegation found with task_id {task_id}"} + # Lazy refresh for QUEUED entries: the platform's drain stitch + # updates its activity_logs row when the queued delegation + # eventually completes, but doesn't push back to this runtime. + # Without this refresh, the LLM polling here would see "queued" + # forever even after the result is available — exactly the bug + # the upstream director-bypass docstring guidance warned against. + if delegation.status == DelegationStatus.QUEUED: + await _refresh_queued_from_platform(task_id) + # delegation is the same dict entry — _refresh mutates in-place. + result = { "task_id": task_id, "workspace_id": delegation.workspace_id, diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index b957cdee..33d4f982 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -353,6 +353,178 @@ class TestA2AQueued: ) +class TestQueuedLazyRefresh: + """When a delegation is QUEUED, check_delegation_status must lazily + refresh from the platform's GET /delegations to pick up drain-stitch + completions. Without this refresh, the LLM sees "queued" forever + because the platform never pushes back to the runtime. + + Pre-fix the docstring told the LLM to wait on QUEUED. With no refresh + path, "wait" was permanent. These tests pin the refresh behavior so + the docstring is actually load-bearing.""" + + @pytest.mark.asyncio + async def test_queued_resolves_to_completed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + # Step 1: invoke delegation, peer returns 202+queued, local + # status becomes QUEUED. + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + initial = await _invoke_and_wait(mod) + assert initial["status"] == "queued" + task_id = next(iter(mod._delegations)) + + # Step 2: simulate platform's drain having stitched a completed + # result. GET /workspaces//delegations now returns a + # 'completed' delegate_result row matching our task_id. + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "completed", + "summary": "Delegation completed (peer reply)", + "response_preview": "the peer's actual reply text", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "completed", ( + f"lazy refresh should advance QUEUED → completed; got {refreshed}" + ) + assert refreshed.get("result") == "the peer's actual reply text" + + @pytest.mark.asyncio + async def test_queued_resolves_to_failed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "failed", + "error": "peer timed out after 30 min", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "failed" + assert refreshed.get("error") == "peer timed out after 30 min" + + @pytest.mark.asyncio + async def test_queued_stays_queued_when_platform_not_resolved(self, delegation_mocks): + # Realistic case: LLM polls before platform's drain has fired. + # Refresh sees only the queued row → no state change. Subsequent + # poll will retry. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "queued", # not yet resolved + "summary": "Delegation queued — target at capacity", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "queued" + + @pytest.mark.asyncio + async def test_refresh_is_safe_when_platform_unreachable(self, delegation_mocks): + # Platform GET fails (network blip). Refresh must not raise — + # local state stays QUEUED so the next poll retries. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(side_effect=httpx.ConnectError("network down")) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + # Doesn't raise; local state preserved. + assert refreshed["status"] == "queued" + + class TestA2AErrors: @pytest.mark.asyncio