forked from molecule-ai/molecule-core
Merge pull request #2420 from Molecule-AI/refactor/send-a2a-message-by-peer-id
refactor(workspace-runtime): send_a2a_message takes peer_id + UUID validation [stacks on #2418]
This commit is contained in:
commit
f035482e0a
@ -8,6 +8,7 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
|
||||
@ -33,13 +34,54 @@ _peer_names: dict[str, str] = {}
|
||||
# Used by delegate_task to distinguish real errors from normal response text.
|
||||
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
||||
|
||||
# Workspace IDs are UUIDs everywhere we generate them (platform's
|
||||
# workspaces.id column, /registry/discover/:id route param, etc.) but
|
||||
# the agent-facing tool surface receives them as free-form strings via
|
||||
# tool args. ``_validate_peer_id`` enforces UUID-shape at the
|
||||
# trust boundary so we never interpolate `..` or `/` into a URL path,
|
||||
# never silently coerce malformed input into a 404, and surface a
|
||||
# clear error to the agent rather than letting an HTTP 4xx bubble up
|
||||
# from the platform with a generic error message.
|
||||
#
|
||||
# Lenient on case + whitespace because real-world peer-id strings
|
||||
# come from list_peers/discover_peer responses (canonical lowercase)
|
||||
# or hand-typed agent input (mixed-case acceptable). Strict on
|
||||
# everything else.
|
||||
_UUID_RE = re.compile(
|
||||
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
|
||||
)
|
||||
|
||||
|
||||
def _validate_peer_id(peer_id: str) -> str | None:
|
||||
"""Return the canonicalised peer_id if valid, else None.
|
||||
|
||||
Returning None instead of raising so callers in tool surfaces can
|
||||
convert to a friendly agent-facing string ("workspace_id is not a
|
||||
valid UUID") rather than crashing with a stack trace.
|
||||
"""
|
||||
if not isinstance(peer_id, str):
|
||||
return None
|
||||
pid = peer_id.strip()
|
||||
if not _UUID_RE.match(pid):
|
||||
return None
|
||||
return pid.lower()
|
||||
|
||||
|
||||
async def discover_peer(target_id: str) -> dict | None:
|
||||
"""Discover a peer workspace's URL via the platform registry."""
|
||||
"""Discover a peer workspace's URL via the platform registry.
|
||||
|
||||
Validates ``target_id`` is a UUID before constructing the URL — a
|
||||
malformed id can't reach the platform handler now, which both
|
||||
short-circuits an avoidable round-trip AND ensures we never
|
||||
interpolate path-traversal characters into the URL.
|
||||
"""
|
||||
safe_id = _validate_peer_id(target_id)
|
||||
if safe_id is None:
|
||||
return None
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
try:
|
||||
resp = await client.get(
|
||||
f"{PLATFORM_URL}/registry/discover/{target_id}",
|
||||
f"{PLATFORM_URL}/registry/discover/{safe_id}",
|
||||
headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
@ -134,8 +176,14 @@ def _format_a2a_error(exc: BaseException, target_url: str) -> str:
|
||||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||||
|
||||
|
||||
async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
"""Send an A2A message/send to a target workspace.
|
||||
async def send_a2a_message(peer_id: str, message: str) -> str:
|
||||
"""Send an A2A ``message/send`` to a peer workspace via the platform proxy.
|
||||
|
||||
The target URL is constructed internally as
|
||||
``${PLATFORM_URL}/workspaces/{peer_id}/a2a``. Going through the
|
||||
platform's A2A proxy is the only path that works for both
|
||||
in-container and external runtimes — see
|
||||
a2a_tools.tool_delegate_task for the rationale.
|
||||
|
||||
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
|
||||
transport-layer errors (RemoteProtocolError, ConnectError,
|
||||
@ -144,6 +192,11 @@ async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
JSON-RPC error response, malformed JSON) are NOT retried — they
|
||||
indicate a deterministic problem retry won't fix.
|
||||
"""
|
||||
safe_id = _validate_peer_id(peer_id)
|
||||
if safe_id is None:
|
||||
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
|
||||
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
|
||||
|
||||
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
||||
# a hung upstream to block the agent indefinitely. Use a generous but bounded
|
||||
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
|
||||
|
||||
@ -188,9 +188,10 @@ async def tool_delegate_task(workspace_id: str, task: str) -> str:
|
||||
if not workspace_id or not task:
|
||||
return "Error: workspace_id and task are required"
|
||||
|
||||
# Discover the target. We still call discover_peer because it
|
||||
# enforces access control + tells us whether the peer is online,
|
||||
# but we DO NOT use the peer's reported URL for routing — see below.
|
||||
# Discover the target. discover_peer is the access-control gate +
|
||||
# name/status lookup. The peer's reported ``url`` field is NOT used
|
||||
# for routing — see send_a2a_message, which constructs the URL via
|
||||
# the platform's A2A proxy.
|
||||
peer = await discover_peer(workspace_id)
|
||||
if not peer:
|
||||
return f"Error: workspace {workspace_id} not found or not accessible (check access control)"
|
||||
@ -198,33 +199,16 @@ async def tool_delegate_task(workspace_id: str, task: str) -> str:
|
||||
if (peer.get("status") or "").lower() == "offline":
|
||||
return f"Error: workspace {workspace_id} is offline"
|
||||
|
||||
# Route through the platform's A2A proxy instead of POSTing
|
||||
# directly to peer["url"]. The peer's URL is whatever it last
|
||||
# registered — for in-container peers that's a Docker-internal
|
||||
# hostname like ``http://ws-X-Y:8000`` which only resolves inside
|
||||
# the platform's container DNS. External callers (the standalone
|
||||
# molecule-mcp wrapper running on an operator's laptop) hit
|
||||
# `[Errno 8] nodename nor servname` every time they try to reach
|
||||
# an in-container peer that way. The platform's
|
||||
# ``/workspaces/:peer-id/a2a`` proxy works for BOTH paths: it
|
||||
# forwards over the Docker network for in-container peers and is
|
||||
# the only path external runtimes can use, so unifying on it
|
||||
# makes the universal-MCP path actually universal. In-container
|
||||
# callers pay one extra HTTP hop on the same bridge — microseconds
|
||||
# — in exchange for one consistent code path. Verified live on
|
||||
# 2026-04-30 against workspace 8dad3e29 → 97ac32e9 (Claude Code
|
||||
# Agent) which replied correctly through the proxy after failing
|
||||
# via direct connect.
|
||||
target_url = f"{PLATFORM_URL}/workspaces/{workspace_id}/a2a"
|
||||
|
||||
# Report delegation start — include the task text for traceability
|
||||
peer_name = peer.get("name") or _peer_names.get(workspace_id) or workspace_id[:8]
|
||||
_peer_names[workspace_id] = peer_name # cache for future use
|
||||
# Brief summary for canvas display — just the delegation target
|
||||
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
|
||||
|
||||
# Send A2A message and log the full round-trip
|
||||
result = await send_a2a_message(target_url, task)
|
||||
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
|
||||
# (the platform proxy) so the same code works for in-container and
|
||||
# external (standalone molecule-mcp) callers.
|
||||
result = await send_a2a_message(workspace_id, task)
|
||||
|
||||
# Detect delegation failures — wrap them clearly so the calling agent
|
||||
# can decide to retry, use another peer, or handle the task itself.
|
||||
|
||||
@ -42,6 +42,12 @@ def _make_response(status_code, json_data):
|
||||
return resp
|
||||
|
||||
|
||||
# Canonical UUID used wherever a test needs a peer_id. send_a2a_message and
|
||||
# discover_peer reject non-UUID strings at the trust boundary (see
|
||||
# a2a_client._validate_peer_id), so test inputs must be valid UUIDs.
|
||||
_TEST_PEER_ID = "11111111-1111-1111-1111-111111111111"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level constants (just ensure they exist and have sensible types)
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -64,12 +70,12 @@ class TestDiscoverPeer:
|
||||
"""200 response → returns the JSON body."""
|
||||
import a2a_client
|
||||
|
||||
peer_data = {"id": "ws-abc", "url": "http://ws-abc.svc", "name": "Alpha"}
|
||||
peer_data = {"id": _TEST_PEER_ID, "url": "http://ws-abc.svc", "name": "Alpha"}
|
||||
resp = _make_response(200, peer_data)
|
||||
mock_client = _make_mock_client(get_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.discover_peer("ws-abc")
|
||||
result = await a2a_client.discover_peer(_TEST_PEER_ID)
|
||||
|
||||
assert result == peer_data
|
||||
|
||||
@ -81,7 +87,7 @@ class TestDiscoverPeer:
|
||||
mock_client = _make_mock_client(get_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.discover_peer("ws-missing")
|
||||
result = await a2a_client.discover_peer(_TEST_PEER_ID)
|
||||
|
||||
assert result is None
|
||||
|
||||
@ -93,7 +99,7 @@ class TestDiscoverPeer:
|
||||
mock_client = _make_mock_client(get_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.discover_peer("ws-forbidden")
|
||||
result = await a2a_client.discover_peer(_TEST_PEER_ID)
|
||||
|
||||
assert result is None
|
||||
|
||||
@ -104,10 +110,26 @@ class TestDiscoverPeer:
|
||||
mock_client = _make_mock_client(get_exc=ConnectionError("host unreachable"))
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.discover_peer("ws-down")
|
||||
result = await a2a_client.discover_peer(_TEST_PEER_ID)
|
||||
|
||||
assert result is None
|
||||
|
||||
async def test_invalid_peer_id_returns_none_without_http(self):
|
||||
"""Malformed peer_id is rejected at the trust boundary — no HTTP call.
|
||||
|
||||
Path-traversal-shaped input ("../admin"), free-form labels
|
||||
("ws-abc"), and empty strings all return None and don't reach
|
||||
the platform. Closes the URL-interpolation class of bug.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
mock_client = _make_mock_client(get_resp=_make_response(200, {}))
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
|
||||
assert await a2a_client.discover_peer(bad) is None
|
||||
# No GET should have been issued for any of those.
|
||||
mock_client.get.assert_not_called()
|
||||
|
||||
async def test_request_uses_correct_url_and_header(self):
|
||||
"""GET is called with the right URL and X-Workspace-ID header."""
|
||||
import a2a_client
|
||||
@ -116,11 +138,11 @@ class TestDiscoverPeer:
|
||||
mock_client = _make_mock_client(get_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
await a2a_client.discover_peer("ws-xyz")
|
||||
await a2a_client.discover_peer(_TEST_PEER_ID)
|
||||
|
||||
mock_client.get.assert_called_once()
|
||||
positional_url = mock_client.get.call_args.args[0]
|
||||
assert "ws-xyz" in positional_url
|
||||
assert _TEST_PEER_ID in positional_url
|
||||
# X-Workspace-ID must be present; bearer token also merged in when available
|
||||
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
|
||||
assert headers_sent.get("X-Workspace-ID") == a2a_client.WORKSPACE_ID
|
||||
@ -142,7 +164,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert result == "Hello!"
|
||||
|
||||
@ -154,7 +176,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert result == "(no response)"
|
||||
|
||||
@ -168,7 +190,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "Agent error: something bad" in result
|
||||
@ -183,7 +205,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "Internal error occurred" in result
|
||||
@ -196,7 +218,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
# The error includes the JSON-RPC code so the operator can look it
|
||||
@ -205,7 +227,10 @@ class TestSendA2AMessage:
|
||||
assert "code=-32600" in result
|
||||
assert "no message" in result.lower()
|
||||
# Target URL is included so chained delegations are traceable.
|
||||
assert "target=http://target/a2a" in result
|
||||
# Target URL now constructed internally — assert it contains the peer_id
|
||||
# and the proxy path, not the old hand-passed URL.
|
||||
assert _TEST_PEER_ID in result
|
||||
assert "/workspaces/" in result and "/a2a" in result
|
||||
|
||||
async def test_jsonrpc_error_with_code_zero_includes_code_in_detail(self):
|
||||
"""JSON-RPC error code=0 is technically not valid in the spec,
|
||||
@ -219,7 +244,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "code=0" in result
|
||||
@ -234,7 +259,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
# Pre-fix this returned bare str(payload) which the canvas
|
||||
# rendered as a confusing "looks like a successful response"
|
||||
@ -243,7 +268,10 @@ class TestSendA2AMessage:
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "unexpected response shape" in result
|
||||
assert "abc123" in result # snippet of payload included for context
|
||||
assert "target=http://target/a2a" in result
|
||||
# Target URL now constructed internally — assert it contains the peer_id
|
||||
# and the proxy path, not the old hand-passed URL.
|
||||
assert _TEST_PEER_ID in result
|
||||
assert "/workspaces/" in result and "/a2a" in result
|
||||
|
||||
async def test_exception_returns_error_prefix_and_message(self):
|
||||
"""Network exception → returns _A2A_ERROR_PREFIX + exception text."""
|
||||
@ -252,7 +280,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_exc=ConnectionError("connection refused"))
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "connection refused" in result
|
||||
@ -260,7 +288,10 @@ class TestSendA2AMessage:
|
||||
# already include it — gives the operator a typed handle to
|
||||
# search for in container logs.
|
||||
assert "ConnectionError" in result
|
||||
assert "target=http://target/a2a" in result
|
||||
# Target URL now constructed internally — assert it contains the peer_id
|
||||
# and the proxy path, not the old hand-passed URL.
|
||||
assert _TEST_PEER_ID in result
|
||||
assert "/workspaces/" in result and "/a2a" in result
|
||||
|
||||
async def test_empty_stringifying_exception_falls_back_to_class_name(self):
|
||||
"""The user's reported bug: httpx.RemoteProtocolError and similar
|
||||
@ -280,7 +311,7 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_exc=_SilentRemoteProtocolError())
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
# Must NOT be just the bare prefix — that's the regression.
|
||||
assert result != a2a_client._A2A_ERROR_PREFIX.strip()
|
||||
@ -288,7 +319,10 @@ class TestSendA2AMessage:
|
||||
# Must include the class name + something explanatory.
|
||||
assert "_SilentRemoteProtocolError" in result
|
||||
assert "no message" in result.lower()
|
||||
assert "target=http://target/a2a" in result
|
||||
# Target URL now constructed internally — assert it contains the peer_id
|
||||
# and the proxy path, not the old hand-passed URL.
|
||||
assert _TEST_PEER_ID in result
|
||||
assert "/workspaces/" in result and "/a2a" in result
|
||||
|
||||
async def test_result_text_part_missing_text_key_returns_empty(self):
|
||||
"""Part dict without 'text' key → falls back to '' (empty string returned)."""
|
||||
@ -300,11 +334,30 @@ class TestSendA2AMessage:
|
||||
mock_client = _make_mock_client(post_resp=resp)
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||
|
||||
# Returns "" (empty string — does not start with _A2A_ERROR_PREFIX)
|
||||
assert result == ""
|
||||
|
||||
async def test_invalid_peer_id_short_circuits_without_http(self):
|
||||
"""Malformed peer_id is rejected at the trust boundary — no POST.
|
||||
|
||||
Symmetric coverage with discover_peer's validation gate. Path-traversal
|
||||
("../admin"), free-form labels ("ws-abc"), and empty strings all
|
||||
return an _A2A_ERROR_PREFIX message identifying the bad input and
|
||||
never reach the platform.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
mock_client = _make_mock_client(post_resp=_make_response(200, {}))
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
|
||||
result = await a2a_client.send_a2a_message(bad, "ping")
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "invalid peer_id" in result
|
||||
# No POST should have been issued for any of those.
|
||||
mock_client.post.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# send_a2a_message — transient-error retry behaviour
|
||||
@ -354,7 +407,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert result == "OK"
|
||||
assert mock_client.post.await_count == 3
|
||||
@ -373,7 +426,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert result == "OK"
|
||||
assert mock_client.post.await_count == 2
|
||||
@ -388,12 +441,15 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
assert "RemoteProtocolError" in result
|
||||
assert "target=http://target/a2a" in result
|
||||
# Target URL now constructed internally — assert it contains the peer_id
|
||||
# and the proxy path, not the old hand-passed URL.
|
||||
assert _TEST_PEER_ID in result
|
||||
assert "/workspaces/" in result and "/a2a" in result
|
||||
|
||||
async def test_caps_at_max_attempts(self):
|
||||
"""If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS,
|
||||
@ -407,7 +463,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS
|
||||
assert mock_client.post.await_count == 5
|
||||
@ -425,7 +481,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert mock_client.post.await_count == 1 # NO retry
|
||||
assert "Internal error" in result
|
||||
@ -441,7 +497,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
assert mock_client.post.await_count == 1 # NO retry
|
||||
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||
@ -475,7 +531,7 @@ class TestSendA2AMessageRetry:
|
||||
|
||||
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
|
||||
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
|
||||
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
|
||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
|
||||
|
||||
# Stopped before exhausting all 5 attempts.
|
||||
assert mock_client.post.await_count < 5
|
||||
|
||||
@ -239,35 +239,34 @@ class TestToolDelegateTask:
|
||||
result = await a2a_tools.tool_delegate_task("ws-1", "task")
|
||||
assert "offline" in result.lower()
|
||||
|
||||
async def test_routes_through_platform_proxy_not_peer_url(self):
|
||||
"""tool_delegate_task must POST to ${PLATFORM_URL}/workspaces/:peer-id/a2a,
|
||||
NOT to peer["url"]. The peer's URL is a Docker-internal hostname for
|
||||
in-container peers; external molecule-mcp callers cannot resolve it.
|
||||
Routing through the platform proxy works for both."""
|
||||
async def test_passes_peer_id_to_send_a2a_message(self):
|
||||
"""tool_delegate_task forwards the workspace_id directly to
|
||||
send_a2a_message, which owns URL construction (proxy path).
|
||||
Verifies the contract: tool_delegate_task does NOT build URLs
|
||||
from peer["url"], it just hands the id off."""
|
||||
import a2a_tools
|
||||
from a2a_client import PLATFORM_URL
|
||||
|
||||
peer_id = "11111111-1111-1111-1111-111111111111"
|
||||
peer = {
|
||||
"id": "ws-target",
|
||||
# Internal-only URL — must NOT be used.
|
||||
"id": peer_id,
|
||||
# Internal-only URL — must NOT be used as the routing target.
|
||||
"url": "http://ws-target-internal:8000",
|
||||
"name": "Worker",
|
||||
"status": "online",
|
||||
}
|
||||
captured = {}
|
||||
async def fake_send(target_url, message):
|
||||
captured["target_url"] = target_url
|
||||
async def fake_send(passed_peer_id, message):
|
||||
captured["peer_id"] = passed_peer_id
|
||||
captured["message"] = message
|
||||
return "ok"
|
||||
|
||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
||||
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||
await a2a_tools.tool_delegate_task("ws-target", "do thing")
|
||||
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
||||
|
||||
assert captured["target_url"] == f"{PLATFORM_URL}/workspaces/ws-target/a2a"
|
||||
# Sanity: definitely NOT the peer's reported URL
|
||||
assert captured["target_url"] != peer["url"]
|
||||
assert captured["peer_id"] == peer_id
|
||||
assert captured["message"] == "do thing"
|
||||
|
||||
async def test_success_returns_result_text(self):
|
||||
"""Happy path: peer found with URL, A2A returns a result."""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user