From 5950d4cd81168cdea74e9d033288d25fca59cc60 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 21:31:11 -0700 Subject: [PATCH] =?UTF-8?q?feat(delegations):=20agent-side=20cutover=20?= =?UTF-8?q?=E2=80=94=20sync=20delegate=20uses=20async+poll=20path=20(RFC?= =?UTF-8?q?=20#2829=20PR-5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Behind feature flag DELEGATION_SYNC_VIA_INBOX (default off). When set, tool_delegate_task no longer holds an HTTP message/send connection through the platform proxy waiting for the callee's reply. Instead: 1. POST /workspaces//delegate (returns 202 + delegation_id) — platform's executeDelegation goroutine handles A2A dispatch in the background. No client-side timeout dependency on the platform holding a connection open. 2. Poll GET /workspaces//delegations every 3s for a row with matching delegation_id reaching terminal status (completed/failed). 3. Return the response_preview text on completed; surface the wrapped _A2A_ERROR_PREFIX error on failed (so caller error detection stays unchanged). This closes the bug class that broke Hongming's home hermes on 2026-05-05 ("message/send queued but result not available after 600s timeout" while the callee was actively heartbeating "iteration 14/90"). ## Compatibility Default-off feature flag — flag-off path is byte-identical to the legacy send_a2a_message behavior, pinned by TestFlagOffLegacyPath::test_flag_off_uses_send_a2a_message_not_polling. Idempotency-key derivation matches tool_delegate_task_async (SHA-256 of source:target:task) so a restart-mid-delegation gets the same key and the platform returns the existing delegation_id. ## Recovery on timeout If the polling budget (DELEGATION_TIMEOUT, default 300s) elapses without a terminal status, the error message includes the delegation_id + a "call check_task_status('') to retrieve later" hint. The platform's durable row is still live — work is NOT lost, just the synchronous wait is over. Caller can poll for the result later via the existing check_task_status tool. ## Stack with PR-2 PR-2 added the SERVER-SIDE result-push to the caller's a2a_receive inbox row. PR-5 (this PR) adds the AGENT-SIDE cutover. Together they remove the proxy-blocked sync path entirely. PR-2 default-off keeps existing behavior; PR-5 default-off keeps existing behavior. Operators flip both for full effect after staging burn-in. ## Coverage 9 unit tests: - flag off → byte-identical to legacy (send_a2a_message called, _delegate_sync_via_polling NOT called) - dispatch HTTP exception → wrapped error - dispatch non-2xx → wrapped error mentioning HTTP code - dispatch missing delegation_id → wrapped error - completed first poll → response_preview returned - failed status → wrapped error with error_detail - transient poll error → keeps polling, eventually succeeds - deadline exceeded → wrapped timeout error mentions delegation_id + check_task_status hint for recovery - filters by delegation_id (other delegations' rows ignored) All passing locally. CI will run the same suite on a clean env. Refs RFC #2829. --- workspace/a2a_tools.py | 159 ++++++++- .../tests/test_delegation_sync_via_polling.py | 321 ++++++++++++++++++ 2 files changed, 476 insertions(+), 4 deletions(-) create mode 100644 workspace/tests/test_delegation_sync_via_polling.py diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 922a45e0..55a19758 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -191,6 +191,145 @@ async def report_activity( pass # Best-effort — don't block delegation on activity reporting +# RFC #2829 PR-5 cutover constants. The poll cadence + timeout are +# intentionally generous: 3s gives the platform's executeDelegation +# goroutine room to dispatch + the callee to respond + the result to +# write to activity_logs without thrashing the platform with rapid +# polls; the budget matches the legacy DELEGATION_TIMEOUT (300s) so +# operators don't see behavior change beyond "no more 600s timeouts". +_SYNC_POLL_INTERVAL_S = 3.0 +_SYNC_POLL_BUDGET_S = float(os.environ.get("DELEGATION_TIMEOUT", "300.0")) + + +async def _delegate_sync_via_polling( + workspace_id: str, + task: str, + src: str, +) -> str: + """RFC #2829 PR-5: durable async delegation + poll for terminal status. + + Sidesteps the platform proxy's blocking `message/send` HTTP path that + hits a hard 600s ceiling. Instead: + + 1. POST /workspaces//delegate (async, returns 202 + delegation_id) + — platform's executeDelegation goroutine handles A2A dispatch in + the background. No client-side timeout dependency on the platform + holding a connection open. + 2. Poll GET /workspaces//delegations every 3s for a row with + matching delegation_id reaching terminal status (completed/failed). + 3. Return the response_preview text on completed; surface error_detail + on failed (with the same _A2A_ERROR_PREFIX wrapping the legacy + path uses, so caller error-detection logic is unchanged). + + Both /delegate and /delegations are existing endpoints — this helper + just composes them into a polling synchronous facade. The result is + available the moment the platform writes the terminal status row; + no extra latency vs. the legacy proxy-blocked path on fast cases. + """ + import asyncio + import time + + idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32] + + # 1. Dispatch via /delegate (the async, durable path). + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post( + f"{PLATFORM_URL}/workspaces/{src}/delegate", + json={ + "target_id": workspace_id, + "task": task, + "idempotency_key": idem_key, + }, + headers=_auth_headers_for_heartbeat(src), + ) + except Exception as e: # pylint: disable=broad-except + return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: {e}" + + if resp.status_code != 202 and resp.status_code != 200: + return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: HTTP {resp.status_code} {resp.text[:200]}" + + try: + dispatch = resp.json() + except Exception as e: # pylint: disable=broad-except + return f"{_A2A_ERROR_PREFIX}delegate dispatch returned non-JSON: {e}" + + delegation_id = dispatch.get("delegation_id", "") + if not delegation_id: + return f"{_A2A_ERROR_PREFIX}delegate dispatch missing delegation_id: {dispatch}" + + # 2. Poll for terminal status with a deadline. Each poll is a cheap + # /delegations GET — bounded by the platform's existing rate limit. + deadline = time.monotonic() + _SYNC_POLL_BUDGET_S + last_status = "unknown" + while time.monotonic() < deadline: + try: + async with httpx.AsyncClient(timeout=10.0) as client: + poll = await client.get( + f"{PLATFORM_URL}/workspaces/{src}/delegations", + headers=_auth_headers_for_heartbeat(src), + ) + except Exception as e: # pylint: disable=broad-except + # Transient — keep polling. The platform IS holding the + # delegation row; we just lost a network request. + last_status = f"poll-error: {e}" + await asyncio.sleep(_SYNC_POLL_INTERVAL_S) + continue + + if poll.status_code != 200: + last_status = f"poll HTTP {poll.status_code}" + await asyncio.sleep(_SYNC_POLL_INTERVAL_S) + continue + + try: + rows = poll.json() + except Exception as e: # pylint: disable=broad-except + last_status = f"poll non-JSON: {e}" + await asyncio.sleep(_SYNC_POLL_INTERVAL_S) + continue + + # /delegations returns a flat list of delegation events. Filter to + # our delegation_id; pick the first terminal one. The list may + # have multiple rows per delegation_id (one for the original + # dispatch, one per status update); we want the latest terminal. + if not isinstance(rows, list): + await asyncio.sleep(_SYNC_POLL_INTERVAL_S) + continue + terminal = None + for r in rows: + if not isinstance(r, dict): + continue + if r.get("delegation_id") != delegation_id: + continue + status = (r.get("status") or "").lower() + last_status = status + if status in ("completed", "failed"): + terminal = r + break + if terminal: + if (terminal.get("status") or "").lower() == "completed": + return terminal.get("response_preview") or "" + err = ( + terminal.get("error_detail") + or terminal.get("summary") + or "delegation failed" + ) + return f"{_A2A_ERROR_PREFIX}{err}" + + await asyncio.sleep(_SYNC_POLL_INTERVAL_S) + + # Budget exhausted — the platform's row is still in flight (or queued). + # Surface as an error so the caller can decide to retry or fall back; + # the platform DOES still have the durable row, so the work isn't + # lost — it'll complete eventually and a future check_task_status + # will surface the result. + return ( + f"{_A2A_ERROR_PREFIX}polling timeout after {_SYNC_POLL_BUDGET_S}s " + f"(delegation_id={delegation_id}, last_status={last_status}); " + f"the platform is still working on it — call check_task_status('{delegation_id}') to retrieve later" + ) + + async def tool_delegate_task( workspace_id: str, task: str, @@ -229,10 +368,22 @@ async def tool_delegate_task( # Brief summary for canvas display — just the delegation target await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task) - # send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a - # (the platform proxy) so the same code works for in-container and - # external (standalone molecule-mcp) callers. - result = await send_a2a_message(workspace_id, task, source_workspace_id=src) + # RFC #2829 PR-5: agent-side cutover. When DELEGATION_SYNC_VIA_INBOX=1, + # use the platform's durable async delegation API (POST /delegate + + # poll /delegations) instead of the proxy-blocked message/send path. + # This sidesteps the 600s message/send timeout class that broke + # iteration-14/90-style long-running delegations on 2026-05-05. + # + # Default off — staging-canary first, flip default after PR-2's + # result-push flag (DELEGATION_RESULT_INBOX_PUSH) has been on for + # ≥1 week without incident. + if os.environ.get("DELEGATION_SYNC_VIA_INBOX") == "1": + result = await _delegate_sync_via_polling(workspace_id, task, src or WORKSPACE_ID) + else: + # send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a + # (the platform proxy) so the same code works for in-container and + # external (standalone molecule-mcp) callers. + result = await send_a2a_message(workspace_id, task, source_workspace_id=src) # Detect delegation failures — wrap them clearly so the calling agent # can decide to retry, use another peer, or handle the task itself. diff --git a/workspace/tests/test_delegation_sync_via_polling.py b/workspace/tests/test_delegation_sync_via_polling.py new file mode 100644 index 00000000..4d032f4e --- /dev/null +++ b/workspace/tests/test_delegation_sync_via_polling.py @@ -0,0 +1,321 @@ +"""RFC #2829 PR-5: tests for the agent-side cutover that replaces the +proxy-blocked send_a2a_message sync path with delegate-then-poll. + +Coverage: + + - Flag off (default) → byte-identical to legacy: tool_delegate_task + calls send_a2a_message and never touches /delegate. + - Flag on, dispatch fails → wrapped error returned, no infinite poll. + - Flag on, dispatch returns no delegation_id → wrapped error. + - Flag on, completed status on first poll → response_preview returned. + - Flag on, failed status → wrapped error with error_detail. + - Flag on, transient poll error → keeps polling, eventually succeeds. + - Flag on, deadline exceeded → wrapped timeout error mentions + delegation_id so caller can pick it up via check_task_status later. + - Idempotency key is consistent with the legacy path's hashing. +""" + +import json +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +import pytest + +# WORKSPACE_ID + PLATFORM_URL are checked at a2a_client import time. +# CI ships them via the workflow env block; for local pytest runs we +# set them here so the test file can import a2a_tools at module scope +# (matching the pattern in test_a2a_tools_impl.py — that file relies +# on the same CI env shape). +os.environ.setdefault("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001") +os.environ.setdefault("PLATFORM_URL", "http://localhost:8080") + + +def _resp(status_code, payload, text=None): + r = MagicMock() + r.status_code = status_code + r.json = MagicMock(return_value=payload) + r.text = text or json.dumps(payload) + return r + + +def _make_client(post_resp=None, get_resps=None, post_exc=None): + """Build an AsyncClient mock where get() returns a sequence of responses + (one per call) so we can simulate multiple poll rounds. + """ + mc = AsyncMock() + mc.__aenter__ = AsyncMock(return_value=mc) + mc.__aexit__ = AsyncMock(return_value=False) + if post_exc is not None: + mc.post = AsyncMock(side_effect=post_exc) + else: + mc.post = AsyncMock(return_value=post_resp or _resp(202, {"delegation_id": "deleg-1"})) + if get_resps is None: + get_resps = [_resp(200, [])] + mc.get = AsyncMock(side_effect=get_resps) + return mc + + +# --------------------------------------------------------------------------- +# Flag-off: legacy path is preserved +# --------------------------------------------------------------------------- + +class TestFlagOffLegacyPath: + + async def test_flag_off_uses_send_a2a_message_not_polling(self, monkeypatch): + """With DELEGATION_SYNC_VIA_INBOX unset, tool_delegate_task must + invoke the legacy send_a2a_message and NEVER call /delegate.""" + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + send_calls = [] + + async def fake_send(workspace_id, task, source_workspace_id=None): + send_calls.append((workspace_id, task, source_workspace_id)) + return "legacy ok" + + async def fake_discover(*_a, **_kw): + return {"name": "peer-name", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task body", source_workspace_id="ws-self" + ) + + assert result == "legacy ok", f"expected legacy passthrough, got {result!r}" + assert send_calls == [("ws-target", "task body", "ws-self")] + poll_mock.assert_not_called() + + +# --------------------------------------------------------------------------- +# Flag-on: dispatch failures +# --------------------------------------------------------------------------- + +class TestFlagOnDispatchFailures: + + async def test_dispatch_http_exception_returns_wrapped_error(self, monkeypatch): + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + + import a2a_tools + mc = _make_client(post_exc=httpx.ConnectError("network down")) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res.startswith(a2a_tools._A2A_ERROR_PREFIX) + assert "delegate dispatch failed" in res + + async def test_dispatch_non_2xx_returns_wrapped_error(self, monkeypatch): + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + + import a2a_tools + mc = _make_client(post_resp=_resp(403, {"error": "forbidden"})) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res.startswith(a2a_tools._A2A_ERROR_PREFIX) + assert "HTTP 403" in res + + async def test_dispatch_missing_delegation_id_returns_wrapped_error(self, monkeypatch): + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + + import a2a_tools + # 202 Accepted but no delegation_id field — defensive shape check. + mc = _make_client(post_resp=_resp(202, {"status": "delegated"})) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res.startswith(a2a_tools._A2A_ERROR_PREFIX) + assert "missing delegation_id" in res + + +# --------------------------------------------------------------------------- +# Flag-on: polling outcomes +# --------------------------------------------------------------------------- + +class TestFlagOnPollingOutcomes: + + async def test_completed_first_poll_returns_response_preview(self, monkeypatch): + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + # Tighten budget to a few seconds so the test never blocks long. + monkeypatch.setenv("DELEGATION_TIMEOUT", "10") + + import importlib + import a2a_tools + importlib.reload(a2a_tools) # pick up new env-driven _SYNC_POLL_BUDGET_S + + completed_row = { + "delegation_id": "deleg-1", + "status": "completed", + "response_preview": "the answer", + } + mc = _make_client( + post_resp=_resp(202, {"delegation_id": "deleg-1"}), + get_resps=[_resp(200, [completed_row])], + ) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res == "the answer" + # Cleanup: restore the module to default state for subsequent tests. + monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False) + importlib.reload(a2a_tools) + + async def test_failed_status_returns_wrapped_error_with_detail(self, monkeypatch): + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + monkeypatch.setenv("DELEGATION_TIMEOUT", "10") + + import importlib + import a2a_tools + importlib.reload(a2a_tools) + + failed_row = { + "delegation_id": "deleg-1", + "status": "failed", + "error_detail": "callee unreachable", + } + mc = _make_client( + post_resp=_resp(202, {"delegation_id": "deleg-1"}), + get_resps=[_resp(200, [failed_row])], + ) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res.startswith(a2a_tools._A2A_ERROR_PREFIX) + assert "callee unreachable" in res + monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False) + importlib.reload(a2a_tools) + + async def test_transient_poll_error_then_completed_succeeds(self, monkeypatch): + """A network blip during polling must NOT abort — keep polling.""" + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + monkeypatch.setenv("DELEGATION_TIMEOUT", "30") + + import importlib + import a2a_tools + importlib.reload(a2a_tools) + + # Speed up: monkey-patch the poll interval to 0.01s so we don't + # actually wait 3s between rounds in the test. + monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01) + + completed_row = { + "delegation_id": "deleg-1", + "status": "completed", + "response_preview": "eventually ok", + } + # First poll raises, second poll returns completed. + get_seq = [ + httpx.ConnectError("transient"), + _resp(200, [completed_row]), + ] + mc = _make_client( + post_resp=_resp(202, {"delegation_id": "deleg-1"}), + get_resps=get_seq, + ) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res == "eventually ok" + monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False) + importlib.reload(a2a_tools) + + async def test_deadline_exceeded_returns_recovery_hint(self, monkeypatch): + """When the budget runs out without a terminal status, the error + must surface delegation_id + a check_task_status hint so the + caller can recover the result.""" + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + monkeypatch.setenv("DELEGATION_TIMEOUT", "1") # 1s budget + + import importlib + import a2a_tools + importlib.reload(a2a_tools) + monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.05) + + # Endless in-progress responses. + in_progress_row = { + "delegation_id": "deleg-1", + "status": "in_progress", + } + get_seq = [_resp(200, [in_progress_row])] * 50 + mc = _make_client( + post_resp=_resp(202, {"delegation_id": "deleg-1"}), + get_resps=get_seq, + ) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res.startswith(a2a_tools._A2A_ERROR_PREFIX) + assert "polling timeout" in res + assert "deleg-1" in res, "must surface delegation_id for recovery" + assert "check_task_status" in res, "must hint at the recovery tool" + monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False) + importlib.reload(a2a_tools) + + async def test_poll_filters_by_delegation_id_ignoring_other_rows(self, monkeypatch): + """Other delegations' rows in the response must NOT be picked up + by mistake — we pin to delegation_id.""" + monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1") + monkeypatch.setenv("DELEGATION_TIMEOUT", "10") + + import importlib + import a2a_tools + importlib.reload(a2a_tools) + monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01) + + # First poll: no row matching ours, BUT a completed row for + # someone else's delegation. We must NOT return that one. + # Second poll: ours completes. + first_poll = _resp(200, [ + {"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"}, + ]) + second_poll = _resp(200, [ + {"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"}, + {"delegation_id": "deleg-1", "status": "completed", "response_preview": "right"}, + ]) + mc = _make_client( + post_resp=_resp(202, {"delegation_id": "deleg-1"}), + get_resps=[first_poll, second_poll], + ) + + with patch("a2a_tools.httpx.AsyncClient", return_value=mc): + res = await a2a_tools._delegate_sync_via_polling( + "ws-target", "task", "ws-self" + ) + + assert res == "right", f"must filter to delegation_id, got {res!r}" + monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False) + importlib.reload(a2a_tools) + + +# --------------------------------------------------------------------------- +# pytest-asyncio collection marker +# --------------------------------------------------------------------------- + +pytestmark = pytest.mark.asyncio