feat(workspace): add HTTP/SSE transport to a2a_mcp_server
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 1m5s
E2E API Smoke Test / detect-changes (pull_request) Successful in 58s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 50s
Harness Replays / detect-changes (pull_request) Successful in 17s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 15s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 53s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 21s
gate-check-v3 / gate-check (pull_request) Successful in 8s
qa-review / approved (pull_request) Successful in 6s
publish-runtime-autobump / pr-validate (pull_request) Successful in 39s
security-review / approved (pull_request) Successful in 6s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m18s
sop-checklist-gate / gate (pull_request) Successful in 8s
sop-tier-check / tier-check (pull_request) Successful in 8s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m39s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m44s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m27s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m0s
audit-force-merge / audit (pull_request) Has been skipped
CI / Shellcheck (E2E scripts) (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Harness Replays / Harness Replays (pull_request) Successful in 6s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m33s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3m12s
CI / Python Lint & Test (pull_request) Failing after 7m4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m7s
CI / Platform (Go) (pull_request) Successful in 7m49s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Failing after 1s
Some checks failed
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 1m5s
E2E API Smoke Test / detect-changes (pull_request) Successful in 58s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 50s
Harness Replays / detect-changes (pull_request) Successful in 17s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 15s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 53s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 11s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 21s
gate-check-v3 / gate-check (pull_request) Successful in 8s
qa-review / approved (pull_request) Successful in 6s
publish-runtime-autobump / pr-validate (pull_request) Successful in 39s
security-review / approved (pull_request) Successful in 6s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m18s
sop-checklist-gate / gate (pull_request) Successful in 8s
sop-tier-check / tier-check (pull_request) Successful in 8s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m39s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m44s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m27s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m0s
audit-force-merge / audit (pull_request) Has been skipped
CI / Shellcheck (E2E scripts) (pull_request) Successful in 4s
CI / Canvas (Next.js) (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 6s
Harness Replays / Harness Replays (pull_request) Successful in 6s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m33s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3m12s
CI / Python Lint & Test (pull_request) Failing after 7m4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2m7s
CI / Platform (Go) (pull_request) Successful in 7m49s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Failing after 1s
Port HTTP/SSE transport (from workspace-runtime PR #16) to the canonical monorepo source. This enables the Hermes MCP-native runtime to communicate with the A2A platform tools via HTTP/SSE instead of stdio. Changes: - Add _run_http_server(port) async function using starlette + uvicorn - POST /mcp: receive JSON-RPC requests, return JSON or queue for SSE - GET /mcp/stream: SSE stream for push-based responses - GET /health: returns {"ok": true, "transport": "http+sse", "port": N} - cli_main() now accepts (transport, port) params for dual-mode startup - argparse added to __main__ guard for CLI: --transport stdio|http, --port N uvicorn/starlette are lazy-imported inside _run_http_server() so the stdio path (the common case) doesn't fail if they're not installed. Closes #16 (workspace-runtime) — fix now goes through monorepo. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
06af0bbeb3
commit
b9b75b1bc6
@ -12,12 +12,14 @@ Environment variables (set by the workspace container):
|
||||
PLATFORM_URL — platform API base URL (e.g. http://platform:8080)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import uuid
|
||||
from typing import Callable
|
||||
|
||||
# Top-level (not inside main()) so the wheel rewriter expands this to
|
||||
@ -765,24 +767,163 @@ async def main(): # pragma: no cover
|
||||
break
|
||||
|
||||
|
||||
def cli_main() -> None: # pragma: no cover
|
||||
"""Synchronous wrapper around the async MCP stdio loop.
|
||||
# --- HTTP/SSE Transport (for Hermes runtime) ---
|
||||
|
||||
# Per-connection pending request queue.
|
||||
# Maps connection-id → asyncio.Queue of JSON-RPC responses.
|
||||
_http_connection_queues: dict[str, asyncio.Queue] = {}
|
||||
_http_connection_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _handle_http_mcp(request) -> dict | None:
|
||||
"""Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response dict, or None for notifications."""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}
|
||||
|
||||
req_id = body.get("id")
|
||||
method = body.get("method", "")
|
||||
|
||||
if method == "initialize":
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": _build_initialize_result(),
|
||||
}
|
||||
elif method == "notifications/initialized":
|
||||
return None # No response needed
|
||||
elif method == "tools/list":
|
||||
return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": TOOLS}}
|
||||
elif method == "tools/call":
|
||||
params = body.get("params", {})
|
||||
tool_name = params.get("name", "")
|
||||
tool_args = params.get("arguments", {})
|
||||
result_text = await handle_tool_call(tool_name, tool_args)
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": {"content": [{"type": "text", "text": result_text}]},
|
||||
}
|
||||
else:
|
||||
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Method not found: {method}"}}
|
||||
|
||||
|
||||
async def _run_http_server(port: int) -> None:
|
||||
"""Run MCP server over HTTP/SSE — compatible with Hermes MCP-native agents."""
|
||||
try:
|
||||
from starlette.applications import Starlette # noqa: F401
|
||||
from starlette.routing import Route # noqa: F401
|
||||
from starlette.responses import JSONResponse, Response # noqa: F401
|
||||
except ImportError:
|
||||
logger.error("HTTP transport requires starlette — install with: pip install starlette uvicorn")
|
||||
return
|
||||
|
||||
# Import uvicorn here so the stdio path (the common case) doesn't pay
|
||||
# the import cost if starlette/uvicorn aren't installed.
|
||||
import uvicorn # noqa: F401
|
||||
|
||||
_http_connection_queues.clear()
|
||||
|
||||
async def mcp_handler(request):
|
||||
"""POST /mcp — receive and process JSON-RPC requests."""
|
||||
conn_id = request.headers.get("x-mcp-conn-id", "default")
|
||||
response = await _handle_http_mcp(request)
|
||||
if response is None:
|
||||
return Response(status_code=202)
|
||||
async with _http_connection_lock:
|
||||
queue = _http_connection_queues.get(conn_id)
|
||||
if queue is not None and not queue.full():
|
||||
await queue.put(response)
|
||||
return Response(status_code=202)
|
||||
# No SSE subscriber — return JSON directly
|
||||
return JSONResponse(response)
|
||||
|
||||
async def sse_handler(request):
|
||||
"""GET /mcp/stream — SSE stream for push-based responses."""
|
||||
conn_id = str(uuid.uuid4())
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
||||
async with _http_connection_lock:
|
||||
_http_connection_queues[conn_id] = queue
|
||||
|
||||
async def event_stream():
|
||||
yield f"event: connected\ndata: {json.dumps({'conn_id': conn_id})}\n\n"
|
||||
try:
|
||||
while True:
|
||||
response = await asyncio.wait_for(queue.get(), timeout=300)
|
||||
yield f"event: message\ndata: {json.dumps(response)}\n\n"
|
||||
if queue.empty():
|
||||
yield "event: heartbeat\ndata: null\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
finally:
|
||||
async with _http_connection_lock:
|
||||
_http_connection_queues.pop(conn_id, None)
|
||||
|
||||
return Response(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
async def health_handler(_request):
|
||||
return JSONResponse({"ok": True, "transport": "http+sse", "port": port})
|
||||
|
||||
app = Starlette(
|
||||
routes=[
|
||||
Route("/mcp", mcp_handler, methods=["POST"]),
|
||||
Route("/mcp/stream", sse_handler, methods=["GET"]),
|
||||
Route("/health", health_handler),
|
||||
]
|
||||
)
|
||||
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
|
||||
server = uvicorn.Server(config)
|
||||
logger.info(f"A2A MCP HTTP server listening on http://127.0.0.1:{port}/mcp")
|
||||
await server.serve()
|
||||
|
||||
|
||||
def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no cover
|
||||
"""Synchronous wrapper — selects stdio or HTTP transport.
|
||||
|
||||
Called by ``mcp_cli.main`` (the ``molecule-mcp`` console-script
|
||||
entry point in scripts/build_runtime_package.py) AFTER env
|
||||
validation and the standalone register + heartbeat thread setup.
|
||||
Direct callers (in-container code that already validated env and
|
||||
runs heartbeat.py separately) can also invoke this — it's the
|
||||
smallest possible "run the MCP stdio JSON-RPC loop" surface.
|
||||
runs heartbeat.py separately) can also invoke this.
|
||||
|
||||
Wheel-smoke gates in scripts/wheel_smoke.py pin the importability
|
||||
of this name (alongside ``mcp_cli.main``) so a silent rename can't
|
||||
break every external-runtime operator's MCP install — the 0.1.16
|
||||
``main_sync`` rename incident is the cautionary precedent.
|
||||
|
||||
Args:
|
||||
transport: "stdio" (default) or "http" (HTTP+SSE for Hermes).
|
||||
port: TCP port for HTTP transport (default 9100).
|
||||
"""
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
asyncio.run(main())
|
||||
if transport == "http":
|
||||
asyncio.run(_run_http_server(port))
|
||||
else:
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
cli_main()
|
||||
parser = argparse.ArgumentParser(description="A2A MCP Server")
|
||||
parser.add_argument(
|
||||
"--transport",
|
||||
default="stdio",
|
||||
choices=["stdio", "http"],
|
||||
help="Transport mode: stdio (default) or http (HTTP+SSE for Hermes)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=9100,
|
||||
help="TCP port for HTTP transport (default 9100)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
cli_main(transport=args.transport, port=args.port)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user