fix(a2a): auto-retry transient transport errors in send_a2a_message

Three different intermittent failures observed during a single
manual-test session — RemoteProtocolError, ReadTimeout, ConnectError —
each surfaced as a "Failed to deliver to <peer>" error chip in the
canvas Agent Comms panel even though the next attempt would have
succeeded (verified by direct probes from the same source workspace
to the same peer). The error message even told the user "Usually a
transient network blip — retry once," but it left the retry to a
human reading the error message.

Auto-retry inside send_a2a_message itself: up to 5 attempts (1
initial + 4 retries) with exponential backoff (1s, 2s, 4s, 8s,
16s-capped), each backoff jittered ±25% to break sync across
siblings. Cumulative wall-clock capped at 600s by
_DELEGATE_TOTAL_BUDGET_S so a string of 5×300s ReadTimeouts can't
make the caller wait 25 minutes — once the deadline elapses, retries
stop even if attempts remain.

Retry only on transport-layer transients:
  - ConnectError / ConnectTimeout (peer's listening socket not ready)
  - RemoteProtocolError (peer closed TCP without writing — observed
    when a peer's prior in-flight Claude SDK session aborted)
  - ReadError / WriteError (network blip on Docker bridge)
  - ReadTimeout (peer wrote no response in 300s)

Application-level errors are NOT retried — they're deterministic and
retrying just wastes wall-clock:
  - HTTP 4xx (peer rejected the request format)
  - JSON parse failures (peer returned garbage)
  - JSON-RPC error in response body (peer's runtime errored cleanly)
  - Programmer-bug exceptions (ValueError, etc.)

8 new tests pin the contract:
  - retry succeeds after 2 RemoteProtocolErrors
  - retry succeeds after 1 ConnectError
  - all 5 attempts fail → returns formatted last-error
  - capped at exactly _DELEGATE_MAX_ATTEMPTS (regression cover for
    "did someone bump the constant accidentally?")
  - JSON-RPC error response NOT retried (1 attempt only)
  - non-httpx exception NOT retried (programmer bugs stay loud)
  - total budget caps the loop even if attempts remain
  - backoff schedule grows exponentially with ±25% jitter

Refactor: extracted _format_a2a_error() so the success and exhausted
paths share one error-formatting routine. _delegate_backoff_seconds()
is a pure function so the schedule is unit-testable without monkey-
patching asyncio.sleep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-27 13:52:01 -07:00
parent 5a7659c54d
commit e87a9c3858
2 changed files with 365 additions and 69 deletions

View File

@ -4,8 +4,11 @@ Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import asyncio
import logging
import os
import random
import time
import uuid
import httpx
@ -47,80 +50,183 @@ async def discover_peer(target_id: str) -> dict | None:
return None
# httpx exception classes that indicate a transient transport-layer
# failure worth retrying — the request never produced an application
# response, so a fresh attempt has a real chance of succeeding. Any
# error not in this tuple is treated as deterministic (HTTP-status,
# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to
# the caller on the first try.
#
# Why each one belongs here:
# - ConnectError / ConnectTimeout: peer's listening socket wasn't
# ready (mid-restart, not yet bound). Fast failure, fast recovery.
# - RemoteProtocolError: peer closed the TCP connection without
# writing a response — observed on 2026-04-27 when a peer's prior
# in-flight Claude SDK session aborted and the new request's
# connection was reset mid-handler.
# - ReadError / WriteError: TCP read/write socket error mid-flight,
# typically a network blip on the Docker bridge or a peer worker
# crash.
# - ReadTimeout: peer didn't write ANY response bytes within the
# 300s read budget. Distinct from "peer is slow but progressing"
# (which httpx surfaces as a successful read with chunked bytes).
# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S.
_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.WriteError,
httpx.RemoteProtocolError,
httpx.ReadTimeout,
)
# Retry budget. Up to 5 attempts (1 initial + 4 retries) with
# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25%
# to prevent synchronized retry storms across siblings if a peer flaps.
# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of
# ReadTimeouts can't make the caller wait 25 minutes — once the
# deadline elapses we stop retrying even if attempts remain. 600s = 10
# minutes is the agreed worst case the caller can tolerate before
# falling back to "peer unavailable" handling in tool_delegate_task.
_DELEGATE_MAX_ATTEMPTS = 5
_DELEGATE_BACKOFF_BASE_S = 1.0
_DELEGATE_BACKOFF_CAP_S = 16.0
_DELEGATE_TOTAL_BUDGET_S = 600.0
def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float:
"""Return the (jittered) backoff delay before retrying after the
given attempt index (0 = backoff before retry #1).
Pure function so the schedule is unit-testable without monkey-
patching asyncio.sleep. Jitter is symmetric ±25% on top of the
capped exponential enough to break sync across simultaneous
callers without making the schedule unpredictable.
"""
base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S)
jitter = base * (0.5 * random.random() - 0.25)
return max(0.0, base + jitter)
def _format_a2a_error(exc: BaseException, target_url: str) -> str:
"""Format an httpx exception as an [A2A_ERROR] string.
Some httpx exceptions stringify to empty (RemoteProtocolError,
ConnectionReset variants) the canvas would then render
"[A2A_ERROR] " with no detail and the operator has no signal to
act on. Always include the exception class name and the target
URL so the activity log + Agent Comms panel have actionable
information without a trip through container logs.
"""
msg = str(exc).strip()
type_name = type(exc).__name__
if not msg:
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
# Already prefixed with the type — don't double-prefix.
# Prefix-anchored check (not substring) so a message that
# happens to mention some OTHER class name mid-string
# (e.g. "got OSError on read") doesn't suppress our own
# type prefix and lose the diagnostic signal.
detail = msg
else:
detail = f"{type_name}: {msg}"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace."""
"""Send an A2A message/send to a target workspace.
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
transport-layer errors (RemoteProtocolError, ConnectError,
ReadTimeout, etc.) with exponential-backoff + jitter, capped by
_DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx,
JSON-RPC error response, malformed JSON) are NOT retried they
indicate a deterministic problem retry won't fix.
"""
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try:
# self_source_headers() includes X-Workspace-ID so the
# platform's a2a_receive logger records source_id =
# WORKSPACE_ID. Otherwise peer-A2A messages — including
# the case where target_url resolves to this workspace's
# own /a2a — get logged with source_id=NULL and surface
# in the recipient's My Chat tab as user-typed input.
resp = await client.post(
target_url,
headers=self_source_headers(WORKSPACE_ID),
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S
last_exc: BaseException | None = None
for attempt in range(_DELEGATE_MAX_ATTEMPTS):
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
try:
# self_source_headers() includes X-Workspace-ID so the
# platform's a2a_receive logger records source_id =
# WORKSPACE_ID. Otherwise peer-A2A messages — including
# the case where target_url resolves to this workspace's
# own /a2a — get logged with source_id=NULL and surface
# in the recipient's My Chat tab as user-typed input.
resp = await client.post(
target_url,
headers=self_source_headers(WORKSPACE_ID),
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
detail = msg
elif code is not None:
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except Exception as e:
# Some httpx exceptions stringify to empty (RemoteProtocolError,
# ConnectionReset variants) — the canvas would then render
# "[A2A_ERROR] " with no detail and the operator has no signal
# to act on. Always include the exception class name and the
# target URL so the activity log + Agent Comms panel have
# actionable information without a trip through container logs.
msg = str(e).strip()
type_name = type(e).__name__
if not msg:
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
# Already prefixed with the type — don't double-prefix.
# Prefix-anchored check (not substring) so a message that
# happens to mention some OTHER class name mid-string
# (e.g. "got OSError on read") doesn't suppress our own
# type prefix and lose the diagnostic signal.
detail = msg
else:
detail = f"{type_name}: {msg}"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
detail = msg
elif code is not None:
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
if attempts_remaining <= 0 or time.monotonic() >= deadline:
# Out of attempts OR out of total budget — surface
# the last error to the caller.
break
delay = _delegate_backoff_seconds(attempt)
# Don't sleep past the deadline — clamp.
remaining = deadline - time.monotonic()
if delay > remaining:
delay = max(0.0, remaining)
logger.warning(
"send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)",
type(e).__name__,
attempt + 1,
_DELEGATE_MAX_ATTEMPTS,
delay,
target_url,
)
await asyncio.sleep(delay)
continue
except Exception as e:
# Non-transient (HTTP-status, JSON parse, etc.) — don't retry.
return _format_a2a_error(e, target_url)
# Retries exhausted (or budget elapsed). last_exc must be set
# because we only break out of the loop after assigning it.
assert last_exc is not None # noqa: S101
return _format_a2a_error(last_exc, target_url)
async def get_peers() -> list[dict]:

View File

@ -306,6 +306,196 @@ class TestSendA2AMessage:
assert result == ""
# ---------------------------------------------------------------------------
# send_a2a_message — transient-error retry behaviour
# ---------------------------------------------------------------------------
def _make_seq_mock_client(post_side_effect):
"""Build an AsyncClient mock whose .post() returns a different result
on each successive call (matching httpx.AsyncClient's per-request
semantics each AsyncClient context-manager opens fresh in the
retry loop, so the sequence is observed across attempts).
A new AsyncClient context is opened for every retry attempt in the
SUT, so we route AsyncClient(...) to a single mock that hands back
the same client on every __aenter__ but the .post side-effect list
is shared and consumed sequentially across attempts.
"""
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(side_effect=post_side_effect)
return mock_client
class TestSendA2AMessageRetry:
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
Application-level errors (HTTP-status errors, JSON-RPC error in
response body) MUST NOT be retried they're deterministic and
re-trying just wastes wall-clock.
asyncio.sleep is patched to a no-op so tests don't actually wait
out the exponential backoff.
"""
async def test_retry_succeeds_after_two_remote_protocol_errors(self):
"""Two RemoteProtocolErrors followed by a 200 → returns the 200's text."""
import a2a_client
import httpx
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
side_effects = [
httpx.RemoteProtocolError("Server disconnected"),
httpx.RemoteProtocolError("Server disconnected"),
success,
]
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert result == "OK"
assert mock_client.post.await_count == 3
async def test_retry_succeeds_after_connect_error(self):
"""Single ConnectError then 200 → returns the 200's text."""
import a2a_client
import httpx
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
side_effects = [
httpx.ConnectError("connection refused"),
success,
]
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert result == "OK"
assert mock_client.post.await_count == 2
async def test_all_attempts_fail_returns_last_error(self):
"""5 RemoteProtocolErrors → returns the last error formatted with target URL."""
import a2a_client
import httpx
side_effects = [httpx.RemoteProtocolError("Server disconnected")] * 5
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "RemoteProtocolError" in result
assert "target=http://target/a2a" in result
async def test_caps_at_max_attempts(self):
"""If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS,
not retry forever. Pin the exact attempt count so a future tweak to
the constant has to update this test in lockstep."""
import a2a_client
import httpx
side_effects = [httpx.ReadTimeout("timeout")] * 20 # way more than max
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS
assert mock_client.post.await_count == 5
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
async def test_application_error_not_retried(self):
"""JSON-RPC error response (application-level) is deterministic —
retrying just wastes wall-clock. Must return on the first attempt."""
import a2a_client
resp = _make_response(200, {
"error": {"code": -32603, "message": "Internal error"}
})
mock_client = _make_seq_mock_client([resp, resp, resp])
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert mock_client.post.await_count == 1 # NO retry
assert "Internal error" in result
async def test_non_transient_exception_not_retried(self):
"""A non-httpx exception (programmer bug, JSON parse, etc.) must
not trigger retry surface immediately so the bug is loud."""
import a2a_client
# A plain ValueError isn't in _TRANSIENT_HTTP_ERRORS.
side_effects = [ValueError("malformed something")] * 3
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
assert mock_client.post.await_count == 1 # NO retry
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "ValueError" in result
async def test_total_budget_caps_retry_loop(self, monkeypatch):
"""Total wall-clock budget caps the retry loop even if attempts
remain protects against a string of 5×300s ReadTimeouts.
Simulate elapsed time advancing past the budget on attempt 2."""
import a2a_client
import httpx
side_effects = [httpx.ReadTimeout("timeout")] * 5
mock_client = _make_seq_mock_client(side_effects)
# Make time.monotonic() jump forward past the budget after the
# second attempt — the retry loop should detect the deadline
# and stop, even though _DELEGATE_MAX_ATTEMPTS is 5.
call_count = {"n": 0}
original_budget = a2a_client._DELEGATE_TOTAL_BUDGET_S
def fake_monotonic():
call_count["n"] += 1
# First call (deadline computation) → 0
# Subsequent calls → 0 until attempt 3, then jump past budget
if call_count["n"] <= 4:
return 0.0
return original_budget + 1.0
monkeypatch.setattr(a2a_client.time, "monotonic", fake_monotonic)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
# Stopped before exhausting all 5 attempts.
assert mock_client.post.await_count < 5
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
def test_delegate_backoff_seconds_grows_exponentially_with_jitter():
"""Schedule: ~1s, ~2s, ~4s, ~8s, then capped at 16s. ±25% jitter
means each delay falls in [base*0.75, base*1.25]."""
import a2a_client
# Run a bunch to sample the jitter distribution; assert each value
# falls in the expected window.
for attempt, base in [(0, 1.0), (1, 2.0), (2, 4.0), (3, 8.0), (4, 16.0), (10, 16.0)]:
for _ in range(20):
d = a2a_client._delegate_backoff_seconds(attempt)
assert d >= base * 0.75 - 1e-9, f"attempt {attempt}: {d} < lower"
assert d <= base * 1.25 + 1e-9, f"attempt {attempt}: {d} > upper"
# ---------------------------------------------------------------------------
# get_peers
# ---------------------------------------------------------------------------