Merge pull request #2189 from Molecule-AI/fix/delegate-task-retry-transient
fix(a2a): auto-retry transient transport errors in send_a2a_message (up to 5x)
This commit is contained in:
commit
44dc3c6943
@ -4,8 +4,11 @@ Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
|
||||
a2a_tools and a2a_mcp_server can import them from a single place.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import httpx
|
||||
@ -47,14 +50,109 @@ async def discover_peer(target_id: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
# httpx exception classes that indicate a transient transport-layer
|
||||
# failure worth retrying — the request never produced an application
|
||||
# response, so a fresh attempt has a real chance of succeeding. Any
|
||||
# error not in this tuple is treated as deterministic (HTTP-status,
|
||||
# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to
|
||||
# the caller on the first try.
|
||||
#
|
||||
# Why each one belongs here:
|
||||
# - ConnectError / ConnectTimeout: peer's listening socket wasn't
|
||||
# ready (mid-restart, not yet bound). Fast failure, fast recovery.
|
||||
# - RemoteProtocolError: peer closed the TCP connection without
|
||||
# writing a response — observed on 2026-04-27 when a peer's prior
|
||||
# in-flight Claude SDK session aborted and the new request's
|
||||
# connection was reset mid-handler.
|
||||
# - ReadError / WriteError: TCP read/write socket error mid-flight,
|
||||
# typically a network blip on the Docker bridge or a peer worker
|
||||
# crash.
|
||||
# - ReadTimeout: peer didn't write ANY response bytes within the
|
||||
# 300s read budget. Distinct from "peer is slow but progressing"
|
||||
# (which httpx surfaces as a successful read with chunked bytes).
|
||||
# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S.
|
||||
_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = (
|
||||
httpx.ConnectError,
|
||||
httpx.ConnectTimeout,
|
||||
httpx.ReadError,
|
||||
httpx.WriteError,
|
||||
httpx.RemoteProtocolError,
|
||||
httpx.ReadTimeout,
|
||||
)
|
||||
|
||||
# Retry budget. Up to 5 attempts (1 initial + 4 retries) with
|
||||
# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25%
|
||||
# to prevent synchronized retry storms across siblings if a peer flaps.
|
||||
# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of
|
||||
# ReadTimeouts can't make the caller wait 25 minutes — once the
|
||||
# deadline elapses we stop retrying even if attempts remain. 600s = 10
|
||||
# minutes is the agreed worst case the caller can tolerate before
|
||||
# falling back to "peer unavailable" handling in tool_delegate_task.
|
||||
_DELEGATE_MAX_ATTEMPTS = 5
|
||||
_DELEGATE_BACKOFF_BASE_S = 1.0
|
||||
_DELEGATE_BACKOFF_CAP_S = 16.0
|
||||
_DELEGATE_TOTAL_BUDGET_S = 600.0
|
||||
|
||||
|
||||
def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float:
|
||||
"""Return the (jittered) backoff delay before retrying after the
|
||||
given attempt index (0 = backoff before retry #1).
|
||||
|
||||
Pure function so the schedule is unit-testable without monkey-
|
||||
patching asyncio.sleep. Jitter is symmetric ±25% on top of the
|
||||
capped exponential — enough to break sync across simultaneous
|
||||
callers without making the schedule unpredictable.
|
||||
"""
|
||||
base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S)
|
||||
jitter = base * (0.5 * random.random() - 0.25)
|
||||
return max(0.0, base + jitter)
|
||||
|
||||
|
||||
def _format_a2a_error(exc: BaseException, target_url: str) -> str:
|
||||
"""Format an httpx exception as an [A2A_ERROR] string.
|
||||
|
||||
Some httpx exceptions stringify to empty (RemoteProtocolError,
|
||||
ConnectionReset variants) — the canvas would then render
|
||||
"[A2A_ERROR] " with no detail and the operator has no signal to
|
||||
act on. Always include the exception class name and the target
|
||||
URL so the activity log + Agent Comms panel have actionable
|
||||
information without a trip through container logs.
|
||||
"""
|
||||
msg = str(exc).strip()
|
||||
type_name = type(exc).__name__
|
||||
if not msg:
|
||||
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
|
||||
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
|
||||
# Already prefixed with the type — don't double-prefix.
|
||||
# Prefix-anchored check (not substring) so a message that
|
||||
# happens to mention some OTHER class name mid-string
|
||||
# (e.g. "got OSError on read") doesn't suppress our own
|
||||
# type prefix and lose the diagnostic signal.
|
||||
detail = msg
|
||||
else:
|
||||
detail = f"{type_name}: {msg}"
|
||||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||||
|
||||
|
||||
async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
"""Send an A2A message/send to a target workspace."""
|
||||
"""Send an A2A message/send to a target workspace.
|
||||
|
||||
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
|
||||
transport-layer errors (RemoteProtocolError, ConnectError,
|
||||
ReadTimeout, etc.) with exponential-backoff + jitter, capped by
|
||||
_DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx,
|
||||
JSON-RPC error response, malformed JSON) are NOT retried — they
|
||||
indicate a deterministic problem retry won't fix.
|
||||
"""
|
||||
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
||||
# a hung upstream to block the agent indefinitely. Use a generous but bounded
|
||||
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
|
||||
async with httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
||||
) as client:
|
||||
timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
||||
deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S
|
||||
last_exc: BaseException | None = None
|
||||
|
||||
for attempt in range(_DELEGATE_MAX_ATTEMPTS):
|
||||
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
|
||||
try:
|
||||
# self_source_headers() includes X-Workspace-ID so the
|
||||
# platform's a2a_receive logger records source_id =
|
||||
@ -100,27 +198,35 @@ async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
detail = "JSON-RPC error with no message"
|
||||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||||
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
|
||||
except _TRANSIENT_HTTP_ERRORS as e:
|
||||
last_exc = e
|
||||
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
|
||||
if attempts_remaining <= 0 or time.monotonic() >= deadline:
|
||||
# Out of attempts OR out of total budget — surface
|
||||
# the last error to the caller.
|
||||
break
|
||||
delay = _delegate_backoff_seconds(attempt)
|
||||
# Don't sleep past the deadline — clamp.
|
||||
remaining = deadline - time.monotonic()
|
||||
if delay > remaining:
|
||||
delay = max(0.0, remaining)
|
||||
logger.warning(
|
||||
"send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)",
|
||||
type(e).__name__,
|
||||
attempt + 1,
|
||||
_DELEGATE_MAX_ATTEMPTS,
|
||||
delay,
|
||||
target_url,
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
except Exception as e:
|
||||
# Some httpx exceptions stringify to empty (RemoteProtocolError,
|
||||
# ConnectionReset variants) — the canvas would then render
|
||||
# "[A2A_ERROR] " with no detail and the operator has no signal
|
||||
# to act on. Always include the exception class name and the
|
||||
# target URL so the activity log + Agent Comms panel have
|
||||
# actionable information without a trip through container logs.
|
||||
msg = str(e).strip()
|
||||
type_name = type(e).__name__
|
||||
if not msg:
|
||||
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
|
||||
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
|
||||
# Already prefixed with the type — don't double-prefix.
|
||||
# Prefix-anchored check (not substring) so a message that
|
||||
# happens to mention some OTHER class name mid-string
|
||||
# (e.g. "got OSError on read") doesn't suppress our own
|
||||
# type prefix and lose the diagnostic signal.
|
||||
detail = msg
|
||||
else:
|
||||
detail = f"{type_name}: {msg}"
|
||||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||||
# Non-transient (HTTP-status, JSON parse, etc.) — don't retry.
|
||||
return _format_a2a_error(e, target_url)
|
||||
# Retries exhausted (or budget elapsed). last_exc must be set
|
||||
# because we only break out of the loop after assigning it.
|
||||
assert last_exc is not None # noqa: S101
|
||||
return _format_a2a_error(last_exc, target_url)
|
||||
|
||||
|
||||
async def get_peers() -> list[dict]:
|
||||
|
||||
@ -306,6 +306,196 @@ class TestSendA2AMessage:
|
||||
assert result == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# send_a2a_message — transient-error retry behaviour
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_seq_mock_client(post_side_effect):
|
||||
"""Build an AsyncClient mock whose .post() returns a different result
|
||||
on each successive call (matching httpx.AsyncClient's per-request
|
||||
semantics — each AsyncClient context-manager opens fresh in the
|
||||
retry loop, so the sequence is observed across attempts).
|
||||
|
||||
A new AsyncClient context is opened for every retry attempt in the
|
||||
SUT, so we route AsyncClient(...) to a single mock that hands back
|
||||
the same client on every __aenter__ but the .post side-effect list
|
||||
is shared and consumed sequentially across attempts.
|
||||
"""
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(side_effect=post_side_effect)
|
||||
return mock_client
|
||||
|
||||
|
||||
class TestSendA2AMessageRetry:
|
||||
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
|
||||
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
|
||||
Application-level errors (HTTP-status errors, JSON-RPC error in
|
||||
response body) MUST NOT be retried — they're deterministic and
|
||||
re-trying just wastes wall-clock.
|
||||
|
||||
asyncio.sleep is patched to a no-op so tests don't actually wait
|
||||
out the exponential backoff.
|
||||
"""
|
||||
|
||||
async def test_retry_succeeds_after_two_remote_protocol_errors(self):
|
||||
"""Two RemoteProtocolErrors followed by a 200 → returns the 200's text."""
|
||||
import a2a_client
|
||||
import httpx
|
||||
|
||||
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
|
||||
side_effects = [
|
||||
httpx.RemoteProtocolError("Server disconnected"),
|
||||
httpx.RemoteProtocolError("Server disconnected"),
|
||||
success,
|
||||
]
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert result == "OK"
|
||||
assert mock_client.post.await_count == 3
|
||||
|
||||
async def test_retry_succeeds_after_connect_error(self):
|
||||
"""Single ConnectError then 200 → returns the 200's text."""
|
||||
import a2a_client
|
||||
import httpx
|
||||
|
||||
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
|
||||
side_effects = [
|
||||
httpx.ConnectError("connection refused"),
|
||||
success,
|
||||
]
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert result == "OK"
|
||||
assert mock_client.post.await_count == 2
|
||||
|
||||
async def test_all_attempts_fail_returns_last_error(self):
|
||||
"""5 RemoteProtocolErrors → returns the last error formatted with target URL."""
|
||||
import a2a_client
|
||||
import httpx
|
||||
|
||||
side_effects = [httpx.RemoteProtocolError("Server disconnected")] * 5
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "RemoteProtocolError" in result
|
||||
assert "target=http://target/a2a" in result
|
||||
|
||||
async def test_caps_at_max_attempts(self):
|
||||
"""If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS,
|
||||
not retry forever. Pin the exact attempt count so a future tweak to
|
||||
the constant has to update this test in lockstep."""
|
||||
import a2a_client
|
||||
import httpx
|
||||
|
||||
side_effects = [httpx.ReadTimeout("timeout")] * 20 # way more than max
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS
|
||||
assert mock_client.post.await_count == 5
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
|
||||
async def test_application_error_not_retried(self):
|
||||
"""JSON-RPC error response (application-level) is deterministic —
|
||||
retrying just wastes wall-clock. Must return on the first attempt."""
|
||||
import a2a_client
|
||||
|
||||
resp = _make_response(200, {
|
||||
"error": {"code": -32603, "message": "Internal error"}
|
||||
})
|
||||
mock_client = _make_seq_mock_client([resp, resp, resp])
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert mock_client.post.await_count == 1 # NO retry
|
||||
assert "Internal error" in result
|
||||
|
||||
async def test_non_transient_exception_not_retried(self):
|
||||
"""A non-httpx exception (programmer bug, JSON parse, etc.) must
|
||||
not trigger retry — surface immediately so the bug is loud."""
|
||||
import a2a_client
|
||||
|
||||
# A plain ValueError isn't in _TRANSIENT_HTTP_ERRORS.
|
||||
side_effects = [ValueError("malformed something")] * 3
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
assert mock_client.post.await_count == 1 # NO retry
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "ValueError" in result
|
||||
|
||||
async def test_total_budget_caps_retry_loop(self, monkeypatch):
|
||||
"""Total wall-clock budget caps the retry loop even if attempts
|
||||
remain — protects against a string of 5×300s ReadTimeouts.
|
||||
Simulate elapsed time advancing past the budget on attempt 2."""
|
||||
import a2a_client
|
||||
import httpx
|
||||
|
||||
side_effects = [httpx.ReadTimeout("timeout")] * 5
|
||||
mock_client = _make_seq_mock_client(side_effects)
|
||||
|
||||
# Make time.monotonic() jump forward past the budget after the
|
||||
# second attempt — the retry loop should detect the deadline
|
||||
# and stop, even though _DELEGATE_MAX_ATTEMPTS is 5.
|
||||
call_count = {"n": 0}
|
||||
original_budget = a2a_client._DELEGATE_TOTAL_BUDGET_S
|
||||
|
||||
def fake_monotonic():
|
||||
call_count["n"] += 1
|
||||
# First call (deadline computation) → 0
|
||||
# Subsequent calls → 0 until attempt 3, then jump past budget
|
||||
if call_count["n"] <= 4:
|
||||
return 0.0
|
||||
return original_budget + 1.0
|
||||
|
||||
monkeypatch.setattr(a2a_client.time, "monotonic", fake_monotonic)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
|
||||
# Stopped before exhausting all 5 attempts.
|
||||
assert mock_client.post.await_count < 5
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
|
||||
|
||||
def test_delegate_backoff_seconds_grows_exponentially_with_jitter():
|
||||
"""Schedule: ~1s, ~2s, ~4s, ~8s, then capped at 16s. ±25% jitter
|
||||
means each delay falls in [base*0.75, base*1.25]."""
|
||||
import a2a_client
|
||||
|
||||
# Run a bunch to sample the jitter distribution; assert each value
|
||||
# falls in the expected window.
|
||||
for attempt, base in [(0, 1.0), (1, 2.0), (2, 4.0), (3, 8.0), (4, 16.0), (10, 16.0)]:
|
||||
for _ in range(20):
|
||||
d = a2a_client._delegate_backoff_seconds(attempt)
|
||||
assert d >= base * 0.75 - 1e-9, f"attempt {attempt}: {d} < lower"
|
||||
assert d <= base * 1.25 + 1e-9, f"attempt {attempt}: {d} > upper"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_peers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user