fix(mcp): apply review fixes to HTTP/SSE transport (PR #5 follow-up) #10
@ -314,7 +314,7 @@ async def _handle_stdio():
|
||||
"result": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {"tools": {"listChanged": False}},
|
||||
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
|
||||
"serverInfo": {"name": "molecule", "version": "1.0.0"},
|
||||
},
|
||||
})
|
||||
|
||||
@ -361,15 +361,7 @@ _connection_queues: dict[str, asyncio.Queue] = {}
|
||||
_connection_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _sse_broadcaster(request_id: str, response: dict, conn_id: str):
|
||||
"""Send a JSON-RPC response to a specific SSE connection."""
|
||||
async with _connection_lock:
|
||||
queue = _connection_queues.get(conn_id)
|
||||
if queue is not None:
|
||||
await queue.put(response)
|
||||
|
||||
|
||||
async def _handle_http_mcp(request) -> dict:
|
||||
async def _handle_http_mcp(request) -> dict | None:
|
||||
"""Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response."""
|
||||
try:
|
||||
body = await request.json()
|
||||
@ -386,7 +378,7 @@ async def _handle_http_mcp(request) -> dict:
|
||||
"result": {
|
||||
"protocolVersion": "2024-11-05",
|
||||
"capabilities": {"tools": {"listChanged": False}},
|
||||
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
|
||||
"serverInfo": {"name": "molecule", "version": "1.0.0"},
|
||||
},
|
||||
}
|
||||
|
||||
@ -440,7 +432,7 @@ async def _run_http_server(port: int):
|
||||
|
||||
async def sse_handler(request):
|
||||
"""GET endpoint — SSE stream for push-based responses."""
|
||||
conn_id = str(uuid.uuid4())[:8]
|
||||
conn_id = str(uuid.uuid4()) # full UUID to avoid collision across connections
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
||||
|
||||
async with _connection_lock:
|
||||
@ -452,8 +444,9 @@ async def _run_http_server(port: int):
|
||||
while True:
|
||||
response = await asyncio.wait_for(queue.get(), timeout=300)
|
||||
yield f"event: message\ndata: {json.dumps(response)}\n\n"
|
||||
# Emit a heartbeat when the queue is drained (connection alive but idle)
|
||||
if queue.empty():
|
||||
yield "event: heartbeat\ndata: {}\n\n"
|
||||
yield "event: heartbeat\ndata: null\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
finally:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user