When an exception's str() is empty (bare TimeoutError(), BrokenPipeError(),
some httpx transport errors) `f"{_A2A_ERROR_PREFIX}{e}"` produced
`"[A2A_ERROR] "` with a trailing space and zero diagnostic context,
masking the real cause of peer-delegation failures in activity_logs.
Observed on main monorepo: 22+ occurrences in 75 min across 7 leads
during the MiniMax M2.7 trial rate-limit episode — zero breadcrumbs
to route the debug from.
Fix:
- Exception branch: fall back to `type(e).__name__` when str(e) is empty
- Error branch: include JSON-RPC `error.code` alongside message when present
Tests: test_a2a_error_observability.py covers both the bare-exception
path (must surface class name) and the message-passthrough path (must
preserve existing useful messages).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
131 lines
5.0 KiB
Python
131 lines
5.0 KiB
Python
"""A2A protocol client — peer discovery, messaging, and workspace info.
|
|
|
|
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
|
|
a2a_tools and a2a_mcp_server can import them from a single place.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
|
|
import httpx
|
|
|
|
from .builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
|
|
from .platform_auth import auth_headers
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
|
|
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
|
|
|
|
# Cache workspace ID → name mappings (populated by list_peers calls)
|
|
_peer_names: dict[str, str] = {}
|
|
|
|
# Sentinel prefix for errors originating from send_a2a_message / child agents.
|
|
# Used by delegate_task to distinguish real errors from normal response text.
|
|
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
|
|
|
|
|
async def discover_peer(target_id: str) -> dict | None:
|
|
"""Discover a peer workspace's URL via the platform registry."""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/registry/discover/{target_id}",
|
|
headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Discovery failed for {target_id}: {e}")
|
|
return None
|
|
|
|
|
|
async def send_a2a_message(target_url: str, message: str) -> str:
|
|
"""Send an A2A message/send to a target workspace."""
|
|
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
|
# a hung upstream to block the agent indefinitely. Use a generous but bounded
|
|
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
|
|
async with httpx.AsyncClient(
|
|
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
|
) as client:
|
|
try:
|
|
resp = await client.post(
|
|
target_url,
|
|
headers=auth_headers(),
|
|
json={
|
|
"jsonrpc": "2.0",
|
|
"id": str(uuid.uuid4()),
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "user",
|
|
"messageId": str(uuid.uuid4()),
|
|
"parts": [{"kind": "text", "text": message}],
|
|
}
|
|
},
|
|
},
|
|
)
|
|
data = resp.json()
|
|
if "result" in data:
|
|
parts = data["result"].get("parts", [])
|
|
text = parts[0].get("text", "") if parts else "(no response)"
|
|
# Tag child-reported errors so the caller can detect them reliably
|
|
if text.startswith("Agent error:"):
|
|
return f"{_A2A_ERROR_PREFIX}{text}"
|
|
return text
|
|
elif "error" in data:
|
|
err = data["error"]
|
|
msg = err.get("message") or "unknown"
|
|
code = err.get("code")
|
|
if code is not None:
|
|
return f"{_A2A_ERROR_PREFIX}[code={code}] {msg}"
|
|
return f"{_A2A_ERROR_PREFIX}{msg}"
|
|
return str(data)
|
|
except Exception as e:
|
|
# #51: str(e) is empty for bare TimeoutError(), BrokenPipeError(),
|
|
# and several httpx transport errors — leaving "[A2A_ERROR] " with
|
|
# no diagnostic. Fall back to the exception class name so logs
|
|
# always carry at least one actionable breadcrumb.
|
|
detail = str(e) or type(e).__name__
|
|
return f"{_A2A_ERROR_PREFIX}{detail}"
|
|
|
|
|
|
async def get_peers() -> list[dict]:
|
|
"""Get this workspace's peers from the platform registry."""
|
|
try:
|
|
ws_id = get_validated_workspace_id(caller="a2a_client.get_peers")
|
|
except WorkspaceIdValidationError:
|
|
return []
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/registry/{ws_id}/peers",
|
|
headers={"X-Workspace-ID": ws_id, **auth_headers()},
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
async def get_workspace_info() -> dict:
|
|
"""Get this workspace's info from the platform."""
|
|
try:
|
|
ws_id = get_validated_workspace_id(caller="a2a_client.get_workspace_info")
|
|
except WorkspaceIdValidationError:
|
|
return {"error": "WORKSPACE_ID validation failed"}
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/workspaces/{ws_id}",
|
|
headers=auth_headers(),
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
return {"error": "not found"}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|