feat(delegations): agent-side cutover — sync delegate uses async+poll path (RFC #2829 PR-5)

Behind feature flag DELEGATION_SYNC_VIA_INBOX (default off). When set,
tool_delegate_task no longer holds an HTTP message/send connection
through the platform proxy waiting for the callee's reply. Instead:

  1. POST /workspaces/<src>/delegate (returns 202 + delegation_id)
     — platform's executeDelegation goroutine handles A2A dispatch
     in the background. No client-side timeout dependency on the
     platform holding a connection open.
  2. Poll GET /workspaces/<src>/delegations every 3s for a row with
     matching delegation_id reaching terminal status (completed/failed).
  3. Return the response_preview text on completed; surface the
     wrapped _A2A_ERROR_PREFIX error on failed (so caller error
     detection stays unchanged).

This closes the bug class that broke Hongming's home hermes on
2026-05-05 ("message/send queued but result not available after 600s
timeout" while the callee was actively heartbeating "iteration 14/90").

## Compatibility

Default-off feature flag — flag-off path is byte-identical to the
legacy send_a2a_message behavior, pinned by
TestFlagOffLegacyPath::test_flag_off_uses_send_a2a_message_not_polling.

Idempotency-key derivation matches tool_delegate_task_async (SHA-256
of source:target:task) so a restart-mid-delegation gets the same key
and the platform returns the existing delegation_id.

## Recovery on timeout

If the polling budget (DELEGATION_TIMEOUT, default 300s) elapses
without a terminal status, the error message includes the
delegation_id + a "call check_task_status('<id>') to retrieve later"
hint. The platform's durable row is still live — work is NOT lost,
just the synchronous wait is over. Caller can poll for the result
later via the existing check_task_status tool.

## Stack with PR-2

PR-2 added the SERVER-SIDE result-push to the caller's a2a_receive
inbox row. PR-5 (this PR) adds the AGENT-SIDE cutover. Together they
remove the proxy-blocked sync path entirely. PR-2 default-off keeps
existing behavior; PR-5 default-off keeps existing behavior. Operators
flip both for full effect after staging burn-in.

## Coverage

9 unit tests:

  - flag off → byte-identical to legacy (send_a2a_message called,
    _delegate_sync_via_polling NOT called)
  - dispatch HTTP exception → wrapped error
  - dispatch non-2xx → wrapped error mentioning HTTP code
  - dispatch missing delegation_id → wrapped error
  - completed first poll → response_preview returned
  - failed status → wrapped error with error_detail
  - transient poll error → keeps polling, eventually succeeds
  - deadline exceeded → wrapped timeout error mentions delegation_id +
    check_task_status hint for recovery
  - filters by delegation_id (other delegations' rows ignored)

All passing locally. CI will run the same suite on a clean env.

Refs RFC #2829.
This commit is contained in:
Hongming Wang 2026-05-04 21:31:11 -07:00
parent ae05f91bd8
commit 5950d4cd81
2 changed files with 476 additions and 4 deletions

View File

@ -191,6 +191,145 @@ async def report_activity(
pass # Best-effort — don't block delegation on activity reporting
# RFC #2829 PR-5 cutover constants. The poll cadence + timeout are
# intentionally generous: 3s gives the platform's executeDelegation
# goroutine room to dispatch + the callee to respond + the result to
# write to activity_logs without thrashing the platform with rapid
# polls; the budget matches the legacy DELEGATION_TIMEOUT (300s) so
# operators don't see behavior change beyond "no more 600s timeouts".
_SYNC_POLL_INTERVAL_S = 3.0
_SYNC_POLL_BUDGET_S = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
async def _delegate_sync_via_polling(
workspace_id: str,
task: str,
src: str,
) -> str:
"""RFC #2829 PR-5: durable async delegation + poll for terminal status.
Sidesteps the platform proxy's blocking `message/send` HTTP path that
hits a hard 600s ceiling. Instead:
1. POST /workspaces/<src>/delegate (async, returns 202 + delegation_id)
platform's executeDelegation goroutine handles A2A dispatch in
the background. No client-side timeout dependency on the platform
holding a connection open.
2. Poll GET /workspaces/<src>/delegations every 3s for a row with
matching delegation_id reaching terminal status (completed/failed).
3. Return the response_preview text on completed; surface error_detail
on failed (with the same _A2A_ERROR_PREFIX wrapping the legacy
path uses, so caller error-detection logic is unchanged).
Both /delegate and /delegations are existing endpoints this helper
just composes them into a polling synchronous facade. The result is
available the moment the platform writes the terminal status row;
no extra latency vs. the legacy proxy-blocked path on fast cases.
"""
import asyncio
import time
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
# 1. Dispatch via /delegate (the async, durable path).
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{src}/delegate",
json={
"target_id": workspace_id,
"task": task,
"idempotency_key": idem_key,
},
headers=_auth_headers_for_heartbeat(src),
)
except Exception as e: # pylint: disable=broad-except
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: {e}"
if resp.status_code != 202 and resp.status_code != 200:
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: HTTP {resp.status_code} {resp.text[:200]}"
try:
dispatch = resp.json()
except Exception as e: # pylint: disable=broad-except
return f"{_A2A_ERROR_PREFIX}delegate dispatch returned non-JSON: {e}"
delegation_id = dispatch.get("delegation_id", "")
if not delegation_id:
return f"{_A2A_ERROR_PREFIX}delegate dispatch missing delegation_id: {dispatch}"
# 2. Poll for terminal status with a deadline. Each poll is a cheap
# /delegations GET — bounded by the platform's existing rate limit.
deadline = time.monotonic() + _SYNC_POLL_BUDGET_S
last_status = "unknown"
while time.monotonic() < deadline:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
poll = await client.get(
f"{PLATFORM_URL}/workspaces/{src}/delegations",
headers=_auth_headers_for_heartbeat(src),
)
except Exception as e: # pylint: disable=broad-except
# Transient — keep polling. The platform IS holding the
# delegation row; we just lost a network request.
last_status = f"poll-error: {e}"
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
continue
if poll.status_code != 200:
last_status = f"poll HTTP {poll.status_code}"
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
continue
try:
rows = poll.json()
except Exception as e: # pylint: disable=broad-except
last_status = f"poll non-JSON: {e}"
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
continue
# /delegations returns a flat list of delegation events. Filter to
# our delegation_id; pick the first terminal one. The list may
# have multiple rows per delegation_id (one for the original
# dispatch, one per status update); we want the latest terminal.
if not isinstance(rows, list):
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
continue
terminal = None
for r in rows:
if not isinstance(r, dict):
continue
if r.get("delegation_id") != delegation_id:
continue
status = (r.get("status") or "").lower()
last_status = status
if status in ("completed", "failed"):
terminal = r
break
if terminal:
if (terminal.get("status") or "").lower() == "completed":
return terminal.get("response_preview") or ""
err = (
terminal.get("error_detail")
or terminal.get("summary")
or "delegation failed"
)
return f"{_A2A_ERROR_PREFIX}{err}"
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
# Budget exhausted — the platform's row is still in flight (or queued).
# Surface as an error so the caller can decide to retry or fall back;
# the platform DOES still have the durable row, so the work isn't
# lost — it'll complete eventually and a future check_task_status
# will surface the result.
return (
f"{_A2A_ERROR_PREFIX}polling timeout after {_SYNC_POLL_BUDGET_S}s "
f"(delegation_id={delegation_id}, last_status={last_status}); "
f"the platform is still working on it — call check_task_status('{delegation_id}') to retrieve later"
)
async def tool_delegate_task(
workspace_id: str,
task: str,
@ -229,6 +368,18 @@ async def tool_delegate_task(
# Brief summary for canvas display — just the delegation target
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
# RFC #2829 PR-5: agent-side cutover. When DELEGATION_SYNC_VIA_INBOX=1,
# use the platform's durable async delegation API (POST /delegate +
# poll /delegations) instead of the proxy-blocked message/send path.
# This sidesteps the 600s message/send timeout class that broke
# iteration-14/90-style long-running delegations on 2026-05-05.
#
# Default off — staging-canary first, flip default after PR-2's
# result-push flag (DELEGATION_RESULT_INBOX_PUSH) has been on for
# ≥1 week without incident.
if os.environ.get("DELEGATION_SYNC_VIA_INBOX") == "1":
result = await _delegate_sync_via_polling(workspace_id, task, src or WORKSPACE_ID)
else:
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
# (the platform proxy) so the same code works for in-container and
# external (standalone molecule-mcp) callers.

View File

@ -0,0 +1,321 @@
"""RFC #2829 PR-5: tests for the agent-side cutover that replaces the
proxy-blocked send_a2a_message sync path with delegate-then-poll.
Coverage:
- Flag off (default) byte-identical to legacy: tool_delegate_task
calls send_a2a_message and never touches /delegate.
- Flag on, dispatch fails wrapped error returned, no infinite poll.
- Flag on, dispatch returns no delegation_id wrapped error.
- Flag on, completed status on first poll response_preview returned.
- Flag on, failed status wrapped error with error_detail.
- Flag on, transient poll error keeps polling, eventually succeeds.
- Flag on, deadline exceeded wrapped timeout error mentions
delegation_id so caller can pick it up via check_task_status later.
- Idempotency key is consistent with the legacy path's hashing.
"""
import json
import os
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
# WORKSPACE_ID + PLATFORM_URL are checked at a2a_client import time.
# CI ships them via the workflow env block; for local pytest runs we
# set them here so the test file can import a2a_tools at module scope
# (matching the pattern in test_a2a_tools_impl.py — that file relies
# on the same CI env shape).
os.environ.setdefault("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
os.environ.setdefault("PLATFORM_URL", "http://localhost:8080")
def _resp(status_code, payload, text=None):
r = MagicMock()
r.status_code = status_code
r.json = MagicMock(return_value=payload)
r.text = text or json.dumps(payload)
return r
def _make_client(post_resp=None, get_resps=None, post_exc=None):
"""Build an AsyncClient mock where get() returns a sequence of responses
(one per call) so we can simulate multiple poll rounds.
"""
mc = AsyncMock()
mc.__aenter__ = AsyncMock(return_value=mc)
mc.__aexit__ = AsyncMock(return_value=False)
if post_exc is not None:
mc.post = AsyncMock(side_effect=post_exc)
else:
mc.post = AsyncMock(return_value=post_resp or _resp(202, {"delegation_id": "deleg-1"}))
if get_resps is None:
get_resps = [_resp(200, [])]
mc.get = AsyncMock(side_effect=get_resps)
return mc
# ---------------------------------------------------------------------------
# Flag-off: legacy path is preserved
# ---------------------------------------------------------------------------
class TestFlagOffLegacyPath:
async def test_flag_off_uses_send_a2a_message_not_polling(self, monkeypatch):
"""With DELEGATION_SYNC_VIA_INBOX unset, tool_delegate_task must
invoke the legacy send_a2a_message and NEVER call /delegate."""
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
send_calls = []
async def fake_send(workspace_id, task, source_workspace_id=None):
send_calls.append((workspace_id, task, source_workspace_id))
return "legacy ok"
async def fake_discover(*_a, **_kw):
return {"name": "peer-name", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task body", source_workspace_id="ws-self"
)
assert result == "legacy ok", f"expected legacy passthrough, got {result!r}"
assert send_calls == [("ws-target", "task body", "ws-self")]
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# Flag-on: dispatch failures
# ---------------------------------------------------------------------------
class TestFlagOnDispatchFailures:
async def test_dispatch_http_exception_returns_wrapped_error(self, monkeypatch):
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
import a2a_tools
mc = _make_client(post_exc=httpx.ConnectError("network down"))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
assert "delegate dispatch failed" in res
async def test_dispatch_non_2xx_returns_wrapped_error(self, monkeypatch):
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
import a2a_tools
mc = _make_client(post_resp=_resp(403, {"error": "forbidden"}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
assert "HTTP 403" in res
async def test_dispatch_missing_delegation_id_returns_wrapped_error(self, monkeypatch):
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
import a2a_tools
# 202 Accepted but no delegation_id field — defensive shape check.
mc = _make_client(post_resp=_resp(202, {"status": "delegated"}))
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
assert "missing delegation_id" in res
# ---------------------------------------------------------------------------
# Flag-on: polling outcomes
# ---------------------------------------------------------------------------
class TestFlagOnPollingOutcomes:
async def test_completed_first_poll_returns_response_preview(self, monkeypatch):
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
# Tighten budget to a few seconds so the test never blocks long.
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
import importlib
import a2a_tools
importlib.reload(a2a_tools) # pick up new env-driven _SYNC_POLL_BUDGET_S
completed_row = {
"delegation_id": "deleg-1",
"status": "completed",
"response_preview": "the answer",
}
mc = _make_client(
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
get_resps=[_resp(200, [completed_row])],
)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res == "the answer"
# Cleanup: restore the module to default state for subsequent tests.
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
importlib.reload(a2a_tools)
async def test_failed_status_returns_wrapped_error_with_detail(self, monkeypatch):
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
import importlib
import a2a_tools
importlib.reload(a2a_tools)
failed_row = {
"delegation_id": "deleg-1",
"status": "failed",
"error_detail": "callee unreachable",
}
mc = _make_client(
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
get_resps=[_resp(200, [failed_row])],
)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
assert "callee unreachable" in res
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
importlib.reload(a2a_tools)
async def test_transient_poll_error_then_completed_succeeds(self, monkeypatch):
"""A network blip during polling must NOT abort — keep polling."""
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
monkeypatch.setenv("DELEGATION_TIMEOUT", "30")
import importlib
import a2a_tools
importlib.reload(a2a_tools)
# Speed up: monkey-patch the poll interval to 0.01s so we don't
# actually wait 3s between rounds in the test.
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01)
completed_row = {
"delegation_id": "deleg-1",
"status": "completed",
"response_preview": "eventually ok",
}
# First poll raises, second poll returns completed.
get_seq = [
httpx.ConnectError("transient"),
_resp(200, [completed_row]),
]
mc = _make_client(
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
get_resps=get_seq,
)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res == "eventually ok"
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
importlib.reload(a2a_tools)
async def test_deadline_exceeded_returns_recovery_hint(self, monkeypatch):
"""When the budget runs out without a terminal status, the error
must surface delegation_id + a check_task_status hint so the
caller can recover the result."""
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
monkeypatch.setenv("DELEGATION_TIMEOUT", "1") # 1s budget
import importlib
import a2a_tools
importlib.reload(a2a_tools)
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.05)
# Endless in-progress responses.
in_progress_row = {
"delegation_id": "deleg-1",
"status": "in_progress",
}
get_seq = [_resp(200, [in_progress_row])] * 50
mc = _make_client(
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
get_resps=get_seq,
)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
assert "polling timeout" in res
assert "deleg-1" in res, "must surface delegation_id for recovery"
assert "check_task_status" in res, "must hint at the recovery tool"
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
importlib.reload(a2a_tools)
async def test_poll_filters_by_delegation_id_ignoring_other_rows(self, monkeypatch):
"""Other delegations' rows in the response must NOT be picked up
by mistake we pin to delegation_id."""
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
import importlib
import a2a_tools
importlib.reload(a2a_tools)
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01)
# First poll: no row matching ours, BUT a completed row for
# someone else's delegation. We must NOT return that one.
# Second poll: ours completes.
first_poll = _resp(200, [
{"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"},
])
second_poll = _resp(200, [
{"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"},
{"delegation_id": "deleg-1", "status": "completed", "response_preview": "right"},
])
mc = _make_client(
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
get_resps=[first_poll, second_poll],
)
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
res = await a2a_tools._delegate_sync_via_polling(
"ws-target", "task", "ws-self"
)
assert res == "right", f"must filter to delegation_id, got {res!r}"
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
importlib.reload(a2a_tools)
# ---------------------------------------------------------------------------
# pytest-asyncio collection marker
# ---------------------------------------------------------------------------
pytestmark = pytest.mark.asyncio