From a4b172c9b865f5540b117f6c18be918fb9f85432 Mon Sep 17 00:00:00 2001 From: Molecule AI Infra-Runtime-BE Date: Sun, 10 May 2026 08:46:08 +0000 Subject: [PATCH] fix(mcp): apply review fixes to HTTP/SSE transport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four corrections to PR #5 (fix/hermes-mcp-platform-tools): 1. **Remove dead code** — `_sse_broadcaster` was defined but never called. The POST handler pushes directly via the lock; this function only confused the reader. 2. **Full UUID for connection IDs** — `conn_id = str(uuid.uuid4())[:8]` truncates to 8 chars, increasing collision risk across concurrent connections. Use the full UUID instead; the logger/telemetry can still truncate if needed. 3. **Fix heartbeat data format** — `data: {}` emits a Python dict repr, not valid JSON. Change to `data: null` (SSE event data field) which is correct JSON and semantically meaningful (no payload, just keepalive). 4. **Unify serverInfo.name** — HTTP transport returned `"a2a-delegation"` while stdio returns `"molecule"`. Both now use `"molecule"` for consistency across transport modes. Co-Authored-By: Claude Opus 4.7 --- molecule_runtime/a2a_mcp_server.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/molecule_runtime/a2a_mcp_server.py b/molecule_runtime/a2a_mcp_server.py index 22a5497..f0b6668 100644 --- a/molecule_runtime/a2a_mcp_server.py +++ b/molecule_runtime/a2a_mcp_server.py @@ -314,7 +314,7 @@ async def _handle_stdio(): "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": False}}, - "serverInfo": {"name": "a2a-delegation", "version": "1.0.0"}, + "serverInfo": {"name": "molecule", "version": "1.0.0"}, }, }) @@ -361,15 +361,7 @@ _connection_queues: dict[str, asyncio.Queue] = {} _connection_lock = asyncio.Lock() -async def _sse_broadcaster(request_id: str, response: dict, conn_id: str): - """Send a JSON-RPC response to a specific SSE connection.""" - async with _connection_lock: - queue = _connection_queues.get(conn_id) - if queue is not None: - await queue.put(response) - - -async def _handle_http_mcp(request) -> dict: +async def _handle_http_mcp(request) -> dict | None: """Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response.""" try: body = await request.json() @@ -386,7 +378,7 @@ async def _handle_http_mcp(request) -> dict: "result": { "protocolVersion": "2024-11-05", "capabilities": {"tools": {"listChanged": False}}, - "serverInfo": {"name": "a2a-delegation", "version": "1.0.0"}, + "serverInfo": {"name": "molecule", "version": "1.0.0"}, }, } @@ -440,7 +432,7 @@ async def _run_http_server(port: int): async def sse_handler(request): """GET endpoint — SSE stream for push-based responses.""" - conn_id = str(uuid.uuid4())[:8] + conn_id = str(uuid.uuid4()) # full UUID to avoid collision across connections queue: asyncio.Queue = asyncio.Queue(maxsize=100) async with _connection_lock: @@ -452,8 +444,9 @@ async def _run_http_server(port: int): while True: response = await asyncio.wait_for(queue.get(), timeout=300) yield f"event: message\ndata: {json.dumps(response)}\n\n" + # Emit a heartbeat when the queue is drained (connection alive but idle) if queue.empty(): - yield "event: heartbeat\ndata: {}\n\n" + yield "event: heartbeat\ndata: null\n\n" except asyncio.TimeoutError: pass finally: -- 2.45.2