fix(delegation): lazy-refresh QUEUED state from platform; live DELEGATION_* events
Critical follow-up to PR #2126's review. Two real bugs: 1. **Runtime QUEUED never resolved.** Platform's drain stitch updates the platform's delegate_result row when a queued delegation finally completes, but never pushes back to the runtime. The LLM polling check_delegation_status saw status="queued" forever — combined with the new docstring guidance ("queued → wait, peer will reply"), the model would wait indefinitely on a state that never resolves. Strictly worse than pre-PR behavior where it would have at least bypassed. 2. **Live updates dead code.** delegation.go writes activity rows by direct INSERT INTO activity_logs, bypassing the LogActivity helper that fires ACTIVITY_LOGGED. Adding "delegation" to the canvas's ACTIVITY_LOGGED filter (PR #2126 first cut) was inert — initial GET worked, live updates did not. Fix: (1) Runtime side, workspace/builtin_tools/delegation.py: - New `_refresh_queued_from_platform(task_id)` async helper that pulls /workspaces/<self>/delegations and finds the platform-side delegate_result row for our task_id. - check_delegation_status calls _refresh when local status is QUEUED, so the LLM's poll itself drives state convergence. - Best-effort: GET failure leaves local state untouched, next poll retries. - Docstring updated to reflect the actual behavior ("polls transparently — keep polling and you'll see the flip"). - 4 new tests cover: QUEUED → completed via refresh; QUEUED → failed via refresh; refresh keeps QUEUED when platform hasn't resolved; refresh swallows network errors safely. (2) Canvas side, AgentCommsPanel.tsx WS push handler: - Listens for DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE / DELEGATION_FAILED in addition to ACTIVITY_LOGGED. - Each event's payload synthesized into an ActivityEntry shape so toCommMessage's existing delegation branch maps it. Status derived: STATUS uses payload.status, COMPLETE → "completed", FAILED → "failed", SENT → "pending". - The ACTIVITY_LOGGED branch keeps the "delegation" type accepted as a no-op-today / future-proof path: if delegation handlers are ever refactored to call LogActivity, this lights up automatically without another canvas change. Doesn't change: the docstring guidance ("queued → wait, don't bypass") is now actually load-bearing because the refresh path will deliver the eventual outcome. Without the refresh, the guidance was a trap. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
057876cb0c
commit
5071454074
@ -213,14 +213,34 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.event === "ACTIVITY_LOGGED" && msg.workspace_id === workspaceId) {
|
||||
if (msg.workspace_id !== workspaceId) return;
|
||||
|
||||
// Two live-update paths:
|
||||
// 1. ACTIVITY_LOGGED — fired by the LogActivity helper for
|
||||
// a2a_send / a2a_receive (and delegation rows IF the
|
||||
// delegation handler is ever refactored to use it). Today
|
||||
// the platform's delegation handlers do direct INSERT
|
||||
// INTO activity_logs WITHOUT firing ACTIVITY_LOGGED, so
|
||||
// the delegation branch here is defensive — it'd light
|
||||
// up automatically the day delegation handlers are
|
||||
// refactored to call LogActivity.
|
||||
// 2. DELEGATION_SENT / DELEGATION_STATUS / DELEGATION_COMPLETE
|
||||
// / DELEGATION_FAILED — fired by the platform's delegation
|
||||
// handlers directly. These are the ONLY live signals the
|
||||
// panel currently has for delegation rows; the GET on
|
||||
// mount (which reads from activity_logs) shows them, but
|
||||
// without this branch, nothing surfaced live until the
|
||||
// next remount. Synthesise an ActivityEntry from the
|
||||
// payload so toCommMessage's existing delegation branch
|
||||
// handles them identically to the GET path.
|
||||
let entry: ActivityEntry | null = null;
|
||||
if (msg.event === "ACTIVITY_LOGGED") {
|
||||
const p = msg.payload || {};
|
||||
const type = p.activity_type as string;
|
||||
const sourceId = p.source_id as string | null;
|
||||
if (!sourceId) return; // canvas-initiated, not agent comms
|
||||
if (type !== "a2a_send" && type !== "a2a_receive" && type !== "delegation") return;
|
||||
|
||||
const entry: ActivityEntry = {
|
||||
entry = {
|
||||
id: p.id as string || crypto.randomUUID(),
|
||||
activity_type: type,
|
||||
source_id: sourceId,
|
||||
@ -232,13 +252,56 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
|
||||
status: p.status as string || "ok",
|
||||
created_at: msg.timestamp || new Date().toISOString(),
|
||||
};
|
||||
const m = toCommMessage(entry, workspaceId);
|
||||
if (m) {
|
||||
const key = `${m.timestamp}:${m.flow}:${m.peerId}`;
|
||||
if (seenKeys.current.has(key)) return;
|
||||
seenKeys.current.add(key);
|
||||
setMessages((prev) => [...prev, m]);
|
||||
} else if (
|
||||
msg.event === "DELEGATION_SENT" ||
|
||||
msg.event === "DELEGATION_STATUS" ||
|
||||
msg.event === "DELEGATION_COMPLETE" ||
|
||||
msg.event === "DELEGATION_FAILED"
|
||||
) {
|
||||
const p = msg.payload || {};
|
||||
const targetId = (p.target_id as string) || "";
|
||||
if (!targetId) return;
|
||||
// Map event → status. DELEGATION_STATUS payload includes its
|
||||
// own `status` field (queued / dispatched). Other events have
|
||||
// implicit status: SENT → pending, COMPLETE → completed,
|
||||
// FAILED → failed.
|
||||
let status: string;
|
||||
let summary: string;
|
||||
if (msg.event === "DELEGATION_STATUS") {
|
||||
status = (p.status as string) || "queued";
|
||||
summary = `Delegation ${status}`;
|
||||
} else if (msg.event === "DELEGATION_COMPLETE") {
|
||||
status = "completed";
|
||||
summary = `Delegation completed (${(p.response_preview as string)?.slice(0, 60) || ""})`;
|
||||
} else if (msg.event === "DELEGATION_FAILED") {
|
||||
status = "failed";
|
||||
summary = `Delegation failed: ${(p.error as string) || "unknown"}`;
|
||||
} else {
|
||||
status = "pending";
|
||||
summary = `Delegating to ${(p.target_id as string)?.slice(0, 8) || "peer"}`;
|
||||
}
|
||||
entry = {
|
||||
id: (p.delegation_id as string) || crypto.randomUUID(),
|
||||
activity_type: "delegation",
|
||||
source_id: workspaceId,
|
||||
target_id: targetId,
|
||||
method: msg.event === "DELEGATION_SENT" ? "delegate" : "delegate_result",
|
||||
summary,
|
||||
request_body: null,
|
||||
response_body: null,
|
||||
status,
|
||||
created_at: msg.timestamp || new Date().toISOString(),
|
||||
};
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
|
||||
const m = toCommMessage(entry, workspaceId);
|
||||
if (m) {
|
||||
const key = `${m.timestamp}:${m.flow}:${m.peerId}`;
|
||||
if (seenKeys.current.has(key)) return;
|
||||
seenKeys.current.add(key);
|
||||
setMessages((prev) => [...prev, m]);
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
};
|
||||
|
||||
@ -126,6 +126,69 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str,
|
||||
logger.debug("Delegation record failed (best-effort): %s", e)
|
||||
|
||||
|
||||
async def _refresh_queued_from_platform(task_id: str) -> bool:
|
||||
"""Lazy-refresh a QUEUED delegation's local state from the platform.
|
||||
|
||||
Called by check_delegation_status when local status is QUEUED. The
|
||||
platform's drain stitch (a2a_queue.go) updates the delegate_result
|
||||
activity_logs row when a queued delegation eventually completes,
|
||||
but it has no callback to this runtime — without this lazy refresh,
|
||||
the LLM polling check_delegation_status would see "queued" forever
|
||||
even after the platform has the result.
|
||||
|
||||
Returns True if the local delegation was updated to a terminal state
|
||||
(completed/failed), False otherwise. Best-effort — network/parse
|
||||
errors leave the local state untouched and let the next call retry.
|
||||
"""
|
||||
delegation = _delegations.get(task_id)
|
||||
if not delegation:
|
||||
return False
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(
|
||||
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/delegations",
|
||||
headers={},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return False
|
||||
entries = resp.json()
|
||||
if not isinstance(entries, list):
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.debug("refresh queued delegation %s: %s", task_id, e)
|
||||
return False
|
||||
# Find the latest delegate_result row matching our task_id.
|
||||
# Platform list is newest-first; the first match is the freshest.
|
||||
for entry in entries:
|
||||
if entry.get("delegation_id") != task_id:
|
||||
continue
|
||||
if entry.get("type") != "delegation":
|
||||
continue
|
||||
# Only delegate_result rows carry the eventual outcome; the
|
||||
# initial 'delegate' row stays at status='pending' even after
|
||||
# the result lands. Filtering on summary text is brittle, but
|
||||
# the rows from the LIST endpoint don't include `method`. The
|
||||
# `delegate_result` rows are the ones with `error` (failure)
|
||||
# or `response_preview` (success) populated — pick those.
|
||||
status = entry.get("status", "")
|
||||
if status == "completed":
|
||||
delegation.status = DelegationStatus.COMPLETED
|
||||
delegation.result = entry.get("response_preview", "")
|
||||
await _notify_completion(task_id, delegation.workspace_id, "completed")
|
||||
return True
|
||||
if status == "failed":
|
||||
delegation.status = DelegationStatus.FAILED
|
||||
delegation.error = entry.get("error", "")
|
||||
await _notify_completion(task_id, delegation.workspace_id, "failed")
|
||||
return True
|
||||
# status == "queued" / "pending" / "dispatched": platform hasn't
|
||||
# resolved yet; leave local state unchanged so the next poll
|
||||
# retries. Don't break — keep scanning in case there's a newer
|
||||
# entry for the same task_id (possible if the same delegation
|
||||
# was retried).
|
||||
return False
|
||||
|
||||
|
||||
async def _update_delegation_on_platform(task_id: str, status: str, error: str = "", response_preview: str = ""):
|
||||
"""Mirror status changes to the platform's activity_logs (#64 fix).
|
||||
|
||||
@ -369,10 +432,13 @@ async def check_delegation_status(
|
||||
- "pending" / "in_progress" → peer is actively working. Wait and check again.
|
||||
- "queued" → peer's a2a-proxy accepted the call but the peer is
|
||||
processing a prior task. The reply WILL arrive — the platform's
|
||||
drain re-dispatches when the peer is free. Do NOT retry the
|
||||
delegation. Do NOT do the work yourself. Acknowledge to the user
|
||||
that the peer is busy and will reply, then continue with other
|
||||
delegations or check back later.
|
||||
drain re-dispatches when the peer is free. This tool transparently
|
||||
polls the platform for the eventual outcome on each call, so
|
||||
keep polling check_delegation_status periodically and you'll see
|
||||
the status flip to "completed" / "failed" automatically.
|
||||
Do NOT retry the delegation. Do NOT do the work yourself.
|
||||
Acknowledge to the user that the peer is busy and will reply,
|
||||
then continue with other delegations or check back later.
|
||||
- "completed" → result is in the `result` field.
|
||||
- "failed" → real failure (network, peer crashed, etc.). The
|
||||
`error` field has the cause. Only fall back to doing the work
|
||||
@ -405,6 +471,16 @@ async def check_delegation_status(
|
||||
if not delegation:
|
||||
return {"error": f"No delegation found with task_id {task_id}"}
|
||||
|
||||
# Lazy refresh for QUEUED entries: the platform's drain stitch
|
||||
# updates its activity_logs row when the queued delegation
|
||||
# eventually completes, but doesn't push back to this runtime.
|
||||
# Without this refresh, the LLM polling here would see "queued"
|
||||
# forever even after the result is available — exactly the bug
|
||||
# the upstream director-bypass docstring guidance warned against.
|
||||
if delegation.status == DelegationStatus.QUEUED:
|
||||
await _refresh_queued_from_platform(task_id)
|
||||
# delegation is the same dict entry — _refresh mutates in-place.
|
||||
|
||||
result = {
|
||||
"task_id": task_id,
|
||||
"workspace_id": delegation.workspace_id,
|
||||
|
||||
@ -353,6 +353,178 @@ class TestA2AQueued:
|
||||
)
|
||||
|
||||
|
||||
class TestQueuedLazyRefresh:
|
||||
"""When a delegation is QUEUED, check_delegation_status must lazily
|
||||
refresh from the platform's GET /delegations to pick up drain-stitch
|
||||
completions. Without this refresh, the LLM sees "queued" forever
|
||||
because the platform never pushes back to the runtime.
|
||||
|
||||
Pre-fix the docstring told the LLM to wait on QUEUED. With no refresh
|
||||
path, "wait" was permanent. These tests pin the refresh behavior so
|
||||
the docstring is actually load-bearing."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_resolves_to_completed_via_lazy_refresh(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
# Step 1: invoke delegation, peer returns 202+queued, local
|
||||
# status becomes QUEUED.
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
initial = await _invoke_and_wait(mod)
|
||||
assert initial["status"] == "queued"
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
# Step 2: simulate platform's drain having stitched a completed
|
||||
# result. GET /workspaces/<self>/delegations now returns a
|
||||
# 'completed' delegate_result row matching our task_id.
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "completed",
|
||||
"summary": "Delegation completed (peer reply)",
|
||||
"response_preview": "the peer's actual reply text",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "completed", (
|
||||
f"lazy refresh should advance QUEUED → completed; got {refreshed}"
|
||||
)
|
||||
assert refreshed.get("result") == "the peer's actual reply text"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_resolves_to_failed_via_lazy_refresh(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "failed",
|
||||
"error": "peer timed out after 30 min",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "failed"
|
||||
assert refreshed.get("error") == "peer timed out after 30 min"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queued_stays_queued_when_platform_not_resolved(self, delegation_mocks):
|
||||
# Realistic case: LLM polls before platform's drain has fired.
|
||||
# Refresh sees only the queued row → no state change. Subsequent
|
||||
# poll will retry.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
list_response = MagicMock()
|
||||
list_response.status_code = 200
|
||||
list_response.json.return_value = [
|
||||
{
|
||||
"delegation_id": task_id,
|
||||
"type": "delegation",
|
||||
"status": "queued", # not yet resolved
|
||||
"summary": "Delegation queued — target at capacity",
|
||||
"source_id": "ws-self",
|
||||
"target_id": "target",
|
||||
},
|
||||
]
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(return_value=list_response)
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
assert refreshed["status"] == "queued"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_refresh_is_safe_when_platform_unreachable(self, delegation_mocks):
|
||||
# Platform GET fails (network blip). Refresh must not raise —
|
||||
# local state stays QUEUED so the next poll retries.
|
||||
mod, *_ = delegation_mocks
|
||||
_, mock_cls_queued = _make_mock_client(
|
||||
a2a_status=202,
|
||||
a2a_payload={"queued": True},
|
||||
)
|
||||
with patch("httpx.AsyncClient", mock_cls_queued):
|
||||
await _invoke_and_wait(mod)
|
||||
task_id = next(iter(mod._delegations))
|
||||
|
||||
refresh_client = AsyncMock()
|
||||
refresh_client.get = AsyncMock(side_effect=httpx.ConnectError("network down"))
|
||||
refresh_client.post = AsyncMock(return_value=MagicMock(status_code=200))
|
||||
refresh_cls = MagicMock()
|
||||
refresh_cls.return_value.__aenter__ = AsyncMock(return_value=refresh_client)
|
||||
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
with patch("httpx.AsyncClient", refresh_cls):
|
||||
fn = mod.check_delegation_status
|
||||
if hasattr(fn, "ainvoke"):
|
||||
refreshed = await fn.ainvoke({"task_id": task_id})
|
||||
else:
|
||||
refreshed = await fn(task_id=task_id)
|
||||
|
||||
# Doesn't raise; local state preserved.
|
||||
assert refreshed["status"] == "queued"
|
||||
|
||||
|
||||
class TestA2AErrors:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Loading…
Reference in New Issue
Block a user