From 057876cb0ca9c2adf3fe8c90110622c4f6ef2e77 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 15:01:50 -0700 Subject: [PATCH 1/2] fix(delegation): runtime handles 202+queued; canvas surfaces delegation rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs that compounded into the "Director does the work itself" UX: 1. workspace/builtin_tools/delegation.py: _execute_delegation only handled HTTP 200 in the response branch. When the peer's a2a-proxy returned HTTP 202 + {queued: true} (single-SDK-session bottleneck on the peer), the loop fell through. Two iterations later the `if "error" in result` check tried to access an unbound `result`, the goroutine ended quietly, and the delegation stayed at FAILED with error="None". The LLM checking status saw "failed" + the platform's "Delegation queued — target at capacity" log line in chat context, concluded the peer was permanently unavailable, and bypassed delegation to do the work itself. Fix: explicit 202+queued branch. Adds DelegationStatus.QUEUED, marks the local delegation as QUEUED, mirrors to the platform, and returns cleanly without retrying. The retry loop is for transient transport errors — queueing is a real ack, not a failure to retry against (retrying would just re-queue the same task). check_delegation_status docstring extended with explicit per-status guidance: pending/in_progress → wait, queued → wait (peer busy on prior task, reply WILL arrive), completed → use result, failed → real error in error field; only fall back on failed, never queued. 2. canvas/src/components/tabs/chat/AgentCommsPanel.tsx: filter dropped every delegation row because it whitelisted only a2a_send / a2a_receive. activity_type='delegation' rows (written by the platform's /delegate handler with method='delegate' or 'delegate_result') never reached toCommMessage. User saw "No agent-to-agent communications yet" while 6+ delegations existed in the DB. Fix: include "delegation" in the both the initial filter and the WS push filter, plus a delegation branch in toCommMessage that maps the row as outbound (always — platform proxies on our behalf) and uses summary as the primary text source. Tests: - 3 new Python tests cover the 202+queued path: status becomes QUEUED not FAILED; no retry on queued (counted by URL match against the A2A target since the mock is shared across all AsyncClient calls); bare 202 without {queued:true} still falls through to the existing retry-then-FAILED path. - 3 new TS tests cover the delegation mapper: 'delegate' row maps as outbound to target with summary text; queued 'delegate_result' preserves status='queued' (load-bearing for the LLM's wait-vs-bypass decision); missing target_id returns null instead of rendering a ghost. Does NOT solve: the underlying single-SDK-session bottleneck that causes peers to queue in the first place. Tracked as task #102 (parallel SDK sessions per workspace) — real architectural work. This PR makes the runtime handle the queueing correctly so the LLM doesn't bail out, and makes the delegations visible in Agent Comms so operators can see what's happening. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../components/tabs/chat/AgentCommsPanel.tsx | 36 +++++++++- .../chat/__tests__/AgentCommsPanel.test.ts | 61 ++++++++++++++++ workspace/builtin_tools/delegation.py | 54 ++++++++++++++ workspace/tests/test_delegation.py | 70 +++++++++++++++++++ 4 files changed, 219 insertions(+), 2 deletions(-) diff --git a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx index 909616bf..03dc97a3 100644 --- a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx +++ b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx @@ -59,6 +59,34 @@ function resolveName(id: string): string { } export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null { + // delegation activity rows are written by the platform's /delegate + // handler. They're always outbound from this workspace's POV (the + // platform proxies the A2A on our behalf). Two methods: + // - "delegate" — the initial outbound; status pending/dispatched + // - "delegate_result" — the eventual reply; status completed/queued/failed + // We surface them in Agent Comms because they ARE agent-to-agent + // calls; without this branch they'd be dropped by the activity_type + // filter and the user would see "No agent-to-agent communications yet" + // even when the director made delegations. + if (entry.activity_type === "delegation") { + const peerId = entry.target_id || ""; + if (!peerId) return null; + return { + id: entry.id, + flow: "out", + peerName: resolveName(peerId), + peerId, + // Prefer summary (set by the platform with a human-readable + // string like "Delegating to X" or "Delegation queued — target + // at capacity"). Fall back to request body for older rows that + // pre-date the summary column being populated. + text: entry.summary || extractRequestText(entry.request_body) || "(delegation)", + responseText: entry.response_body ? extractResponseText(entry.response_body) : null, + status: entry.status || "ok", + timestamp: entry.created_at, + }; + } + // a2a_receive activity rows come in two shapes: // // 1. Real incoming call (a peer called us): source_id = the peer, @@ -132,7 +160,11 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { api.get(`/workspaces/${workspaceId}/activity?source=agent&limit=50`) .then((entries) => { const filtered = (entries ?? []) - .filter((e) => e.activity_type === "a2a_send" || e.activity_type === "a2a_receive") + .filter((e) => + e.activity_type === "a2a_send" || + e.activity_type === "a2a_receive" || + e.activity_type === "delegation", + ) .reverse(); const msgs: CommMessage[] = []; for (const e of filtered) { @@ -186,7 +218,7 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { const type = p.activity_type as string; const sourceId = p.source_id as string | null; if (!sourceId) return; // canvas-initiated, not agent comms - if (type !== "a2a_send" && type !== "a2a_receive") return; + if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return; const entry: ActivityEntry = { id: p.id as string || crypto.randomUUID(), diff --git a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts index fc2eb70f..2e421ed4 100644 --- a/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts +++ b/canvas/src/components/tabs/chat/__tests__/AgentCommsPanel.test.ts @@ -110,4 +110,65 @@ describe("toCommMessage — flow derivation", () => { ); expect(m!.status).toBe("error"); }); + + // --- delegation rows --- + // The platform's /delegate handler writes activity_type='delegation' + // for both the initial outbound (method='delegate') and the eventual + // reply (method='delegate_result', status=queued|completed|failed). + // Pre-fix the panel filtered these out and showed "no agent comms" + // even when 6+ delegations existed in the DB. + + it("delegation 'delegate' row maps as outbound to target", () => { + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate", + source_id: SELF, + target_id: PEER, + summary: "Delegating to ws-peer", + status: "pending", + }), + SELF, + ); + expect(m).toBeTruthy(); + expect(m!.flow).toBe("out"); + expect(m!.peerId).toBe(PEER); + expect(m!.peerName).toBe("Peer Agent"); + expect(m!.text).toBe("Delegating to ws-peer"); + expect(m!.status).toBe("pending"); + }); + + it("delegation 'delegate_result' queued row preserves status='queued'", () => { + // The "queued" status is the load-bearing signal the LLM uses to + // decide whether to wait or fall back. If toCommMessage drops or + // rewrites it, the UI loses the ability to show the "peer busy, + // will reply" affordance. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + method: "delegate_result", + source_id: SELF, + target_id: PEER, + summary: "Delegation queued — target at capacity", + status: "queued", + }), + SELF, + ); + expect(m!.status).toBe("queued"); + expect(m!.text).toContain("queued"); + }); + + it("delegation row with no target_id returns null", () => { + // Defensive: a delegation row missing target_id can't be rendered + // (we wouldn't know which peer to attribute it to). Drop instead + // of rendering a ghost. + const m = toCommMessage( + makeEntry({ + activity_type: "delegation", + target_id: null, + }), + SELF, + ); + expect(m).toBeNull(); + }); }); diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 40b76bf0..9e9f8d09 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -39,6 +39,13 @@ DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0")) class DelegationStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" + # QUEUED: peer's a2a-proxy returned HTTP 202 + {queued: true}, meaning + # the peer is mid-task and the request was placed in a drain queue. + # The reply will arrive via the platform's stitch path when the + # peer finishes its current work. The LLM should WAIT, not retry, + # and definitely not fall back to doing the work itself — see the + # check_delegation_status docstring for the prompt-side guidance. + QUEUED = "queued" COMPLETED = "completed" FAILED = "failed" @@ -212,6 +219,39 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): }, ) + # HTTP 202 + {queued: true} = peer's a2a-proxy + # accepted the request but the peer's runtime is + # mid-task. Platform-side drain will deliver the + # reply asynchronously. Mark QUEUED locally so + # check_delegation_status can surface that state + # to the LLM with explicit "wait, don't bypass" + # guidance. Do NOT mark FAILED — the request is + # alive in the platform's queue, not lost. + # + # Without this branch, the loop falls through, the + # `if "error" in result` line below references an + # unbound `result`, and the eventual FAILED status + # leads the LLM to conclude the peer is permanently + # unavailable — at which point it does the delegated + # work itself, defeating the whole orchestration. + if a2a_resp.status_code == 202: + try: + queued_body = a2a_resp.json() + except Exception: + queued_body = {} + if queued_body.get("queued") is True: + delegation.status = DelegationStatus.QUEUED + log_event( + event_type="delegation", action="delegate", + resource=workspace_id, outcome="queued", + trace_id=task_id, attempt=attempt + 1, + ) + await _notify_completion(task_id, workspace_id, "queued") + await _update_delegation_on_platform( + task_id, "queued", "", "", + ) + return + if a2a_resp.status_code == 200: try: result = a2a_resp.json() @@ -324,6 +364,20 @@ async def check_delegation_status( ) -> dict: """Check the status of a delegated task, or list all active delegations. + Status semantics — IMPORTANT: + + - "pending" / "in_progress" → peer is actively working. Wait and check again. + - "queued" → peer's a2a-proxy accepted the call but the peer is + processing a prior task. The reply WILL arrive — the platform's + drain re-dispatches when the peer is free. Do NOT retry the + delegation. Do NOT do the work yourself. Acknowledge to the user + that the peer is busy and will reply, then continue with other + delegations or check back later. + - "completed" → result is in the `result` field. + - "failed" → real failure (network, peer crashed, etc.). The + `error` field has the cause. Only fall back to doing the work + yourself if status is "failed", never if status is "queued". + Args: task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations. diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index 3b9721cb..b957cdee 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -283,6 +283,76 @@ class TestA2ASuccess: assert "artifact text" in status["result"] +class TestA2AQueued: + """HTTP 202 + {queued: true} comes back when the peer's a2a-proxy + accepted the request but the peer is mid-task. Pre-fix the runtime + treated this as 'no 200 → fall through to FAILED', which led the + LLM to conclude the peer was permanently unavailable and bypass + delegation entirely. Post-fix the status is QUEUED and the LLM + sees explicit guidance to wait.""" + + @pytest.mark.asyncio + async def test_queued_marks_status_queued_not_failed(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True, "summary": "Delegation queued — target at capacity"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "queued", f"expected queued, got {status}" + # No 'error' field on queued (it's not a failure) + assert "error" not in status or not status.get("error") + + @pytest.mark.asyncio + async def test_queued_does_not_retry(self, delegation_mocks): + # The retry loop is for transient transport errors. A 202+queued + # is NOT a failure to retry against — the platform's drain will + # deliver the eventual reply. Retrying would just re-queue the + # same task and double-count it. + mod, *_ = delegation_mocks + client, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + + with patch("httpx.AsyncClient", mock_cls): + await _invoke_and_wait(mod) + + # The mock is shared across all AsyncClient calls (record, A2A, + # notify, update), so total post count includes platform-sync + # bookkeeping POSTs too. Only count the A2A POST itself — + # identified by URL matching the target's /a2a endpoint. + a2a_calls = [ + c for c in client.post.await_args_list + if c.args and c.args[0] == "http://peer:8000" + ] + assert len(a2a_calls) == 1, ( + f"queued should not retry the A2A POST; got {len(a2a_calls)} A2A calls" + ) + + @pytest.mark.asyncio + async def test_202_without_queued_flag_falls_through(self, delegation_mocks): + # A bare 202 with no {queued: true} marker is NOT the platform's + # queue signal — could be a misbehaving proxy or a future protocol + # revision. Don't treat it as queued. Falls through to the existing + # retry-then-FAILED path. + mod, *_ = delegation_mocks + _, mock_cls = _make_mock_client( + a2a_status=202, + a2a_payload={"some_other_field": "value"}, + ) + + with patch("httpx.AsyncClient", mock_cls): + status = await _invoke_and_wait(mod) + + assert status["status"] == "failed", ( + f"bare 202 should not be treated as queued, expected failed, got {status}" + ) + + class TestA2AErrors: @pytest.mark.asyncio From 5071454074d120af8db04d272802dd6ea6114734 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 16:05:04 -0700 Subject: [PATCH 2/2] fix(delegation): lazy-refresh QUEUED state from platform; live DELEGATION_* events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical follow-up to PR #2126's review. Two real bugs: 1. **Runtime QUEUED never resolved.** Platform's drain stitch updates the platform's delegate_result row when a queued delegation finally completes, but never pushes back to the runtime. The LLM polling check_delegation_status saw status="queued" forever — combined with the new docstring guidance ("queued → wait, peer will reply"), the model would wait indefinitely on a state that never resolves. Strictly worse than pre-PR behavior where it would have at least bypassed. 2. **Live updates dead code.** delegation.go writes activity rows by direct INSERT INTO activity_logs, bypassing the LogActivity helper that fires ACTIVITY_LOGGED. Adding "delegation" to the canvas's ACTIVITY_LOGGED filter (PR #2126 first cut) was inert — initial GET worked, live updates did not. Fix: (1) Runtime side, workspace/builtin_tools/delegation.py: - New `_refresh_queued_from_platform(task_id)` async helper that pulls /workspaces//delegations and finds the platform-side delegate_result row for our task_id. - check_delegation_status calls _refresh when local status is QUEUED, so the LLM's poll itself drives state convergence. - Best-effort: GET failure leaves local state untouched, next poll retries. - Docstring updated to reflect the actual behavior ("polls transparently — keep polling and you'll see the flip"). - 4 new tests cover: QUEUED → completed via refresh; QUEUED → failed via refresh; refresh keeps QUEUED when platform hasn't resolved; refresh swallows network errors safely. (2) Canvas side, AgentCommsPanel.tsx WS push handler: - Listens for DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE / DELEGATION_FAILED in addition to ACTIVITY_LOGGED. - Each event's payload synthesized into an ActivityEntry shape so toCommMessage's existing delegation branch maps it. Status derived: STATUS uses payload.status, COMPLETE → "completed", FAILED → "failed", SENT → "pending". - The ACTIVITY_LOGGED branch keeps the "delegation" type accepted as a no-op-today / future-proof path: if delegation handlers are ever refactored to call LogActivity, this lights up automatically without another canvas change. Doesn't change: the docstring guidance ("queued → wait, don't bypass") is now actually load-bearing because the refresh path will deliver the eventual outcome. Without the refresh, the guidance was a trap. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../components/tabs/chat/AgentCommsPanel.tsx | 81 ++++++++- workspace/builtin_tools/delegation.py | 84 ++++++++- workspace/tests/test_delegation.py | 172 ++++++++++++++++++ 3 files changed, 324 insertions(+), 13 deletions(-) diff --git a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx index 03dc97a3..fd19c3a5 100644 --- a/canvas/src/components/tabs/chat/AgentCommsPanel.tsx +++ b/canvas/src/components/tabs/chat/AgentCommsPanel.tsx @@ -213,14 +213,34 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { ws.onmessage = (event) => { try { const msg = JSON.parse(event.data); - if (msg.event === "ACTIVITY_LOGGED" && msg.workspace_id === workspaceId) { + if (msg.workspace_id !== workspaceId) return; + + // Two live-update paths: + // 1. ACTIVITY_LOGGED — fired by the LogActivity helper for + // a2a_send / a2a_receive (and delegation rows IF the + // delegation handler is ever refactored to use it). Today + // the platform's delegation handlers do direct INSERT + // INTO activity_logs WITHOUT firing ACTIVITY_LOGGED, so + // the delegation branch here is defensive — it'd light + // up automatically the day delegation handlers are + // refactored to call LogActivity. + // 2. DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE + // / DELEGATION_FAILED — fired by the platform's delegation + // handlers directly. These are the ONLY live signals the + // panel currently has for delegation rows; the GET on + // mount (which reads from activity_logs) shows them, but + // without this branch, nothing surfaced live until the + // next remount. Synthesise an ActivityEntry from the + // payload so toCommMessage's existing delegation branch + // handles them identically to the GET path. + let entry: ActivityEntry | null = null; + if (msg.event === "ACTIVITY_LOGGED") { const p = msg.payload || {}; const type = p.activity_type as string; const sourceId = p.source_id as string | null; if (!sourceId) return; // canvas-initiated, not agent comms if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return; - - const entry: ActivityEntry = { + entry = { id: p.id as string || crypto.randomUUID(), activity_type: type, source_id: sourceId, @@ -232,13 +252,56 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) { status: p.status as string || "ok", created_at: msg.timestamp || new Date().toISOString(), }; - const m = toCommMessage(entry, workspaceId); - if (m) { - const key = `${m.timestamp}:${m.flow}:${m.peerId}`; - if (seenKeys.current.has(key)) return; - seenKeys.current.add(key); - setMessages((prev) => [...prev, m]); + } else if ( + msg.event === "DELEGATION_SENT" || + msg.event === "DELEGATION_STATUS" || + msg.event === "DELEGATION_COMPLETE" || + msg.event === "DELEGATION_FAILED" + ) { + const p = msg.payload || {}; + const targetId = (p.target_id as string) || ""; + if (!targetId) return; + // Map event → status. DELEGATION_STATUS payload includes its + // own `status` field (queued / dispatched). Other events have + // implicit status: SENT → pending, COMPLETE → completed, + // FAILED → failed. + let status: string; + let summary: string; + if (msg.event === "DELEGATION_STATUS") { + status = (p.status as string) || "queued"; + summary = `Delegation ${status}`; + } else if (msg.event === "DELEGATION_COMPLETE") { + status = "completed"; + summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`; + } else if (msg.event === "DELEGATION_FAILED") { + status = "failed"; + summary = `Delegation failed: ${(p.error as string) || "unknown"}`; + } else { + status = "pending"; + summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`; } + entry = { + id: (p.delegation_id as string) || crypto.randomUUID(), + activity_type: "delegation", + source_id: workspaceId, + target_id: targetId, + method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result", + summary, + request_body: null, + response_body: null, + status, + created_at: msg.timestamp || new Date().toISOString(), + }; + } else { + return; + } + + const m = toCommMessage(entry, workspaceId); + if (m) { + const key = `${m.timestamp}:${m.flow}:${m.peerId}`; + if (seenKeys.current.has(key)) return; + seenKeys.current.add(key); + setMessages((prev) => [...prev, m]); } } catch { /* ignore */ } }; diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 9e9f8d09..25d0ae55 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -126,6 +126,69 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str, logger.debug("Delegation record failed (best-effort): %s", e) +async def _refresh_queued_from_platform(task_id: str) -> bool: + """Lazy-refresh a QUEUED delegation's local state from the platform. + + Called by check_delegation_status when local status is QUEUED. The + platform's drain stitch (a2a_queue.go) updates the delegate_result + activity_logs row when a queued delegation eventually completes, + but it has no callback to this runtime — without this lazy refresh, + the LLM polling check_delegation_status would see "queued" forever + even after the platform has the result. + + Returns True if the local delegation was updated to a terminal state + (completed/failed), False otherwise. Best-effort — network/parse + errors leave the local state untouched and let the next call retry. + """ + delegation = _delegations.get(task_id) + if not delegation: + return False + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/delegations", + headers={}, + ) + if resp.status_code != 200: + return False + entries = resp.json() + if not isinstance(entries, list): + return False + except Exception as e: + logger.debug("refresh queued delegation %s: %s", task_id, e) + return False + # Find the latest delegate_result row matching our task_id. + # Platform list is newest-first; the first match is the freshest. + for entry in entries: + if entry.get("delegation_id") != task_id: + continue + if entry.get("type") != "delegation": + continue + # Only delegate_result rows carry the eventual outcome; the + # initial 'delegate' row stays at status='pending' even after + # the result lands. Filtering on summary text is brittle, but + # the rows from the LIST endpoint don't include `method`. The + # `delegate_result` rows are the ones with `error` (failure) + # or `response_preview` (success) populated — pick those. + status = entry.get("status", "") + if status == "completed": + delegation.status = DelegationStatus.COMPLETED + delegation.result = entry.get("response_preview", "") + await _notify_completion(task_id, delegation.workspace_id, "completed") + return True + if status == "failed": + delegation.status = DelegationStatus.FAILED + delegation.error = entry.get("error", "") + await _notify_completion(task_id, delegation.workspace_id, "failed") + return True + # status == "queued" / "pending" / "dispatched": platform hasn't + # resolved yet; leave local state unchanged so the next poll + # retries. Don't break — keep scanning in case there's a newer + # entry for the same task_id (possible if the same delegation + # was retried). + return False + + async def _update_delegation_on_platform(task_id: str, status: str, error: str = "", response_preview: str = ""): """Mirror status changes to the platform's activity_logs (#64 fix). @@ -369,10 +432,13 @@ async def check_delegation_status( - "pending" / "in_progress" → peer is actively working. Wait and check again. - "queued" → peer's a2a-proxy accepted the call but the peer is processing a prior task. The reply WILL arrive — the platform's - drain re-dispatches when the peer is free. Do NOT retry the - delegation. Do NOT do the work yourself. Acknowledge to the user - that the peer is busy and will reply, then continue with other - delegations or check back later. + drain re-dispatches when the peer is free. This tool transparently + polls the platform for the eventual outcome on each call, so + keep polling check_delegation_status periodically and you'll see + the status flip to "completed" / "failed" automatically. + Do NOT retry the delegation. Do NOT do the work yourself. + Acknowledge to the user that the peer is busy and will reply, + then continue with other delegations or check back later. - "completed" → result is in the `result` field. - "failed" → real failure (network, peer crashed, etc.). The `error` field has the cause. Only fall back to doing the work @@ -405,6 +471,16 @@ async def check_delegation_status( if not delegation: return {"error": f"No delegation found with task_id {task_id}"} + # Lazy refresh for QUEUED entries: the platform's drain stitch + # updates its activity_logs row when the queued delegation + # eventually completes, but doesn't push back to this runtime. + # Without this refresh, the LLM polling here would see "queued" + # forever even after the result is available — exactly the bug + # the upstream director-bypass docstring guidance warned against. + if delegation.status == DelegationStatus.QUEUED: + await _refresh_queued_from_platform(task_id) + # delegation is the same dict entry — _refresh mutates in-place. + result = { "task_id": task_id, "workspace_id": delegation.workspace_id, diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index b957cdee..33d4f982 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -353,6 +353,178 @@ class TestA2AQueued: ) +class TestQueuedLazyRefresh: + """When a delegation is QUEUED, check_delegation_status must lazily + refresh from the platform's GET /delegations to pick up drain-stitch + completions. Without this refresh, the LLM sees "queued" forever + because the platform never pushes back to the runtime. + + Pre-fix the docstring told the LLM to wait on QUEUED. With no refresh + path, "wait" was permanent. These tests pin the refresh behavior so + the docstring is actually load-bearing.""" + + @pytest.mark.asyncio + async def test_queued_resolves_to_completed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + # Step 1: invoke delegation, peer returns 202+queued, local + # status becomes QUEUED. + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + initial = await _invoke_and_wait(mod) + assert initial["status"] == "queued" + task_id = next(iter(mod._delegations)) + + # Step 2: simulate platform's drain having stitched a completed + # result. GET /workspaces//delegations now returns a + # 'completed' delegate_result row matching our task_id. + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "completed", + "summary": "Delegation completed (peer reply)", + "response_preview": "the peer's actual reply text", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "completed", ( + f"lazy refresh should advance QUEUED → completed; got {refreshed}" + ) + assert refreshed.get("result") == "the peer's actual reply text" + + @pytest.mark.asyncio + async def test_queued_resolves_to_failed_via_lazy_refresh(self, delegation_mocks): + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "failed", + "error": "peer timed out after 30 min", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "failed" + assert refreshed.get("error") == "peer timed out after 30 min" + + @pytest.mark.asyncio + async def test_queued_stays_queued_when_platform_not_resolved(self, delegation_mocks): + # Realistic case: LLM polls before platform's drain has fired. + # Refresh sees only the queued row → no state change. Subsequent + # poll will retry. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + list_response = MagicMock() + list_response.status_code = 200 + list_response.json.return_value = [ + { + "delegation_id": task_id, + "type": "delegation", + "status": "queued", # not yet resolved + "summary": "Delegation queued — target at capacity", + "source_id": "ws-self", + "target_id": "target", + }, + ] + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(return_value=list_response) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + assert refreshed["status"] == "queued" + + @pytest.mark.asyncio + async def test_refresh_is_safe_when_platform_unreachable(self, delegation_mocks): + # Platform GET fails (network blip). Refresh must not raise — + # local state stays QUEUED so the next poll retries. + mod, *_ = delegation_mocks + _, mock_cls_queued = _make_mock_client( + a2a_status=202, + a2a_payload={"queued": True}, + ) + with patch("httpx.AsyncClient", mock_cls_queued): + await _invoke_and_wait(mod) + task_id = next(iter(mod._delegations)) + + refresh_client = AsyncMock() + refresh_client.get = AsyncMock(side_effect=httpx.ConnectError("network down")) + refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200)) + refresh_cls = MagicMock() + refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client) + refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) + + with patch("httpx.AsyncClient", refresh_cls): + fn = mod.check_delegation_status + if hasattr(fn, "ainvoke"): + refreshed = await fn.ainvoke({"task_id": task_id}) + else: + refreshed = await fn(task_id=task_id) + + # Doesn't raise; local state preserved. + assert refreshed["status"] == "queued" + + class TestA2AErrors: @pytest.mark.asyncio