hermes-channel-molecule/hermes_channel_molecule/adapter.py
Hongming Wang 754d162d99
Some checks failed
CI / test (3.11) (push) Failing after 13m37s
CI / test (3.12) (push) Failing after 13m37s
feat: dual-mode for upstream register_platform (post-#17751) + legacy register_platform_adapter
NousResearch/hermes-agent#17751 (merged 2026-04-30) shipped a
comprehensive pluggable-platform system with:

- ctx.register_platform(name, label, adapter_factory, check_fn, ...)
- Open Platform enum (Platform('molecule') creates a pseudo-member
  via _missing_() when the platform_registry knows about it)

That supersedes my upstream PR #18775 (which used a narrower
register_platform_adapter shape with a closed enum + custom
PluginPlatformIdentifier). Closing #18775 as redundant.

This plugin previously coupled to my fork's API. Migration:

- __init__.py register() now prefers ctx.register_platform when
  available; falls back to ctx.register_platform_adapter on legacy
  forks (template-hermes' baked-in fork until it migrates).
- adapter.py constructs Platform(name) when the enum accepts
  'molecule', else falls back to PluginPlatformIdentifier(name).

Same wheel installs cleanly on stock hermes-agent (post-#17751)
AND on the legacy template-hermes fork build. Removed the test
stub of PluginPlatformIdentifier; tests now stub the open-enum
Platform shape with the same _missing_() behavior the upstream
ships.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 07:47:51 -07:00

583 lines
22 KiB
Python

"""molecule-a2a hermes platform adapter.
Connects hermes to the molecule platform via the molecule-runtime A2A MCP
server. The MCP server is spawned as a subprocess on ``connect()``; we
talk to it over stdio JSON-RPC.
Inbound flow
------------
A background long-poll task calls ``wait_for_message`` on the MCP server.
When a real message lands (canvas user typing in the molecule canvas, or
a peer workspace delegating to us), we build a hermes ``MessageEvent``
and dispatch via ``self.handle_message(event)``. After the gateway hands
back a reply (or decides not to), we ack with ``inbox_pop(activity_id)``.
Outbound flow
-------------
``send(chat_id, content)`` parses the prefix:
- ``canvas:<workspace_id>`` → ``send_message_to_user(message=content)``
- ``peer:<peer_workspace_id>`` → ``delegate_task(workspace_id=<peer>, task=content)``
The chat_id encoding is what we hand to hermes inside the
``MessageEvent.source`` so the gateway's session bookkeeping has a stable
key per canvas/peer; the adapter is the only thing that decodes it.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
import subprocess
import sys
from typing import Any, Dict, Optional
from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
SendResult,
)
from gateway.config import Platform
def _platform_identity(name: str):
"""Pick the right Platform-shaped identity for the installed hermes.
Upstream #17751 made Platform an open enum (``Platform("molecule")``
works via ``_missing_()``). Legacy forks have a closed enum and ship
``PluginPlatformIdentifier`` for plugin-supplied platforms instead.
Detect at import time so the same plugin works on both.
"""
try:
return Platform(name)
except ValueError:
# Closed enum (legacy fork) — fall back to the fork's plugin
# identifier shape. Import lazily so a stock hermes-agent doesn't
# need this symbol to exist.
from hermes_cli.plugins import PluginPlatformIdentifier
return PluginPlatformIdentifier(name)
logger = logging.getLogger(__name__)
PLUGIN_NAME = "molecule"
# How long the long-poll task asks the MCP server to block per turn.
# 20s keeps the inbound latency floor tight while spending almost no CPU
# (one JSON-RPC roundtrip every ~20s when idle). The MCP server caps
# wait_for_message at 300s anyway, so picking lower than that is just a
# trade-off between wakeups-per-minute and inbound latency.
_LONG_POLL_SECS = 20
def _resolve_python() -> str:
"""Return the python interpreter that can ``import molecule_runtime``.
Order of preference matches the codex/hermes templates:
1. ``MOLECULE_MCP_PYTHON`` env override
2. ``/opt/molecule-venv/bin/python3`` (the workspace runtime venv —
only present on platform-managed EC2 workspaces)
3. ``python3`` / ``python`` from PATH
Returns the first candidate that ``-c "import molecule_runtime"``
succeeds against; falls back to the last candidate as a last resort
so the failure surface is a subprocess error (visible) rather than
a silent no-op (invisible).
"""
override = os.environ.get("MOLECULE_MCP_PYTHON", "").strip()
if override and os.access(override, os.X_OK):
return override
candidates = [
"/opt/molecule-venv/bin/python3",
"/opt/molecule-venv/bin/python",
shutil.which("python3") or "",
shutil.which("python") or "",
]
last_real = ""
for cand in candidates:
if not cand or not os.access(cand, os.X_OK):
continue
last_real = cand
try:
subprocess.run(
[cand, "-c", "import molecule_runtime"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=10,
)
return cand
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError):
continue
return last_real or "/opt/molecule-venv/bin/python3"
def check_molecule_requirements() -> bool:
"""Return True when the plugin has everything it needs to start.
Mirrors the ``check_<platform>_requirements()`` pattern in the
in-tree adapters. The gateway will skip adapter creation (with a
warning) if this returns False.
"""
if not os.environ.get("MOLECULE_WORKSPACE_ID"):
logger.warning(
"%s: MOLECULE_WORKSPACE_ID is not set; cannot connect", PLUGIN_NAME
)
return False
if not os.environ.get("MOLECULE_WORKSPACE_TOKEN"):
# Soft-warn rather than block: the runtime will fall back to
# /configs/.auth_token if present (in-container scenario where
# this plugin is unusual but not impossible). External runs —
# the common case — need the env var.
logger.warning(
"%s: MOLECULE_WORKSPACE_TOKEN is not set; outbound platform "
"calls will be unauthenticated unless /configs/.auth_token "
"exists. Generate a token from the canvas → Tokens tab and "
"export MOLECULE_WORKSPACE_TOKEN=...", PLUGIN_NAME,
)
python = _resolve_python()
try:
subprocess.run(
[python, "-c", "import molecule_runtime"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=10,
)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as exc:
logger.warning(
"%s: %s cannot import molecule_runtime: %s",
PLUGIN_NAME, python, exc,
)
return False
return True
class MoleculeAdapter(BasePlatformAdapter):
"""Hermes platform adapter for the molecule platform via A2A MCP."""
def __init__(self, config) -> None:
super().__init__(config, _platform_identity(PLUGIN_NAME))
self._workspace_id = os.environ.get("MOLECULE_WORKSPACE_ID", "")
self._platform_url = os.environ.get(
"MOLECULE_PLATFORM_URL", "http://platform:8080"
)
self._org_id = os.environ.get("MOLECULE_ORG_ID", "")
self._configs_dir = os.environ.get("MOLECULE_CONFIGS_DIR", "/configs")
# Per-workspace platform credential. Inside a molecule-managed
# container the runtime reads this from /configs/.auth_token (the
# platform writes it on provision); external runtimes (this case)
# have no /configs volume and must supply it via env. Without it
# every outbound platform call goes unauthenticated and gets
# rejected. See molecule_runtime.platform_auth.get_token.
self._workspace_token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "")
self._python = _resolve_python()
self._proc: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
self._poll_task: Optional[asyncio.Task] = None
self._stderr_task: Optional[asyncio.Task] = None
# JSON-RPC bookkeeping: monotonic id, demux table, write lock.
self._next_id = 1
self._pending: Dict[int, asyncio.Future] = {}
self._write_lock = asyncio.Lock()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def connect(self) -> bool:
"""Spawn the MCP server, complete the JSON-RPC handshake, start
the long-poll loop. Returns True on success.
"""
env = os.environ.copy()
# Ensure the subprocess sees the molecule_runtime configuration
# even when hermes was launched from a shell that doesn't have
# those vars exported.
env["WORKSPACE_ID"] = self._workspace_id
env["PLATFORM_URL"] = self._platform_url
if self._org_id:
env["MOLECULE_ORG_ID"] = self._org_id
if self._configs_dir:
env["CONFIGS_DIR"] = self._configs_dir
if self._workspace_token:
env["MOLECULE_WORKSPACE_TOKEN"] = self._workspace_token
try:
self._proc = await asyncio.create_subprocess_exec(
self._python,
"-m",
"molecule_runtime.a2a_mcp_server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
except (OSError, FileNotFoundError) as exc:
self._set_fatal_error(
"spawn_failed",
f"failed to spawn {self._python} -m molecule_runtime.a2a_mcp_server: {exc}",
retryable=False,
)
return False
self._reader_task = asyncio.create_task(
self._reader_loop(), name=f"{PLUGIN_NAME}-reader"
)
self._stderr_task = asyncio.create_task(
self._stderr_loop(), name=f"{PLUGIN_NAME}-stderr"
)
try:
init_result = await asyncio.wait_for(
self._call(
"initialize",
{
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": PLUGIN_NAME, "version": "0.1.0"},
},
),
timeout=15.0,
)
except asyncio.TimeoutError:
await self.disconnect()
self._set_fatal_error(
"init_timeout",
"MCP initialize handshake timed out after 15s",
retryable=True,
)
return False
except Exception as exc: # noqa: BLE001 — surface every init failure
await self.disconnect()
self._set_fatal_error(
"init_failed",
f"MCP initialize raised: {exc}",
retryable=True,
)
return False
logger.info(
"%s: MCP initialize ok (server=%s)",
PLUGIN_NAME,
(init_result or {}).get("serverInfo", {}).get("name", "?"),
)
# The "initialized" notification is fire-and-forget — no response.
await self._notify("notifications/initialized", {})
self._poll_task = asyncio.create_task(
self._long_poll_loop(), name=f"{PLUGIN_NAME}-poll"
)
self._mark_connected()
return True
async def disconnect(self) -> None:
"""Cancel background tasks and tear down the subprocess."""
for task in (self._poll_task, self._reader_task, self._stderr_task):
if task is not None and not task.done():
task.cancel()
for task in (self._poll_task, self._reader_task, self._stderr_task):
if task is None:
continue
try:
await task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
self._poll_task = None
self._reader_task = None
self._stderr_task = None
if self._proc is not None:
try:
if self._proc.returncode is None:
self._proc.terminate()
try:
await asyncio.wait_for(self._proc.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._proc.kill()
await self._proc.wait()
except ProcessLookupError:
pass
self._proc = None
# Fail any inflight calls so callers don't hang forever.
for fut in self._pending.values():
if not fut.done():
fut.set_exception(ConnectionError("MCP subprocess closed"))
self._pending.clear()
self._mark_disconnected()
# ------------------------------------------------------------------
# Outbound
# ------------------------------------------------------------------
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Route an outbound message based on the chat_id prefix."""
kind, target = self._decode_chat_id(chat_id)
try:
if kind == "canvas":
payload = await self._call_tool(
"send_message_to_user", {"message": content}
)
elif kind == "peer":
payload = await self._call_tool(
"delegate_task",
{"workspace_id": target, "task": content},
)
else:
return SendResult(
success=False,
error=f"unknown chat_id prefix: {chat_id!r}",
)
except Exception as exc: # noqa: BLE001 — convert to SendResult
return SendResult(
success=False,
error=str(exc),
retryable=isinstance(exc, (ConnectionError, asyncio.TimeoutError)),
)
return SendResult(success=True, raw_response=payload)
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""No-op — the molecule platform doesn't surface typing indicators."""
return None
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
kind, target = self._decode_chat_id(chat_id)
if kind == "canvas":
return {"name": "molecule canvas", "type": "dm", "chat_id": chat_id}
if kind == "peer":
return {
"name": f"peer:{target[:8]}",
"type": "dm",
"chat_id": chat_id,
}
return {"name": chat_id, "type": "dm", "chat_id": chat_id}
# ------------------------------------------------------------------
# Inbound — long-poll loop
# ------------------------------------------------------------------
async def _long_poll_loop(self) -> None:
"""Drain the molecule inbox, dispatching messages to the gateway."""
while True:
try:
raw = await self._call_tool(
"wait_for_message", {"timeout_secs": _LONG_POLL_SECS}
)
except asyncio.CancelledError:
raise
except ConnectionError:
# Subprocess died — disconnect path will reset state.
return
except Exception as exc: # noqa: BLE001
logger.warning("%s: wait_for_message raised: %s", PLUGIN_NAME, exc)
await asyncio.sleep(2.0)
continue
message = self._extract_inbox_message(raw)
if message is None:
continue # timeout tick
try:
await self._dispatch(message)
except Exception: # noqa: BLE001 — never let a bad event kill the loop
logger.exception("%s: dispatch failed for %r", PLUGIN_NAME, message)
activity_id = message.get("activity_id", "")
if activity_id:
try:
await self._call_tool(
"inbox_pop", {"activity_id": activity_id}
)
except Exception as exc: # noqa: BLE001
# Worst case: the same message redelivers next poll.
# That's a duplicate, not a lost message — log and
# continue rather than tear down the loop.
logger.warning(
"%s: inbox_pop(%s) failed: %s",
PLUGIN_NAME, activity_id, exc,
)
async def _dispatch(self, message: Dict[str, Any]) -> None:
"""Build a hermes MessageEvent and hand it to the gateway."""
kind = message.get("kind", "canvas_user")
peer_id = message.get("peer_id", "") or ""
text = message.get("text", "") or ""
activity_id = message.get("activity_id", "")
if kind == "peer_agent" and peer_id:
chat_id = self._encode_chat_id("peer", peer_id)
chat_name = f"peer:{peer_id[:8]}"
user_id = peer_id
user_name = peer_id
else:
chat_id = self._encode_chat_id("canvas", self._workspace_id)
chat_name = "molecule canvas"
user_id = "canvas"
user_name = "canvas-user"
source = self.build_source(
chat_id=chat_id,
chat_name=chat_name,
chat_type="dm",
user_id=user_id,
user_name=user_name,
)
event = MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
raw_message=message,
message_id=activity_id or None,
)
await self.handle_message(event)
# ------------------------------------------------------------------
# JSON-RPC helpers
# ------------------------------------------------------------------
async def _reader_loop(self) -> None:
"""Read stdout lines from the MCP server, demux to pending futures."""
assert self._proc is not None and self._proc.stdout is not None
stdout = self._proc.stdout
while True:
line = await stdout.readline()
if not line:
# Subprocess closed stdout — fail every pending call.
for fut in self._pending.values():
if not fut.done():
fut.set_exception(ConnectionError("MCP stdout closed"))
self._pending.clear()
return
try:
msg = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
req_id = msg.get("id")
if req_id is None:
# Notification from the server — ignore for now (push tags
# would arrive here once we wire the channel notification).
continue
fut = self._pending.pop(req_id, None)
if fut is None or fut.done():
continue
if "error" in msg:
fut.set_exception(RuntimeError(str(msg["error"])))
else:
fut.set_result(msg.get("result"))
async def _stderr_loop(self) -> None:
"""Mirror MCP server stderr into our logger so operators can debug."""
assert self._proc is not None and self._proc.stderr is not None
stderr = self._proc.stderr
while True:
line = await stderr.readline()
if not line:
return
text = line.decode("utf-8", errors="replace").rstrip()
if text:
logger.info("[mcp-server] %s", text)
async def _call(self, method: str, params: dict) -> Any:
"""Send a JSON-RPC request and await the response."""
if self._proc is None or self._proc.stdin is None:
raise ConnectionError("MCP subprocess not running")
req_id = self._next_id
self._next_id += 1
fut: asyncio.Future = asyncio.get_event_loop().create_future()
self._pending[req_id] = fut
payload = json.dumps(
{"jsonrpc": "2.0", "id": req_id, "method": method, "params": params}
) + "\n"
async with self._write_lock:
self._proc.stdin.write(payload.encode("utf-8"))
await self._proc.stdin.drain()
return await fut
async def _notify(self, method: str, params: dict) -> None:
"""Send a JSON-RPC notification (no id, no response)."""
if self._proc is None or self._proc.stdin is None:
raise ConnectionError("MCP subprocess not running")
payload = json.dumps(
{"jsonrpc": "2.0", "method": method, "params": params}
) + "\n"
async with self._write_lock:
self._proc.stdin.write(payload.encode("utf-8"))
await self._proc.stdin.drain()
async def _call_tool(self, tool_name: str, arguments: dict) -> Any:
"""Convenience wrapper: tools/call → return the parsed result."""
result = await self._call(
"tools/call",
{"name": tool_name, "arguments": arguments},
)
return self._extract_tool_payload(result)
@staticmethod
def _extract_tool_payload(result: Any) -> Any:
"""Pull the JSON payload out of an MCP tools/call response.
MCP wraps tool returns as ``{"content": [{"type": "text",
"text": "..."}]}``. Most molecule tools return a JSON string
inside that text field; some (commit_memory, send_message_to_user)
return a plain status string. Try JSON-decode first, fall back
to the raw text.
"""
if not isinstance(result, dict):
return result
content = result.get("content")
if not isinstance(content, list) or not content:
return result
text = content[0].get("text", "") if isinstance(content[0], dict) else ""
if not text:
return result
try:
return json.loads(text)
except (json.JSONDecodeError, TypeError):
return text
@staticmethod
def _extract_inbox_message(payload: Any) -> Optional[Dict[str, Any]]:
"""Return the inbox-message dict from a wait_for_message result.
Returns None for the timeout sentinel ``{"timeout": true, ...}``.
"""
if not isinstance(payload, dict):
return None
if payload.get("timeout"):
return None
if "activity_id" not in payload:
return None
return payload
# ------------------------------------------------------------------
# chat_id helpers
# ------------------------------------------------------------------
@staticmethod
def _encode_chat_id(kind: str, target: str) -> str:
return f"{kind}:{target}"
@staticmethod
def _decode_chat_id(chat_id: str) -> tuple[str, str]:
if ":" not in chat_id:
return ("", chat_id)
kind, _, target = chat_id.partition(":")
return (kind, target)