molecule-core/workspace-template/a2a_client.py
Backend Engineer 9649311d51 fix(security): N1 — add auth headers to all platform calls in Python callers
IMPACT WITHOUT THIS FIX: deploying PR #31 (WorkspaceAuth middleware on
/workspaces/*) without this patch causes EVERY delegation cycle to silently
break — the heartbeat poll returns 401, the self-message A2A POST returns
401, agents never wake up after task completion, and memory consolidation
stops. The entire multi-agent coordination system degrades to single-shot
interactions with no result delivery.

Changes (all using the existing platform_auth.auth_headers() pattern
already used for POST /registry/heartbeat):

heartbeat.py — 5 calls fixed:
  - GET  /workspaces/:id/delegations     (delegation poll)
  - GET  /workspaces/:id                 (self workspace info for parent lookup)
  - GET  /workspaces/{parent_id}         (parent workspace name lookup)
  - POST /workspaces/:id/a2a             (self-message to wake agent on results)
  - POST /workspaces/:id/notify          (canvas delegation result notification)
  Also moved `from platform_auth import auth_headers` from inline (per-call)
  to module-level import so _check_delegations() can use it without re-importing.

consolidation.py — 4 calls fixed:
  - GET    /workspaces/:id/memories      (fetch memories for consolidation)
  - POST   /workspaces/:id/memories      (write consolidated summary — agent path)
  - DELETE /workspaces/:id/memories/:id  (delete original memories post-consolidation)
  - POST   /workspaces/:id/memories      (write consolidated summary — fallback path)

a2a_client.py — 1 call fixed:
  - GET /workspaces/:id                  (get_workspace_info())

⚠️  DEPLOYMENT NOTE: This PR MUST be merged and deployed at the same time as
PR #31 (WorkspaceAuth middleware). Deploying #31 without this fix will
immediately break all delegation result delivery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 08:37:50 +00:00

108 lines
3.9 KiB
Python

"""A2A protocol client — peer discovery, messaging, and workspace info.
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import logging
import os
import uuid
import httpx
from platform_auth import auth_headers
logger = logging.getLogger(__name__)
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://platform:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
async def discover_peer(target_id: str) -> dict | None:
"""Discover a peer workspace's URL via the platform registry."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{target_id}",
headers={"X-Workspace-ID": WORKSPACE_ID},
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
logger.error(f"Discovery failed for {target_id}: {e}")
return None
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace."""
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try:
resp = await client.post(
target_url,
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
return f"{_A2A_ERROR_PREFIX}{data['error'].get('message', 'unknown')}"
return str(data)
except Exception as e:
return f"{_A2A_ERROR_PREFIX}{e}"
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers")
if resp.status_code == 200:
return resp.json()
return []
except Exception:
return []
async def get_workspace_info() -> dict:
"""Get this workspace's info from the platform."""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}",
headers=auth_headers(),
)
if resp.status_code == 200:
return resp.json()
return {"error": "not found"}
except Exception as e:
return {"error": str(e)}