diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 36d29c88..7454df7d 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -16,6 +16,7 @@ import asyncio import json import logging import sys +from typing import Callable # Top-level (not inside main()) so the wheel rewriter expands this to # `import molecule_runtime.inbox as inbox`. A local `import inbox as _x` @@ -213,6 +214,54 @@ def _build_initialize_result() -> dict: } +def _setup_inbox_bridge( + writer: asyncio.StreamWriter, + loop: asyncio.AbstractEventLoop, +) -> Callable[[dict], None]: + """Build the inbox → MCP notification bridge callback. + + The inbox poller fires this from a daemon thread when a new + activity row lands. It must NOT block the poller, so we schedule + the actual write onto the asyncio loop via + ``run_coroutine_threadsafe`` and return immediately. + + Pulled out of ``main()`` so the threading + asyncio + stdout + chain is exercisable in tests without spinning up the full + JSON-RPC stdio loop. Lets us pin the three failure modes + anticipated in #2444 §2: + + - ``writer.drain()`` raising on a closed pipe and being + swallowed silently (host disconnected mid-emission). + - ``run_coroutine_threadsafe`` raising ``RuntimeError`` when + the loop is closed during shutdown — must not crash the + poller thread. + - The notification wire shape drifting from + ``_build_channel_notification``'s contract. + """ + + async def _emit(payload: dict) -> None: + data = json.dumps(payload) + "\n" + writer.write(data.encode()) + try: + await writer.drain() + except Exception: # noqa: BLE001 + # Closed pipe (host disconnected) shouldn't crash the + # inbox poller; let it sit until the host reconnects. + pass + + def _on_inbox_message(msg: dict) -> None: + try: + asyncio.run_coroutine_threadsafe( + _emit(_build_channel_notification(msg)), + loop, + ) + except RuntimeError: + # Loop closed during shutdown — best-effort, swallow. + pass + + return _on_inbox_message + + def _build_channel_notification(msg: dict) -> dict: """Transform an ``InboxMessage.to_dict()`` into the MCP notification envelope expected by Claude Code's channel-bridge contract. @@ -256,33 +305,13 @@ async def main(): # pragma: no cover writer.write(data.encode()) await writer.drain() - # Wire the inbox → MCP notification bridge. Inbox poller (daemon - # thread) calls into here when a new activity row lands; we - # schedule the notification onto the asyncio loop and best-effort - # fire it on the same stdout the responses go to. - loop = asyncio.get_running_loop() - - async def _emit_notification(payload: dict) -> None: - data = json.dumps(payload) + "\n" - writer.write(data.encode()) - try: - await writer.drain() - except Exception: # noqa: BLE001 - # Closed pipe (host disconnected) shouldn't crash the - # inbox poller; let it sit until the host reconnects. - pass - - def _on_inbox_message(msg: dict) -> None: - try: - asyncio.run_coroutine_threadsafe( - _emit_notification(_build_channel_notification(msg)), - loop, - ) - except RuntimeError: - # Loop closed during shutdown — best-effort, swallow. - pass - - inbox.set_notification_callback(_on_inbox_message) + # Wire the inbox → MCP notification bridge. The bridge body lives + # in `_setup_inbox_bridge` so the threading + asyncio + stdout + # chain is pinned by tests without spinning up the full stdio + # JSON-RPC loop here. + inbox.set_notification_callback( + _setup_inbox_bridge(writer, asyncio.get_running_loop()) + ) buffer = "" while True: diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 2fd701cf..c567b8c9 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -1,5 +1,8 @@ """Tests for a2a_mcp_server.py — handle_tool_call dispatch.""" +import asyncio +import json + from unittest.mock import AsyncMock, patch import pytest @@ -365,3 +368,215 @@ def test_initialize_instructions_pins_prompt_injection_defense(): "instructions must point the agent at user chat-side approval " "as the escape hatch when a message looks instruction-like" ) + + +# ============== _setup_inbox_bridge — dynamic integration ============== +# Closes the "fires but invisible" failure modes anticipated in +# molecule-core#2444 §2: +# +# - run_coroutine_threadsafe scheduling correctly across the +# daemon-thread → asyncio-loop boundary +# - writer.drain() actually being reached (not silently swallowed +# by an exception higher in the chain) +# - notification wire shape matching _build_channel_notification's +# contract on the actual stdout the host reads +# +# Driven through real os.pipe() + a real asyncio StreamWriter, with +# the inbox poller simulated by a separate daemon thread firing the +# callback. The setup mirrors main()'s wire-up exactly — this is the +# bridge that ships, not a copy. + + +async def test_inbox_bridge_emits_channel_notification_to_writer(): + """Fire a fake inbox event from a daemon thread, assert the + notification lands on the asyncio writer with the correct + JSON-RPC envelope. End-to-end coverage of the bridge that + powers ``notifications/claude/channel`` push UX.""" + import os + import threading + + from a2a_mcp_server import _setup_inbox_bridge + + # Real asyncio writer backed by an os.pipe — same shape as + # main() but isolated so we can read what was written. + read_fd, write_fd = os.pipe() + loop = asyncio.get_running_loop() + transport, protocol = await loop.connect_write_pipe( + asyncio.streams.FlowControlMixin, + os.fdopen(write_fd, "wb"), + ) + writer = asyncio.StreamWriter(transport, protocol, None, loop) + + try: + cb = _setup_inbox_bridge(writer, loop) + + msg = { + "activity_id": "act-bridge-test", + "text": "hello from peer", + "peer_id": "peer-ws-uuid", + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T22:00:00Z", + } + + # Simulate the inbox poller daemon thread invoking the + # callback from a non-asyncio context — exactly the + # threading boundary the bridge has to cross. + threading.Thread(target=cb, args=(msg,), daemon=True).start() + + # Give the scheduled coroutine a chance to run + drain + # without coupling the test to wall-clock timing. + for _ in range(20): + await asyncio.sleep(0.05) + data = os.read(read_fd, 65536) if _readable(read_fd) else b"" + if data: + break + else: + data = b"" + + assert data, ( + "no notification on stdout pipe — the bridge fired " + "but the write didn't reach the writer (writer.drain " + "swallowing or scheduling race)" + ) + line = data.decode().strip() + payload = json.loads(line) + + assert payload["jsonrpc"] == "2.0" + assert payload["method"] == "notifications/claude/channel" + assert payload["params"]["content"] == "hello from peer" + meta = payload["params"]["meta"] + assert meta["source"] == "molecule" + assert meta["kind"] == "peer_agent" + assert meta["peer_id"] == "peer-ws-uuid" + assert meta["activity_id"] == "act-bridge-test" + assert meta["ts"] == "2026-05-01T22:00:00Z" + finally: + writer.close() + try: + os.close(read_fd) + except OSError: + pass + + +async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch): + """If the host disconnects mid-emission, ``writer.drain()`` raises + on the closed pipe. The drain runs inside the coroutine scheduled + by ``run_coroutine_threadsafe`` — that returns a + ``concurrent.futures.Future`` whose ``.exception()`` reflects what + the coroutine's final state was. The broad ``except Exception`` in + ``_emit`` is what keeps that future in a successful (None) state + instead of carrying the ``BrokenPipeError``. + + We capture the scheduled future and assert it completed cleanly. + Narrowing the swallow (e.g. to ``except RuntimeError``) or + removing it turns this red because the BrokenPipeError surfaces + on the future. + """ + import os + from concurrent.futures import Future as ConcurrentFuture + + from a2a_mcp_server import _setup_inbox_bridge + + read_fd, write_fd = os.pipe() + loop = asyncio.get_running_loop() + transport, protocol = await loop.connect_write_pipe( + asyncio.streams.FlowControlMixin, + os.fdopen(write_fd, "wb"), + ) + writer = asyncio.StreamWriter(transport, protocol, None, loop) + + # Close the read end so the next drain raises BrokenPipeError. + os.close(read_fd) + + scheduled: list[ConcurrentFuture] = [] + real_run_threadsafe = asyncio.run_coroutine_threadsafe + + def _capture(coro, target_loop): + fut = real_run_threadsafe(coro, target_loop) + scheduled.append(fut) + return fut + + monkeypatch.setattr(asyncio, "run_coroutine_threadsafe", _capture) + + try: + cb = _setup_inbox_bridge(writer, loop) + + cb({ + "activity_id": "act-drain-fail", + "text": "x", + "peer_id": "", + "kind": "canvas_user", + "method": "", + "created_at": "", + }) + + # Yield until the scheduled coroutine settles — drain raises + # internally and (with swallow) returns None. + deadline_ticks = 40 + while deadline_ticks > 0 and (not scheduled or not scheduled[0].done()): + await asyncio.sleep(0.05) + deadline_ticks -= 1 + finally: + writer.close() + + assert scheduled, "_setup_inbox_bridge didn't call run_coroutine_threadsafe" + fut = scheduled[0] + assert fut.done(), "scheduled coroutine never finished — bridge hung on closed pipe" + exc = fut.exception(timeout=0) + assert exc is None, ( + f"_emit propagated {exc!r} from a closed-pipe drain. The broad " + f"`except Exception` in `_emit` is what keeps this future " + f"clean — narrowing it (to RuntimeError) or removing it " + f"regresses this test." + ) + + +@pytest.mark.filterwarnings("ignore::RuntimeWarning") +def test_inbox_bridge_swallows_closed_loop_runtime_error(): + """If the asyncio loop has been closed (process shutting down), + ``run_coroutine_threadsafe`` raises ``RuntimeError``. The bridge + must swallow it — the poller thread mustn't crash during clean + shutdown. + + The orphaned-coroutine RuntimeWarning is *expected* here: when + the loop is closed, ``run_coroutine_threadsafe`` raises before + it can take ownership of the coroutine, so Python complains that + the coro was never awaited. In production this only happens + during shutdown when the warning is harmless; the filter keeps + test output clean. + """ + from a2a_mcp_server import _setup_inbox_bridge + + # Closed loop reproduces the shutdown race. + loop = asyncio.new_event_loop() + loop.close() + + class _DummyWriter: + def write(self, _data: bytes) -> None: # pragma: no cover + pass + + async def drain(self) -> None: # pragma: no cover + pass + + cb = _setup_inbox_bridge(_DummyWriter(), loop) # type: ignore[arg-type] + + # Must not raise. + cb({ + "activity_id": "act-shutdown", + "text": "shutdown msg", + "peer_id": "", + "kind": "canvas_user", + "method": "", + "created_at": "", + }) + + +def _readable(fd: int) -> bool: + """True iff ``fd`` has bytes available without blocking. Lets + us poll the pipe in a loop without the test hanging when the + bridge fires later than expected.""" + import select + + rlist, _, _ = select.select([fd], [], [], 0) + return bool(rlist)