From 5381f126dd1140d554f68f235d5f5b507c4835cb Mon Sep 17 00:00:00 2001 From: Molecule AI SDK-Dev Date: Thu, 23 Apr 2026 22:50:19 +0000 Subject: [PATCH] feat(sdk): add A2AServer for Phase 30.8b inbound A2A support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds molecule_agent.a2a_server.A2AServer — a bundled HTTP server that receives inbound A2A calls so remote agents can receive work from the platform without provisioning their own HTTP endpoint. - A2AServer: threaded HTTPServer on POST /a2a/inbound - Sync and async handlers both supported; async handlers run in a dedicated event loop per call to avoid "no event loop in thread" errors - 9 unit tests covering: lifecycle, routing, error handling, async path, concurrent requests - Exported from molecule_agent.__init__; client.py docstring updated - Closes GitHub #14 Co-Authored-By: Claude Sonnet 4.6 --- molecule_agent/__init__.py | 2 + molecule_agent/a2a_server.py | 229 +++++++++++++++++++++++++++++++++++ molecule_agent/client.py | 5 +- tests/test_a2a_server.py | 217 +++++++++++++++++++++++++++++++++ 4 files changed, 451 insertions(+), 2 deletions(-) create mode 100644 molecule_agent/a2a_server.py create mode 100644 tests/test_a2a_server.py diff --git a/molecule_agent/__init__.py b/molecule_agent/__init__.py index 49f0f98..372edf4 100644 --- a/molecule_agent/__init__.py +++ b/molecule_agent/__init__.py @@ -34,6 +34,7 @@ Design notes: from __future__ import annotations +from .a2a_server import A2AServer from .client import ( PeerInfo, RemoteAgentClient, @@ -46,6 +47,7 @@ from .client import ( from .__main__ import compute_plugin_sha256 __all__ = [ + "A2AServer", "RemoteAgentClient", "WorkspaceState", "PeerInfo", diff --git a/molecule_agent/a2a_server.py b/molecule_agent/a2a_server.py new file mode 100644 index 0000000..97e2d95 --- /dev/null +++ b/molecule_agent/a2a_server.py @@ -0,0 +1,229 @@ +"""A2A server for inbound agent calls. + +Bundled alongside :class:`molecule_agent.client.RemoteAgentClient` to +enable remote agents to receive A2A calls from the platform without +requiring the agent author to provision their own HTTP endpoint. + +Phase 30.8b contract — the server exposes ``POST /a2a/inbound`` which +the platform's ingress proxy calls when it needs to push work to a +registered remote agent. + +Usage:: + + from molecule_agent import RemoteAgentClient, A2AServer + + client = RemoteAgentClient(workspace_id="...", platform_url="...") + server = A2AServer( + agent_id=client.workspace_id, + inbound_url="https://my-agent.example.com/a2a/inbound", + message_handler=my_handler, + ) + + # Start server in background thread, then register with platform. + server.start_in_background() + client.reported_url = server.inbound_url # platform reaches this URL + token = client.register() + + # Heartbeat loop now reports a real URL instead of "remote://no-inbound". + client.run_heartbeat_loop() + + # Shutdown the server when the agent exits. + server.stop() + +The ``message_handler`` signature is:: + + async def my_handler(request: dict) -> dict: + '''Return an A2A-formatted response dict.''' + ... + +Handlers are invoked on the server's internal thread pool. +""" +from __future__ import annotations + +import json +import logging +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any, Callable, Awaitable +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +# Module-level HTTPServer instance so the handler can access server state. +_server: HTTPServer | None = None +_lock = threading.Lock() + + +# --------------------------------------------------------------------------- +# Handler +# --------------------------------------------------------------------------- + +class _A2AHandler(BaseHTTPRequestHandler): + """Handles ``POST /a2a/inbound`` requests. + + The request body is a JSON A2A task dispatch dict:: + + { + "task_id": "...", + "sender": "...", + "message": "...", + "idempotency_key": "...", + } + + The ``message_handler`` ( supplied at construction) is called with the + parsed dict and its return value is written as a JSON response:: + + 200 {"status": "ok", "result": } + 400 {"error": "bad request: ..."} + 500 {"error": "internal error: ..."} + """ + + protocol_version = "HTTP/1.1" + + def log_message(self, format: str, *args: Any) -> None: + """Suppress default stderr noise; use structured logging instead.""" + logger.debug("%s %s — %s", self.command, self.path, format % args) + + def log_error(self, format: str, *args: Any) -> None: + logger.warning("%s %s — %s", self.command, self.path, format % args) + + def _send_json(self, status: int, body: dict) -> None: + body_bytes = json.dumps(body).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body_bytes))) + self.end_headers() + if self.command != "HEAD": + self.wfile.write(body_bytes) + + def do_POST(self) -> None: + parsed = urlparse(self.path) + if parsed.path != "/a2a/inbound": + self._send_json(404, {"error": "not found"}) + return + + try: + content_length = int(self.headers.get("Content-Length", 0)) + if content_length == 0: + raise ValueError("empty body") + body = self.rfile.read(content_length) + payload = json.loads(body) + except (ValueError, json.JSONDecodeError) as exc: + self._send_json(400, {"error": f"bad request: {exc}"}) + return + + try: + result = _A2AHandler._message_handler(payload) + if isinstance(result, Awaitable): + # If the handler is async, run it synchronously in the server thread. + # Agents that want full async semantics should use an explicit ASGI app; + # this path covers the common case of a simple sync handler. + import asyncio + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(result) + finally: + loop.close() + self._send_json(200, {"status": "ok", "result": result}) + except Exception as exc: + logger.exception("message_handler raised: %s", exc) + self._send_json(500, {"error": f"internal error: {exc}"}) + + +# --------------------------------------------------------------------------- +# A2AServer +# --------------------------------------------------------------------------- + +class A2AServer: + """HTTP server that receives inbound A2A calls and dispatches them to a + handler running alongside :class:`~molecule_agent.client.RemoteAgentClient`. + + Args: + agent_id: The workspace / agent identifier. Used in log messages. + inbound_url: The URL the platform's ingress proxy uses to reach this + server. Must be a reachable host:port (or a publicly accessible + URL if a tunnel is in front). The value is typically assigned to + ``RemoteAgentClient.reported_url`` before registration so the + platform knows where to deliver inbound calls. + message_handler: Callable that receives a parsed A2A task dict and + returns a dict response. May be ``async def`` or regular ``def``. + host: Address to bind the HTTP server to. Defaults to ``"0.0.0.0"`` + (all interfaces); bind to ``"127.0.0.1"`` if behind a reverse + proxy or tunnel. + port: TCP port to listen on. ``0`` picks an available ephemeral port + (useful when the real public URL is managed by a proxy/tunnel). + """ + + def __init__( + self, + agent_id: str, + inbound_url: str, + message_handler: Callable[[dict], dict | Awaitable[dict]], + host: str = "0.0.0.0", + port: int = 0, + ) -> None: + self.agent_id = agent_id + self.inbound_url = inbound_url + self.host = host + self.port = port + self._handler = message_handler + self._server: HTTPServer | None = None + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + + # ------------------------------------------------------------------------- + # Lifecycle + # ------------------------------------------------------------------------- + + def start_in_background(self) -> None: + """Start the HTTP server in a daemon thread and return immediately. + + Call :py:meth:`stop` to shut it down cleanly. + """ + global _server + with _lock: + self._server = HTTPServer((self.host, self.port), _A2AHandler) + _server = self._server + _A2AHandler._server = self # type: ignore[attr-defined] + _A2AHandler._message_handler = self._handler # type: ignore[attr-defined] + + actual = self._server.server_address + logger.info( + "A2AServer for %s listening on %s:%s (inbound_url=%s)", + self.agent_id, actual[0], actual[1], self.inbound_url, + ) + + self._thread = threading.Thread(target=self._serve_forever, daemon=True) + self._thread.start() + + def _serve_forever(self) -> None: + assert self._server is not None + while not self._stop_event.is_set(): + try: + self._server.timeout = 0.5 + self._server.handle_request() + except Exception as exc: + if not self._stop_event.is_set(): + logger.warning("A2AServer handle_request raised: %s", exc) + + def stop(self, timeout: float = 5.0) -> None: + """Stop the HTTP server and join the background thread. + + Idempotent — safe to call multiple times. + """ + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=timeout) + self._thread = None + if self._server is not None: + try: + self._server.server_close() + except Exception as exc: + logger.warning("A2AServer server_close raised: %s", exc) + self._server = None + global _server + with _lock: + _server = None + + +__all__ = ["A2AServer"] diff --git a/molecule_agent/client.py b/molecule_agent/client.py index 7702d37..f72d51d 100644 --- a/molecule_agent/client.py +++ b/molecule_agent/client.py @@ -12,8 +12,9 @@ a Phase 30 endpoint: returns when the platform reports the workspace paused or deleted. No inbound A2A server is bundled here yet — that requires hosting an HTTP -endpoint the platform's proxy can reach, which is network-dependent. A -future 30.8b iteration will add an optional ``start_a2a_server()`` helper. +endpoint the platform's proxy can reach, which is network-dependent. +Use :class:`molecule_agent.a2a_server.A2AServer` to add inbound A2A support. +See that module for usage and the Phase 30.8b contract. """ from __future__ import annotations diff --git a/tests/test_a2a_server.py b/tests/test_a2a_server.py new file mode 100644 index 0000000..0eb0840 --- /dev/null +++ b/tests/test_a2a_server.py @@ -0,0 +1,217 @@ +"""Tests for molecule_agent.a2a_server.""" + +from __future__ import annotations + +import json +import threading +from http.client import HTTPConnection +from unittest.mock import MagicMock +import time + +import pytest + +from molecule_agent.a2a_server import A2AServer + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _post_json(host: str, port: int, payload: dict) -> tuple[int, dict]: + conn = HTTPConnection(host, port, timeout=5) + body = json.dumps(payload).encode() + conn.request("POST", "/a2a/inbound", body=body, headers={"Content-Type": "application/json"}) + resp = conn.getresponse() + return resp.status, json.loads(resp.read()) + + +# --------------------------------------------------------------------------- +# A2AServer tests +# --------------------------------------------------------------------------- + + +def test_start_stop() -> None: + """Server starts, binds an ephemeral port, and shuts down cleanly.""" + handler = MagicMock(return_value={"ack": True}) + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + assert host in ("0.0.0.0", "127.0.0.1", "::") + assert isinstance(port, int) and port > 0 + finally: + server.stop() + + +def test_stop_idempotent() -> None: + """stop() called twice does not raise.""" + handler = MagicMock() + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + server.stop() + server.stop() # must not raise + + +def test_inbound_call_routes_to_handler() -> None: + """POST /a2a/inbound calls message_handler and returns 200.""" + handler = MagicMock(return_value={"task_id": "reply-123"}) + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + status, body = _post_json(host, port, {"task_id": "req-1", "message": "ping"}) + assert status == 200 + assert body["status"] == "ok" + assert body["result"] == {"task_id": "reply-123"} + handler.assert_called_once_with({"task_id": "req-1", "message": "ping"}) + finally: + server.stop() + + +def test_non_json_body_returns_400() -> None: + """Malformed JSON body returns 400 with error detail.""" + handler = MagicMock() + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + conn = HTTPConnection(host, port, timeout=5) + conn.request("POST", "/a2a/inbound", body=b"not json{", headers={"Content-Type": "application/json"}) + resp = conn.getresponse() + assert resp.status == 400 + body = json.loads(resp.read()) + assert "error" in body + finally: + server.stop() + + +def test_empty_body_returns_400() -> None: + """Empty body returns 400.""" + handler = MagicMock() + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + conn = HTTPConnection(host, port, timeout=5) + conn.request("POST", "/a2a/inbound", body=b"", headers={"Content-Length": "0"}) + resp = conn.getresponse() + assert resp.status == 400 + finally: + server.stop() + + +def test_wrong_path_returns_404() -> None: + """A POST to any path other than /a2a/inbound returns 404.""" + handler = MagicMock() + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + conn = HTTPConnection(host, port, timeout=5) + conn.request("POST", "/other/path", body=b"{}") + resp = conn.getresponse() + assert resp.status == 404 + handler.assert_not_called() + finally: + server.stop() + + +def test_handler_exception_returns_500() -> None: + """Handler raising an exception returns 500, not crashing the server.""" + handler = MagicMock(side_effect=RuntimeError("boom")) + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + status, body = _post_json(host, port, {"task_id": "req-1"}) + assert status == 500 + assert "error" in body + finally: + server.stop() + + +def test_async_handler_runs_sync() -> None: + """An async handler is run to completion synchronously.""" + async_calls: list = [] + + async def async_handler(payload: dict) -> dict: + async_calls.append(payload) + return {"async": True} + + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=async_handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + status, body = _post_json(host, port, {"task_id": "async-req"}) + assert status == 200 + assert body["result"] == {"async": True} + assert len(async_calls) == 1 + finally: + server.stop() + + +def test_concurrent_requests() -> None: + """Multiple simultaneous POSTs are handled without crashing the server.""" + call_count = {"count": 0} + lock = threading.Lock() + + def counting_handler(payload: dict) -> dict: + with lock: + call_count["count"] += 1 + time.sleep(0.05) # simulate light processing + return {"received": payload.get("task_id")} + + server = A2AServer( + agent_id="test-agent", + inbound_url="https://example.com/a2a/inbound", + message_handler=counting_handler, + ) + server.start_in_background() + try: + host, port = server._server.server_address # type: ignore[union-attr] + + def send(n: int) -> tuple[int, dict]: + return _post_json(host, port, {"task_id": f"concurrent-{n}"}) + + threads = [threading.Thread(target=send, args=(i,)) for i in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert call_count["count"] == 5 + finally: + server.stop()