Staging E2E for PR #32 surfaced a workspace boot failure: the deployed image's hermes gateway never bound :8645, so adapter.setup()'s /a2a/health probe got httpx.ConnectError and the workspace went status=failed at ~498s. Root cause is image-side install/discovery of the molecule-a2a plugin, NOT the executor wire shape. Local scripts/e2e_full_chain.py runs against a venv where I'd already installed the plugin manually — it didn't catch the deployment-shape divergence. Flip the default off to restore the legacy /v1/chat/completions fallback (no session continuity, but works). Plugin path stays opt-in via MOLECULE_A2A_PLATFORM_ENABLED=true so debugging can continue per-workspace without rolling the whole image again. Re-enabling will require: - An image-build smoke test that verifies pip show hermes-platform-molecule-a2a + hermes config show inside the built container (filed separately) - Verifying the molecule-a2a config stanza actually lands in ~/.hermes/config.yaml inside the running container Tests updated: 37 pass. Plugin-path tests now opt-in via the helper's default; default-detection test asserts the new chat_completions fallback. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
786 lines
26 KiB
Python
786 lines
26 KiB
Python
"""Tests for the hermes plugin transport in executor.py.
|
|
|
|
Coverage targets:
|
|
- Lifecycle: start() boots reply server; stop() tears it down and
|
|
fails any in-flight pending Futures.
|
|
- Inbound: execute() POSTs to a stub /a2a/inbound, registers a
|
|
Future keyed by message_id, awaits it.
|
|
- Outbound: a stub plugin POSTs to our reply server with reply_to
|
|
matching the inbound message_id; the awaiting execute() resolves
|
|
and emits on the event_queue.
|
|
- Auth: shared_secret is sent on outbound POST and required on
|
|
inbound reply POST.
|
|
- Error paths: empty prompt, hermes-side POST failure, reply timeout,
|
|
late/duplicate replies for unknown message_id.
|
|
- Fallback: when MOLECULE_A2A_PLATFORM_ENABLED=false the executor
|
|
falls through to the chat_completions transport.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import socket
|
|
import sys
|
|
from typing import Any, Dict, List, Optional
|
|
from unittest.mock import MagicMock
|
|
|
|
import httpx
|
|
import pytest
|
|
from aiohttp import ClientSession, ClientTimeout, web
|
|
|
|
from executor import HermesAgentProxyExecutor, SECRET_HEADER
|
|
from molecule_runtime.adapters.base import AdapterConfig
|
|
|
|
|
|
# ---- helpers --------------------------------------------------------
|
|
|
|
|
|
def _free_port() -> int:
|
|
with socket.socket() as s:
|
|
s.bind(("127.0.0.1", 0))
|
|
return s.getsockname()[1]
|
|
|
|
|
|
def _make_executor(monkeypatch, **env: str) -> HermesAgentProxyExecutor:
|
|
"""Test helper. Defaults MOLECULE_A2A_PLATFORM_ENABLED=true so the
|
|
plugin-path tests below operate in plugin mode by default. Tests
|
|
that exercise the chat_completions fallback override this."""
|
|
env.setdefault("MOLECULE_A2A_PLATFORM_ENABLED", "true")
|
|
for k, v in env.items():
|
|
monkeypatch.setenv(k, v)
|
|
cfg = AdapterConfig(
|
|
model="hermes-test",
|
|
system_prompt="you are helpful",
|
|
)
|
|
return HermesAgentProxyExecutor(cfg)
|
|
|
|
|
|
class _CapturingQueue:
|
|
"""Stand-in for a2a.server.events.EventQueue."""
|
|
|
|
def __init__(self) -> None:
|
|
self.events: List[Any] = []
|
|
|
|
async def enqueue_event(self, event: Any) -> None:
|
|
self.events.append(event)
|
|
|
|
|
|
def _build_context(text: str, *, task_id: str = "task-1"):
|
|
"""A minimal RequestContext stub the executor's helper logic
|
|
can extract text + task_id from. The real a2a-sdk class has many
|
|
fields; we only need the ones executor.py touches."""
|
|
ctx = MagicMock()
|
|
ctx.task_id = task_id
|
|
ctx.session_id = None
|
|
ctx.context_id = None
|
|
msg = MagicMock()
|
|
msg.task_id = task_id
|
|
# Stand up a `parts` list of `TextPart`-shaped objects so
|
|
# extract_message_text(msg) returns our prompt.
|
|
text_part = MagicMock()
|
|
text_part.text = text
|
|
text_part.kind = "text"
|
|
msg.parts = [text_part]
|
|
ctx.message = msg
|
|
return ctx
|
|
|
|
|
|
# ---- structural -----------------------------------------------------
|
|
|
|
|
|
def test_executor_init_defaults_to_chat_completions(monkeypatch):
|
|
"""Default is the safe legacy /v1/chat/completions transport while
|
|
the image-side plugin install is being debugged. Plugin path is
|
|
opt-in via MOLECULE_A2A_PLATFORM_ENABLED=true. Port defaults still
|
|
apply for when the plugin path is enabled.
|
|
|
|
Built without _make_executor so the helper's plugin-on default
|
|
doesn't mask the prod default we want to assert here."""
|
|
monkeypatch.delenv("MOLECULE_A2A_PLATFORM_ENABLED", raising=False)
|
|
cfg = AdapterConfig(model="hermes-test", system_prompt="you are helpful")
|
|
ex = HermesAgentProxyExecutor(cfg)
|
|
assert ex._use_plugin is False
|
|
# Defaults still set so plugin enable just needs the one env flip.
|
|
assert ex._plugin_port == 8645
|
|
assert ex._callback_port == 8646
|
|
|
|
|
|
def test_executor_enables_plugin_path_when_opted_in(monkeypatch):
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_PLATFORM_ENABLED="true")
|
|
assert ex._use_plugin is True
|
|
|
|
|
|
def test_executor_disabled_explicitly(monkeypatch):
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_PLATFORM_ENABLED="false")
|
|
assert ex._use_plugin is False
|
|
|
|
|
|
def test_executor_respects_port_overrides(monkeypatch):
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT="9999",
|
|
MOLECULE_A2A_CALLBACK_PORT="9000",
|
|
)
|
|
assert ex._plugin_port == 9999
|
|
assert ex._callback_port == 9000
|
|
|
|
|
|
# ---- lifecycle ------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_boots_reply_server_and_stop_tears_it_down(monkeypatch):
|
|
cb_port = _free_port()
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
|
|
)
|
|
try:
|
|
await ex.start()
|
|
# Server is up — POSTing should reach the handler (and fail
|
|
# validation since no reply_to/content; but the connection
|
|
# itself proves the listener exists).
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
|
|
) as r:
|
|
assert r.status == 400
|
|
finally:
|
|
await ex.stop()
|
|
# After stop, connection should refuse.
|
|
async with ClientSession(timeout=ClientTimeout(total=1)) as s:
|
|
with pytest.raises(Exception):
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
|
|
) as r:
|
|
pass
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_idempotent(monkeypatch):
|
|
cb_port = _free_port()
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
|
|
try:
|
|
await ex.start()
|
|
await ex.start() # should be a no-op, not a port-in-use error
|
|
finally:
|
|
await ex.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_fails_pending_futures(monkeypatch):
|
|
ex = _make_executor(
|
|
monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(_free_port())
|
|
)
|
|
await ex.start()
|
|
loop = asyncio.get_running_loop()
|
|
fut: asyncio.Future = loop.create_future()
|
|
ex._pending["msg-1"] = fut
|
|
await ex.stop()
|
|
assert fut.done()
|
|
with pytest.raises(RuntimeError, match="executor shutting down"):
|
|
fut.result()
|
|
assert ex._pending == {}
|
|
|
|
|
|
# ---- happy path -----------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_via_plugin_round_trips(monkeypatch):
|
|
"""Stand up a fake hermes plugin that ack's the inbound POST and
|
|
asynchronously POSTs back a reply matched by reply_to. Confirm
|
|
execute() emits the reply text on the event queue."""
|
|
|
|
plugin_port = _free_port()
|
|
cb_port = _free_port()
|
|
inbound_received: List[Dict[str, Any]] = []
|
|
|
|
async def fake_inbound(request: web.Request) -> web.Response:
|
|
body = await request.json()
|
|
inbound_received.append({
|
|
"headers": dict(request.headers),
|
|
"body": body,
|
|
})
|
|
# Simulate hermes processing — POST a reply back to the
|
|
# callback URL the executor sent us.
|
|
async def _delayed_reply():
|
|
await asyncio.sleep(0.05)
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
await s.post(
|
|
body["callback_url"],
|
|
json={
|
|
"chat_id": body["chat_id"],
|
|
"content": "hello back from hermes",
|
|
"reply_to": body["message_id"],
|
|
"metadata": {},
|
|
},
|
|
)
|
|
asyncio.create_task(_delayed_reply())
|
|
return web.json_response({"ok": True, "queued": True})
|
|
|
|
plugin_app = web.Application()
|
|
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
|
|
plugin_runner = web.AppRunner(plugin_app)
|
|
await plugin_runner.setup()
|
|
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
|
|
await plugin_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
|
|
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
ctx = _build_context("hello hermes")
|
|
await ex.execute(ctx, queue)
|
|
finally:
|
|
await ex.stop()
|
|
await plugin_site.stop()
|
|
await plugin_runner.cleanup()
|
|
|
|
assert len(inbound_received) == 1
|
|
assert inbound_received[0]["body"]["content"] == "hello hermes"
|
|
assert inbound_received[0]["body"]["chat_id"] == "task-1"
|
|
assert "callback_url" in inbound_received[0]["body"]
|
|
assert "message_id" in inbound_received[0]["body"]
|
|
|
|
assert len(queue.events) == 1
|
|
# The event is the a2a-sdk new_text_message envelope; confirm the
|
|
# text we set survived the round trip somewhere in its repr.
|
|
repr_event = repr(queue.events[0])
|
|
assert "hello back from hermes" in repr_event
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_empty_prompt_short_circuits(monkeypatch):
|
|
ex = _make_executor(
|
|
monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(_free_port())
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
ctx = _build_context(" ") # whitespace only
|
|
await ex.execute(ctx, queue)
|
|
finally:
|
|
await ex.stop()
|
|
|
|
assert len(queue.events) == 1
|
|
assert "empty prompt" in repr(queue.events[0])
|
|
|
|
|
|
# ---- error paths ----------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_handles_inbound_post_failure(monkeypatch):
|
|
"""No plugin listening at the configured port — execute() should
|
|
emit an error message rather than hang."""
|
|
|
|
closed_port = _free_port() # nothing binds to it
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT=str(closed_port),
|
|
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
ctx = _build_context("anybody home")
|
|
await ex.execute(ctx, queue)
|
|
finally:
|
|
await ex.stop()
|
|
|
|
assert len(queue.events) == 1
|
|
repr_event = repr(queue.events[0])
|
|
assert "hermes plugin POST error" in repr_event
|
|
# No leaked pending entry.
|
|
assert ex._pending == {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_handles_reply_timeout(monkeypatch):
|
|
"""Inbound POST succeeds but no reply comes — execute() emits a
|
|
timeout message after the configured deadline."""
|
|
|
|
plugin_port = _free_port()
|
|
|
|
async def silent_inbound(request: web.Request) -> web.Response:
|
|
await request.json()
|
|
return web.json_response({"ok": True, "queued": True}) # no callback
|
|
|
|
plugin_app = web.Application()
|
|
plugin_app.router.add_post("/a2a/inbound", silent_inbound)
|
|
plugin_runner = web.AppRunner(plugin_app)
|
|
await plugin_runner.setup()
|
|
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
|
|
await plugin_site.start()
|
|
|
|
# Patch the timeout so the test runs in <1s instead of 600s.
|
|
import executor as ex_mod
|
|
monkeypatch.setattr(ex_mod, "_PLUGIN_REPLY_TIMEOUT", 0.5)
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
|
|
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
ctx = _build_context("waiting for nothing")
|
|
await ex.execute(ctx, queue)
|
|
finally:
|
|
await ex.stop()
|
|
await plugin_site.stop()
|
|
await plugin_runner.cleanup()
|
|
|
|
assert len(queue.events) == 1
|
|
assert "reply timeout" in repr(queue.events[0])
|
|
assert ex._pending == {}
|
|
|
|
|
|
# ---- shared-secret enforcement --------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_shared_secret_sent_on_inbound_and_required_on_reply(monkeypatch):
|
|
"""When MOLECULE_A2A_PLATFORM_SHARED_SECRET is set, the executor
|
|
must include the X-Molecule-A2A-Secret header on outbound POSTs
|
|
and require it on incoming reply POSTs."""
|
|
|
|
plugin_port = _free_port()
|
|
cb_port = _free_port()
|
|
inbound_headers: Dict[str, str] = {}
|
|
|
|
async def fake_inbound(request: web.Request) -> web.Response:
|
|
body = await request.json()
|
|
inbound_headers.update(dict(request.headers))
|
|
# Reply WITHOUT the secret header — should be rejected 401.
|
|
async def _bad_then_good():
|
|
await asyncio.sleep(0.02)
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
# First: no header → 401 (and the future stays pending).
|
|
async with s.post(
|
|
body["callback_url"],
|
|
json={
|
|
"chat_id": body["chat_id"],
|
|
"content": "should be rejected",
|
|
"reply_to": body["message_id"],
|
|
},
|
|
) as r1:
|
|
assert r1.status == 401
|
|
# Second: with header → 200, future resolves.
|
|
await s.post(
|
|
body["callback_url"],
|
|
headers={SECRET_HEADER: "topsecret"},
|
|
json={
|
|
"chat_id": body["chat_id"],
|
|
"content": "authorized reply",
|
|
"reply_to": body["message_id"],
|
|
},
|
|
)
|
|
asyncio.create_task(_bad_then_good())
|
|
return web.json_response({"ok": True, "queued": True})
|
|
|
|
plugin_app = web.Application()
|
|
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
|
|
plugin_runner = web.AppRunner(plugin_app)
|
|
await plugin_runner.setup()
|
|
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
|
|
await plugin_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
|
|
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
|
|
MOLECULE_A2A_PLATFORM_SHARED_SECRET="topsecret",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
await ex.execute(_build_context("auth me"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
await plugin_site.stop()
|
|
await plugin_runner.cleanup()
|
|
|
|
assert inbound_headers.get(SECRET_HEADER) == "topsecret"
|
|
assert "authorized reply" in repr(queue.events[0])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reply_for_unknown_message_id_acks_stale(monkeypatch):
|
|
"""A reply for a message_id we don't have pending should ack
|
|
{ok: true, stale: true} so the plugin doesn't retry forever."""
|
|
|
|
cb_port = _free_port()
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
|
|
try:
|
|
await ex.start()
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply",
|
|
json={"reply_to": "ghost-id", "content": "anybody home"},
|
|
) as r:
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert body == {"ok": True, "stale": True}
|
|
finally:
|
|
await ex.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reply_validates_required_fields(monkeypatch):
|
|
cb_port = _free_port()
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
|
|
try:
|
|
await ex.start()
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
# Missing both
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
|
|
) as r:
|
|
assert r.status == 400
|
|
# Missing content
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply",
|
|
json={"reply_to": "x"},
|
|
) as r:
|
|
assert r.status == 400
|
|
# Bad JSON
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply",
|
|
data="not json",
|
|
) as r:
|
|
assert r.status == 400
|
|
finally:
|
|
await ex.stop()
|
|
|
|
|
|
# ---- chat_completions fallback --------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_chat_completions_path(monkeypatch):
|
|
"""With MOLECULE_A2A_PLATFORM_ENABLED=false, the executor must POST
|
|
to /v1/chat/completions and emit the assistant text."""
|
|
|
|
api_port = _free_port()
|
|
|
|
async def fake_chat_completions(request: web.Request) -> web.Response:
|
|
body = await request.json()
|
|
assert body["model"] == "hermes-test"
|
|
return web.json_response({
|
|
"choices": [
|
|
{"message": {"role": "assistant", "content": "fallback reply"}}
|
|
]
|
|
})
|
|
|
|
api_app = web.Application()
|
|
api_app.router.add_post("/v1/chat/completions", fake_chat_completions)
|
|
api_runner = web.AppRunner(api_app)
|
|
await api_runner.setup()
|
|
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
|
|
await api_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_ENABLED="false",
|
|
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start() # no-op when plugin disabled
|
|
await ex.execute(_build_context("legacy"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
await api_site.stop()
|
|
await api_runner.cleanup()
|
|
|
|
assert len(queue.events) == 1
|
|
assert "fallback reply" in repr(queue.events[0])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_handles_chat_completions_http_error(monkeypatch):
|
|
api_port = _free_port()
|
|
|
|
async def err500(request: web.Request) -> web.Response:
|
|
return web.Response(status=500, text="boom")
|
|
|
|
api_app = web.Application()
|
|
api_app.router.add_post("/v1/chat/completions", err500)
|
|
api_runner = web.AppRunner(api_app)
|
|
await api_runner.setup()
|
|
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
|
|
await api_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_ENABLED="false",
|
|
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
await ex.execute(_build_context("trigger 500"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
await api_site.stop()
|
|
await api_runner.cleanup()
|
|
|
|
assert "hermes-agent error 500" in repr(queue.events[0])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_handles_unreachable_chat_completions(monkeypatch):
|
|
closed_port = _free_port()
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_ENABLED="false",
|
|
HERMES_API_BASE=f"http://127.0.0.1:{closed_port}/v1",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
await ex.execute(_build_context("nowhere"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
|
|
assert "hermes-agent unreachable" in repr(queue.events[0])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_fallback_handles_unexpected_response_shape(monkeypatch):
|
|
api_port = _free_port()
|
|
|
|
async def junk_response(request: web.Request) -> web.Response:
|
|
return web.json_response({"not": "what we expected"})
|
|
|
|
api_app = web.Application()
|
|
api_app.router.add_post("/v1/chat/completions", junk_response)
|
|
api_runner = web.AppRunner(api_app)
|
|
await api_runner.setup()
|
|
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
|
|
await api_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_ENABLED="false",
|
|
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
await ex.execute(_build_context("junk"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
await api_site.stop()
|
|
await api_runner.cleanup()
|
|
|
|
assert "no content" in repr(queue.events[0])
|
|
|
|
|
|
# ---- chat_id derivation ----------------------------------------------
|
|
|
|
|
|
def test_derive_chat_id_prefers_task_id():
|
|
ctx = MagicMock()
|
|
ctx.task_id = "task-A"
|
|
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "task-A"
|
|
|
|
|
|
def test_derive_chat_id_falls_back_to_session_id():
|
|
ctx = MagicMock()
|
|
ctx.task_id = None
|
|
ctx.session_id = "sess-B"
|
|
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "sess-B"
|
|
|
|
|
|
def test_derive_chat_id_synthesizes_when_nothing_present():
|
|
ctx = MagicMock()
|
|
ctx.task_id = None
|
|
ctx.session_id = None
|
|
ctx.context_id = None
|
|
ctx.message = None
|
|
derived = HermesAgentProxyExecutor._derive_chat_id(ctx)
|
|
assert derived.startswith("adhoc-")
|
|
|
|
|
|
def test_derive_chat_id_falls_back_to_message_attrs():
|
|
"""When ctx.task_id/session_id/context_id are all None but
|
|
ctx.message has one of them, derive_chat_id should use that."""
|
|
ctx = MagicMock()
|
|
ctx.task_id = None
|
|
ctx.session_id = None
|
|
ctx.context_id = None
|
|
msg = MagicMock()
|
|
msg.task_id = None
|
|
msg.session_id = None
|
|
msg.context_id = "ctx-from-message"
|
|
ctx.message = msg
|
|
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "ctx-from-message"
|
|
|
|
|
|
def test_derive_chat_id_uses_message_id_camelcase():
|
|
"""The a2a-sdk uses messageId on the Message object — ensure we
|
|
pick it up as a last fallback before synthesizing an adhoc id."""
|
|
ctx = MagicMock()
|
|
ctx.task_id = None
|
|
ctx.session_id = None
|
|
ctx.context_id = None
|
|
msg = MagicMock()
|
|
msg.task_id = None
|
|
msg.session_id = None
|
|
msg.context_id = None
|
|
msg.messageId = "msg-camelcase"
|
|
ctx.message = msg
|
|
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "msg-camelcase"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_returns_none():
|
|
"""cancel() is a noop today — confirm it doesn't raise and returns
|
|
None so a2a-sdk's contract is satisfied."""
|
|
cfg = AdapterConfig(model="hermes-test")
|
|
ex = HermesAgentProxyExecutor(cfg)
|
|
result = await ex.cancel(MagicMock(), _CapturingQueue())
|
|
assert result is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_path_resolves_with_runtime_error(monkeypatch):
|
|
"""If something inside the awaited future raises a non-timeout
|
|
exception (e.g., the executor was stopped mid-flight), execute()
|
|
should emit '[hermes plugin error] ...' rather than propagate."""
|
|
|
|
plugin_port = _free_port()
|
|
|
|
async def fake_inbound(request: web.Request) -> web.Response:
|
|
return web.json_response({"ok": True, "queued": True}) # no callback
|
|
|
|
plugin_app = web.Application()
|
|
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
|
|
plugin_runner = web.AppRunner(plugin_app)
|
|
await plugin_runner.setup()
|
|
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
|
|
await plugin_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
|
|
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
|
|
# Race: send the request, then concurrently fail every pending
|
|
# future with a custom RuntimeError.
|
|
async def _execute():
|
|
await ex.execute(_build_context("simulate failure"), queue)
|
|
|
|
async def _trip():
|
|
# Wait for execute() to register its pending future.
|
|
for _ in range(50):
|
|
if ex._pending:
|
|
break
|
|
await asyncio.sleep(0.01)
|
|
for fut in list(ex._pending.values()):
|
|
if not fut.done():
|
|
fut.set_exception(RuntimeError("simulated mid-flight failure"))
|
|
|
|
await asyncio.gather(_execute(), _trip())
|
|
finally:
|
|
await ex.stop()
|
|
await plugin_site.stop()
|
|
await plugin_runner.cleanup()
|
|
|
|
assert "hermes plugin error" in repr(queue.events[0])
|
|
assert "simulated mid-flight failure" in repr(queue.events[0])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_completions_includes_bearer_when_key_set(monkeypatch):
|
|
"""When API_SERVER_KEY is set, the legacy fallback must add an
|
|
Authorization header to /v1/chat/completions calls."""
|
|
|
|
api_port = _free_port()
|
|
received_headers: Dict[str, str] = {}
|
|
|
|
async def fake_chat_completions(request: web.Request) -> web.Response:
|
|
received_headers.update(dict(request.headers))
|
|
return web.json_response({
|
|
"choices": [{"message": {"role": "assistant", "content": "ok"}}]
|
|
})
|
|
|
|
api_app = web.Application()
|
|
api_app.router.add_post("/v1/chat/completions", fake_chat_completions)
|
|
api_runner = web.AppRunner(api_app)
|
|
await api_runner.setup()
|
|
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
|
|
await api_site.start()
|
|
|
|
ex = _make_executor(
|
|
monkeypatch,
|
|
MOLECULE_A2A_PLATFORM_ENABLED="false",
|
|
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
|
|
API_SERVER_KEY="test-bearer-token",
|
|
)
|
|
queue = _CapturingQueue()
|
|
try:
|
|
await ex.start()
|
|
await ex.execute(_build_context("authed"), queue)
|
|
finally:
|
|
await ex.stop()
|
|
await api_site.stop()
|
|
await api_runner.cleanup()
|
|
|
|
assert received_headers.get("Authorization") == "Bearer test-bearer-token"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plugin_path_reply_handler_exception_path(monkeypatch):
|
|
"""If something inside our reply handler raises (e.g., the future
|
|
is cancelled mid-set), we should log and not crash the executor."""
|
|
|
|
cb_port = _free_port()
|
|
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
|
|
try:
|
|
await ex.start()
|
|
# Pre-resolve a future then send a reply for it — the handler
|
|
# tries to call set_result on a done future. We guard against
|
|
# that with the not future.done() check, so this should ack OK.
|
|
loop = asyncio.get_running_loop()
|
|
fut: asyncio.Future = loop.create_future()
|
|
fut.set_result("already-done")
|
|
ex._pending["pre-resolved"] = fut
|
|
|
|
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
|
|
async with s.post(
|
|
f"http://127.0.0.1:{cb_port}/a2a/reply",
|
|
json={
|
|
"reply_to": "pre-resolved",
|
|
"content": "late delivery",
|
|
},
|
|
) as r:
|
|
assert r.status == 200
|
|
body = await r.json()
|
|
assert body == {"ok": True}
|
|
finally:
|
|
await ex.stop()
|
|
|
|
|
|
# ---- pyproject-style configuration -----------------------------------
|
|
|
|
|
|
def test_pytest_can_collect_module():
|
|
"""Trivial sanity check that conftest.py wires sys.path correctly
|
|
and the test module imports cleanly without monkeypatch fixtures."""
|
|
import executor # noqa: F401
|
|
assert hasattr(executor, "HermesAgentProxyExecutor")
|