fix(mcp): apply review fixes to HTTP/SSE transport (PR #5 follow-up) #10

Closed
infra-runtime-be wants to merge 1 commits from runtime/http-mcp-review-fixes into main

View File

@ -314,7 +314,7 @@ async def _handle_stdio():
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
"serverInfo": {"name": "molecule", "version": "1.0.0"},
},
})
@ -361,15 +361,7 @@ _connection_queues: dict[str, asyncio.Queue] = {}
_connection_lock = asyncio.Lock()
async def _sse_broadcaster(request_id: str, response: dict, conn_id: str):
"""Send a JSON-RPC response to a specific SSE connection."""
async with _connection_lock:
queue = _connection_queues.get(conn_id)
if queue is not None:
await queue.put(response)
async def _handle_http_mcp(request) -> dict:
async def _handle_http_mcp(request) -> dict | None:
"""Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response."""
try:
body = await request.json()
@ -386,7 +378,7 @@ async def _handle_http_mcp(request) -> dict:
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
"serverInfo": {"name": "molecule", "version": "1.0.0"},
},
}
@ -440,7 +432,7 @@ async def _run_http_server(port: int):
async def sse_handler(request):
"""GET endpoint — SSE stream for push-based responses."""
conn_id = str(uuid.uuid4())[:8]
conn_id = str(uuid.uuid4()) # full UUID to avoid collision across connections
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async with _connection_lock:
@ -452,8 +444,9 @@ async def _run_http_server(port: int):
while True:
response = await asyncio.wait_for(queue.get(), timeout=300)
yield f"event: message\ndata: {json.dumps(response)}\n\n"
# Emit a heartbeat when the queue is drained (connection alive but idle)
if queue.empty():
yield "event: heartbeat\ndata: {}\n\n"
yield "event: heartbeat\ndata: null\n\n"
except asyncio.TimeoutError:
pass
finally: