Merge pull request #2126 from Molecule-AI/fix/director-bypass-and-agent-comms
fix(delegation): runtime handles 202+queued; canvas surfaces delegation rows
This commit is contained in:
commit
fdf8b65c59
@ -59,6 +59,34 @@ function resolveName(id: string): string {
|
||||
}
|
||||
|
||||
export function toCommMessage(entry: ActivityEntry, workspaceId: string): CommMessage | null {
|
||||
// delegation activity rows are written by the platform's /delegate
|
||||
// handler. They're always outbound from this workspace's POV (the
|
||||
// platform proxies the A2A on our behalf). Two methods:
|
||||
// - "delegate" — the initial outbound; status pending/dispatched
|
||||
// - "delegate_result" — the eventual reply; status completed/queued/failed
|
||||
// We surface them in Agent Comms because they ARE agent-to-agent
|
||||
// calls; without this branch they'd be dropped by the activity_type
|
||||
// filter and the user would see "No agent-to-agent communications yet"
|
||||
// even when the director made delegations.
|
||||
if (entry.activity_type === "delegation") {
|
||||
const peerId = entry.target_id || "";
|
||||
if (!peerId) return null;
|
||||
return {
|
||||
id: entry.id,
|
||||
flow: "out",
|
||||
peerName: resolveName(peerId),
|
||||
peerId,
|
||||
// Prefer summary (set by the platform with a human-readable
|
||||
// string like "Delegating to X" or "Delegation queued — target
|
||||
// at capacity"). Fall back to request body for older rows that
|
||||
// pre-date the summary column being populated.
|
||||
text: entry.summary || extractRequestText(entry.request_body) || "(delegation)",
|
||||
responseText: entry.response_body ? extractResponseText(entry.response_body) : null,
|
||||
status: entry.status || "ok",
|
||||
timestamp: entry.created_at,
|
||||
};
|
||||
}
|
||||
|
||||
// a2a_receive activity rows come in two shapes:
|
||||
//
|
||||
// 1. Real incoming call (a peer called us): source_id = the peer,
|
||||
@ -132,7 +160,11 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
api.get<ActivityEntry[]>(`/workspaces/${workspaceId}/activity?source=agent&limit=50`)
|
||||
.then((entries) => {
|
||||
const filtered = (entries ?? [])
|
||||
.filter((e) => e.activity_type === "a2a_send" || e.activity_type === "a2a_receive")
|
||||
.filter((e) =>
|
||||
e.activity_type === "a2a_send" ||
|
||||
e.activity_type === "a2a_receive" ||
|
||||
e.activity_type === "delegation",
|
||||
)
|
||||
.reverse();
|
||||
const msgs: CommMessage[] = [];
|
||||
for (const e of filtered) {
|
||||
@ -181,14 +213,34 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.event === "ACTIVITY_LOGGED" && msg.workspace_id === workspaceId) {
|
||||
if (msg.workspace_id !== workspaceId) return;
|
||||
|
||||
// Two live-update paths:
|
||||
// 1. ACTIVITY_LOGGED — fired by the LogActivity helper for
|
||||
// a2a_send / a2a_receive (and delegation rows IF the
|
||||
// delegation handler is ever refactored to use it). Today
|
||||
// the platform's delegation handlers do direct INSERT
|
||||
// INTO activity_logs WITHOUT firing ACTIVITY_LOGGED, so
|
||||
// the delegation branch here is defensive — it'd light
|
||||
// up automatically the day delegation handlers are
|
||||
// refactored to call LogActivity.
|
||||
// 2. DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE
|
||||
// / DELEGATION_FAILED — fired by the platform's delegation
|
||||
// handlers directly. These are the ONLY live signals the
|
||||
// panel currently has for delegation rows; the GET on
|
||||
// mount (which reads from activity_logs) shows them, but
|
||||
// without this branch, nothing surfaced live until the
|
||||
// next remount. Synthesise an ActivityEntry from the
|
||||
// payload so toCommMessage's existing delegation branch
|
||||
// handles them identically to the GET path.
|
||||
let entry: ActivityEntry | null = null;
|
||||
if (msg.event === "ACTIVITY_LOGGED") {
|
||||
const p = msg.payload || {};
|
||||
const type = p.activity_type as string;
|
||||
const sourceId = p.source_id as string | null;
|
||||
if (!sourceId) return; // canvas-initiated, not agent comms
|
||||
if (type !== "a2a_send" && type !== "a2a_receive") return;
|
||||
|
||||
const entry: ActivityEntry = {
|
||||
if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return;
|
||||
entry = {
|
||||
id: p.id as string || crypto.randomUUID(),
|
||||
activity_type: type,
|
||||
source_id: sourceId,
|
||||
@ -200,13 +252,56 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
status: p.status as string || "ok",
|
||||
created_at: msg.timestamp || new Date().toISOString(),
|
||||
};
|
||||
const m = toCommMessage(entry, workspaceId);
|
||||
if (m) {
|
||||
const key = `${m.timestamp}:${m.flow}:${m.peerId}`;
|
||||
if (seenKeys.current.has(key)) return;
|
||||
seenKeys.current.add(key);
|
||||
setMessages((prev) => [...prev, m]);
|
||||
} else if (
|
||||
msg.event === "DELEGATION_SENT" ||
|
||||
msg.event === "DELEGATION_STATUS" ||
|
||||
msg.event === "DELEGATION_COMPLETE" ||
|
||||
msg.event === "DELEGATION_FAILED"
|
||||
) {
|
||||
const p = msg.payload || {};
|
||||
const targetId = (p.target_id as string) || "";
|
||||
if (!targetId) return;
|
||||
// Map event → status. DELEGATION_STATUS payload includes its
|
||||
// own `status` field (queued / dispatched). Other events have
|
||||
// implicit status: SENT → pending, COMPLETE → completed,
|
||||
// FAILED → failed.
|
||||
let status: string;
|
||||
let summary: string;
|
||||
if (msg.event === "DELEGATION_STATUS") {
|
||||
status = (p.status as string) || "queued";
|
||||
summary = `Delegation ${status}`;
|
||||
} else if (msg.event === "DELEGATION_COMPLETE") {
|
||||
status = "completed";
|
||||
summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`;
|
||||
} else if (msg.event === "DELEGATION_FAILED") {
|
||||
status = "failed";
|
||||
summary = `Delegation failed: ${(p.error as string) || "unknown"}`;
|
||||
} else {
|
||||
status = "pending";
|
||||
summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`;
|
||||
}
|
||||
entry = {
|
||||
id: (p.delegation_id as string) || crypto.randomUUID(),
|
||||
activity_type: "delegation",
|
||||
source_id: workspaceId,
|
||||
target_id: targetId,
|
||||
method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result",
|
||||
summary,
|
||||
request_body: null,
|
||||
response_body: null,
|
||||
status,
|
||||
created_at: msg.timestamp || new Date().toISOString(),
|
||||
};
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
const m = toCommMessage(entry, workspaceId);
|
||||
if (m) {
|
||||
const key = `${m.timestamp}:${m.flow}:${m.peerId}`;
|
||||
if (seenKeys.current.has(key)) return;
|
||||
seenKeys.current.add(key);
|
||||
setMessages((prev) => [...prev, m]);
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
};
|
||||
|
||||
@ -110,4 +110,65 @@ describe("toCommMessage — flow derivation", () => {
|
||||
);
|
||||
expect(m!.status).toBe("error");
|
||||
});
|
||||
|
||||
// --- delegation rows ---
|
||||
// The platform's /delegate handler writes activity_type='delegation'
|
||||
// for both the initial outbound (method='delegate') and the eventual
|
||||
// reply (method='delegate_result', status=queued|completed|failed).
|
||||
// Pre-fix the panel filtered these out and showed "no agent comms"
|
||||
// even when 6+ delegations existed in the DB.
|
||||
|
||||
it("delegation 'delegate' row maps as outbound to target", () => {
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
method: "delegate",
|
||||
source_id: SELF,
|
||||
target_id: PEER,
|
||||
summary: "Delegating to ws-peer",
|
||||
status: "pending",
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m).toBeTruthy();
|
||||
expect(m!.flow).toBe("out");
|
||||
expect(m!.peerId).toBe(PEER);
|
||||
expect(m!.peerName).toBe("Peer Agent");
|
||||
expect(m!.text).toBe("Delegating to ws-peer");
|
||||
expect(m!.status).toBe("pending");
|
||||
});
|
||||
|
||||
it("delegation 'delegate_result' queued row preserves status='queued'", () => {
|
||||
// The "queued" status is the load-bearing signal the LLM uses to
|
||||
// decide whether to wait or fall back. If toCommMessage drops or
|
||||
// rewrites it, the UI loses the ability to show the "peer busy,
|
||||
// will reply" affordance.
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
method: "delegate_result",
|
||||
source_id: SELF,
|
||||
target_id: PEER,
|
||||
summary: "Delegation queued — target at capacity",
|
||||
status: "queued",
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m!.status).toBe("queued");
|
||||
expect(m!.text).toContain("queued");
|
||||
});
|
||||
|
||||
it("delegation row with no target_id returns null", () => {
|
||||
// Defensive: a delegation row missing target_id can't be rendered
|
||||
// (we wouldn't know which peer to attribute it to). Drop instead
|
||||
// of rendering a ghost.
|
||||
const m = toCommMessage(
|
||||
makeEntry({
|
||||
activity_type: "delegation",
|
||||
target_id: null,
|
||||
}),
|
||||
SELF,
|
||||
);
|
||||
expect(m).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@ -39,6 +39,13 @@ DELEGATION_TIMEOUT = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
|
||||
class DelegationStatus(str, Enum):
|
||||
PENDING = "pending"
|
||||
IN_PROGRESS = "in_progress"
|
||||
# QUEUED: peer's a2a-proxy returned HTTP 202 + {queued: true}, meaning
|
||||
# the peer is mid-task and the request was placed in a drain queue.
|
||||
# The reply will arrive via the platform's stitch path when the
|
||||
# peer finishes its current work. The LLM should WAIT, not retry,
|
||||
# and definitely not fall back to doing the work itself — see the
|
||||
# check_delegation_status docstring for the prompt-side guidance.
|
||||
QUEUED = "queued"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
|
||||
@ -119,6 +126,69 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str,
|
||||
logger.debug("Delegation record failed (best-effort): %s", e)
|
||||
|
||||
|
||||
async def _refresh_queued_from_platform(task_id: str) -> bool:
|
||||
"""Lazy-refresh a QUEUED delegation's local state from the platform.
|
||||
|
||||
Called by check_delegation_status when local status is QUEUED. The
|
||||
platform's drain stitch (a2a_queue.go) updates the delegate_result
|
||||
activity_logs row when a queued delegation eventually completes,
|
||||
but it has no callback to this runtime — without this lazy refresh,
|
||||
the LLM polling check_delegation_status would see "queued" forever
|
||||
even after the platform has the result.
|
||||
|
||||
Returns True if the local delegation was updated to a terminal state
|
||||
(completed/failed), False otherwise. Best-effort — network/parse
|
||||
errors leave the local state untouched and let the next call retry.
|
||||
"""
|
||||
delegation = _delegations.get(task_id)
|
||||
if not delegation:
|
||||
return False
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(
|
||||
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/delegations",
|
||||
headers={},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return False
|
||||
entries = resp.json()
|
||||
if not isinstance(entries, list):
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.debug("refresh queued delegation %s: %s", task_id, e)
|
||||
return False
|
||||
# Find the latest delegate_result row matching our task_id.
|
||||
# Platform list is newest-first; the first match is the freshest.
|
||||
for entry in entries:
|
||||
if entry.get("delegation_id") != task_id:
|
||||
continue
|
||||
if entry.get("type") != "delegation":
|
||||
continue
|
||||
# Only delegate_result rows carry the eventual outcome; the
|
||||
# initial 'delegate' row stays at status='pending' even after
|
||||
# the result lands. Filtering on summary text is brittle, but
|
||||
# the rows from the LIST endpoint don't include `method`. The
|
||||
# `delegate_result` rows are the ones with `error` (failure)
|
||||
# or `response_preview` (success) populated — pick those.
|
||||
status = entry.get("status", "")
|
||||
if status == "completed":
|
||||
delegation.status = DelegationStatus.COMPLETED
|
||||
delegation.result = entry.get("response_preview", "")
|
||||
await _notify_completion(task_id, delegation.workspace_id, "completed")
|
||||
return True
|
||||
if status == "failed":
|
||||
delegation.status = DelegationStatus.FAILED
|
||||
delegation.error = entry.get("error", "")
|
||||
await _notify_completion(task_id, delegation.workspace_id, "failed")
|
||||
return True
|
||||
# status == "queued" / "pending" / "dispatched": platform hasn't
|
||||
# resolved yet; leave local state unchanged so the next poll
|
||||
# retries. Don't break — keep scanning in case there's a newer
|
||||
# entry for the same task_id (possible if the same delegation
|
||||
# was retried).
|
||||
return False
|
||||
|
||||
|
||||
async def _update_delegation_on_platform(task_id: str, status: str, error: str = "", response_preview: str = ""):
|
||||
"""Mirror status changes to the platform's activity_logs (#64 fix).
|
||||
|
||||
@ -212,6 +282,39 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str):
|
||||
},
|
||||
)
|
||||
|
||||
# HTTP 202 + {queued: true} = peer's a2a-proxy
|
||||
# accepted the request but the peer's runtime is
|
||||
# mid-task. Platform-side drain will deliver the
|
||||
# reply asynchronously. Mark QUEUED locally so
|
||||
# check_delegation_status can surface that state
|
||||
# to the LLM with explicit "wait, don't bypass"
|
||||
# guidance. Do NOT mark FAILED — the request is
|
||||
# alive in the platform's queue, not lost.
|
||||
#
|
||||
# Without this branch, the loop falls through, the
|
||||
# `if "error" in result` line below references an
|
||||
# unbound `result`, and the eventual FAILED status
|
||||
# leads the LLM to conclude the peer is permanently
|
||||
# unavailable — at which point it does the delegated
|
||||
# work itself, defeating the whole orchestration.
|
||||
if a2a_resp.status_code == 202:
|
||||
try:
|
||||
queued_body = a2a_resp.json()
|
||||
except Exception:
|
||||
queued_body = {}
|
||||
if queued_body.get("queued") is True:
|
||||
delegation.status = DelegationStatus.QUEUED
|
||||
log_event(
|
||||
event_type="delegation", action="delegate",
|
||||
resource=workspace_id, outcome="queued",
|
||||
trace_id=task_id, attempt=attempt + 1,
|
||||
)
|
||||
await _notify_completion(task_id, workspace_id, "queued")
|
||||
await _update_delegation_on_platform(
|
||||
task_id, "queued", "", "",
|
||||
)
|
||||
return
|
||||
|
||||
if a2a_resp.status_code == 200:
|
||||
try:
|
||||
result = a2a_resp.json()
|
||||
@ -324,6 +427,23 @@ async def check_delegation_status(
|
||||
) -> dict:
|
||||
"""Check the status of a delegated task, or list all active delegations.
|
||||
|
||||
Status semantics — IMPORTANT:
|
||||
|
||||
- "pending" / "in_progress" → peer is actively working. Wait and check again.
|
||||
- "queued" → peer's a2a-proxy accepted the call but the peer is
|
||||
processing a prior task. The reply WILL arrive — the platform's
|
||||
drain re-dispatches when the peer is free. This tool transparently
|
||||
polls the platform for the eventual outcome on each call, so
|
||||
keep polling check_delegation_status periodically and you'll see
|
||||
the status flip to "completed" / "failed" automatically.
|
||||
Do NOT retry the delegation. Do NOT do the work yourself.
|
||||
Acknowledge to the user that the peer is busy and will reply,
|
||||
then continue with other delegations or check back later.
|
||||
- "completed" → result is in the `result` field.
|
||||
- "failed" → real failure (network, peer crashed, etc.). The
|
||||
`error` field has the cause. Only fall back to doing the work
|
||||
yourself if status is "failed", never if status is "queued".
|
||||
|
||||
Args:
|
||||
task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations.
|
||||
|
||||
@ -351,6 +471,16 @@ async def check_delegation_status(
|
||||
if not delegation:
|
||||
return {"error": f"No delegation found with task_id {task_id}"}
|
||||
|
||||
# Lazy refresh for QUEUED entries: the platform's drain stitch
|
||||
# updates its activity_logs row when the queued delegation
|
||||
# eventually completes, but doesn't push back to this runtime.
|
||||
# Without this refresh, the LLM polling here would see "queued"
|
||||
# forever even after the result is available — exactly the bug
|
||||
# the upstream director-bypass docstring guidance warned against.
|
||||
if delegation.status == DelegationStatus.QUEUED:
|
||||
await _refresh_queued_from_platform(task_id)
|
||||
# delegation is the same dict entry — _refresh mutates in-place.
|
||||
|
||||
result = {
|
||||
"task_id": task_id,
|
||||
"workspace_id": delegation.workspace_id,
|
||||
|
||||
@ -283,6 +283,248 @@ class TestA2ASuccess:
|
||||
assert "artifact text" in status["result"]
|
||||
|
||||
|
||||
class TestA2AQueued:
|
||||
"""HTTP 202 + {queued: true} comes back when the peer's a2a-proxy
|
||||
accepted the request but the peer is mid-task. Pre-fix the runtime
|
||||
treated this as 'no 200 → fall through to FAILED', which led the
|
||||
LLM to conclude the peer was permanently unavailable and bypass
|
||||
delegation entirely. Post-fix the status is QUEUED and the LLM
|
||||
sees explicit guidance to wait."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_marks_status_queued_not_failed(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True, "summary": "Delegation queued — target at capacity"},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
status = await _invoke_and_wait(mod)
|
||||
|
||||
assert status["status"] == "queued", f"expected queued, got {status}"
|
||||
# No 'error' field on queued (it's not a failure)
|
||||
assert "error" not in status or not status.get("error")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_does_not_retry(self, delegation_mocks):
|
||||
# The retry loop is for transient transport errors. A 202+queued
|
||||
# is NOT a failure to retry against — the platform's drain will
|
||||
# deliver the eventual reply. Retrying would just re-queue the
|
||||
# same task and double-count it.
|
||||
mod, *_ = delegation_mocks
|
||||
client, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
await _invoke_and_wait(mod)
|
||||
|
||||
# The mock is shared across all AsyncClient calls (record, A2A,
|
||||
# notify, update), so total post count includes platform-sync
|
||||
# bookkeeping POSTs too. Only count the A2A POST itself —
|
||||
# identified by URL matching the target's /a2a endpoint.
|
||||
a2a_calls = [
|
||||
c for c in client.post.await_args_list
|
||||
if c.args and c.args[0] == "http://peer:8000"
|
||||
]
|
||||
assert len(a2a_calls) == 1, (
|
||||
f"queued should not retry the A2A POST; got {len(a2a_calls)} A2A calls"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_202_without_queued_flag_falls_through(self, delegation_mocks):
|
||||
# A bare 202 with no {queued: true} marker is NOT the platform's
|
||||
# queue signal — could be a misbehaving proxy or a future protocol
|
||||
# revision. Don't treat it as queued. Falls through to the existing
|
||||
# retry-then-FAILED path.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"some_other_field": "value"},
|
||||
)
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
status = await _invoke_and_wait(mod)
|
||||
|
||||
assert status["status"] == "failed", (
|
||||
f"bare 202 should not be treated as queued, expected failed, got {status}"
|
||||
)
|
||||
|
||||
|
||||
class TestQueuedLazyRefresh:
|
||||
"""When a delegation is QUEUED, check_delegation_status must lazily
|
||||
refresh from the platform's GET /delegations to pick up drain-stitch
|
||||
completions. Without this refresh, the LLM sees "queued" forever
|
||||
because the platform never pushes back to the runtime.
|
||||
|
||||
Pre-fix the docstring told the LLM to wait on QUEUED. With no refresh
|
||||
path, "wait" was permanent. These tests pin the refresh behavior so
|
||||
the docstring is actually load-bearing."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_resolves_to_completed_via_lazy_refresh(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
# Step 1: invoke delegation, peer returns 202+queued, local
|
||||
# status becomes QUEUED.
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
initial = await _invoke_and_wait(mod)
|
||||
assert initial["status"] == "queued"
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
# Step 2: simulate platform's drain having stitched a completed
|
||||
# result. GET /workspaces/<self>/delegations now returns a
|
||||
# 'completed' delegate_result row matching our task_id.
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "completed",
|
||||
"summary": "Delegation completed (peer reply)",
|
||||
"response_preview": "the peer's actual reply text",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "completed", (
|
||||
f"lazy refresh should advance QUEUED → completed; got {refreshed}"
|
||||
)
|
||||
assert refreshed.get("result") == "the peer's actual reply text"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_resolves_to_failed_via_lazy_refresh(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "failed",
|
||||
"error": "peer timed out after 30 min",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "failed"
|
||||
assert refreshed.get("error") == "peer timed out after 30 min"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_stays_queued_when_platform_not_resolved(self, delegation_mocks):
|
||||
# Realistic case: LLM polls before platform's drain has fired.
|
||||
# Refresh sees only the queued row → no state change. Subsequent
|
||||
# poll will retry.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "queued", # not yet resolved
|
||||
"summary": "Delegation queued — target at capacity",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "queued"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_is_safe_when_platform_unreachable(self, delegation_mocks):
|
||||
# Platform GET fails (network blip). Refresh must not raise —
|
||||
# local state stays QUEUED so the next poll retries.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(side_effect=httpx.ConnectError("network down"))
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
# Doesn't raise; local state preserved.
|
||||
assert refreshed["status"] == "queued"
|
||||
|
||||
|
||||
class TestA2AErrors:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Loading…
Reference in New Issue
Block a user