molecule-ai-workspace-templ.../tests/test_executor_plugin_path.py
Hongming Wang f8ee17e2e3 hotfix(adapter): default MOLECULE_A2A_PLATFORM_ENABLED=false
Staging E2E for PR #32 surfaced a workspace boot failure: the deployed
image's hermes gateway never bound :8645, so adapter.setup()'s
/a2a/health probe got httpx.ConnectError and the workspace went
status=failed at ~498s.

Root cause is image-side install/discovery of the molecule-a2a plugin,
NOT the executor wire shape. Local scripts/e2e_full_chain.py runs
against a venv where I'd already installed the plugin manually — it
didn't catch the deployment-shape divergence.

Flip the default off to restore the legacy /v1/chat/completions
fallback (no session continuity, but works). Plugin path stays
opt-in via MOLECULE_A2A_PLATFORM_ENABLED=true so debugging can
continue per-workspace without rolling the whole image again.

Re-enabling will require:
  - An image-build smoke test that verifies pip show
    hermes-platform-molecule-a2a + hermes config show inside the
    built container (filed separately)
  - Verifying the molecule-a2a config stanza actually lands in
    ~/.hermes/config.yaml inside the running container

Tests updated: 37 pass. Plugin-path tests now opt-in via the helper's
default; default-detection test asserts the new chat_completions
fallback.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 18:51:00 -07:00

786 lines
26 KiB
Python

"""Tests for the hermes plugin transport in executor.py.
Coverage targets:
- Lifecycle: start() boots reply server; stop() tears it down and
fails any in-flight pending Futures.
- Inbound: execute() POSTs to a stub /a2a/inbound, registers a
Future keyed by message_id, awaits it.
- Outbound: a stub plugin POSTs to our reply server with reply_to
matching the inbound message_id; the awaiting execute() resolves
and emits on the event_queue.
- Auth: shared_secret is sent on outbound POST and required on
inbound reply POST.
- Error paths: empty prompt, hermes-side POST failure, reply timeout,
late/duplicate replies for unknown message_id.
- Fallback: when MOLECULE_A2A_PLATFORM_ENABLED=false the executor
falls through to the chat_completions transport.
"""
from __future__ import annotations
import asyncio
import os
import socket
import sys
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock
import httpx
import pytest
from aiohttp import ClientSession, ClientTimeout, web
from executor import HermesAgentProxyExecutor, SECRET_HEADER
from molecule_runtime.adapters.base import AdapterConfig
# ---- helpers --------------------------------------------------------
def _free_port() -> int:
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _make_executor(monkeypatch, **env: str) -> HermesAgentProxyExecutor:
"""Test helper. Defaults MOLECULE_A2A_PLATFORM_ENABLED=true so the
plugin-path tests below operate in plugin mode by default. Tests
that exercise the chat_completions fallback override this."""
env.setdefault("MOLECULE_A2A_PLATFORM_ENABLED", "true")
for k, v in env.items():
monkeypatch.setenv(k, v)
cfg = AdapterConfig(
model="hermes-test",
system_prompt="you are helpful",
)
return HermesAgentProxyExecutor(cfg)
class _CapturingQueue:
"""Stand-in for a2a.server.events.EventQueue."""
def __init__(self) -> None:
self.events: List[Any] = []
async def enqueue_event(self, event: Any) -> None:
self.events.append(event)
def _build_context(text: str, *, task_id: str = "task-1"):
"""A minimal RequestContext stub the executor's helper logic
can extract text + task_id from. The real a2a-sdk class has many
fields; we only need the ones executor.py touches."""
ctx = MagicMock()
ctx.task_id = task_id
ctx.session_id = None
ctx.context_id = None
msg = MagicMock()
msg.task_id = task_id
# Stand up a `parts` list of `TextPart`-shaped objects so
# extract_message_text(msg) returns our prompt.
text_part = MagicMock()
text_part.text = text
text_part.kind = "text"
msg.parts = [text_part]
ctx.message = msg
return ctx
# ---- structural -----------------------------------------------------
def test_executor_init_defaults_to_chat_completions(monkeypatch):
"""Default is the safe legacy /v1/chat/completions transport while
the image-side plugin install is being debugged. Plugin path is
opt-in via MOLECULE_A2A_PLATFORM_ENABLED=true. Port defaults still
apply for when the plugin path is enabled.
Built without _make_executor so the helper's plugin-on default
doesn't mask the prod default we want to assert here."""
monkeypatch.delenv("MOLECULE_A2A_PLATFORM_ENABLED", raising=False)
cfg = AdapterConfig(model="hermes-test", system_prompt="you are helpful")
ex = HermesAgentProxyExecutor(cfg)
assert ex._use_plugin is False
# Defaults still set so plugin enable just needs the one env flip.
assert ex._plugin_port == 8645
assert ex._callback_port == 8646
def test_executor_enables_plugin_path_when_opted_in(monkeypatch):
ex = _make_executor(monkeypatch, MOLECULE_A2A_PLATFORM_ENABLED="true")
assert ex._use_plugin is True
def test_executor_disabled_explicitly(monkeypatch):
ex = _make_executor(monkeypatch, MOLECULE_A2A_PLATFORM_ENABLED="false")
assert ex._use_plugin is False
def test_executor_respects_port_overrides(monkeypatch):
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT="9999",
MOLECULE_A2A_CALLBACK_PORT="9000",
)
assert ex._plugin_port == 9999
assert ex._callback_port == 9000
# ---- lifecycle ------------------------------------------------------
@pytest.mark.asyncio
async def test_start_boots_reply_server_and_stop_tears_it_down(monkeypatch):
cb_port = _free_port()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
)
try:
await ex.start()
# Server is up — POSTing should reach the handler (and fail
# validation since no reply_to/content; but the connection
# itself proves the listener exists).
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
) as r:
assert r.status == 400
finally:
await ex.stop()
# After stop, connection should refuse.
async with ClientSession(timeout=ClientTimeout(total=1)) as s:
with pytest.raises(Exception):
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
) as r:
pass
@pytest.mark.asyncio
async def test_start_idempotent(monkeypatch):
cb_port = _free_port()
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
try:
await ex.start()
await ex.start() # should be a no-op, not a port-in-use error
finally:
await ex.stop()
@pytest.mark.asyncio
async def test_stop_fails_pending_futures(monkeypatch):
ex = _make_executor(
monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(_free_port())
)
await ex.start()
loop = asyncio.get_running_loop()
fut: asyncio.Future = loop.create_future()
ex._pending["msg-1"] = fut
await ex.stop()
assert fut.done()
with pytest.raises(RuntimeError, match="executor shutting down"):
fut.result()
assert ex._pending == {}
# ---- happy path -----------------------------------------------------
@pytest.mark.asyncio
async def test_execute_via_plugin_round_trips(monkeypatch):
"""Stand up a fake hermes plugin that ack's the inbound POST and
asynchronously POSTs back a reply matched by reply_to. Confirm
execute() emits the reply text on the event queue."""
plugin_port = _free_port()
cb_port = _free_port()
inbound_received: List[Dict[str, Any]] = []
async def fake_inbound(request: web.Request) -> web.Response:
body = await request.json()
inbound_received.append({
"headers": dict(request.headers),
"body": body,
})
# Simulate hermes processing — POST a reply back to the
# callback URL the executor sent us.
async def _delayed_reply():
await asyncio.sleep(0.05)
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
await s.post(
body["callback_url"],
json={
"chat_id": body["chat_id"],
"content": "hello back from hermes",
"reply_to": body["message_id"],
"metadata": {},
},
)
asyncio.create_task(_delayed_reply())
return web.json_response({"ok": True, "queued": True})
plugin_app = web.Application()
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
plugin_runner = web.AppRunner(plugin_app)
await plugin_runner.setup()
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
await plugin_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
)
queue = _CapturingQueue()
try:
await ex.start()
ctx = _build_context("hello hermes")
await ex.execute(ctx, queue)
finally:
await ex.stop()
await plugin_site.stop()
await plugin_runner.cleanup()
assert len(inbound_received) == 1
assert inbound_received[0]["body"]["content"] == "hello hermes"
assert inbound_received[0]["body"]["chat_id"] == "task-1"
assert "callback_url" in inbound_received[0]["body"]
assert "message_id" in inbound_received[0]["body"]
assert len(queue.events) == 1
# The event is the a2a-sdk new_text_message envelope; confirm the
# text we set survived the round trip somewhere in its repr.
repr_event = repr(queue.events[0])
assert "hello back from hermes" in repr_event
@pytest.mark.asyncio
async def test_execute_empty_prompt_short_circuits(monkeypatch):
ex = _make_executor(
monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(_free_port())
)
queue = _CapturingQueue()
try:
await ex.start()
ctx = _build_context(" ") # whitespace only
await ex.execute(ctx, queue)
finally:
await ex.stop()
assert len(queue.events) == 1
assert "empty prompt" in repr(queue.events[0])
# ---- error paths ----------------------------------------------------
@pytest.mark.asyncio
async def test_execute_handles_inbound_post_failure(monkeypatch):
"""No plugin listening at the configured port — execute() should
emit an error message rather than hang."""
closed_port = _free_port() # nothing binds to it
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT=str(closed_port),
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
)
queue = _CapturingQueue()
try:
await ex.start()
ctx = _build_context("anybody home")
await ex.execute(ctx, queue)
finally:
await ex.stop()
assert len(queue.events) == 1
repr_event = repr(queue.events[0])
assert "hermes plugin POST error" in repr_event
# No leaked pending entry.
assert ex._pending == {}
@pytest.mark.asyncio
async def test_execute_handles_reply_timeout(monkeypatch):
"""Inbound POST succeeds but no reply comes — execute() emits a
timeout message after the configured deadline."""
plugin_port = _free_port()
async def silent_inbound(request: web.Request) -> web.Response:
await request.json()
return web.json_response({"ok": True, "queued": True}) # no callback
plugin_app = web.Application()
plugin_app.router.add_post("/a2a/inbound", silent_inbound)
plugin_runner = web.AppRunner(plugin_app)
await plugin_runner.setup()
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
await plugin_site.start()
# Patch the timeout so the test runs in <1s instead of 600s.
import executor as ex_mod
monkeypatch.setattr(ex_mod, "_PLUGIN_REPLY_TIMEOUT", 0.5)
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
)
queue = _CapturingQueue()
try:
await ex.start()
ctx = _build_context("waiting for nothing")
await ex.execute(ctx, queue)
finally:
await ex.stop()
await plugin_site.stop()
await plugin_runner.cleanup()
assert len(queue.events) == 1
assert "reply timeout" in repr(queue.events[0])
assert ex._pending == {}
# ---- shared-secret enforcement --------------------------------------
@pytest.mark.asyncio
async def test_shared_secret_sent_on_inbound_and_required_on_reply(monkeypatch):
"""When MOLECULE_A2A_PLATFORM_SHARED_SECRET is set, the executor
must include the X-Molecule-A2A-Secret header on outbound POSTs
and require it on incoming reply POSTs."""
plugin_port = _free_port()
cb_port = _free_port()
inbound_headers: Dict[str, str] = {}
async def fake_inbound(request: web.Request) -> web.Response:
body = await request.json()
inbound_headers.update(dict(request.headers))
# Reply WITHOUT the secret header — should be rejected 401.
async def _bad_then_good():
await asyncio.sleep(0.02)
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
# First: no header → 401 (and the future stays pending).
async with s.post(
body["callback_url"],
json={
"chat_id": body["chat_id"],
"content": "should be rejected",
"reply_to": body["message_id"],
},
) as r1:
assert r1.status == 401
# Second: with header → 200, future resolves.
await s.post(
body["callback_url"],
headers={SECRET_HEADER: "topsecret"},
json={
"chat_id": body["chat_id"],
"content": "authorized reply",
"reply_to": body["message_id"],
},
)
asyncio.create_task(_bad_then_good())
return web.json_response({"ok": True, "queued": True})
plugin_app = web.Application()
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
plugin_runner = web.AppRunner(plugin_app)
await plugin_runner.setup()
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
await plugin_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
MOLECULE_A2A_CALLBACK_PORT=str(cb_port),
MOLECULE_A2A_PLATFORM_SHARED_SECRET="topsecret",
)
queue = _CapturingQueue()
try:
await ex.start()
await ex.execute(_build_context("auth me"), queue)
finally:
await ex.stop()
await plugin_site.stop()
await plugin_runner.cleanup()
assert inbound_headers.get(SECRET_HEADER) == "topsecret"
assert "authorized reply" in repr(queue.events[0])
@pytest.mark.asyncio
async def test_reply_for_unknown_message_id_acks_stale(monkeypatch):
"""A reply for a message_id we don't have pending should ack
{ok: true, stale: true} so the plugin doesn't retry forever."""
cb_port = _free_port()
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
try:
await ex.start()
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply",
json={"reply_to": "ghost-id", "content": "anybody home"},
) as r:
assert r.status == 200
body = await r.json()
assert body == {"ok": True, "stale": True}
finally:
await ex.stop()
@pytest.mark.asyncio
async def test_reply_validates_required_fields(monkeypatch):
cb_port = _free_port()
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
try:
await ex.start()
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
# Missing both
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply", json={}
) as r:
assert r.status == 400
# Missing content
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply",
json={"reply_to": "x"},
) as r:
assert r.status == 400
# Bad JSON
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply",
data="not json",
) as r:
assert r.status == 400
finally:
await ex.stop()
# ---- chat_completions fallback --------------------------------------
@pytest.mark.asyncio
async def test_fallback_chat_completions_path(monkeypatch):
"""With MOLECULE_A2A_PLATFORM_ENABLED=false, the executor must POST
to /v1/chat/completions and emit the assistant text."""
api_port = _free_port()
async def fake_chat_completions(request: web.Request) -> web.Response:
body = await request.json()
assert body["model"] == "hermes-test"
return web.json_response({
"choices": [
{"message": {"role": "assistant", "content": "fallback reply"}}
]
})
api_app = web.Application()
api_app.router.add_post("/v1/chat/completions", fake_chat_completions)
api_runner = web.AppRunner(api_app)
await api_runner.setup()
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
await api_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_ENABLED="false",
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
)
queue = _CapturingQueue()
try:
await ex.start() # no-op when plugin disabled
await ex.execute(_build_context("legacy"), queue)
finally:
await ex.stop()
await api_site.stop()
await api_runner.cleanup()
assert len(queue.events) == 1
assert "fallback reply" in repr(queue.events[0])
@pytest.mark.asyncio
async def test_fallback_handles_chat_completions_http_error(monkeypatch):
api_port = _free_port()
async def err500(request: web.Request) -> web.Response:
return web.Response(status=500, text="boom")
api_app = web.Application()
api_app.router.add_post("/v1/chat/completions", err500)
api_runner = web.AppRunner(api_app)
await api_runner.setup()
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
await api_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_ENABLED="false",
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
)
queue = _CapturingQueue()
try:
await ex.start()
await ex.execute(_build_context("trigger 500"), queue)
finally:
await ex.stop()
await api_site.stop()
await api_runner.cleanup()
assert "hermes-agent error 500" in repr(queue.events[0])
@pytest.mark.asyncio
async def test_fallback_handles_unreachable_chat_completions(monkeypatch):
closed_port = _free_port()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_ENABLED="false",
HERMES_API_BASE=f"http://127.0.0.1:{closed_port}/v1",
)
queue = _CapturingQueue()
try:
await ex.start()
await ex.execute(_build_context("nowhere"), queue)
finally:
await ex.stop()
assert "hermes-agent unreachable" in repr(queue.events[0])
@pytest.mark.asyncio
async def test_fallback_handles_unexpected_response_shape(monkeypatch):
api_port = _free_port()
async def junk_response(request: web.Request) -> web.Response:
return web.json_response({"not": "what we expected"})
api_app = web.Application()
api_app.router.add_post("/v1/chat/completions", junk_response)
api_runner = web.AppRunner(api_app)
await api_runner.setup()
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
await api_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_ENABLED="false",
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
)
queue = _CapturingQueue()
try:
await ex.start()
await ex.execute(_build_context("junk"), queue)
finally:
await ex.stop()
await api_site.stop()
await api_runner.cleanup()
assert "no content" in repr(queue.events[0])
# ---- chat_id derivation ----------------------------------------------
def test_derive_chat_id_prefers_task_id():
ctx = MagicMock()
ctx.task_id = "task-A"
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "task-A"
def test_derive_chat_id_falls_back_to_session_id():
ctx = MagicMock()
ctx.task_id = None
ctx.session_id = "sess-B"
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "sess-B"
def test_derive_chat_id_synthesizes_when_nothing_present():
ctx = MagicMock()
ctx.task_id = None
ctx.session_id = None
ctx.context_id = None
ctx.message = None
derived = HermesAgentProxyExecutor._derive_chat_id(ctx)
assert derived.startswith("adhoc-")
def test_derive_chat_id_falls_back_to_message_attrs():
"""When ctx.task_id/session_id/context_id are all None but
ctx.message has one of them, derive_chat_id should use that."""
ctx = MagicMock()
ctx.task_id = None
ctx.session_id = None
ctx.context_id = None
msg = MagicMock()
msg.task_id = None
msg.session_id = None
msg.context_id = "ctx-from-message"
ctx.message = msg
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "ctx-from-message"
def test_derive_chat_id_uses_message_id_camelcase():
"""The a2a-sdk uses messageId on the Message object — ensure we
pick it up as a last fallback before synthesizing an adhoc id."""
ctx = MagicMock()
ctx.task_id = None
ctx.session_id = None
ctx.context_id = None
msg = MagicMock()
msg.task_id = None
msg.session_id = None
msg.context_id = None
msg.messageId = "msg-camelcase"
ctx.message = msg
assert HermesAgentProxyExecutor._derive_chat_id(ctx) == "msg-camelcase"
@pytest.mark.asyncio
async def test_cancel_returns_none():
"""cancel() is a noop today — confirm it doesn't raise and returns
None so a2a-sdk's contract is satisfied."""
cfg = AdapterConfig(model="hermes-test")
ex = HermesAgentProxyExecutor(cfg)
result = await ex.cancel(MagicMock(), _CapturingQueue())
assert result is None
@pytest.mark.asyncio
async def test_plugin_path_resolves_with_runtime_error(monkeypatch):
"""If something inside the awaited future raises a non-timeout
exception (e.g., the executor was stopped mid-flight), execute()
should emit '[hermes plugin error] ...' rather than propagate."""
plugin_port = _free_port()
async def fake_inbound(request: web.Request) -> web.Response:
return web.json_response({"ok": True, "queued": True}) # no callback
plugin_app = web.Application()
plugin_app.router.add_post("/a2a/inbound", fake_inbound)
plugin_runner = web.AppRunner(plugin_app)
await plugin_runner.setup()
plugin_site = web.TCPSite(plugin_runner, "127.0.0.1", plugin_port)
await plugin_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_PORT=str(plugin_port),
MOLECULE_A2A_CALLBACK_PORT=str(_free_port()),
)
queue = _CapturingQueue()
try:
await ex.start()
# Race: send the request, then concurrently fail every pending
# future with a custom RuntimeError.
async def _execute():
await ex.execute(_build_context("simulate failure"), queue)
async def _trip():
# Wait for execute() to register its pending future.
for _ in range(50):
if ex._pending:
break
await asyncio.sleep(0.01)
for fut in list(ex._pending.values()):
if not fut.done():
fut.set_exception(RuntimeError("simulated mid-flight failure"))
await asyncio.gather(_execute(), _trip())
finally:
await ex.stop()
await plugin_site.stop()
await plugin_runner.cleanup()
assert "hermes plugin error" in repr(queue.events[0])
assert "simulated mid-flight failure" in repr(queue.events[0])
@pytest.mark.asyncio
async def test_chat_completions_includes_bearer_when_key_set(monkeypatch):
"""When API_SERVER_KEY is set, the legacy fallback must add an
Authorization header to /v1/chat/completions calls."""
api_port = _free_port()
received_headers: Dict[str, str] = {}
async def fake_chat_completions(request: web.Request) -> web.Response:
received_headers.update(dict(request.headers))
return web.json_response({
"choices": [{"message": {"role": "assistant", "content": "ok"}}]
})
api_app = web.Application()
api_app.router.add_post("/v1/chat/completions", fake_chat_completions)
api_runner = web.AppRunner(api_app)
await api_runner.setup()
api_site = web.TCPSite(api_runner, "127.0.0.1", api_port)
await api_site.start()
ex = _make_executor(
monkeypatch,
MOLECULE_A2A_PLATFORM_ENABLED="false",
HERMES_API_BASE=f"http://127.0.0.1:{api_port}/v1",
API_SERVER_KEY="test-bearer-token",
)
queue = _CapturingQueue()
try:
await ex.start()
await ex.execute(_build_context("authed"), queue)
finally:
await ex.stop()
await api_site.stop()
await api_runner.cleanup()
assert received_headers.get("Authorization") == "Bearer test-bearer-token"
@pytest.mark.asyncio
async def test_plugin_path_reply_handler_exception_path(monkeypatch):
"""If something inside our reply handler raises (e.g., the future
is cancelled mid-set), we should log and not crash the executor."""
cb_port = _free_port()
ex = _make_executor(monkeypatch, MOLECULE_A2A_CALLBACK_PORT=str(cb_port))
try:
await ex.start()
# Pre-resolve a future then send a reply for it — the handler
# tries to call set_result on a done future. We guard against
# that with the not future.done() check, so this should ack OK.
loop = asyncio.get_running_loop()
fut: asyncio.Future = loop.create_future()
fut.set_result("already-done")
ex._pending["pre-resolved"] = fut
async with ClientSession(timeout=ClientTimeout(total=2)) as s:
async with s.post(
f"http://127.0.0.1:{cb_port}/a2a/reply",
json={
"reply_to": "pre-resolved",
"content": "late delivery",
},
) as r:
assert r.status == 200
body = await r.json()
assert body == {"ok": True}
finally:
await ex.stop()
# ---- pyproject-style configuration -----------------------------------
def test_pytest_can_collect_module():
"""Trivial sanity check that conftest.py wires sys.path correctly
and the test module imports cleanly without monkeypatch fixtures."""
import executor # noqa: F401
assert hasattr(executor, "HermesAgentProxyExecutor")