From b9b75b1bc697eb52d51663806a608bb8ca65da3e Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Wed, 13 May 2026 04:30:47 +0000 Subject: [PATCH] feat(workspace): add HTTP/SSE transport to a2a_mcp_server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port HTTP/SSE transport (from workspace-runtime PR #16) to the canonical monorepo source. This enables the Hermes MCP-native runtime to communicate with the A2A platform tools via HTTP/SSE instead of stdio. Changes: - Add _run_http_server(port) async function using starlette + uvicorn - POST /mcp: receive JSON-RPC requests, return JSON or queue for SSE - GET /mcp/stream: SSE stream for push-based responses - GET /health: returns {"ok": true, "transport": "http+sse", "port": N} - cli_main() now accepts (transport, port) params for dual-mode startup - argparse added to __main__ guard for CLI: --transport stdio|http, --port N uvicorn/starlette are lazy-imported inside _run_http_server() so the stdio path (the common case) doesn't fail if they're not installed. Closes #16 (workspace-runtime) — fix now goes through monorepo. Co-Authored-By: Claude Opus 4.7 --- workspace/a2a_mcp_server.py | 155 ++++++++++++++++++++++++++++++++++-- 1 file changed, 148 insertions(+), 7 deletions(-) diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 07f04f32..f02c4ba0 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -12,12 +12,14 @@ Environment variables (set by the workspace container): PLATFORM_URL — platform API base URL (e.g. http://platform:8080) """ +import argparse import asyncio import json import logging import os import stat import sys +import uuid from typing import Callable # Top-level (not inside main()) so the wheel rewriter expands this to @@ -765,24 +767,163 @@ async def main(): # pragma: no cover break -def cli_main() -> None: # pragma: no cover - """Synchronous wrapper around the async MCP stdio loop. +# --- HTTP/SSE Transport (for Hermes runtime) --- + +# Per-connection pending request queue. +# Maps connection-id → asyncio.Queue of JSON-RPC responses. +_http_connection_queues: dict[str, asyncio.Queue] = {} +_http_connection_lock = asyncio.Lock() + + +async def _handle_http_mcp(request) -> dict | None: + """Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response dict, or None for notifications.""" + try: + body = await request.json() + except Exception: + return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}} + + req_id = body.get("id") + method = body.get("method", "") + + if method == "initialize": + return { + "jsonrpc": "2.0", + "id": req_id, + "result": _build_initialize_result(), + } + elif method == "notifications/initialized": + return None # No response needed + elif method == "tools/list": + return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": TOOLS}} + elif method == "tools/call": + params = body.get("params", {}) + tool_name = params.get("name", "") + tool_args = params.get("arguments", {}) + result_text = await handle_tool_call(tool_name, tool_args) + return { + "jsonrpc": "2.0", + "id": req_id, + "result": {"content": [{"type": "text", "text": result_text}]}, + } + else: + return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Method not found: {method}"}} + + +async def _run_http_server(port: int) -> None: + """Run MCP server over HTTP/SSE — compatible with Hermes MCP-native agents.""" + try: + from starlette.applications import Starlette # noqa: F401 + from starlette.routing import Route # noqa: F401 + from starlette.responses import JSONResponse, Response # noqa: F401 + except ImportError: + logger.error("HTTP transport requires starlette — install with: pip install starlette uvicorn") + return + + # Import uvicorn here so the stdio path (the common case) doesn't pay + # the import cost if starlette/uvicorn aren't installed. + import uvicorn # noqa: F401 + + _http_connection_queues.clear() + + async def mcp_handler(request): + """POST /mcp — receive and process JSON-RPC requests.""" + conn_id = request.headers.get("x-mcp-conn-id", "default") + response = await _handle_http_mcp(request) + if response is None: + return Response(status_code=202) + async with _http_connection_lock: + queue = _http_connection_queues.get(conn_id) + if queue is not None and not queue.full(): + await queue.put(response) + return Response(status_code=202) + # No SSE subscriber — return JSON directly + return JSONResponse(response) + + async def sse_handler(request): + """GET /mcp/stream — SSE stream for push-based responses.""" + conn_id = str(uuid.uuid4()) + queue: asyncio.Queue = asyncio.Queue(maxsize=100) + async with _http_connection_lock: + _http_connection_queues[conn_id] = queue + + async def event_stream(): + yield f"event: connected\ndata: {json.dumps({'conn_id': conn_id})}\n\n" + try: + while True: + response = await asyncio.wait_for(queue.get(), timeout=300) + yield f"event: message\ndata: {json.dumps(response)}\n\n" + if queue.empty(): + yield "event: heartbeat\ndata: null\n\n" + except asyncio.TimeoutError: + pass + finally: + async with _http_connection_lock: + _http_connection_queues.pop(conn_id, None) + + return Response( + event_stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + async def health_handler(_request): + return JSONResponse({"ok": True, "transport": "http+sse", "port": port}) + + app = Starlette( + routes=[ + Route("/mcp", mcp_handler, methods=["POST"]), + Route("/mcp/stream", sse_handler, methods=["GET"]), + Route("/health", health_handler), + ] + ) + config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning") + server = uvicorn.Server(config) + logger.info(f"A2A MCP HTTP server listening on http://127.0.0.1:{port}/mcp") + await server.serve() + + +def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no cover + """Synchronous wrapper — selects stdio or HTTP transport. Called by ``mcp_cli.main`` (the ``molecule-mcp`` console-script entry point in scripts/build_runtime_package.py) AFTER env validation and the standalone register + heartbeat thread setup. Direct callers (in-container code that already validated env and - runs heartbeat.py separately) can also invoke this — it's the - smallest possible "run the MCP stdio JSON-RPC loop" surface. + runs heartbeat.py separately) can also invoke this. Wheel-smoke gates in scripts/wheel_smoke.py pin the importability of this name (alongside ``mcp_cli.main``) so a silent rename can't break every external-runtime operator's MCP install — the 0.1.16 ``main_sync`` rename incident is the cautionary precedent. + + Args: + transport: "stdio" (default) or "http" (HTTP+SSE for Hermes). + port: TCP port for HTTP transport (default 9100). """ - _assert_stdio_is_pipe_compatible() - asyncio.run(main()) + if transport == "http": + asyncio.run(_run_http_server(port)) + else: + _assert_stdio_is_pipe_compatible() + asyncio.run(main()) if __name__ == "__main__": # pragma: no cover - cli_main() + parser = argparse.ArgumentParser(description="A2A MCP Server") + parser.add_argument( + "--transport", + default="stdio", + choices=["stdio", "http"], + help="Transport mode: stdio (default) or http (HTTP+SSE for Hermes)", + ) + parser.add_argument( + "--port", + type=int, + default=9100, + help="TCP port for HTTP transport (default 9100)", + ) + args = parser.parse_args() + cli_main(transport=args.transport, port=args.port) -- 2.45.2