molecule-core/workspace/a2a_mcp_server.py
Hongming Wang b54ceb799f fix: address 5-axis review findings on PR #2413
Critical:
- ExternalConnectModal.tsx: filledUniversalMcp substitution searched
  for WORKSPACE_AUTH_TOKEN but the snippet's placeholder is now
  MOLECULE_WORKSPACE_TOKEN (changed in the previous polish commit
  876c0bfc). Operators copy-pasting the MCP tab would have gotten a
  literal "<paste from create response>" instead of the token. Fix
  the substitution to match the new placeholder name.

Important:
- mcp_cli._platform_register: 401/403 from initial register now hard-
  exits with code 3 + an actionable stderr message pointing the
  operator at the canvas Tokens tab. Pre-fix: warning log + continue,
  which made a bad-token startup silently fail (heartbeat 401's
  forever, every tool call also 401's, no clear surfacing in the
  operator's MCP client). 500/503 still log + continue (transient
  platform blips shouldn't abort the MCP loop).
- a2a_mcp_server.cli_main docstring: removed stale claim that this is
  the wheel's console-script entry-point target. The actual target is
  mcp_cli.main since 2026-04-30. Wheel-smoke pins both names so the
  functionality was correct, but the doc was lying.

Test coverage: 3 new mcp_cli tests:
  - register 401 exits code=3 + stderr mentions canvas Tokens tab
  - register 403 (C18 hijack rejection) takes same path
  - register 500/503 does NOT exit — only auth errors hard-fail

Findings deferred to follow-up (acceptable per review rubric):
  - Code dedup across mcp_cli / heartbeat.py / molecule_agent SDK
  - Pooled httpx.Client for connection reuse
  - Heartbeat exponential backoff
  - Token-resolution ordering parity (env-first vs file-first)
    between mcp_cli.main and platform_auth.get_token

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:06:59 -07:00

224 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""A2A MCP Server — runs inside each workspace container.
Exposes A2A delegation, peer discovery, and workspace info as MCP tools
so CLI-based runtimes (Claude Code, Codex) can communicate with other workspaces.
Launched automatically by main.py for CLI runtimes. Runs on stdio transport
and is configured as a local MCP server for the claude --print invocation.
Environment variables (set by the workspace container):
WORKSPACE_ID — this workspace's ID
PLATFORM_URL — platform API base URL (e.g. http://platform:8080)
"""
import asyncio
import json
import logging
import sys
from a2a_tools import (
tool_check_task_status,
tool_commit_memory,
tool_delegate_task,
tool_delegate_task_async,
tool_get_workspace_info,
tool_list_peers,
tool_recall_memory,
tool_send_message_to_user,
)
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
logger = logging.getLogger(__name__)
# Re-export constants and client functions so existing imports
# (e.g. tests that do `import a2a_mcp_server`) still work.
from a2a_client import ( # noqa: F401, E402
PLATFORM_URL,
WORKSPACE_ID,
_A2A_ERROR_PREFIX,
_peer_names,
discover_peer,
get_peers,
get_workspace_info,
send_a2a_message,
)
from a2a_tools import report_activity # noqa: F401, E402
# --- Tool definitions (schemas) ---
#
# Built once at import time from the platform_tools registry. The MCP
# `description` field is the spec's `short` line — that's the unified
# tool description used by both the MCP tool listing AND the bullet
# rendering in the agent-facing system-prompt section. The deeper
# `when_to_use` guidance is appended to the system prompt only (it's
# too long to live in MCP `description` without bloating every
# tool-list response the model sees).
TOOLS = [
{
"name": _spec.name,
"description": _spec.short,
"inputSchema": _spec.input_schema,
}
for _spec in _PLATFORM_TOOL_SPECS
]
# --- Tool dispatch ---
async def handle_tool_call(name: str, arguments: dict) -> str:
"""Handle a tool call and return the result as text."""
if name == "delegate_task":
return await tool_delegate_task(
arguments.get("workspace_id", ""),
arguments.get("task", ""),
)
elif name == "delegate_task_async":
return await tool_delegate_task_async(
arguments.get("workspace_id", ""),
arguments.get("task", ""),
)
elif name == "check_task_status":
return await tool_check_task_status(
arguments.get("workspace_id", ""),
arguments.get("task_id", ""),
)
elif name == "send_message_to_user":
raw_attachments = arguments.get("attachments")
attachments: list[str] | None = None
if isinstance(raw_attachments, list):
# Defensive: filter to strings only — claude-code SDK occasionally
# emits dicts here when the model misreads the schema. Drop the
# bad entries rather than 500 the whole call.
attachments = [p for p in raw_attachments if isinstance(p, str) and p]
return await tool_send_message_to_user(
arguments.get("message", ""),
attachments=attachments,
)
elif name == "list_peers":
return await tool_list_peers()
elif name == "get_workspace_info":
return await tool_get_workspace_info()
elif name == "commit_memory":
return await tool_commit_memory(
arguments.get("content", ""),
arguments.get("scope", "LOCAL"),
)
elif name == "recall_memory":
return await tool_recall_memory(
arguments.get("query", ""),
arguments.get("scope", ""),
)
return f"Unknown tool: {name}"
# --- MCP Server (JSON-RPC over stdio) ---
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
async def write_response(response: dict):
data = json.dumps(response) + "\n"
writer.write(data.encode())
await writer.drain()
buffer = ""
while True:
try:
chunk = await reader.read(65536)
if not chunk:
break
buffer += chunk.decode(errors="replace")
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError:
continue
req_id = request.get("id")
method = request.get("method", "")
if method == "initialize":
await write_response({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
},
})
elif method == "notifications/initialized":
pass # No response needed
elif method == "tools/list":
await write_response({
"jsonrpc": "2.0",
"id": req_id,
"result": {"tools": TOOLS},
})
elif method == "tools/call":
params = request.get("params", {})
tool_name = params.get("name", "")
tool_args = params.get("arguments", {})
result_text = await handle_tool_call(tool_name, tool_args)
await write_response({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [{"type": "text", "text": result_text}],
},
})
else:
await write_response({
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
})
except Exception as e:
logger.error(f"MCP server error: {e}")
break
def cli_main() -> None: # pragma: no cover
"""Synchronous wrapper around the async MCP stdio loop.
Called by ``mcp_cli.main`` (the ``molecule-mcp`` console-script
entry point in scripts/build_runtime_package.py) AFTER env
validation and the standalone register + heartbeat thread setup.
Direct callers (in-container code that already validated env and
runs heartbeat.py separately) can also invoke this — it's the
smallest possible "run the MCP stdio JSON-RPC loop" surface.
Wheel-smoke gates in scripts/wheel_smoke.py pin the importability
of this name (alongside ``mcp_cli.main``) so a silent rename can't
break every external-runtime operator's MCP install — the 0.1.16
``main_sync`` rename incident is the cautionary precedent.
"""
asyncio.run(main())
if __name__ == "__main__": # pragma: no cover
cli_main()