Audited every a2a-sdk surface in workspace/ against the installed
1.0.2 wheel. Found and fixed:
main.py (the live workspace startup path):
• create_jsonrpc_routes(rpc_url='/', enable_v0_3_compat=True) —
rpc_url required in 1.x; v0.3 compat enables inbound legacy
clients (`"role": "user"` lowercase) without forcing them to
upgrade. Pairs with the outbound rename below.
a2a_executor.py:
• TextPart/FilePart/FileWithUri removed in 1.x. Part is now a
flat proto message: Part(text=…) / Part(url=…, filename=…,
media_type=…). Updated the file-attachment branch (only
reachable when an agent emits files; the harness's PONG path
didn't exercise this, but it's a latent crash).
• Message field names: messageId/taskId/contextId →
message_id/task_id/context_id (proto3 snake_case).
• Role enum: Role.agent → Role.ROLE_AGENT (proto enum).
Outbound JSON-RPC payloads (8 files):
• "role": "user" → "role": "ROLE_USER" — proto3 JSON serialization
is strict about enum values. Sites: a2a_client, a2a_cli, main
(initial+idle prompts), heartbeat, builtin_tools/a2a_tools,
builtin_tools/delegation. Wire JSON keys stay camelCase
(proto3 default), only the role enum value changed.
google-adk/adapter.py:
• new_agent_text_message → new_text_message (4 sites). This
adapter's directory has a hyphen, so it can't be imported as a
Python module — effectively dead code, but the wheel ships the
file and a future fix should keep it correct against 1.x.
Why one PR instead of seven: every previous a2a-sdk migration find
landed as its own publish → cascade → harness → next-bug cycle.
Today's audit ran every a2a-sdk symbol/type/method in workspace/
against the installed 1.0.2 wheel in a single sweep + tested the
critical paths (Message construction, Part construction, Role enum
parsing) against the actual SDK. Should be the last migration PR.
Verified locally:
python3 scripts/build_runtime_package.py --version 0.1.99 \
--out /tmp/build-final
pip install /tmp/build-final
python -c "import molecule_runtime.main; \
from molecule_runtime.a2a_executor import LangGraphA2AExecutor"
→ ✓ all imports clean against a2a-sdk 1.0.2
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
91 lines
3.4 KiB
Python
91 lines
3.4 KiB
Python
"""A2A communication tools — framework-agnostic delegation and peer discovery.
|
|
|
|
These are plain async functions that any adapter can wrap in its native tool format.
|
|
The LangChain @tool versions are in tools/delegation.py.
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
|
|
import httpx
|
|
|
|
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
|
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
|
|
|
|
|
|
async def list_peers() -> list[dict]:
|
|
"""Get this workspace's peers from the platform registry."""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
resp = await client.get(f"{PLATFORM_URL}/registry/{WORKSPACE_ID}/peers")
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
return []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
async def delegate_task(workspace_id: str, task: str) -> str:
|
|
"""Send a task to a peer workspace via A2A and return the response text."""
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
# Discover target URL
|
|
try:
|
|
resp = await client.get(
|
|
f"{PLATFORM_URL}/registry/discover/{workspace_id}",
|
|
headers={"X-Workspace-ID": WORKSPACE_ID},
|
|
)
|
|
if resp.status_code != 200:
|
|
return f"Error: cannot reach workspace {workspace_id} (status {resp.status_code})"
|
|
target_url = resp.json().get("url", "")
|
|
if not target_url:
|
|
return f"Error: workspace {workspace_id} has no URL"
|
|
except Exception as e:
|
|
return f"Error discovering workspace: {e}"
|
|
|
|
# Send A2A message. X-Workspace-ID identifies us as the source —
|
|
# without it the platform's a2a_receive logger writes
|
|
# source_id=NULL and the recipient's My Chat tab renders the
|
|
# delegation as if a human user typed it. Same hazard fixed
|
|
# in heartbeat.py / a2a_client.py / main.py initial+idle flows.
|
|
try:
|
|
a2a_resp = await client.post(
|
|
target_url,
|
|
headers={"X-Workspace-ID": WORKSPACE_ID},
|
|
json={
|
|
"jsonrpc": "2.0",
|
|
"id": str(uuid.uuid4()),
|
|
"method": "message/send",
|
|
"params": {
|
|
"message": {
|
|
"role": "ROLE_USER",
|
|
"messageId": str(uuid.uuid4()),
|
|
"parts": [{"kind": "text", "text": task}],
|
|
},
|
|
},
|
|
},
|
|
)
|
|
data = a2a_resp.json()
|
|
if "result" in data:
|
|
parts = data["result"].get("parts", [])
|
|
return parts[0].get("text", "(no text)") if parts else str(data["result"])
|
|
elif "error" in data:
|
|
return f"Error: {data['error'].get('message', str(data['error']))}"
|
|
return str(data)
|
|
except Exception as e:
|
|
return f"Error sending A2A message: {e}"
|
|
|
|
|
|
async def get_peers_summary() -> str:
|
|
"""Return a formatted string of available peers for system prompts."""
|
|
peers = await list_peers()
|
|
if not peers:
|
|
return "No peers available."
|
|
lines = []
|
|
for p in peers:
|
|
name = p.get("name", "Unknown")
|
|
pid = p.get("id", "")
|
|
role = p.get("role", "")
|
|
status = p.get("status", "")
|
|
lines.append(f"- {name} (ID: {pid}) — {role} [{status}]")
|
|
return "Available peers:\n" + "\n".join(lines)
|