fix(delegation): runtime handles 202+queued; canvas surfaces delegation rows
Two bugs that compounded into the "Director does the work itself" UX:
1. workspace/builtin_tools/delegation.py: _execute_delegation only
handled HTTP 200 in the response branch. When the peer's a2a-proxy
returned HTTP 202 + {queued: true} (single-SDK-session bottleneck
on the peer), the loop fell through. Two iterations later the
`if "error" in result` check tried to access an unbound `result`,
the goroutine ended quietly, and the delegation stayed at FAILED
with error="None". The LLM checking status saw "failed" + the
platform's "Delegation queued — target at capacity" log line in
chat context, concluded the peer was permanently unavailable, and
bypassed delegation to do the work itself.
Fix: explicit 202+queued branch. Adds DelegationStatus.QUEUED,
marks the local delegation as QUEUED, mirrors to the platform,
and returns cleanly without retrying. The retry loop is for
transient transport errors — queueing is a real ack, not a failure
to retry against (retrying would just re-queue the same task).
check_delegation_status docstring extended with explicit per-status
guidance: pending/in_progress → wait, queued → wait (peer busy on
prior task, reply WILL arrive), completed → use result, failed →
real error in error field; only fall back on failed, never queued.
2. canvas/src/components/tabs/chat/AgentCommsPanel.tsx: filter dropped
every delegation row because it whitelisted only a2a_send /
a2a_receive. activity_type='delegation' rows (written by the
platform's /delegate handler with method='delegate' or
'delegate_result') never reached toCommMessage. User saw "No
agent-to-agent communications yet" while 6+ delegations existed
in the DB.
Fix: include "delegation" in the both the initial filter and the
WS push filter, plus a delegation branch in toCommMessage that
maps the row as outbound (always — platform proxies on our behalf)
and uses summary as the primary text source.
Tests:
- 3 new Python tests cover the 202+queued path: status becomes
QUEUED not FAILED; no retry on queued (counted by URL match
against the A2A target since the mock is shared across all
AsyncClient calls); bare 202 without {queued:true} still
falls through to the existing retry-then-FAILED path.
- 3 new TS tests cover the delegation mapper: 'delegate' row
maps as outbound to target with summary text; queued
'delegate_result' preserves status='queued' (load-bearing for
the LLM's wait-vs-bypass decision); missing target_id returns
null instead of rendering a ghost.
Does NOT solve: the underlying single-SDK-session bottleneck that
causes peers to queue in the first place. Tracked as task #102
(parallel SDK sessions per workspace) — real architectural work.
This PR makes the runtime handle the queueing correctly so the LLM
doesn't bail out, and makes the delegations visible in Agent Comms
so operators can see what's happening.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
64ecdf9c3b
commit
057876cb0c
@ -59,6 +59,34 @@ function resolveName(id: string): string {
|
||||
}
|
||||
|
||||
export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null {
|
||||
// delegation activity rows are written by the platform's /delegate
|
||||
// handler. They're always outbound from this workspace's POV (the
|
||||
// platform proxies the A2A on our behalf). Two methods:
|
||||
// - "delegate" — the initial outbound; status pending/dispatched
|
||||
// - "delegate_result" — the eventual reply; status completed/queued/failed
|
||||
// We surface them in Agent Comms because they ARE agent-to-agent
|
||||
// calls; without this branch they'd be dropped by the activity_type
|
||||
// filter and the user would see "No agent-to-agent communications yet"
|
||||
// even when the director made delegations.
|
||||
if (entry.activity_type === "delegation") {
|
||||
const peerId = entry.target_id || "";
|
||||
if (!peerId) return null;
|
||||
return {
|
||||
id: entry.id,
|
||||
flow: "out",
|
||||
peerName: resolveName(peerId),
|
||||
peerId,
|
||||
// Prefer summary (set by the platform with a human-readable
|
||||
// string like "Delegating to X" or "Delegation queued — target
|
||||
// at capacity"). Fall back to request body for older rows that
|
||||
// pre-date the summary column being populated.
|
||||
text: entry.summary || extractRequestText(entry.request_body) || "(delegation)",
|
||||
responseText: entry.response_body ? extractResponseText(entry.response_body) : null,
|
||||
status: entry.status || "ok",
|
||||
timestamp: entry.created_at,
|
||||
};
|
||||
}
|
||||
|
||||
// a2a_receive activity rows come in two shapes:
|
||||
//
|
||||
// 1. Real incoming call (a peer called us): source_id = the peer,
|
||||
@ -132,7 +160,11 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
api.get<ActivityEntry[]>(`/workspaces/${workspaceId}/activity?source=agent&limit=50`)
|
||||
.then((entries) => {
|
||||
const filtered = (entries ?? [])
|
||||
.filter((e) => e.activity_type === "a2a_send" || e.activity_type === "a2a_receive")
|
||||
.filter((e) =>
|
||||
e.activity_type === "a2a_send" ||
|
||||
e.activity_type === "a2a_receive" ||
|
||||
e.activity_type === "delegation",
|
||||
)
|
||||
.reverse();
|
||||
const msgs: CommMessage[] = [];
|
||||
for (const e of filtered) {
|
||||
@ -186,7 +218,7 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
const type = p.activity_type as string;
|
||||
const sourceId = p.source_id as string | null;
|
||||
if (!sourceId) return; // canvas-initiated, not agent comms
|
||||
if (type !== "a2a_send" && type !== "a2a_receive") return;
|
||||
if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return;
|
||||
|
||||
const entry: ActivityEntry = {
|
||||
id: p.id as string || crypto.randomUUID(),
|
||||
|
||||
@ -110,4 +110,65 @@ describe("toCommMessage — flow derivation", () => {
|
||||
);
|
||||
expect(m!.status).toBe("error");
|
||||
});
|
||||
|
||||
// --- delegation rows ---
|
||||
// The platform's /delegate handler writes activity_type='delegation'
|
||||
// for both the initial outbound (method='delegate') and the eventual
|
||||
// reply (method='delegate_result', status=queued|completed|failed).
|
||||
// Pre-fix the panel filtered these out and showed "no agent comms"
|
||||
// even when 6+ delegations existed in the DB.
|
||||
|
||||
it("delegation 'delegate' row maps as outbound to target", () => {
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
method: "delegate",
|
||||
source_id: SELF,
|
||||
target_id: PEER,
|
||||
summary: "Delegating to ws-peer",
|
||||
status: "pending",
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m).toBeTruthy();
|
||||
expect(m!.flow).toBe("out");
|
||||
expect(m!.peerId).toBe(PEER);
|
||||
expect(m!.peerName).toBe("Peer Agent");
|
||||
expect(m!.text).toBe("Delegating to ws-peer");
|
||||
expect(m!.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("delegation 'delegate_result' queued row preserves status='queued'", () => {
|
||||
// The "queued" status is the load-bearing signal the LLM uses to
|
||||
// decide whether to wait or fall back. If toCommMessage drops or
|
||||
// rewrites it, the UI loses the ability to show the "peer busy,
|
||||
// will reply" affordance.
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
method: "delegate_result",
|
||||
source_id: SELF,
|
||||
target_id: PEER,
|
||||
summary: "Delegation queued — target at capacity",
|
||||
status: "queued",
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m!.status).toBe("queued");
|
||||
expect(m!.text).toContain("queued");
|
||||
});
|
||||
|
||||
it("delegation row with no target_id returns null", () => {
|
||||
// Defensive: a delegation row missing target_id can't be rendered
|
||||
// (we wouldn't know which peer to attribute it to). Drop instead
|
||||
// of rendering a ghost.
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
target_id: null,
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@ -39,6 +39,13 @@ DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
|
||||
class DelegationStatus(str, Enum):
|
||||
PENDING = "pending"
|
||||
IN_PROGRESS = "in_progress"
|
||||
# QUEUED: peer's a2a-proxy returned HTTP 202 + {queued: true}, meaning
|
||||
# the peer is mid-task and the request was placed in a drain queue.
|
||||
# The reply will arrive via the platform's stitch path when the
|
||||
# peer finishes its current work. The LLM should WAIT, not retry,
|
||||
# and definitely not fall back to doing the work itself — see the
|
||||
# check_delegation_status docstring for the prompt-side guidance.
|
||||
QUEUED = "queued"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
|
||||
@ -212,6 +219,39 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str):
|
||||
},
|
||||
)
|
||||
|
||||
# HTTP 202 + {queued: true} = peer's a2a-proxy
|
||||
# accepted the request but the peer's runtime is
|
||||
# mid-task. Platform-side drain will deliver the
|
||||
# reply asynchronously. Mark QUEUED locally so
|
||||
# check_delegation_status can surface that state
|
||||
# to the LLM with explicit "wait, don't bypass"
|
||||
# guidance. Do NOT mark FAILED — the request is
|
||||
# alive in the platform's queue, not lost.
|
||||
#
|
||||
# Without this branch, the loop falls through, the
|
||||
# `if "error" in result` line below references an
|
||||
# unbound `result`, and the eventual FAILED status
|
||||
# leads the LLM to conclude the peer is permanently
|
||||
# unavailable — at which point it does the delegated
|
||||
# work itself, defeating the whole orchestration.
|
||||
if a2a_resp.status_code == 202:
|
||||
try:
|
||||
queued_body = a2a_resp.json()
|
||||
except Exception:
|
||||
queued_body = {}
|
||||
if queued_body.get("queued") is True:
|
||||
delegation.status = DelegationStatus.QUEUED
|
||||
log_event(
|
||||
event_type="delegation", action="delegate",
|
||||
resource=workspace_id, outcome="queued",
|
||||
trace_id=task_id, attempt=attempt + 1,
|
||||
)
|
||||
await _notify_completion(task_id, workspace_id, "queued")
|
||||
await _update_delegation_on_platform(
|
||||
task_id, "queued", "", "",
|
||||
)
|
||||
return
|
||||
|
||||
if a2a_resp.status_code == 200:
|
||||
try:
|
||||
result = a2a_resp.json()
|
||||
@ -324,6 +364,20 @@ async def check_delegation_status(
|
||||
) -> dict:
|
||||
"""Check the status of a delegated task, or list all active delegations.
|
||||
|
||||
Status semantics — IMPORTANT:
|
||||
|
||||
- "pending" / "in_progress" → peer is actively working. Wait and check again.
|
||||
- "queued" → peer's a2a-proxy accepted the call but the peer is
|
||||
processing a prior task. The reply WILL arrive — the platform's
|
||||
drain re-dispatches when the peer is free. Do NOT retry the
|
||||
delegation. Do NOT do the work yourself. Acknowledge to the user
|
||||
that the peer is busy and will reply, then continue with other
|
||||
delegations or check back later.
|
||||
- "completed" → result is in the `result` field.
|
||||
- "failed" → real failure (network, peer crashed, etc.). The
|
||||
`error` field has the cause. Only fall back to doing the work
|
||||
yourself if status is "failed", never if status is "queued".
|
||||
|
||||
Args:
|
||||
task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations.
|
||||
|
||||
|
||||
@ -283,6 +283,76 @@ class TestA2ASuccess:
|
||||
assert "artifact text" in status["result"]
|
||||
|
||||
|
||||
class TestA2AQueued:
|
||||
"""HTTP 202 + {queued: true} comes back when the peer's a2a-proxy
|
||||
accepted the request but the peer is mid-task. Pre-fix the runtime
|
||||
treated this as 'no 200 → fall through to FAILED', which led the
|
||||
LLM to conclude the peer was permanently unavailable and bypass
|
||||
delegation entirely. Post-fix the status is QUEUED and the LLM
|
||||
sees explicit guidance to wait."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_marks_status_queued_not_failed(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True, "summary": "Delegation queued — target at capacity"},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
status = await _invoke_and_wait(mod)
|
||||
|
||||
assert status["status"] == "queued", f"expected queued, got {status}"
|
||||
# No 'error' field on queued (it's not a failure)
|
||||
assert "error" not in status or not status.get("error")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_does_not_retry(self, delegation_mocks):
|
||||
# The retry loop is for transient transport errors. A 202+queued
|
||||
# is NOT a failure to retry against — the platform's drain will
|
||||
# deliver the eventual reply. Retrying would just re-queue the
|
||||
# same task and double-count it.
|
||||
mod, *_ = delegation_mocks
|
||||
client, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
await _invoke_and_wait(mod)
|
||||
|
||||
# The mock is shared across all AsyncClient calls (record, A2A,
|
||||
# notify, update), so total post count includes platform-sync
|
||||
# bookkeeping POSTs too. Only count the A2A POST itself —
|
||||
# identified by URL matching the target's /a2a endpoint.
|
||||
a2a_calls = [
|
||||
c for c in client.post.await_args_list
|
||||
if c.args and c.args[0] == "http://peer:8000"
|
||||
]
|
||||
assert len(a2a_calls) == 1, (
|
||||
f"queued should not retry the A2A POST; got {len(a2a_calls)} A2A calls"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_202_without_queued_flag_falls_through(self, delegation_mocks):
|
||||
# A bare 202 with no {queued: true} marker is NOT the platform's
|
||||
# queue signal — could be a misbehaving proxy or a future protocol
|
||||
# revision. Don't treat it as queued. Falls through to the existing
|
||||
# retry-then-FAILED path.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"some_other_field": "value"},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
status = await _invoke_and_wait(mod)
|
||||
|
||||
assert status["status"] == "failed", (
|
||||
f"bare 202 should not be treated as queued, expected failed, got {status}"
|
||||
)
|
||||
|
||||
|
||||
class TestA2AErrors:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Loading…
Reference in New Issue
Block a user