molecule-core/workspace/tests/test_a2a_client.py
Molecule AI Core-BE 50319b69f2
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 15s
CI / Detect changes (pull_request) Successful in 44s
E2E API Smoke Test / detect-changes (pull_request) Successful in 47s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 40s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
sop-tier-check / tier-check (pull_request) Successful in 11s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 27s
CI / Platform (Go) (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 28s
CI / Canvas (Next.js) (pull_request) Successful in 8s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 7s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 6s
audit-force-merge / audit (pull_request) Successful in 11s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m7s
CI / Python Lint & Test (pull_request) Successful in 6m58s
fix(workspace): patch enrich_peer_metadata directly in test
test_blocks_until_inflight_completes used patch("a2a_client.httpx.Client")
to mock the HTTP call, but httpx.Client is created inside the background
worker thread AFTER the patch context manager exits — the executor thread
was created before the patch, so it uses the original httpx module.

The httpx patch approach fails reliably when running with
test_envelope_enrichment_fetches_on_cache_miss (different httpx patch,
different peer ID, same executor thread pool). Fix: directly replace
enrich_peer_metadata on the module so the replacement is visible to the
background worker regardless of thread creation timing.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 17:25:46 +00:00

1493 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Comprehensive tests for a2a_client.py — 100% statement coverage.
Tests every async function: discover_peer, send_a2a_message, get_peers,
get_workspace_info. Each test covers exactly one execution path so failures
are easy to diagnose.
"""
import sys
import os
import importlib
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_client(*, get_resp=None, post_resp=None, get_exc=None, post_exc=None):
"""Build a reusable AsyncClient context-manager mock."""
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
if get_exc is not None:
mock_client.get = AsyncMock(side_effect=get_exc)
elif get_resp is not None:
mock_client.get = AsyncMock(return_value=get_resp)
if post_exc is not None:
mock_client.post = AsyncMock(side_effect=post_exc)
elif post_resp is not None:
mock_client.post = AsyncMock(return_value=post_resp)
return mock_client
def _make_response(status_code, json_data):
resp = MagicMock()
resp.status_code = status_code
resp.json = MagicMock(return_value=json_data)
return resp
# Canonical UUID used wherever a test needs a peer_id. send_a2a_message and
# discover_peer reject non-UUID strings at the trust boundary (see
# a2a_client._validate_peer_id), so test inputs must be valid UUIDs.
_TEST_PEER_ID = "11111111-1111-1111-1111-111111111111"
# ---------------------------------------------------------------------------
# Module-level constants (just ensure they exist and have sensible types)
# ---------------------------------------------------------------------------
def test_constants_exist():
import a2a_client
assert isinstance(a2a_client.PLATFORM_URL, str)
assert isinstance(a2a_client.WORKSPACE_ID, str)
assert isinstance(a2a_client._A2A_ERROR_PREFIX, str)
assert isinstance(a2a_client._peer_names, dict)
# ---------------------------------------------------------------------------
# discover_peer
# ---------------------------------------------------------------------------
class TestDiscoverPeer:
async def test_success_returns_json_on_200(self):
"""200 response → returns the JSON body."""
import a2a_client
peer_data = {"id": _TEST_PEER_ID, "url": "http://ws-abc.svc", "name": "Alpha"}
resp = _make_response(200, peer_data)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result == peer_data
async def test_non_200_returns_none(self):
"""Non-200 response → returns None."""
import a2a_client
resp = _make_response(404, {"detail": "not found"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
async def test_403_returns_none(self):
"""403 forbidden → returns None (any non-200 code)."""
import a2a_client
resp = _make_response(403, {"detail": "forbidden"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
async def test_exception_returns_none(self):
"""Network exception → returns None (exception swallowed)."""
import a2a_client
mock_client = _make_mock_client(get_exc=ConnectionError("host unreachable"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
async def test_invalid_peer_id_returns_none_without_http(self):
"""Malformed peer_id is rejected at the trust boundary — no HTTP call.
Path-traversal-shaped input ("../admin"), free-form labels
("ws-abc"), and empty strings all return None and don't reach
the platform. Closes the URL-interpolation class of bug.
"""
import a2a_client
mock_client = _make_mock_client(get_resp=_make_response(200, {}))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
assert await a2a_client.discover_peer(bad) is None
# No GET should have been issued for any of those.
mock_client.get.assert_not_called()
async def test_request_uses_correct_url_and_header(self):
"""GET is called with the right URL and X-Workspace-ID header."""
import a2a_client
resp = _make_response(200, {"url": "http://target"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
await a2a_client.discover_peer(_TEST_PEER_ID)
mock_client.get.assert_called_once()
positional_url = mock_client.get.call_args.args[0]
assert _TEST_PEER_ID in positional_url
# X-Workspace-ID must be present; bearer token also merged in when available
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == a2a_client.WORKSPACE_ID
# ---------------------------------------------------------------------------
# send_a2a_message
# ---------------------------------------------------------------------------
class TestSendA2AMessage:
async def test_result_with_text_part_returns_text(self):
"""'result' key with text parts → returns the text."""
import a2a_client
resp = _make_response(200, {
"result": {"parts": [{"kind": "text", "text": "Hello!"}]}
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "Hello!"
async def test_result_with_empty_parts_returns_no_response(self):
"""'result' key with empty parts list → returns '(no response)'."""
import a2a_client
resp = _make_response(200, {"result": {"parts": []}})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "(no response)"
async def test_result_text_starts_with_agent_error_gets_prefix(self):
"""Text starting with 'Agent error:' gets the _A2A_ERROR_PREFIX prepended."""
import a2a_client
resp = _make_response(200, {
"result": {"parts": [{"kind": "text", "text": "Agent error: something bad"}]}
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "Agent error: something bad" in result
async def test_error_key_returns_error_prefix_and_message(self):
"""'error' key in response → returns _A2A_ERROR_PREFIX + error message."""
import a2a_client
resp = _make_response(200, {
"error": {"code": -32603, "message": "Internal error occurred"}
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "Internal error occurred" in result
async def test_error_key_missing_message_returns_unknown(self):
"""'error' key without 'message' → falls back to 'unknown'."""
import a2a_client
resp = _make_response(200, {"error": {"code": -32600}})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
# The error includes the JSON-RPC code so the operator can look it
# up; "no message" surfaces the missing-message condition explicitly
# instead of the previous opaque "unknown".
assert "code=-32600" in result
assert "no message" in result.lower()
# Target URL is included so chained delegations are traceable.
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_jsonrpc_error_with_code_zero_includes_code_in_detail(self):
"""JSON-RPC error code=0 is technically not valid in the spec,
but a malformed peer can still send it — make sure the code is
preserved in the detail rather than collapsing into the
no-code path. Locks in the `code is not None` semantics over
the truthy-check shortcut."""
import a2a_client
resp = _make_response(200, {"error": {"code": 0, "message": "weird"}})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "code=0" in result
assert "weird" in result
async def test_neither_result_nor_error_returns_a2a_error_with_payload(self):
"""Response with neither 'result' nor 'error' → A2A_ERROR + payload context."""
import a2a_client
payload = {"jsonrpc": "2.0", "id": "abc123"}
resp = _make_response(200, payload)
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Pre-fix this returned bare str(payload) which the canvas
# rendered as a confusing "looks like a successful response"
# block. Now it's tagged so downstream UI / delegate_task
# routes it through the error path.
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
assert "abc123" in result # snippet of payload included for context
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_poll_queued_envelope_returns_success_string(self):
"""Issue #2967: workspace-server's poll-mode short-circuit returns
{status:"queued", delivery_mode:"poll", method:...} when the peer
has no URL to dispatch to. Pre-fix the bare send_a2a_message parser
only knew about JSON-RPC {result, error} keys, so this fell through
to the 'unexpected response shape' error path → callers retried,
peer got duplicate delegations.
Pin: poll-queued envelope returns a string tagged with the
_A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers
can branch on the typed outcome without substring-sniffing.
Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so
the not-startswith assertion would FAIL on the old code.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Discriminating: pre-fix returned a string that startswith
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
# old code. New code returns the queued-success sentinel.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
)
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), (
f"poll-queued envelope must use the queued sentinel; got: {result!r}"
)
# The method is included so a structured-log scraper can route by
# protocol verb if needed.
assert "message/send" in result
async def test_poll_queued_envelope_with_other_method(self):
"""Same envelope but a different a2a_method (the future could add
message/sendStream or similar). Pin that the parser doesn't hardcode
message/send — whatever method the server echoed is preserved.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/sendStream",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "message/sendStream" in result
async def test_status_queued_without_poll_mode_still_falls_through(self):
"""Defensive: only the {status:"queued", delivery_mode:"poll"} pair
triggers the queued-success branch. A response with status:"queued"
but a different delivery_mode (or none) is still 'unexpected'
we don't want to silently swallow a future server bug that emits
a partial envelope. Pin both keys are required.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
# delivery_mode missing
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Falls through — must STILL be tagged as error.
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
async def test_exception_returns_error_prefix_and_message(self):
"""Network exception → returns _A2A_ERROR_PREFIX + exception text."""
import a2a_client
mock_client = _make_mock_client(post_exc=ConnectionError("connection refused"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "connection refused" in result
# Exception class name is prepended when the message doesn't
# already include it — gives the operator a typed handle to
# search for in container logs.
assert "ConnectionError" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_empty_stringifying_exception_falls_back_to_class_name(self):
"""The user's reported bug: httpx.RemoteProtocolError and similar
exceptions can stringify to "" — pre-fix the canvas rendered
"[A2A_ERROR] " with no detail. Verify the empty path now
produces an actionable message including the exception type
and the target URL."""
import a2a_client
# Subclass Exception with __str__ → "" to simulate the
# silent-exception variants without depending on a specific
# httpx version's behavior.
class _SilentRemoteProtocolError(Exception):
def __str__(self) -> str:
return ""
mock_client = _make_mock_client(post_exc=_SilentRemoteProtocolError())
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Must NOT be just the bare prefix — that's the regression.
assert result != a2a_client._A2A_ERROR_PREFIX.strip()
assert result != f"{a2a_client._A2A_ERROR_PREFIX}"
# Must include the class name + something explanatory.
assert "_SilentRemoteProtocolError" in result
assert "no message" in result.lower()
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_result_text_part_missing_text_key_returns_empty(self):
"""Part dict without 'text' key → falls back to '' (empty string returned)."""
import a2a_client
resp = _make_response(200, {
"result": {"parts": [{"kind": "text"}]} # no "text" key
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Returns "" (empty string — does not start with _A2A_ERROR_PREFIX)
assert result == ""
async def test_invalid_peer_id_short_circuits_without_http(self):
"""Malformed peer_id is rejected at the trust boundary — no POST.
Symmetric coverage with discover_peer's validation gate. Path-traversal
("../admin"), free-form labels ("ws-abc"), and empty strings all
return an _A2A_ERROR_PREFIX message identifying the bad input and
never reach the platform.
"""
import a2a_client
mock_client = _make_mock_client(post_resp=_make_response(200, {}))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
result = await a2a_client.send_a2a_message(bad, "ping")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "invalid peer_id" in result
# No POST should have been issued for any of those.
mock_client.post.assert_not_called()
# ---------------------------------------------------------------------------
# send_a2a_message — transient-error retry behaviour
# ---------------------------------------------------------------------------
def _make_seq_mock_client(post_side_effect):
"""Build an AsyncClient mock whose .post() returns a different result
on each successive call (matching httpx.AsyncClient's per-request
semantics — each AsyncClient context-manager opens fresh in the
retry loop, so the sequence is observed across attempts).
A new AsyncClient context is opened for every retry attempt in the
SUT, so we route AsyncClient(...) to a single mock that hands back
the same client on every __aenter__ but the .post side-effect list
is shared and consumed sequentially across attempts.
"""
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(side_effect=post_side_effect)
return mock_client
class TestSendA2AMessagePollMode:
"""Pin the #2967 fix: send_a2a_message recognizes the platform's
poll-mode short-circuit envelope and returns a queued sentinel
instead of an "unexpected response shape" error.
Pre-#2967 the client treated the queued envelope as malformed,
causing the calling agent to retry, which delivered the same
message twice to the (poll-mode) recipient. The Queued sentinel
lets delegate_task fall back to the durable polling path
transparently — see test_delegation_sync_via_polling for the
fallback verification.
"""
async def test_poll_queued_envelope_returns_queued_sentinel(self):
# Workspace-server returns this shape (a2a_proxy.go:402-406)
# when the target workspace is registered as delivery_mode=poll
# (no public URL, typical for external molecule-mcp standalone
# runtimes).
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Sentinel + structured payload so callers can branch on it.
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
# Critically: NOT the error sentinel. Pre-#2967 it was the error path.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
# Carries enough info for the caller to log meaningfully.
assert _TEST_PEER_ID in result
assert "message/send" in result
async def test_poll_queued_envelope_method_is_recorded(self):
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "notify" in result
async def test_status_queued_without_delivery_mode_is_unexpected_shape(self):
# Server bug: only ``status=queued`` set, ``delivery_mode``
# missing. Surface as the malformed branch (not Queued) — the
# SSOT parser treats this as Malformed because the documented
# contract requires both keys.
import a2a_client
resp = _make_response(200, {"status": "queued", "method": "message/send"})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
# Must explicitly mention "or queued envelope" so an operator
# debugging this knows the parser HAS a Queued branch and the
# body just didn't match — not that the parser is missing the
# logic entirely (the pre-#2967 confusion).
assert "queued envelope" in result
async def test_platform_error_with_restart_metadata_surfaces_in_message(self):
# The platform error envelope: 503 with restart metadata.
# Surfaced as an error string that includes "restarting" so
# the caller / agent can render a softer error to the user.
import a2a_client
resp = _make_response(200, {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "restarting" in result
assert "retry_after=15" in result
class TestSendA2AMessageRetry:
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
Application-level errors (HTTP-status errors, JSON-RPC error in
response body) MUST NOT be retried — they're deterministic and
re-trying just wastes wall-clock.
asyncio.sleep is patched to a no-op so tests don't actually wait
out the exponential backoff.
"""
async def test_retry_succeeds_after_two_remote_protocol_errors(self):
"""Two RemoteProtocolErrors followed by a 200 → returns the 200's text."""
import a2a_client
import httpx
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
side_effects = [
httpx.RemoteProtocolError("Server disconnected"),
httpx.RemoteProtocolError("Server disconnected"),
success,
]
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "OK"
assert mock_client.post.await_count == 3
async def test_retry_succeeds_after_connect_error(self):
"""Single ConnectError then 200 → returns the 200's text."""
import a2a_client
import httpx
success = _make_response(200, {"result": {"parts": [{"kind": "text", "text": "OK"}]}})
side_effects = [
httpx.ConnectError("connection refused"),
success,
]
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "OK"
assert mock_client.post.await_count == 2
async def test_all_attempts_fail_returns_last_error(self):
"""5 RemoteProtocolErrors → returns the last error formatted with target URL."""
import a2a_client
import httpx
side_effects = [httpx.RemoteProtocolError("Server disconnected")] * 5
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "RemoteProtocolError" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_caps_at_max_attempts(self):
"""If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS,
not retry forever. Pin the exact attempt count so a future tweak to
the constant has to update this test in lockstep."""
import a2a_client
import httpx
side_effects = [httpx.ReadTimeout("timeout")] * 20 # way more than max
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS
assert mock_client.post.await_count == 5
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
async def test_application_error_not_retried(self):
"""JSON-RPC error response (application-level) is deterministic —
retrying just wastes wall-clock. Must return on the first attempt."""
import a2a_client
resp = _make_response(200, {
"error": {"code": -32603, "message": "Internal error"}
})
mock_client = _make_seq_mock_client([resp, resp, resp])
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 1 # NO retry
assert "Internal error" in result
async def test_non_transient_exception_not_retried(self):
"""A non-httpx exception (programmer bug, JSON parse, etc.) must
not trigger retry — surface immediately so the bug is loud."""
import a2a_client
# A plain ValueError isn't in _TRANSIENT_HTTP_ERRORS.
side_effects = [ValueError("malformed something")] * 3
mock_client = _make_seq_mock_client(side_effects)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 1 # NO retry
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "ValueError" in result
async def test_total_budget_caps_retry_loop(self, monkeypatch):
"""Total wall-clock budget caps the retry loop even if attempts
remain — protects against a string of 5×300s ReadTimeouts.
Simulate elapsed time advancing past the budget on attempt 2."""
import a2a_client
import httpx
side_effects = [httpx.ReadTimeout("timeout")] * 5
mock_client = _make_seq_mock_client(side_effects)
# Make time.monotonic() jump forward past the budget after the
# second attempt — the retry loop should detect the deadline
# and stop, even though _DELEGATE_MAX_ATTEMPTS is 5.
call_count = {"n": 0}
original_budget = a2a_client._DELEGATE_TOTAL_BUDGET_S
def fake_monotonic():
call_count["n"] += 1
# First call (deadline computation) → 0
# Subsequent calls → 0 until attempt 3, then jump past budget
if call_count["n"] <= 4:
return 0.0
return original_budget + 1.0
monkeypatch.setattr(a2a_client.time, "monotonic", fake_monotonic)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
# Stopped before exhausting all 5 attempts.
assert mock_client.post.await_count < 5
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
def test_delegate_backoff_seconds_grows_exponentially_with_jitter():
"""Schedule: ~1s, ~2s, ~4s, ~8s, then capped at 16s. ±25% jitter
means each delay falls in [base*0.75, base*1.25]."""
import a2a_client
# Run a bunch to sample the jitter distribution; assert each value
# falls in the expected window.
for attempt, base in [(0, 1.0), (1, 2.0), (2, 4.0), (3, 8.0), (4, 16.0), (10, 16.0)]:
for _ in range(20):
d = a2a_client._delegate_backoff_seconds(attempt)
assert d >= base * 0.75 - 1e-9, f"attempt {attempt}: {d} < lower"
assert d <= base * 1.25 + 1e-9, f"attempt {attempt}: {d} > upper"
# ---------------------------------------------------------------------------
# get_peers
# ---------------------------------------------------------------------------
class TestGetPeers:
async def test_success_returns_list_on_200(self):
"""200 response → returns the JSON list."""
import a2a_client
peers = [{"id": "ws-1", "name": "Alpha"}, {"id": "ws-2", "name": "Beta"}]
resp = _make_response(200, peers)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_peers()
assert result == peers
async def test_non_200_returns_empty_list(self):
"""Non-200 response → returns []."""
import a2a_client
resp = _make_response(503, {"detail": "service unavailable"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_peers()
assert result == []
async def test_404_returns_empty_list(self):
"""404 response → returns []."""
import a2a_client
resp = _make_response(404, {"detail": "not found"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_peers()
assert result == []
async def test_exception_returns_empty_list(self):
"""Network exception → returns [] (exception swallowed)."""
import a2a_client
mock_client = _make_mock_client(get_exc=TimeoutError("timed out"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_peers()
assert result == []
async def test_request_url_includes_workspace_id(self):
"""GET URL contains the WORKSPACE_ID."""
import a2a_client
resp = _make_response(200, [])
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
await a2a_client.get_peers()
url = mock_client.get.call_args.args[0]
assert "peers" in url
async def test_request_sends_workspace_id_header(self):
"""GET /registry/:id/peers must send X-Workspace-ID header (Phase 30.6)."""
import a2a_client
resp = _make_response(200, [])
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
await a2a_client.get_peers()
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == a2a_client.WORKSPACE_ID
# ---------------------------------------------------------------------------
# get_peers_with_diagnostic — issue #2397
#
# Pin: an empty peer list MUST come with an actionable diagnostic on every
# non-200 + every transport failure. The bug was that get_peers swallowed
# every failure mode behind `return []`, leaving the agent's tool wrapper
# with no way to distinguish "you have no peers" from "auth broke" / "404
# from registry" / "platform 5xx" / "network timeout". Each of these
# requires a different operator action.
# ---------------------------------------------------------------------------
class TestGetPeersWithDiagnostic:
async def test_200_returns_peers_and_no_diagnostic(self):
"""200 with valid list → (peers, None). diagnostic stays None on success."""
import a2a_client
peers = [{"id": "ws-1", "name": "Alpha"}]
resp = _make_response(200, peers)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == peers
assert diag is None
async def test_200_empty_list_returns_no_diagnostic(self):
"""200 with [] → (peers=[], diag=None). Truly no peers is success, not error."""
import a2a_client
resp = _make_response(200, [])
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is None
async def test_401_returns_auth_diagnostic(self):
"""401 → diagnostic mentions auth + restart hint."""
import a2a_client
resp = _make_response(401, {"detail": "unauthorized"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "401" in diag
assert "Authentication" in diag or "authentication" in diag.lower()
async def test_403_returns_auth_diagnostic(self):
"""403 → same auth-failure diagnostic shape as 401."""
import a2a_client
resp = _make_response(403, {"detail": "forbidden"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "403" in diag
async def test_404_returns_registration_diagnostic(self):
"""404 → diagnostic tells operator the workspace ID is missing from the registry."""
import a2a_client
resp = _make_response(404, {"detail": "not found"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "404" in diag
assert "registered" in diag.lower() or "registration" in diag.lower()
async def test_500_returns_platform_error_diagnostic(self):
"""5xx → 'Platform error: HTTP <code>.'"""
import a2a_client
resp = _make_response(503, {"detail": "service unavailable"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "503" in diag
assert "Platform error" in diag or "platform error" in diag.lower()
async def test_network_exception_returns_unreachable_diagnostic(self):
"""httpx exception → diagnostic mentions PLATFORM_URL + the underlying error."""
import a2a_client
mock_client = _make_mock_client(get_exc=TimeoutError("connection timed out"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "Cannot reach platform" in diag or "cannot reach" in diag.lower()
assert "timed out" in diag
async def test_200_with_non_list_body_returns_diagnostic(self):
"""200 but body is a dict → diagnostic flags shape mismatch (regression guard)."""
import a2a_client
resp = _make_response(200, {"oops": "should have been a list"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result, diag = await a2a_client.get_peers_with_diagnostic()
assert result == []
assert diag is not None
assert "list" in diag.lower()
async def test_get_peers_shim_preserves_bare_list_contract(self):
"""get_peers() still returns just list[dict] — no API break for non-tool callers."""
import a2a_client
peers = [{"id": "ws-1", "name": "Alpha"}]
resp = _make_response(200, peers)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_peers()
# Must be a list, not a tuple — bare-list shim contract.
assert isinstance(result, list)
assert result == peers
# ---------------------------------------------------------------------------
# get_workspace_info
# ---------------------------------------------------------------------------
class TestGetWorkspaceInfo:
async def test_success_returns_dict_on_200(self):
"""200 response → returns the JSON dict."""
import a2a_client
info = {"id": "ws-test", "name": "Test Workspace", "status": "online"}
resp = _make_response(200, info)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert result == info
async def test_non_200_returns_error_dict(self):
"""Non-200 response → returns {'error': 'not found'}."""
import a2a_client
resp = _make_response(404, {"detail": "no such workspace"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert result == {"error": "not found"}
async def test_500_returns_error_dict(self):
"""500 response → returns {'error': 'not found'}."""
import a2a_client
resp = _make_response(500, {"detail": "server error"})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert result == {"error": "not found"}
async def test_410_returns_removed_with_hint(self):
"""410 Gone (#2429) → distinct error 'removed' so callers can
prompt re-onboard instead of falling through to 'not found'.
Body shape passes through removed_at + the platform hint."""
import a2a_client
body = {
"error": "workspace removed",
"id": "ws-deleted-uuid",
"removed_at": "2026-04-30T12:00:00Z",
"hint": "Regenerate workspace + token from the canvas → Tokens tab",
}
resp = _make_response(410, body)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert result["error"] == "removed"
assert result["id"] == "ws-deleted-uuid"
assert result["removed_at"] == "2026-04-30T12:00:00Z"
assert "Regenerate" in result["hint"]
async def test_410_with_unparseable_body_falls_back_to_default_hint(self):
"""If the platform's 410 body isn't JSON for some reason, the
default hint still surfaces — the actionable signal must not
depend on body shape parity with the platform."""
import a2a_client
resp = MagicMock()
resp.status_code = 410
resp.json = MagicMock(side_effect=ValueError("not json"))
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert result["error"] == "removed"
assert result["id"] == a2a_client.WORKSPACE_ID
assert result["removed_at"] is None
assert "Regenerate" in result["hint"]
async def test_exception_returns_error_dict_with_message(self):
"""Network exception → returns {'error': '<exception message>'}."""
import a2a_client
exc = RuntimeError("network failure")
mock_client = _make_mock_client(get_exc=exc)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.get_workspace_info()
assert "error" in result
assert "network failure" in result["error"]
async def test_request_url_includes_workspaces_path(self):
"""GET URL contains /workspaces/."""
import a2a_client
resp = _make_response(200, {})
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
await a2a_client.get_workspace_info()
url = mock_client.get.call_args.args[0]
assert "/workspaces/" in url
# ---------------------------------------------------------------------------
# enrich_peer_metadata — sync helper, separate from the async path.
# ---------------------------------------------------------------------------
def _make_sync_mock_client(*, get_resp=None, get_exc=None):
"""Build a synchronous httpx.Client context-manager mock for enrich_peer_metadata."""
mock_get = MagicMock()
if get_exc is not None:
mock_get.side_effect = get_exc
elif get_resp is not None:
mock_get.return_value = get_resp
mock_client = MagicMock()
mock_client.get = mock_get
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
return mock_client
def _make_sync_response(status_code: int, data) -> MagicMock:
"""Build a sync httpx.Response mock."""
resp = MagicMock()
resp.status_code = status_code
resp.json = MagicMock(return_value=data)
return resp
class TestEnrichPeerMetadata:
"""Tests for a2a_client.enrich_peer_metadata.
Uses the same test-ID constant and cache-isolation pattern as the
async tests above.
"""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata(
peer_id,
source_workspace_id=source_workspace_id,
now=now,
)
def test_cache_hit_within_ttl_returns_cached(self):
"""Fresh cache entry → no HTTP call, returns the cached record."""
import a2a_client
peer_data = {"id": _TEST_PEER_ID, "name": "Cached Peer", "url": "http://cached"}
now = 1000.0
# Seed cache with a fresh entry (TTL = 300s, so 1000+100 = 1100 < 1300).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now, peer_data))
try:
result = self._call(_TEST_PEER_ID, now=now + 100)
assert result == peer_data
finally:
# Clean up so other tests are not polluted.
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_cache_expired_causes_refetch(self):
"""Stale cache entry (TTL exceeded) → HTTP GET issued, cache updated."""
import a2a_client
old_data = {"id": _TEST_PEER_ID, "name": "Old"}
fresh_data = {"id": _TEST_PEER_ID, "name": "Fresh", "url": "http://fresh"}
now = 1000.0
# Seed cache with an expired entry (> 300s ago).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now - 1000, old_data))
resp = _make_sync_response(200, fresh_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == fresh_data
# Cache should now hold the fresh data.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == fresh_data
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_network_exception_returns_none_negative_cache_set(self):
"""Network failure → returns None, failure cached (negative cache)."""
import a2a_client
now = 1000.0
mock_client = _make_sync_mock_client(get_exc=ConnectionError("unreachable"))
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
# Negative cache: failure stored so we don't re-fetch on every call.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None # None sentinel = negative cache
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_200_returns_none_negative_cache_set(self):
"""HTTP 404/403/500 → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(404, {"detail": "not found"})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_json_response_returns_none_negative_cache_set(self):
"""Server returns non-JSON body → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = ValueError("invalid json")
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_dict_json_returns_none_negative_cache_set(self):
"""Server returns a JSON array or scalar → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, ["peer-a", "peer-b"])
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_invalid_peer_id_returns_none_without_http(self):
"""Path-traversal / malformed peer IDs are rejected at the trust boundary."""
import a2a_client
mock_client = _make_sync_mock_client(get_resp=_make_sync_response(200, {}))
with patch("a2a_client.httpx.Client", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
assert self._call(bad) is None
# No GET should have been issued for any invalid ID.
mock_client.get.assert_not_called()
def test_happy_path_returns_data_and_caches(self):
"""200 + dict JSON → returns data, cache updated, peer name stored."""
import a2a_client
now = 1000.0
peer_data = {
"id": _TEST_PEER_ID,
"name": "Happy Peer",
"role": "sre",
"url": "http://happy-peer:8080",
}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == peer_data
# Cache updated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
# Peer name indexed.
assert a2a_client._peer_names.get(_TEST_PEER_ID) == "Happy Peer"
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_names.clear()
def test_get_url_includes_peer_id_and_workspace_header(self):
"""GET is issued to /registry/discover/<peer_id> with X-Workspace-ID."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, now=now)
mock_client.get.assert_called_once()
positional_url = mock_client.get.call_args.args[0]
assert _TEST_PEER_ID in positional_url
assert "/registry/discover/" in positional_url
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert "X-Workspace-ID" in headers_sent
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_source_workspace_id_header_overrides_default(self):
"""Caller can pass source_workspace_id to set X-Workspace-ID header."""
import a2a_client
now = 1000.0
src_id = "22222222-2222-2222-2222-222222222222"
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, source_workspace_id=src_id, now=now)
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == src_id
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# enrich_peer_metadata_nonblocking — background-fetch wrapper
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataNonblocking:
"""Tests for the nonblocking variant that schedules work in a thread pool."""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata_nonblocking(
peer_id,
source_workspace_id=source_workspace_id,
)
def test_always_returns_none(self):
"""Nonblocking variant always returns None — never blocks on a registry GET.
Callers render the bare peer_id immediately. A background worker
populates the cache asynchronously; subsequent pushes will see the
warm cache and the caller can optionally read it directly.
"""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
try:
result = self._call(_TEST_PEER_ID)
assert result is None
# The peer should be in the in-flight set (work was scheduled).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_in_flight_guard_prevents_duplicate_schedule(self):
"""Same peer pushed twice before first schedule completes → only one in-flight entry."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
# Pre-populate in-flight manually to simulate already-scheduled.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
result = self._call(_TEST_PEER_ID)
# Returns None because a worker is already scheduled.
assert result is None
# Should NOT have added it again (set.add is idempotent).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_invalid_peer_id_returns_none_without_schedule(self):
"""Malformed peer IDs are rejected at the trust boundary."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
result = self._call("")
assert result is None
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# ---------------------------------------------------------------------------
# _enrich_peer_metadata_worker — background thread body
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataWorker:
"""Tests for the background worker and the test-sync helper."""
def test_worker_runs_sync_function_and_clears_inflight(self):
"""Worker runs enrich_peer_metadata and clears in-flight when done."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
peer_data = {"id": _TEST_PEER_ID, "name": "Worker Peer"}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
# Pre-populate in-flight to simulate a running worker.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
with patch("a2a_client.httpx.Client", return_value=mock_client):
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should be cleared after worker finishes.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# Cache should be populated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_worker_exception_in_sync_function_is_swallowed(self):
"""Exception from the sync function is caught by the worker, in-flight cleared."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
# Patch enrich_peer_metadata to raise so the worker catches it.
with patch.object(
a2a_client, "enrich_peer_metadata", side_effect=RuntimeError("boom")
):
# Should NOT raise — worker swallows it.
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should still be cleared even on error.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# _wait_for_enrichment_inflight_for_testing — test synchronisation helper
# ---------------------------------------------------------------------------
class TestWaitForEnrichmentInFlight:
"""Tests for the test-only synchronisation helper."""
def test_returns_immediately_when_nothing_inflight(self):
"""Empty in-flight set → returns instantly."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
# Should not raise.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=0.1)
# Should have returned quickly (not slept the full 0.1s).
# The implementation polls with 10ms sleeps, so if it ran for >50ms
# it would have done multiple polls — the empty-set early-return is
# the fast path.
def test_blocks_until_inflight_completes(self):
"""In-flight entry cleared while waiting → returns."""
import a2a_client
import time as _time
a2a_client._peer_in_flight_clear_for_testing()
a2a_client._peer_metadata.clear()
peer_data = {"id": _TEST_PEER_ID, "name": "Blocker Peer"}
# Replace enrich_peer_metadata with one that bypasses httpx entirely.
# The httpx patch approach fails because the background worker runs
# after the patch context exits (thread-boundary issue: the executor
# thread is created before the patch, so it uses the original httpx).
# Replacing the function itself works across thread boundaries.
fake_enrich = lambda pid, src=None, *, now=None: (
a2a_client._peer_metadata_set(pid, (now or _time.monotonic(), peer_data)),
a2a_client._peer_names.__setitem__(pid, peer_data["name"])
)
orig = a2a_client.enrich_peer_metadata
a2a_client.enrich_peer_metadata = fake_enrich
try:
a2a_client.enrich_peer_metadata_nonblocking(_TEST_PEER_ID)
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=5.0)
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client.enrich_peer_metadata = orig
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()