molecule-ai-workspace-templ.../executor.py
Hongming Wang f8ee17e2e3 hotfix(adapter): default MOLECULE_A2A_PLATFORM_ENABLED=false
Staging E2E for PR #32 surfaced a workspace boot failure: the deployed
image's hermes gateway never bound :8645, so adapter.setup()'s
/a2a/health probe got httpx.ConnectError and the workspace went
status=failed at ~498s.

Root cause is image-side install/discovery of the molecule-a2a plugin,
NOT the executor wire shape. Local scripts/e2e_full_chain.py runs
against a venv where I'd already installed the plugin manually — it
didn't catch the deployment-shape divergence.

Flip the default off to restore the legacy /v1/chat/completions
fallback (no session continuity, but works). Plugin path stays
opt-in via MOLECULE_A2A_PLATFORM_ENABLED=true so debugging can
continue per-workspace without rolling the whole image again.

Re-enabling will require:
  - An image-build smoke test that verifies pip show
    hermes-platform-molecule-a2a + hermes config show inside the
    built container (filed separately)
  - Verifying the molecule-a2a config stanza actually lands in
    ~/.hermes/config.yaml inside the running container

Tests updated: 37 pass. Plugin-path tests now opt-in via the helper's
default; default-detection test asserts the new chat_completions
fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 18:51:00 -07:00

400 lines
15 KiB
Python

"""A2A → hermes-agent bridge with two transports.
Default transport (``MOLECULE_A2A_PLATFORM_ENABLED=false``)
==========================================================
POST to ``http://127.0.0.1:8642/v1/chat/completions`` synchronously,
parse the OpenAI-shaped response, emit. No session continuity but no
plugin dependency.
This is the safe default while the plugin install path inside the
deployed image is being debugged: the staging E2E for PR #32 surfaced
a workspace boot failure where ``hermes gateway run`` did not bind
``:8645`` inside the container (root cause TBD; local
``scripts/e2e_full_chain.py`` runs against my laptop venv where the
plugin was already installed manually, so it didn't catch the
deployment-shape divergence). Flip back to plugin path with
``MOLECULE_A2A_PLATFORM_ENABLED=true`` once the image-side install
is verified.
Plugin transport (``MOLECULE_A2A_PLATFORM_ENABLED=true``)
=========================================================
POST each A2A turn to the in-container hermes-agent platform plugin's
``/a2a/inbound`` endpoint. Hermes processes the message through its full
pipeline (sessions, skills, tools, hooks) and POSTs the agent's reply
back to a callback server we run inside this executor. A correlation
table maps the inbound ``message_id`` to an ``asyncio.Future`` that the
``execute()`` call awaits — so the A2A response is delivered as soon as
hermes calls ``send()``, not by polling. Earns single-session continuity
for peer agents.
Wire shape
==========
The plugin POSTs replies to:
POST <callback_url>
Content-Type: application/json
{"chat_id": "...", "content": "...",
"reply_to": "<inbound message_id>", "metadata": {...}}
We correlate by ``reply_to`` and resolve the matching pending Future.
``chat_id`` is intentionally *not* used for correlation: it's coarser
than message_id (multiple in-flight messages on the same chat would
race).
"""
from __future__ import annotations
import asyncio
import logging
import os
import socket
import uuid
from typing import Any, Dict, Optional
import httpx
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError: # pragma: no cover
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.helpers import new_text_message
from molecule_runtime.adapters.base import AdapterConfig
from molecule_runtime.executor_helpers import extract_message_text
logger = logging.getLogger(__name__)
# --- legacy chat-completions config -----------------------------------
_DEFAULT_BASE = "http://127.0.0.1:8642/v1"
_REQUEST_TIMEOUT = 600.0
# --- plugin-path config ----------------------------------------------
_DEFAULT_PLUGIN_HOST = "127.0.0.1"
_DEFAULT_PLUGIN_PORT = 8645
_DEFAULT_CALLBACK_HOST = "127.0.0.1"
_DEFAULT_CALLBACK_PORT = 8646
_DEFAULT_CALLBACK_PATH = "/a2a/reply"
SECRET_HEADER = "X-Molecule-A2A-Secret"
# Same generous bound as the chat-completions path. Prevents a hung
# hermes daemon from wedging the A2A queue forever.
_PLUGIN_REPLY_TIMEOUT = 600.0
def _bool_env(name: str, default: bool) -> bool:
raw = os.environ.get(name, "").strip().lower()
if not raw:
return default
return raw in ("1", "true", "yes", "on")
class HermesAgentProxyExecutor(AgentExecutor):
"""Forwards every A2A turn to hermes-agent."""
def __init__(self, config: AdapterConfig):
self._config = config
# Legacy transport state.
self._base = os.environ.get("HERMES_API_BASE", _DEFAULT_BASE).rstrip("/")
# Plugin transport state. The reply server only boots if the
# plugin path is enabled; otherwise the executor degrades to
# the legacy proxy below.
# Default false until the image-side plugin install is verified
# — see module docstring. Operators flip on per workspace via env.
self._use_plugin = _bool_env("MOLECULE_A2A_PLATFORM_ENABLED", False)
self._plugin_host = os.environ.get(
"MOLECULE_A2A_PLATFORM_HOST", _DEFAULT_PLUGIN_HOST
)
self._plugin_port = int(
os.environ.get("MOLECULE_A2A_PLATFORM_PORT", _DEFAULT_PLUGIN_PORT)
)
self._callback_host = os.environ.get(
"MOLECULE_A2A_CALLBACK_HOST", _DEFAULT_CALLBACK_HOST
)
self._callback_port = int(
os.environ.get("MOLECULE_A2A_CALLBACK_PORT", _DEFAULT_CALLBACK_PORT)
)
self._shared_secret = (
os.environ.get("MOLECULE_A2A_PLATFORM_SHARED_SECRET", "") or ""
)
self._pending: Dict[str, asyncio.Future] = {}
self._reply_runner: Optional["web.AppRunner"] = None
self._reply_site: Optional["web.TCPSite"] = None
self._started: bool = False
# ------------------------------------------------------------------
# Lifecycle (called by adapter.create_executor)
# ------------------------------------------------------------------
async def start(self) -> None:
if self._started:
return
self._started = True
if not self._use_plugin:
logger.info(
"Hermes plugin path disabled; using /v1/chat/completions"
)
return
if not AIOHTTP_AVAILABLE:
raise RuntimeError(
"MOLECULE_A2A_PLATFORM_ENABLED=true but aiohttp is not "
"installed — add aiohttp to requirements.txt or set "
"MOLECULE_A2A_PLATFORM_ENABLED=false to use the legacy path."
)
await self._start_reply_server()
async def stop(self) -> None:
if self._reply_site is not None:
try:
await self._reply_site.stop()
except Exception:
logger.exception("hermes plugin: reply site stop failed")
self._reply_site = None
if self._reply_runner is not None:
try:
await self._reply_runner.cleanup()
except Exception:
logger.exception("hermes plugin: reply runner cleanup failed")
self._reply_runner = None
# Cancel all pending futures so any in-flight execute() calls
# surface a clear shutdown error rather than hanging until the
# 600s timeout fires.
for fut in list(self._pending.values()):
if not fut.done():
fut.set_exception(RuntimeError("executor shutting down"))
self._pending.clear()
async def _start_reply_server(self) -> None:
app = web.Application()
app.router.add_post(_DEFAULT_CALLBACK_PATH, self._handle_reply)
self._reply_runner = web.AppRunner(app)
await self._reply_runner.setup()
self._reply_site = web.TCPSite(
self._reply_runner, self._callback_host, self._callback_port
)
await self._reply_site.start()
logger.info(
"hermes plugin reply server listening on http://%s:%d%s",
self._callback_host, self._callback_port, _DEFAULT_CALLBACK_PATH,
)
# ------------------------------------------------------------------
# AgentExecutor contract
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
prompt = extract_message_text(context.message) or ""
if not prompt.strip():
await event_queue.enqueue_event(
new_text_message("(empty prompt — nothing to do)")
)
return
if self._use_plugin:
await self._execute_via_plugin(context, event_queue, prompt)
else:
await self._execute_via_chat_completions(event_queue, prompt)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
# Both transports rely on a wall-clock timeout in execute() to
# bound a hung hermes-side run. No per-request cancel API is
# exposed by either path today. Revisit when hermes adds a
# turn/interrupt RPC for the chat_completions path.
return None
# ------------------------------------------------------------------
# Plugin transport
# ------------------------------------------------------------------
async def _execute_via_plugin(
self,
context: RequestContext,
event_queue: EventQueue,
prompt: str,
) -> None:
message_id = uuid.uuid4().hex
chat_id = self._derive_chat_id(context)
callback_url = (
f"http://{self._callback_host}:{self._callback_port}{_DEFAULT_CALLBACK_PATH}"
)
loop = asyncio.get_running_loop()
future: asyncio.Future = loop.create_future()
self._pending[message_id] = future
payload = {
"chat_id": chat_id,
"peer_id": chat_id,
"peer_name": chat_id,
"content": prompt,
"message_id": message_id,
"callback_url": callback_url,
}
headers = {"Content-Type": "application/json"}
if self._shared_secret:
headers[SECRET_HEADER] = self._shared_secret
inbound_url = (
f"http://{self._plugin_host}:{self._plugin_port}/a2a/inbound"
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(inbound_url, json=payload, headers=headers)
resp.raise_for_status()
except httpx.HTTPError as exc:
self._pending.pop(message_id, None)
logger.exception("hermes plugin POST failed")
await event_queue.enqueue_event(
new_text_message(f"[hermes plugin POST error] {exc!s}")
)
return
try:
text = await asyncio.wait_for(future, timeout=_PLUGIN_REPLY_TIMEOUT)
except asyncio.TimeoutError:
logger.error(
"hermes plugin: reply timeout for message_id=%s", message_id
)
await event_queue.enqueue_event(
new_text_message(
f"[hermes plugin reply timeout after {_PLUGIN_REPLY_TIMEOUT:.0f}s]"
)
)
return
except Exception as exc:
await event_queue.enqueue_event(
new_text_message(f"[hermes plugin error] {exc!s}")
)
return
finally:
self._pending.pop(message_id, None)
await event_queue.enqueue_event(new_text_message(text))
async def _handle_reply(self, request: "web.Request") -> "web.Response":
if self._shared_secret:
provided = request.headers.get(SECRET_HEADER, "")
if provided != self._shared_secret:
return web.json_response(
{"ok": False, "error": "unauthorized"}, status=401
)
try:
body = await request.json()
except Exception:
return web.json_response(
{"ok": False, "error": "invalid json"}, status=400
)
reply_to = body.get("reply_to")
content = body.get("content")
if not reply_to or not isinstance(content, str):
return web.json_response(
{"ok": False, "error": "reply_to and content required"},
status=400,
)
future = self._pending.get(reply_to)
if future is None:
# Late or duplicate delivery — the original execute() either
# already timed out or never registered. Acknowledge so the
# plugin doesn't retry forever.
logger.warning(
"hermes plugin: reply for unknown message_id=%s", reply_to
)
return web.json_response({"ok": True, "stale": True})
if not future.done():
future.set_result(content)
return web.json_response({"ok": True})
@staticmethod
def _derive_chat_id(context: RequestContext) -> str:
# Prefer task_id for stable per-conversation identity. Fall back
# to session/message attributes the a2a-sdk exposes; last resort
# is a synthetic ID so we always pass something hermes can use
# to key its session store.
for attr in ("task_id", "session_id", "context_id"):
value = getattr(context, attr, None)
if value:
return str(value)
message = getattr(context, "message", None)
if message is not None:
for attr in ("task_id", "session_id", "context_id", "messageId"):
value = getattr(message, attr, None)
if value:
return str(value)
return f"adhoc-{uuid.uuid4().hex[:12]}"
# ------------------------------------------------------------------
# Legacy chat_completions transport (fallback)
# ------------------------------------------------------------------
async def _execute_via_chat_completions(
self,
event_queue: EventQueue,
prompt: str,
) -> None:
payload = self._build_chat_completions_payload(prompt)
headers = self._build_chat_completions_headers()
try:
async with httpx.AsyncClient(timeout=_REQUEST_TIMEOUT) as client:
resp = await client.post(
f"{self._base}/chat/completions",
json=payload,
headers=headers,
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPStatusError as exc:
body = exc.response.text[:500] if exc.response is not None else ""
logger.error("hermes-agent %s: %s", exc.response.status_code, body)
await event_queue.enqueue_event(
new_text_message(
f"[hermes-agent error {exc.response.status_code}] {body}"
)
)
return
except httpx.RequestError as exc:
logger.exception("hermes-agent transport error")
await event_queue.enqueue_event(
new_text_message(f"[hermes-agent unreachable] {exc!s}")
)
return
text = self._extract_assistant_text(data)
await event_queue.enqueue_event(new_text_message(text))
def _build_chat_completions_payload(self, user_text: str) -> dict[str, Any]:
messages: list[dict[str, str]] = []
if self._config.system_prompt:
messages.append({"role": "system", "content": self._config.system_prompt})
messages.append({"role": "user", "content": user_text})
return {
"model": self._config.model or "hermes-agent",
"messages": messages,
"stream": False,
}
def _build_chat_completions_headers(self) -> dict[str, str]:
headers = {"Content-Type": "application/json"}
key = os.environ.get("API_SERVER_KEY", "")
if key:
headers["Authorization"] = f"Bearer {key}"
return headers
@staticmethod
def _extract_assistant_text(data: dict[str, Any]) -> str:
try:
return data["choices"][0]["message"]["content"] or ""
except (KeyError, IndexError, TypeError):
logger.warning("Unexpected hermes-agent response shape: %r", data)
return "(hermes-agent returned no content)"