diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index d740bd6a..7c5e3d87 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -4,8 +4,11 @@ Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that a2a_tools and a2a_mcp_server can import them from a single place. """ +import asyncio import logging import os +import random +import time import uuid import httpx @@ -47,80 +50,183 @@ async def discover_peer(target_id: str) -> dict | None: return None +# httpx exception classes that indicate a transient transport-layer +# failure worth retrying — the request never produced an application +# response, so a fresh attempt has a real chance of succeeding. Any +# error not in this tuple is treated as deterministic (HTTP-status, +# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to +# the caller on the first try. +# +# Why each one belongs here: +# - ConnectError / ConnectTimeout: peer's listening socket wasn't +# ready (mid-restart, not yet bound). Fast failure, fast recovery. +# - RemoteProtocolError: peer closed the TCP connection without +# writing a response — observed on 2026-04-27 when a peer's prior +# in-flight Claude SDK session aborted and the new request's +# connection was reset mid-handler. +# - ReadError / WriteError: TCP read/write socket error mid-flight, +# typically a network blip on the Docker bridge or a peer worker +# crash. +# - ReadTimeout: peer didn't write ANY response bytes within the +# 300s read budget. Distinct from "peer is slow but progressing" +# (which httpx surfaces as a successful read with chunked bytes). +# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S. +_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = ( + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadError, + httpx.WriteError, + httpx.RemoteProtocolError, + httpx.ReadTimeout, +) + +# Retry budget. Up to 5 attempts (1 initial + 4 retries) with +# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25% +# to prevent synchronized retry storms across siblings if a peer flaps. +# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of +# ReadTimeouts can't make the caller wait 25 minutes — once the +# deadline elapses we stop retrying even if attempts remain. 600s = 10 +# minutes is the agreed worst case the caller can tolerate before +# falling back to "peer unavailable" handling in tool_delegate_task. +_DELEGATE_MAX_ATTEMPTS = 5 +_DELEGATE_BACKOFF_BASE_S = 1.0 +_DELEGATE_BACKOFF_CAP_S = 16.0 +_DELEGATE_TOTAL_BUDGET_S = 600.0 + + +def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float: + """Return the (jittered) backoff delay before retrying after the + given attempt index (0 = backoff before retry #1). + + Pure function so the schedule is unit-testable without monkey- + patching asyncio.sleep. Jitter is symmetric ±25% on top of the + capped exponential — enough to break sync across simultaneous + callers without making the schedule unpredictable. + """ + base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S) + jitter = base * (0.5 * random.random() - 0.25) + return max(0.0, base + jitter) + + +def _format_a2a_error(exc: BaseException, target_url: str) -> str: + """Format an httpx exception as an [A2A_ERROR] string. + + Some httpx exceptions stringify to empty (RemoteProtocolError, + ConnectionReset variants) — the canvas would then render + "[A2A_ERROR] " with no detail and the operator has no signal to + act on. Always include the exception class name and the target + URL so the activity log + Agent Comms panel have actionable + information without a trip through container logs. + """ + msg = str(exc).strip() + type_name = type(exc).__name__ + if not msg: + detail = f"{type_name} (no message — likely connection reset or silent timeout)" + elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "): + # Already prefixed with the type — don't double-prefix. + # Prefix-anchored check (not substring) so a message that + # happens to mention some OTHER class name mid-string + # (e.g. "got OSError on read") doesn't suppress our own + # type prefix and lose the diagnostic signal. + detail = msg + else: + detail = f"{type_name}: {msg}" + return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" + + async def send_a2a_message(target_url: str, message: str) -> str: - """Send an A2A message/send to a target workspace.""" + """Send an A2A message/send to a target workspace. + + Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient + transport-layer errors (RemoteProtocolError, ConnectError, + ReadTimeout, etc.) with exponential-backoff + jitter, capped by + _DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx, + JSON-RPC error response, malformed JSON) are NOT retried — they + indicate a deterministic problem retry won't fix. + """ # Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed # a hung upstream to block the agent indefinitely. Use a generous but bounded # timeout: 30s connect + 300s read (long enough for slow LLM responses). - async with httpx.AsyncClient( - timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0) - ) as client: - try: - # self_source_headers() includes X-Workspace-ID so the - # platform's a2a_receive logger records source_id = - # WORKSPACE_ID. Otherwise peer-A2A messages — including - # the case where target_url resolves to this workspace's - # own /a2a — get logged with source_id=NULL and surface - # in the recipient's My Chat tab as user-typed input. - resp = await client.post( - target_url, - headers=self_source_headers(WORKSPACE_ID), - json={ - "jsonrpc": "2.0", - "id": str(uuid.uuid4()), - "method": "message/send", - "params": { - "message": { - "role": "user", - "messageId": str(uuid.uuid4()), - "parts": [{"kind": "text", "text": message}], - } + timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0) + deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S + last_exc: BaseException | None = None + + for attempt in range(_DELEGATE_MAX_ATTEMPTS): + async with httpx.AsyncClient(timeout=timeout_cfg) as client: + try: + # self_source_headers() includes X-Workspace-ID so the + # platform's a2a_receive logger records source_id = + # WORKSPACE_ID. Otherwise peer-A2A messages — including + # the case where target_url resolves to this workspace's + # own /a2a — get logged with source_id=NULL and surface + # in the recipient's My Chat tab as user-typed input. + resp = await client.post( + target_url, + headers=self_source_headers(WORKSPACE_ID), + json={ + "jsonrpc": "2.0", + "id": str(uuid.uuid4()), + "method": "message/send", + "params": { + "message": { + "role": "user", + "messageId": str(uuid.uuid4()), + "parts": [{"kind": "text", "text": message}], + } + }, }, - }, - ) - data = resp.json() - if "result" in data: - parts = data["result"].get("parts", []) - text = parts[0].get("text", "") if parts else "(no response)" - # Tag child-reported errors so the caller can detect them reliably - if text.startswith("Agent error:"): - return f"{_A2A_ERROR_PREFIX}{text}" - return text - elif "error" in data: - err = data["error"] - msg = (err.get("message") or "").strip() - code = err.get("code") - if msg and code is not None: - detail = f"{msg} (code={code})" - elif msg: - detail = msg - elif code is not None: - detail = f"JSON-RPC error with no message (code={code})" - else: - detail = "JSON-RPC error with no message" - return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" - return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" - except Exception as e: - # Some httpx exceptions stringify to empty (RemoteProtocolError, - # ConnectionReset variants) — the canvas would then render - # "[A2A_ERROR] " with no detail and the operator has no signal - # to act on. Always include the exception class name and the - # target URL so the activity log + Agent Comms panel have - # actionable information without a trip through container logs. - msg = str(e).strip() - type_name = type(e).__name__ - if not msg: - detail = f"{type_name} (no message — likely connection reset or silent timeout)" - elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "): - # Already prefixed with the type — don't double-prefix. - # Prefix-anchored check (not substring) so a message that - # happens to mention some OTHER class name mid-string - # (e.g. "got OSError on read") doesn't suppress our own - # type prefix and lose the diagnostic signal. - detail = msg - else: - detail = f"{type_name}: {msg}" - return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" + ) + data = resp.json() + if "result" in data: + parts = data["result"].get("parts", []) + text = parts[0].get("text", "") if parts else "(no response)" + # Tag child-reported errors so the caller can detect them reliably + if text.startswith("Agent error:"): + return f"{_A2A_ERROR_PREFIX}{text}" + return text + elif "error" in data: + err = data["error"] + msg = (err.get("message") or "").strip() + code = err.get("code") + if msg and code is not None: + detail = f"{msg} (code={code})" + elif msg: + detail = msg + elif code is not None: + detail = f"JSON-RPC error with no message (code={code})" + else: + detail = "JSON-RPC error with no message" + return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" + return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" + except _TRANSIENT_HTTP_ERRORS as e: + last_exc = e + attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1) + if attempts_remaining <= 0 or time.monotonic() >= deadline: + # Out of attempts OR out of total budget — surface + # the last error to the caller. + break + delay = _delegate_backoff_seconds(attempt) + # Don't sleep past the deadline — clamp. + remaining = deadline - time.monotonic() + if delay > remaining: + delay = max(0.0, remaining) + logger.warning( + "send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)", + type(e).__name__, + attempt + 1, + _DELEGATE_MAX_ATTEMPTS, + delay, + target_url, + ) + await asyncio.sleep(delay) + continue + except Exception as e: + # Non-transient (HTTP-status, JSON parse, etc.) — don't retry. + return _format_a2a_error(e, target_url) + # Retries exhausted (or budget elapsed). last_exc must be set + # because we only break out of the loop after assigning it. + assert last_exc is not None # noqa: S101 + return _format_a2a_error(last_exc, target_url) async def get_peers() -> list[dict]: diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index fd813f3e..e105fb1e 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -306,6 +306,196 @@ class TestSendA2AMessage: assert result == "" +# --------------------------------------------------------------------------- +# send_a2a_message — transient-error retry behaviour +# --------------------------------------------------------------------------- + +def _make_seq_mock_client(post_side_effect): + """Build an AsyncClient mock whose .post() returns a different result + on each successive call (matching httpx.AsyncClient's per-request + semantics — each AsyncClient context-manager opens fresh in the + retry loop, so the sequence is observed across attempts). + + A new AsyncClient context is opened for every retry attempt in the + SUT, so we route AsyncClient(...) to a single mock that hands back + the same client on every __aenter__ but the .post side-effect list + is shared and consumed sequentially across attempts. + """ + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.post = AsyncMock(side_effect=post_side_effect) + return mock_client + + +class TestSendA2AMessageRetry: + """Verify auto-retry on transient transport errors (RemoteProtocolError, + ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times. + Application-level errors (HTTP-status errors, JSON-RPC error in + response body) MUST NOT be retried — they're deterministic and + re-trying just wastes wall-clock. + + asyncio.sleep is patched to a no-op so tests don't actually wait + out the exponential backoff. + """ + + async def test_retry_succeeds_after_two_remote_protocol_errors(self): + """Two RemoteProtocolErrors followed by a 200 → returns the 200's text.""" + import a2a_client + import httpx + + success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}}) + side_effects = [ + httpx.RemoteProtocolError("Server disconnected"), + httpx.RemoteProtocolError("Server disconnected"), + success, + ] + mock_client = _make_seq_mock_client(side_effects) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert result == "OK" + assert mock_client.post.await_count == 3 + + async def test_retry_succeeds_after_connect_error(self): + """Single ConnectError then 200 → returns the 200's text.""" + import a2a_client + import httpx + + success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}}) + side_effects = [ + httpx.ConnectError("connection refused"), + success, + ] + mock_client = _make_seq_mock_client(side_effects) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert result == "OK" + assert mock_client.post.await_count == 2 + + async def test_all_attempts_fail_returns_last_error(self): + """5 RemoteProtocolErrors → returns the last error formatted with target URL.""" + import a2a_client + import httpx + + side_effects = [httpx.RemoteProtocolError("Server disconnected")] * 5 + mock_client = _make_seq_mock_client(side_effects) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "RemoteProtocolError" in result + assert "target=http://target/a2a" in result + + async def test_caps_at_max_attempts(self): + """If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS, + not retry forever. Pin the exact attempt count so a future tweak to + the constant has to update this test in lockstep.""" + import a2a_client + import httpx + + side_effects = [httpx.ReadTimeout("timeout")] * 20 # way more than max + mock_client = _make_seq_mock_client(side_effects) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS + assert mock_client.post.await_count == 5 + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + + async def test_application_error_not_retried(self): + """JSON-RPC error response (application-level) is deterministic — + retrying just wastes wall-clock. Must return on the first attempt.""" + import a2a_client + + resp = _make_response(200, { + "error": {"code": -32603, "message": "Internal error"} + }) + mock_client = _make_seq_mock_client([resp, resp, resp]) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert mock_client.post.await_count == 1 # NO retry + assert "Internal error" in result + + async def test_non_transient_exception_not_retried(self): + """A non-httpx exception (programmer bug, JSON parse, etc.) must + not trigger retry — surface immediately so the bug is loud.""" + import a2a_client + + # A plain ValueError isn't in _TRANSIENT_HTTP_ERRORS. + side_effects = [ValueError("malformed something")] * 3 + mock_client = _make_seq_mock_client(side_effects) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + assert mock_client.post.await_count == 1 # NO retry + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "ValueError" in result + + async def test_total_budget_caps_retry_loop(self, monkeypatch): + """Total wall-clock budget caps the retry loop even if attempts + remain — protects against a string of 5×300s ReadTimeouts. + Simulate elapsed time advancing past the budget on attempt 2.""" + import a2a_client + import httpx + + side_effects = [httpx.ReadTimeout("timeout")] * 5 + mock_client = _make_seq_mock_client(side_effects) + + # Make time.monotonic() jump forward past the budget after the + # second attempt — the retry loop should detect the deadline + # and stop, even though _DELEGATE_MAX_ATTEMPTS is 5. + call_count = {"n": 0} + original_budget = a2a_client._DELEGATE_TOTAL_BUDGET_S + + def fake_monotonic(): + call_count["n"] += 1 + # First call (deadline computation) → 0 + # Subsequent calls → 0 until attempt 3, then jump past budget + if call_count["n"] <= 4: + return 0.0 + return original_budget + 1.0 + + monkeypatch.setattr(a2a_client.time, "monotonic", fake_monotonic) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \ + patch("a2a_client.asyncio.sleep", new=AsyncMock()): + result = await a2a_client.send_a2a_message("http://target/a2a", "ping") + + # Stopped before exhausting all 5 attempts. + assert mock_client.post.await_count < 5 + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + + +def test_delegate_backoff_seconds_grows_exponentially_with_jitter(): + """Schedule: ~1s, ~2s, ~4s, ~8s, then capped at 16s. ±25% jitter + means each delay falls in [base*0.75, base*1.25].""" + import a2a_client + + # Run a bunch to sample the jitter distribution; assert each value + # falls in the expected window. + for attempt, base in [(0, 1.0), (1, 2.0), (2, 4.0), (3, 8.0), (4, 16.0), (10, 16.0)]: + for _ in range(20): + d = a2a_client._delegate_backoff_seconds(attempt) + assert d >= base * 0.75 - 1e-9, f"attempt {attempt}: {d} < lower" + assert d <= base * 1.25 + 1e-9, f"attempt {attempt}: {d} > upper" + + # --------------------------------------------------------------------------- # get_peers # ---------------------------------------------------------------------------