hermes-platform-molecule-a2a/scripts/e2e_validate.py
Hongming Wang 8c4f2b7569 Initial: hermes-platform-molecule-a2a v0.1.0
Hermes-agent platform adapter that delivers Molecule A2A peer-agent
messages to a running hermes daemon over HTTP, and POSTs agent replies
back to the molecule-runtime callback URL.

Closes the MCP-style push parity gap for hermes workspaces:
- Single long-lived adapter session (no subprocess per message)
- MessageEvent(internal=True) bypasses per-platform user allowlist
  for trusted A2A peer messages
- Optional shared-secret HMAC enforcement
- Per-chat callback URL learned from inbound payload

Requires hermes-agent ≥ the upstream PR
NousResearch/hermes-agent#18775 (PluginContext.register_platform_adapter
+ GatewayConfig plugin_platforms wiring + PluginPlatformIdentifier
helper + resolve_platform_id deserialization).
2026-05-02 03:06:56 -07:00

239 lines
9.2 KiB
Python

"""End-to-end validation of MoleculeA2APlatformAdapter against a
hermes-agent install that has the ``register_platform_adapter`` patch.
Unlike ``tests/test_adapter.py`` (which unit-tests the adapter in
isolation), this script exercises the full production discovery path:
pip install hermes-platform-molecule-a2a
→ hermes plugin manager scans entry_points group
``hermes_agent.plugins``
→ calls ``register(ctx)`` on the package
→ ``ctx.register_platform_adapter("molecule-a2a", ...)`` lands
in the platform registry
→ ``GatewayConfig.from_dict({"platforms": {"molecule-a2a": ...}})``
routes the entry into ``plugin_platforms``
→ ``GatewayRunner._create_plugin_adapter`` instantiates the
class
→ adapter boots a real HTTP listener on an ephemeral port
→ POST /a2a/inbound dispatches a MessageEvent(internal=True)
into the registered handler
→ ``adapter.send(...)`` POSTs to the per-chat callback URL
learned from the inbound payload
Prerequisites:
1. A hermes-agent checkout containing the ``register_platform_adapter``
method on PluginContext + the GatewayConfig/GatewayRunner wiring.
Set ``HERMES_REPO`` env to override the default (``~/.hermes/hermes-agent``).
2. ``pip install -e .`` of this package into the same venv that
runs hermes (so the entry point is registered).
Exit 0 on success.
"""
from __future__ import annotations
import asyncio
import os
import socket
import sys
from pathlib import Path
from typing import Any, Dict, List
HERMES_REPO = Path(
os.environ.get("HERMES_REPO", str(Path.home() / ".hermes" / "hermes-agent"))
)
if not HERMES_REPO.exists():
print(f"FAIL: hermes-agent checkout not found at {HERMES_REPO}")
print("Set HERMES_REPO env to point at a fork with the "
"register_platform_adapter patch.")
sys.exit(1)
sys.path.insert(0, str(HERMES_REPO))
# Use a clean HERMES_HOME so user-dir plugins don't pollute the test —
# we want to confirm discovery via the pip entry_points path alone.
ROOT = Path(__file__).resolve().parent
os.environ["HERMES_HOME"] = str(ROOT / ".hermes_e2e_home")
(Path(os.environ["HERMES_HOME"]) / "plugins").mkdir(parents=True, exist_ok=True)
def _free_port() -> int:
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
async def _amain() -> int:
from gateway.config import GatewayConfig, PlatformConfig
from gateway.platforms.base import MessageEvent
from gateway.run import GatewayRunner
from hermes_cli.plugins import (
discover_plugins,
get_plugin_manager,
get_plugin_platform_adapter,
)
# --- 1. Discovery via pip entry_points ----------------------------
discover_plugins()
mgr = get_plugin_manager()
loaded = list(mgr._plugins.keys())
assert "molecule_a2a" in loaded or any(
"molecule" in n.lower() for n in loaded
), f"plugin not discovered via entry_points. Loaded: {loaded}. "\
f"Run: pip install -e /Users/hongming/hermes-platform-molecule-a2a"
print(f"OK: plugin discovered via pip entry_points ({len(loaded)} total)")
# --- 2. Registry round-trip ---------------------------------------
entry = get_plugin_platform_adapter("molecule-a2a")
assert entry is not None, "molecule-a2a not in plugin platform registry"
adapter_class, req_check = entry
assert adapter_class.__name__ == "MoleculeA2APlatformAdapter", (
f"unexpected class registered: {adapter_class.__name__}"
)
assert callable(req_check) and req_check() is True
print(f"OK: registry returns {adapter_class.__name__} + requirements_check")
# --- 3. GatewayConfig.from_dict routing ---------------------------
listen_port = _free_port()
cb_port = _free_port()
cb_url = f"http://127.0.0.1:{cb_port}/reply"
cfg_dict = {
"platforms": {
"molecule-a2a": {
"enabled": True,
"extra": {
"host": "127.0.0.1",
"port": listen_port,
"callback_url": cb_url,
},
},
},
}
gc = GatewayConfig.from_dict(cfg_dict)
assert "molecule-a2a" in gc.plugin_platforms, (
f"GatewayConfig.from_dict didn't route molecule-a2a to "
f"plugin_platforms; got {list(gc.plugin_platforms)}"
)
print("OK: GatewayConfig.from_dict routes molecule-a2a to plugin_platforms")
# --- 4. _create_plugin_adapter instantiation ----------------------
create_method = GatewayRunner._create_plugin_adapter
class _Stub:
pass
stub = _Stub()
pc = gc.plugin_platforms["molecule-a2a"]
adapter = create_method(stub, "molecule-a2a", pc)
assert adapter is not None, "_create_plugin_adapter returned None"
assert adapter.__class__.__name__ == "MoleculeA2APlatformAdapter"
print(f"OK: _create_plugin_adapter returned {adapter.__class__.__name__}")
# --- 5. Boot the adapter on a real port ---------------------------
received: List[MessageEvent] = []
posted_replies: List[Dict[str, Any]] = []
async def fake_handler(event: MessageEvent):
received.append(event)
return None # don't auto-trigger send
adapter.set_message_handler(fake_handler)
# Stand up a fake molecule-runtime callback receiver.
from aiohttp import web, ClientSession, ClientTimeout
async def cb_handler(request: web.Request) -> web.Response:
posted_replies.append(await request.json())
return web.json_response({"ok": True})
cb_app = web.Application()
cb_app.router.add_post("/reply", cb_handler)
cb_runner = web.AppRunner(cb_app)
await cb_runner.setup()
cb_site = web.TCPSite(cb_runner, "127.0.0.1", cb_port)
await cb_site.start()
try:
ok = await adapter.connect()
assert ok, "adapter.connect() returned False"
print(f"OK: adapter listening on http://127.0.0.1:{listen_port}/a2a/inbound")
# --- 6. Inbound A2A POST round-trip --------------------------
async with ClientSession(timeout=ClientTimeout(total=5)) as s:
url = f"http://127.0.0.1:{listen_port}/a2a/inbound"
payload = {
"chat_id": "peer-A",
"peer_id": "peer-A",
"peer_name": "ops-agent",
"content": "ping from peer",
"message_id": "m-1",
"callback_url": cb_url,
}
async with s.post(url, json=payload) as r:
assert r.status == 200, f"POST returned {r.status}"
body = await r.json()
assert body == {"ok": True, "queued": True}
# handle_message dispatches in background; wait briefly.
for _ in range(50):
if received:
break
await asyncio.sleep(0.02)
assert len(received) == 1, "inbound message did not reach handler"
ev = received[0]
assert ev.text == "ping from peer"
assert ev.internal is True
assert ev.source.chat_id == "peer-A"
print("OK: inbound POST → MessageEvent(internal=True) → message_handler")
# --- 7. Outbound send() round-trip ---------------------------
result = await adapter.send(
chat_id="peer-A",
content="pong from agent",
reply_to="m-1",
metadata={"source": "test"},
)
assert result.success, f"send() failed: {result.error}"
assert len(posted_replies) == 1
assert posted_replies[0] == {
"chat_id": "peer-A",
"content": "pong from agent",
"reply_to": "m-1",
"metadata": {"source": "test"},
}
print("OK: send() POSTed reply to per-chat callback URL")
finally:
await adapter.disconnect()
await cb_site.stop()
await cb_runner.cleanup()
# --- 8. Session round-trip survives daemon restart ----------------
# Plugin platforms die without resolve_platform_id: SessionSource.
# from_dict calls Platform(data["platform"]) which would raise
# ValueError for "molecule-a2a". Confirm the patched fork accepts it.
from gateway.session import SessionSource
from hermes_cli.plugins import PluginPlatformIdentifier
src = ev.source
serialized = src.to_dict()
assert serialized["platform"] == "molecule-a2a"
restored = SessionSource.from_dict(serialized)
assert isinstance(restored.platform, PluginPlatformIdentifier), (
f"SessionSource.from_dict returned {type(restored.platform).__name__} "
"for plugin platform — resolve_platform_id fallback not in place"
)
assert restored.platform.value == "molecule-a2a"
assert restored.chat_id == "peer-A"
print("OK: SessionSource survives to_dict/from_dict round-trip")
print("\n✓ Full E2E pipeline validated:")
print(" pip entry_points → plugin discovery → platform registry")
print(" → GatewayConfig.from_dict → _create_plugin_adapter")
print(" → live HTTP listener → MessageEvent dispatch → callback POST")
print(" → SessionSource serialization round-trip (plugin-platform-safe)")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(_amain()))