feat(sdk): add A2AServer for Phase 30.8b inbound A2A support

Adds molecule_agent.a2a_server.A2AServer — a bundled HTTP server that
receives inbound A2A calls so remote agents can receive work from the
platform without provisioning their own HTTP endpoint.

- A2AServer: threaded HTTPServer on POST /a2a/inbound
- Sync and async handlers both supported; async handlers run in a
  dedicated event loop per call to avoid "no event loop in thread" errors
- 9 unit tests covering: lifecycle, routing, error handling, async path,
  concurrent requests
- Exported from molecule_agent.__init__; client.py docstring updated
- Closes GitHub #14

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · sdk-dev 2026-04-23 22:50:19 +00:00
parent 345b44b1bd
commit 5381f126dd
4 changed files with 451 additions and 2 deletions

View File

@ -34,6 +34,7 @@ Design notes:
from __future__ import annotations from __future__ import annotations
from .a2a_server import A2AServer
from .client import ( from .client import (
PeerInfo, PeerInfo,
RemoteAgentClient, RemoteAgentClient,
@ -46,6 +47,7 @@ from .client import (
from .__main__ import compute_plugin_sha256 from .__main__ import compute_plugin_sha256
__all__ = [ __all__ = [
"A2AServer",
"RemoteAgentClient", "RemoteAgentClient",
"WorkspaceState", "WorkspaceState",
"PeerInfo", "PeerInfo",

View File

@ -0,0 +1,229 @@
"""A2A server for inbound agent calls.
Bundled alongside :class:`molecule_agent.client.RemoteAgentClient` to
enable remote agents to receive A2A calls from the platform without
requiring the agent author to provision their own HTTP endpoint.
Phase 30.8b contract the server exposes ``POST /a2a/inbound`` which
the platform's ingress proxy calls when it needs to push work to a
registered remote agent.
Usage::
from molecule_agent import RemoteAgentClient, A2AServer
client = RemoteAgentClient(workspace_id="...", platform_url="...")
server = A2AServer(
agent_id=client.workspace_id,
inbound_url="https://my-agent.example.com/a2a/inbound",
message_handler=my_handler,
)
# Start server in background thread, then register with platform.
server.start_in_background()
client.reported_url = server.inbound_url # platform reaches this URL
token = client.register()
# Heartbeat loop now reports a real URL instead of "remote://no-inbound".
client.run_heartbeat_loop()
# Shutdown the server when the agent exits.
server.stop()
The ``message_handler`` signature is::
async def my_handler(request: dict) -> dict:
'''Return an A2A-formatted response dict.'''
...
Handlers are invoked on the server's internal thread pool.
"""
from __future__ import annotations
import json
import logging
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any, Callable, Awaitable
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
# Module-level HTTPServer instance so the handler can access server state.
_server: HTTPServer | None = None
_lock = threading.Lock()
# ---------------------------------------------------------------------------
# Handler
# ---------------------------------------------------------------------------
class _A2AHandler(BaseHTTPRequestHandler):
"""Handles ``POST /a2a/inbound`` requests.
The request body is a JSON A2A task dispatch dict::
{
"task_id": "...",
"sender": "...",
"message": "...",
"idempotency_key": "...",
}
The ``message_handler`` ( supplied at construction) is called with the
parsed dict and its return value is written as a JSON response::
200 {"status": "ok", "result": <handler-result>}
400 {"error": "bad request: ..."}
500 {"error": "internal error: ..."}
"""
protocol_version = "HTTP/1.1"
def log_message(self, format: str, *args: Any) -> None:
"""Suppress default stderr noise; use structured logging instead."""
logger.debug("%s %s%s", self.command, self.path, format % args)
def log_error(self, format: str, *args: Any) -> None:
logger.warning("%s %s%s", self.command, self.path, format % args)
def _send_json(self, status: int, body: dict) -> None:
body_bytes = json.dumps(body).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body_bytes)))
self.end_headers()
if self.command != "HEAD":
self.wfile.write(body_bytes)
def do_POST(self) -> None:
parsed = urlparse(self.path)
if parsed.path != "/a2a/inbound":
self._send_json(404, {"error": "not found"})
return
try:
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
raise ValueError("empty body")
body = self.rfile.read(content_length)
payload = json.loads(body)
except (ValueError, json.JSONDecodeError) as exc:
self._send_json(400, {"error": f"bad request: {exc}"})
return
try:
result = _A2AHandler._message_handler(payload)
if isinstance(result, Awaitable):
# If the handler is async, run it synchronously in the server thread.
# Agents that want full async semantics should use an explicit ASGI app;
# this path covers the common case of a simple sync handler.
import asyncio
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(result)
finally:
loop.close()
self._send_json(200, {"status": "ok", "result": result})
except Exception as exc:
logger.exception("message_handler raised: %s", exc)
self._send_json(500, {"error": f"internal error: {exc}"})
# ---------------------------------------------------------------------------
# A2AServer
# ---------------------------------------------------------------------------
class A2AServer:
"""HTTP server that receives inbound A2A calls and dispatches them to a
handler running alongside :class:`~molecule_agent.client.RemoteAgentClient`.
Args:
agent_id: The workspace / agent identifier. Used in log messages.
inbound_url: The URL the platform's ingress proxy uses to reach this
server. Must be a reachable host:port (or a publicly accessible
URL if a tunnel is in front). The value is typically assigned to
``RemoteAgentClient.reported_url`` before registration so the
platform knows where to deliver inbound calls.
message_handler: Callable that receives a parsed A2A task dict and
returns a dict response. May be ``async def`` or regular ``def``.
host: Address to bind the HTTP server to. Defaults to ``"0.0.0.0"``
(all interfaces); bind to ``"127.0.0.1"`` if behind a reverse
proxy or tunnel.
port: TCP port to listen on. ``0`` picks an available ephemeral port
(useful when the real public URL is managed by a proxy/tunnel).
"""
def __init__(
self,
agent_id: str,
inbound_url: str,
message_handler: Callable[[dict], dict | Awaitable[dict]],
host: str = "0.0.0.0",
port: int = 0,
) -> None:
self.agent_id = agent_id
self.inbound_url = inbound_url
self.host = host
self.port = port
self._handler = message_handler
self._server: HTTPServer | None = None
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
# -------------------------------------------------------------------------
# Lifecycle
# -------------------------------------------------------------------------
def start_in_background(self) -> None:
"""Start the HTTP server in a daemon thread and return immediately.
Call :py:meth:`stop` to shut it down cleanly.
"""
global _server
with _lock:
self._server = HTTPServer((self.host, self.port), _A2AHandler)
_server = self._server
_A2AHandler._server = self # type: ignore[attr-defined]
_A2AHandler._message_handler = self._handler # type: ignore[attr-defined]
actual = self._server.server_address
logger.info(
"A2AServer for %s listening on %s:%s (inbound_url=%s)",
self.agent_id, actual[0], actual[1], self.inbound_url,
)
self._thread = threading.Thread(target=self._serve_forever, daemon=True)
self._thread.start()
def _serve_forever(self) -> None:
assert self._server is not None
while not self._stop_event.is_set():
try:
self._server.timeout = 0.5
self._server.handle_request()
except Exception as exc:
if not self._stop_event.is_set():
logger.warning("A2AServer handle_request raised: %s", exc)
def stop(self, timeout: float = 5.0) -> None:
"""Stop the HTTP server and join the background thread.
Idempotent safe to call multiple times.
"""
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=timeout)
self._thread = None
if self._server is not None:
try:
self._server.server_close()
except Exception as exc:
logger.warning("A2AServer server_close raised: %s", exc)
self._server = None
global _server
with _lock:
_server = None
__all__ = ["A2AServer"]

View File

@ -12,8 +12,9 @@ a Phase 30 endpoint:
returns when the platform reports the workspace paused or deleted. returns when the platform reports the workspace paused or deleted.
No inbound A2A server is bundled here yet that requires hosting an HTTP No inbound A2A server is bundled here yet that requires hosting an HTTP
endpoint the platform's proxy can reach, which is network-dependent. A endpoint the platform's proxy can reach, which is network-dependent.
future 30.8b iteration will add an optional ``start_a2a_server()`` helper. Use :class:`molecule_agent.a2a_server.A2AServer` to add inbound A2A support.
See that module for usage and the Phase 30.8b contract.
""" """
from __future__ import annotations from __future__ import annotations

217
tests/test_a2a_server.py Normal file
View File

@ -0,0 +1,217 @@
"""Tests for molecule_agent.a2a_server."""
from __future__ import annotations
import json
import threading
from http.client import HTTPConnection
from unittest.mock import MagicMock
import time
import pytest
from molecule_agent.a2a_server import A2AServer
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _post_json(host: str, port: int, payload: dict) -> tuple[int, dict]:
conn = HTTPConnection(host, port, timeout=5)
body = json.dumps(payload).encode()
conn.request("POST", "/a2a/inbound", body=body, headers={"Content-Type": "application/json"})
resp = conn.getresponse()
return resp.status, json.loads(resp.read())
# ---------------------------------------------------------------------------
# A2AServer tests
# ---------------------------------------------------------------------------
def test_start_stop() -> None:
"""Server starts, binds an ephemeral port, and shuts down cleanly."""
handler = MagicMock(return_value={"ack": True})
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
assert host in ("0.0.0.0", "127.0.0.1", "::")
assert isinstance(port, int) and port > 0
finally:
server.stop()
def test_stop_idempotent() -> None:
"""stop() called twice does not raise."""
handler = MagicMock()
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
server.stop()
server.stop() # must not raise
def test_inbound_call_routes_to_handler() -> None:
"""POST /a2a/inbound calls message_handler and returns 200."""
handler = MagicMock(return_value={"task_id": "reply-123"})
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
status, body = _post_json(host, port, {"task_id": "req-1", "message": "ping"})
assert status == 200
assert body["status"] == "ok"
assert body["result"] == {"task_id": "reply-123"}
handler.assert_called_once_with({"task_id": "req-1", "message": "ping"})
finally:
server.stop()
def test_non_json_body_returns_400() -> None:
"""Malformed JSON body returns 400 with error detail."""
handler = MagicMock()
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
conn = HTTPConnection(host, port, timeout=5)
conn.request("POST", "/a2a/inbound", body=b"not json{", headers={"Content-Type": "application/json"})
resp = conn.getresponse()
assert resp.status == 400
body = json.loads(resp.read())
assert "error" in body
finally:
server.stop()
def test_empty_body_returns_400() -> None:
"""Empty body returns 400."""
handler = MagicMock()
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
conn = HTTPConnection(host, port, timeout=5)
conn.request("POST", "/a2a/inbound", body=b"", headers={"Content-Length": "0"})
resp = conn.getresponse()
assert resp.status == 400
finally:
server.stop()
def test_wrong_path_returns_404() -> None:
"""A POST to any path other than /a2a/inbound returns 404."""
handler = MagicMock()
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
conn = HTTPConnection(host, port, timeout=5)
conn.request("POST", "/other/path", body=b"{}")
resp = conn.getresponse()
assert resp.status == 404
handler.assert_not_called()
finally:
server.stop()
def test_handler_exception_returns_500() -> None:
"""Handler raising an exception returns 500, not crashing the server."""
handler = MagicMock(side_effect=RuntimeError("boom"))
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
status, body = _post_json(host, port, {"task_id": "req-1"})
assert status == 500
assert "error" in body
finally:
server.stop()
def test_async_handler_runs_sync() -> None:
"""An async handler is run to completion synchronously."""
async_calls: list = []
async def async_handler(payload: dict) -> dict:
async_calls.append(payload)
return {"async": True}
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=async_handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
status, body = _post_json(host, port, {"task_id": "async-req"})
assert status == 200
assert body["result"] == {"async": True}
assert len(async_calls) == 1
finally:
server.stop()
def test_concurrent_requests() -> None:
"""Multiple simultaneous POSTs are handled without crashing the server."""
call_count = {"count": 0}
lock = threading.Lock()
def counting_handler(payload: dict) -> dict:
with lock:
call_count["count"] += 1
time.sleep(0.05) # simulate light processing
return {"received": payload.get("task_id")}
server = A2AServer(
agent_id="test-agent",
inbound_url="https://example.com/a2a/inbound",
message_handler=counting_handler,
)
server.start_in_background()
try:
host, port = server._server.server_address # type: ignore[union-attr]
def send(n: int) -> tuple[int, dict]:
return _post_json(host, port, {"task_id": f"concurrent-{n}"})
threads = [threading.Thread(target=send, args=(i,)) for i in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert call_count["count"] == 5
finally:
server.stop()