molecule-ai-workspace-runtime/molecule_runtime/a2a_client.py
Molecule AI Infra-Runtime-BE d21f8c2064
Some checks failed
ci / mirror-guard (pull_request) Failing after 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
fix(runtime): align PLATFORM_URL default to host.docker.internal across all modules
Unified the fallback default for PLATFORM_URL from `http://platform:8080`
(Docker Compose service name) to `http://host.docker.internal:8080`
across all 13 modules that declare it. This matches:
- The provisioner's default (buildContainerEnv injects PLATFORM_URL
  from cfg.PlatformURL, which defaults to host.docker.internal on the
  platform side — main.go:platformURL)
- The molecule-git-token-helper.sh script (already uses host.docker.internal)
- The MCP client (MOLECULE_URL injected by provisioner)

The provisioner always sets PLATFORM_URL in production containers, so
this is a development/Docker-only improvement: without this change,
a workspace started outside the Docker Compose network (e.g. via
`docker run` with `--network host`) would fail platform API calls
with "Connection refused" because `platform:8080` resolves nowhere.

13 modules updated: a2a_cli, a2a_client, a2a_mcp_server, adapters/base,
builtin_tools/a2a_tools, builtin_tools/approval, builtin_tools/delegation,
builtin_tools/hitl, builtin_tools/memory, consolidation, coordinator,
main, molecule_ai_status. All docstrings updated to match.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 03:34:24 +00:00

131 lines
5.0 KiB
Python

"""A2A protocol client — peer discovery, messaging, and workspace info.
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import logging
import os
import uuid
import httpx
from .builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
from .platform_auth import auth_headers
logger = logging.getLogger(__name__)
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
async def discover_peer(target_id: str) -> dict | None:
"""Discover a peer workspace's URL via the platform registry."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{target_id}",
headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()},
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
logger.error(f"Discovery failed for {target_id}: {e}")
return None
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace."""
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try:
resp = await client.post(
target_url,
headers=auth_headers(),
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = err.get("message") or "unknown"
code = err.get("code")
if code is not None:
return f"{_A2A_ERROR_PREFIX}[code={code}] {msg}"
return f"{_A2A_ERROR_PREFIX}{msg}"
return str(data)
except Exception as e:
# #51: str(e) is empty for bare TimeoutError(), BrokenPipeError(),
# and several httpx transport errors — leaving "[A2A_ERROR] " with
# no diagnostic. Fall back to the exception class name so logs
# always carry at least one actionable breadcrumb.
detail = str(e) or type(e).__name__
return f"{_A2A_ERROR_PREFIX}{detail}"
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry."""
try:
ws_id = get_validated_workspace_id(caller="a2a_client.get_peers")
except WorkspaceIdValidationError:
return []
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/{ws_id}/peers",
headers={"X-Workspace-ID": ws_id, **auth_headers()},
)
if resp.status_code == 200:
return resp.json()
return []
except Exception:
return []
async def get_workspace_info() -> dict:
"""Get this workspace's info from the platform."""
try:
ws_id = get_validated_workspace_id(caller="a2a_client.get_workspace_info")
except WorkspaceIdValidationError:
return {"error": "WORKSPACE_ID validation failed"}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{ws_id}",
headers=auth_headers(),
)
if resp.status_code == 200:
return resp.json()
return {"error": "not found"}
except Exception as e:
return {"error": str(e)}