Merge pull request #2420 from Molecule-AI/refactor/send-a2a-message-by-peer-id

refactor(workspace-runtime): send_a2a_message takes peer_id + UUID validation [stacks on #2418]
This commit is contained in:
Hongming Wang 2026-05-01 00:44:56 +00:00 committed by GitHub
commit f035482e0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 163 additions and 71 deletions

View File

@ -8,6 +8,7 @@ import asyncio
import logging
import os
import random
import re
import time
import uuid
@ -33,13 +34,54 @@ _peer_names: dict[str, str] = {}
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
# tool args. ``_validate_peer_id`` enforces UUID-shape at the
# trust boundary so we never interpolate `..` or `/` into a URL path,
# never silently coerce malformed input into a 404, and surface a
# clear error to the agent rather than letting an HTTP 4xx bubble up
# from the platform with a generic error message.
#
# Lenient on case + whitespace because real-world peer-id strings
# come from list_peers/discover_peer responses (canonical lowercase)
# or hand-typed agent input (mixed-case acceptable). Strict on
# everything else.
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
def _validate_peer_id(peer_id: str) -> str | None:
"""Return the canonicalised peer_id if valid, else None.
Returning None instead of raising so callers in tool surfaces can
convert to a friendly agent-facing string ("workspace_id is not a
valid UUID") rather than crashing with a stack trace.
"""
if not isinstance(peer_id, str):
return None
pid = peer_id.strip()
if not _UUID_RE.match(pid):
return None
return pid.lower()
async def discover_peer(target_id: str) -> dict | None:
"""Discover a peer workspace's URL via the platform registry."""
"""Discover a peer workspace's URL via the platform registry.
Validates ``target_id`` is a UUID before constructing the URL a
malformed id can't reach the platform handler now, which both
short-circuits an avoidable round-trip AND ensures we never
interpolate path-traversal characters into the URL.
"""
safe_id = _validate_peer_id(target_id)
if safe_id is None:
return None
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{target_id}",
f"{PLATFORM_URL}/registry/discover/{safe_id}",
headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()},
)
if resp.status_code == 200:
@ -134,8 +176,14 @@ def _format_a2a_error(exc: BaseException, target_url: str) -> str:
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace.
async def send_a2a_message(peer_id: str, message: str) -> str:
"""Send an A2A ``message/send`` to a peer workspace via the platform proxy.
The target URL is constructed internally as
``${PLATFORM_URL}/workspaces/{peer_id}/a2a``. Going through the
platform's A2A proxy is the only path that works for both
in-container and external runtimes see
a2a_tools.tool_delegate_task for the rationale.
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
transport-layer errors (RemoteProtocolError, ConnectError,
@ -144,6 +192,11 @@ async def send_a2a_message(target_url: str, message: str) -> str:
JSON-RPC error response, malformed JSON) are NOT retried they
indicate a deterministic problem retry won't fix.
"""
safe_id = _validate_peer_id(peer_id)
if safe_id is None:
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).

View File

@ -188,9 +188,10 @@ async def tool_delegate_task(workspace_id: str, task: str) -> str:
if not workspace_id or not task:
return "Error: workspace_id and task are required"
# Discover the target. We still call discover_peer because it
# enforces access control + tells us whether the peer is online,
# but we DO NOT use the peer's reported URL for routing — see below.
# Discover the target. discover_peer is the access-control gate +
# name/status lookup. The peer's reported ``url`` field is NOT used
# for routing — see send_a2a_message, which constructs the URL via
# the platform's A2A proxy.
peer = await discover_peer(workspace_id)
if not peer:
return f"Error: workspace {workspace_id} not found or not accessible (check access control)"
@ -198,33 +199,16 @@ async def tool_delegate_task(workspace_id: str, task: str) -> str:
if (peer.get("status") or "").lower() == "offline":
return f"Error: workspace {workspace_id} is offline"
# Route through the platform's A2A proxy instead of POSTing
# directly to peer["url"]. The peer's URL is whatever it last
# registered — for in-container peers that's a Docker-internal
# hostname like ``http://ws-X-Y:8000`` which only resolves inside
# the platform's container DNS. External callers (the standalone
# molecule-mcp wrapper running on an operator's laptop) hit
# `[Errno 8] nodename nor servname` every time they try to reach
# an in-container peer that way. The platform's
# ``/workspaces/:peer-id/a2a`` proxy works for BOTH paths: it
# forwards over the Docker network for in-container peers and is
# the only path external runtimes can use, so unifying on it
# makes the universal-MCP path actually universal. In-container
# callers pay one extra HTTP hop on the same bridge — microseconds
# — in exchange for one consistent code path. Verified live on
# 2026-04-30 against workspace 8dad3e29 → 97ac32e9 (Claude Code
# Agent) which replied correctly through the proxy after failing
# via direct connect.
target_url = f"{PLATFORM_URL}/workspaces/{workspace_id}/a2a"
# Report delegation start — include the task text for traceability
peer_name = peer.get("name") or _peer_names.get(workspace_id) or workspace_id[:8]
_peer_names[workspace_id] = peer_name # cache for future use
# Brief summary for canvas display — just the delegation target
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
# Send A2A message and log the full round-trip
result = await send_a2a_message(target_url, task)
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
# (the platform proxy) so the same code works for in-container and
# external (standalone molecule-mcp) callers.
result = await send_a2a_message(workspace_id, task)
# Detect delegation failures — wrap them clearly so the calling agent
# can decide to retry, use another peer, or handle the task itself.

View File

@ -42,6 +42,12 @@ def _make_response(status_code, json_data):
return resp
# Canonical UUID used wherever a test needs a peer_id. send_a2a_message and
# discover_peer reject non-UUID strings at the trust boundary (see
# a2a_client._validate_peer_id), so test inputs must be valid UUIDs.
_TEST_PEER_ID = "11111111-1111-1111-1111-111111111111"
# ---------------------------------------------------------------------------
# Module-level constants (just ensure they exist and have sensible types)
# ---------------------------------------------------------------------------
@ -64,12 +70,12 @@ class TestDiscoverPeer:
"""200 response → returns the JSON body."""
import a2a_client
peer_data = {"id": "ws-abc", "url": "http://ws-abc.svc", "name": "Alpha"}
peer_data = {"id": _TEST_PEER_ID, "url": "http://ws-abc.svc", "name": "Alpha"}
resp = _make_response(200, peer_data)
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer("ws-abc")
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result == peer_data
@ -81,7 +87,7 @@ class TestDiscoverPeer:
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer("ws-missing")
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
@ -93,7 +99,7 @@ class TestDiscoverPeer:
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer("ws-forbidden")
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
@ -104,10 +110,26 @@ class TestDiscoverPeer:
mock_client = _make_mock_client(get_exc=ConnectionError("host unreachable"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.discover_peer("ws-down")
result = await a2a_client.discover_peer(_TEST_PEER_ID)
assert result is None
async def test_invalid_peer_id_returns_none_without_http(self):
"""Malformed peer_id is rejected at the trust boundary — no HTTP call.
Path-traversal-shaped input ("../admin"), free-form labels
("ws-abc"), and empty strings all return None and don't reach
the platform. Closes the URL-interpolation class of bug.
"""
import a2a_client
mock_client = _make_mock_client(get_resp=_make_response(200, {}))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
assert await a2a_client.discover_peer(bad) is None
# No GET should have been issued for any of those.
mock_client.get.assert_not_called()
async def test_request_uses_correct_url_and_header(self):
"""GET is called with the right URL and X-Workspace-ID header."""
import a2a_client
@ -116,11 +138,11 @@ class TestDiscoverPeer:
mock_client = _make_mock_client(get_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
await a2a_client.discover_peer("ws-xyz")
await a2a_client.discover_peer(_TEST_PEER_ID)
mock_client.get.assert_called_once()
positional_url = mock_client.get.call_args.args[0]
assert "ws-xyz" in positional_url
assert _TEST_PEER_ID in positional_url
# X-Workspace-ID must be present; bearer token also merged in when available
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == a2a_client.WORKSPACE_ID
@ -142,7 +164,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "Hello!"
@ -154,7 +176,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "(no response)"
@ -168,7 +190,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "Agent error: something bad" in result
@ -183,7 +205,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "Internal error occurred" in result
@ -196,7 +218,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
# The error includes the JSON-RPC code so the operator can look it
@ -205,7 +227,10 @@ class TestSendA2AMessage:
assert "code=-32600" in result
assert "no message" in result.lower()
# Target URL is included so chained delegations are traceable.
assert "target=http://target/a2a" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_jsonrpc_error_with_code_zero_includes_code_in_detail(self):
"""JSON-RPC error code=0 is technically not valid in the spec,
@ -219,7 +244,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "code=0" in result
@ -234,7 +259,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Pre-fix this returned bare str(payload) which the canvas
# rendered as a confusing "looks like a successful response"
@ -243,7 +268,10 @@ class TestSendA2AMessage:
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
assert "abc123" in result # snippet of payload included for context
assert "target=http://target/a2a" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_exception_returns_error_prefix_and_message(self):
"""Network exception → returns _A2A_ERROR_PREFIX + exception text."""
@ -252,7 +280,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_exc=ConnectionError("connection refused"))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "connection refused" in result
@ -260,7 +288,10 @@ class TestSendA2AMessage:
# already include it — gives the operator a typed handle to
# search for in container logs.
assert "ConnectionError" in result
assert "target=http://target/a2a" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_empty_stringifying_exception_falls_back_to_class_name(self):
"""The user's reported bug: httpx.RemoteProtocolError and similar
@ -280,7 +311,7 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_exc=_SilentRemoteProtocolError())
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Must NOT be just the bare prefix — that's the regression.
assert result != a2a_client._A2A_ERROR_PREFIX.strip()
@ -288,7 +319,10 @@ class TestSendA2AMessage:
# Must include the class name + something explanatory.
assert "_SilentRemoteProtocolError" in result
assert "no message" in result.lower()
assert "target=http://target/a2a" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_result_text_part_missing_text_key_returns_empty(self):
"""Part dict without 'text' key → falls back to '' (empty string returned)."""
@ -300,11 +334,30 @@ class TestSendA2AMessage:
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message("http://target/a2a", "task")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Returns "" (empty string — does not start with _A2A_ERROR_PREFIX)
assert result == ""
async def test_invalid_peer_id_short_circuits_without_http(self):
"""Malformed peer_id is rejected at the trust boundary — no POST.
Symmetric coverage with discover_peer's validation gate. Path-traversal
("../admin"), free-form labels ("ws-abc"), and empty strings all
return an _A2A_ERROR_PREFIX message identifying the bad input and
never reach the platform.
"""
import a2a_client
mock_client = _make_mock_client(post_resp=_make_response(200, {}))
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
result = await a2a_client.send_a2a_message(bad, "ping")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "invalid peer_id" in result
# No POST should have been issued for any of those.
mock_client.post.assert_not_called()
# ---------------------------------------------------------------------------
# send_a2a_message — transient-error retry behaviour
@ -354,7 +407,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "OK"
assert mock_client.post.await_count == 3
@ -373,7 +426,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert result == "OK"
assert mock_client.post.await_count == 2
@ -388,12 +441,15 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 5 # _DELEGATE_MAX_ATTEMPTS
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "RemoteProtocolError" in result
assert "target=http://target/a2a" in result
# Target URL now constructed internally — assert it contains the peer_id
# and the proxy path, not the old hand-passed URL.
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_caps_at_max_attempts(self):
"""If transient errors keep coming, we MUST stop at _DELEGATE_MAX_ATTEMPTS,
@ -407,7 +463,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == a2a_client._DELEGATE_MAX_ATTEMPTS
assert mock_client.post.await_count == 5
@ -425,7 +481,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 1 # NO retry
assert "Internal error" in result
@ -441,7 +497,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
assert mock_client.post.await_count == 1 # NO retry
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
@ -475,7 +531,7 @@ class TestSendA2AMessageRetry:
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client), \
patch("a2a_client.asyncio.sleep", new=AsyncMock()):
result = await a2a_client.send_a2a_message("http://target/a2a", "ping")
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "ping")
# Stopped before exhausting all 5 attempts.
assert mock_client.post.await_count < 5

View File

@ -239,35 +239,34 @@ class TestToolDelegateTask:
result = await a2a_tools.tool_delegate_task("ws-1", "task")
assert "offline" in result.lower()
async def test_routes_through_platform_proxy_not_peer_url(self):
"""tool_delegate_task must POST to ${PLATFORM_URL}/workspaces/:peer-id/a2a,
NOT to peer["url"]. The peer's URL is a Docker-internal hostname for
in-container peers; external molecule-mcp callers cannot resolve it.
Routing through the platform proxy works for both."""
async def test_passes_peer_id_to_send_a2a_message(self):
"""tool_delegate_task forwards the workspace_id directly to
send_a2a_message, which owns URL construction (proxy path).
Verifies the contract: tool_delegate_task does NOT build URLs
from peer["url"], it just hands the id off."""
import a2a_tools
from a2a_client import PLATFORM_URL
peer_id = "11111111-1111-1111-1111-111111111111"
peer = {
"id": "ws-target",
# Internal-only URL — must NOT be used.
"id": peer_id,
# Internal-only URL — must NOT be used as the routing target.
"url": "http://ws-target-internal:8000",
"name": "Worker",
"status": "online",
}
captured = {}
async def fake_send(target_url, message):
captured["target_url"] = target_url
async def fake_send(passed_peer_id, message):
captured["peer_id"] = passed_peer_id
captured["message"] = message
return "ok"
with patch("a2a_tools.discover_peer", return_value=peer), \
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools.report_activity", new=AsyncMock()):
await a2a_tools.tool_delegate_task("ws-target", "do thing")
await a2a_tools.tool_delegate_task(peer_id, "do thing")
assert captured["target_url"] == f"{PLATFORM_URL}/workspaces/ws-target/a2a"
# Sanity: definitely NOT the peer's reported URL
assert captured["target_url"] != peer["url"]
assert captured["peer_id"] == peer_id
assert captured["message"] == "do thing"
async def test_success_returns_result_text(self):
"""Happy path: peer found with URL, A2A returns a result."""