diff --git a/scripts/wheel_smoke.py b/scripts/wheel_smoke.py index 04d5235c..e32e4a77 100644 --- a/scripts/wheel_smoke.py +++ b/scripts/wheel_smoke.py @@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None: InboxState, activate as inbox_activate, get_state as inbox_get_state, + set_notification_callback as inbox_set_notification_callback, start_poller_thread as inbox_start_poller_thread, ) assert callable(inbox_activate), "inbox.activate must be callable" assert callable(inbox_get_state), "inbox.get_state must be callable" assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable" + assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable" assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel" assert callable(get_adapter), "adapters.get_adapter must be callable" diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 9e488f42..afffb956 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -130,6 +130,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str: return f"Unknown tool: {name}" +# --- MCP Notification bridge --- + +# `notifications/claude/channel` matches the contract used by the +# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's +# MCP runtime treats this method as a conversation interrupt — `content` +# becomes the agent turn, `meta` is structured metadata. Notification- +# capable hosts (Claude Code today; any compliant client tomorrow) +# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`) +# still work unchanged. See task #46 + the deprecation path documented +# in workspace/inbox.py:set_notification_callback. +_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel" + + +def _build_channel_notification(msg: dict) -> dict: + """Transform an ``InboxMessage.to_dict()`` into the MCP notification + envelope expected by Claude Code's channel-bridge contract. + + Pure function so the wire shape is unit-testable without spinning + up an asyncio loop. The wire-up in ``main()`` just composes this + with ``asyncio.run_coroutine_threadsafe``. + """ + return { + "jsonrpc": "2.0", + "method": _CHANNEL_NOTIFICATION_METHOD, + "params": { + "content": msg.get("text", ""), + "meta": { + "source": "molecule", + "kind": msg.get("kind", ""), + "peer_id": msg.get("peer_id", ""), + "method": msg.get("method", ""), + "activity_id": msg.get("activity_id", ""), + "ts": msg.get("created_at", ""), + }, + }, + } + + # --- MCP Server (JSON-RPC over stdio) --- async def main(): # pragma: no cover @@ -148,6 +186,35 @@ async def main(): # pragma: no cover writer.write(data.encode()) await writer.drain() + # Wire the inbox → MCP notification bridge. Inbox poller (daemon + # thread) calls into here when a new activity row lands; we + # schedule the notification onto the asyncio loop and best-effort + # fire it on the same stdout the responses go to. + loop = asyncio.get_running_loop() + + async def _emit_notification(payload: dict) -> None: + data = json.dumps(payload) + "\n" + writer.write(data.encode()) + try: + await writer.drain() + except Exception: # noqa: BLE001 + # Closed pipe (host disconnected) shouldn't crash the + # inbox poller; let it sit until the host reconnects. + pass + + def _on_inbox_message(msg: dict) -> None: + try: + asyncio.run_coroutine_threadsafe( + _emit_notification(_build_channel_notification(msg)), + loop, + ) + except RuntimeError: + # Loop closed during shutdown — best-effort, swallow. + pass + + import inbox as _inbox_module + _inbox_module.set_notification_callback(_on_inbox_message) + buffer = "" while True: try: diff --git a/workspace/inbox.py b/workspace/inbox.py index 3674a714..524c1eaa 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -53,7 +53,7 @@ import time from collections import deque from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Callable logger = logging.getLogger(__name__) @@ -173,10 +173,14 @@ class InboxState: logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc) def record(self, message: InboxMessage) -> None: - """Append a message and wake any waiter. + """Append a message, wake any waiter, and fire the notification + callback (if registered) for push-UX-capable hosts. Skips a row whose activity_id we've already queued — defensive - against the poller racing with the consumer + cursor save. + against the poller racing with the consumer + cursor save. The + dedupe short-circuits BEFORE the notification fires, so a + notification-capable host doesn't see duplicate push events on + backlog overlap. """ with self._lock: for existing in self._queue: @@ -184,6 +188,19 @@ class InboxState: return self._queue.append(message) self._arrival.set() + # Fire notification AFTER releasing the lock so the callback + # is free to do anything (including calling back into inbox) + # without deadlock. Best-effort: a raising callback must not + # prevent the message from landing in the queue — observability + # is more important than push delivery. + cb = _NOTIFICATION_CALLBACK + if cb is not None: + try: + cb(message.to_dict()) + except Exception: + logger.warning( + "inbox: notification callback raised", exc_info=True + ) def peek(self, limit: int = 10) -> list[InboxMessage]: """Return up to ``limit`` pending messages without removing them.""" @@ -240,6 +257,35 @@ class InboxState: _STATE: InboxState | None = None +# Notification bridge — set by the universal MCP server (a2a_mcp_server.py) +# at startup so that new inbox arrivals can be pushed to notification- +# capable hosts (Claude Code) as MCP `notifications/claude/channel` +# events. Kept module-level (rather than a method on InboxState) so the +# inbox doesn't need to know about MCP — a thin pluggable seam. +# +# Defaults to None: in-container runtimes that don't activate the inbox +# also don't push notifications, and tests start clean. The wheel's +# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge +# tests below. +_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None + + +def set_notification_callback(cb: Callable[[dict], None] | None) -> None: + """Register (or clear) the per-message notification callback. + + The callback receives ``InboxMessage.to_dict()`` for each new + arrival — same shape ``inbox_peek`` returns to the agent, so a + bridge can build its MCP notification payload without re-deriving + fields. + + Best-effort: a raising callback does NOT prevent the message from + landing in the queue (see ``InboxState.record``). Pass ``None`` to + clear (used by tests + the wheel's shutdown path). + """ + global _NOTIFICATION_CALLBACK + _NOTIFICATION_CALLBACK = cb + + def activate(state: InboxState) -> None: """Register an InboxState as the singleton this module exposes. diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 8969abcb..b08dd3a8 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED(): assert forbidden in desc, ( f"`attachments` description must call out {forbidden!r} as a wrong alternative" ) + + +# ============== Inbox → MCP notification bridge (2026-05-01) ============== +# Notification-capable hosts (Claude Code) get push UX when a new inbound +# message lands; pollers (wait_for_message/inbox_peek) keep working. +# `_build_channel_notification` is the pure shape transformer — wire-up +# in main() composes it with asyncio.run_coroutine_threadsafe. + + +def test_build_channel_notification_method_matches_claude_contract(): + """Method MUST be `notifications/claude/channel` exactly — that's + what Claude Code's MCP runtime listens for as a conversation + interrupt. Same string as the bun channel bridge sends + (server.ts:509) so this is a drop-in replacement.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + assert payload["method"] == "notifications/claude/channel" + assert payload["jsonrpc"] == "2.0" + + +def test_build_channel_notification_content_is_message_text(): + """`content` is what becomes the agent conversation turn — + pulled directly from the inbox message text.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello from canvas", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + assert payload["params"]["content"] == "hello from canvas" + + +def test_build_channel_notification_meta_carries_routing_fields(): + """Meta must include kind, peer_id, method, activity_id, ts — + fields the agent or downstream tooling needs to route a reply + (canvas_user → /notify, peer_agent → /a2a) and to acknowledge + via inbox_pop.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-7", + "text": "ping", + "peer_id": "ws-peer-uuid", + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T01:23:45Z", + }) + meta = payload["params"]["meta"] + + assert meta["source"] == "molecule" + assert meta["kind"] == "peer_agent" + assert meta["peer_id"] == "ws-peer-uuid" + assert meta["method"] == "message/send" + assert meta["activity_id"] == "act-7" + assert meta["ts"] == "2026-05-01T01:23:45Z" + + +def test_build_channel_notification_no_id_field(): + """Notifications MUST NOT carry a JSON-RPC `id` field — that's + what distinguishes them from requests. A notification with `id` + would be mis-interpreted as a request and clients would wait + for a response that never comes.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({"text": "x"}) + + assert "id" not in payload, ( + "notifications must omit `id` per JSON-RPC 2.0 spec — " + "presence would make MCP clients await a phantom response" + ) + + +def test_build_channel_notification_handles_missing_fields_gracefully(): + """Some fields may be absent on edge-case messages (e.g. cursor + bootstrapping with no created_at yet). Default to empty strings + so the wire shape stays valid JSON instead of crashing.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({}) + + assert payload["params"]["content"] == "" + meta = payload["params"]["meta"] + assert meta["activity_id"] == "" + assert meta["peer_id"] == "" + assert meta["kind"] == "" diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 03bcf8a4..a63297ae 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): def test_default_cursor_path_falls_back_to_default(monkeypatch): monkeypatch.delenv("CONFIGS_DIR", raising=False) assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor" + + +# --------------------------------------------------------------------------- +# Notification callback bridge — push UX for notification-capable hosts +# --------------------------------------------------------------------------- +# +# `record()` is called from the poller daemon thread when a new activity +# row arrives. Notification-capable MCP hosts (Claude Code) want to be +# pushed a notification — the universal wheel registers a callback via +# `set_notification_callback()` that fires the MCP notification. Pollers +# (`wait_for_message`/`inbox_peek`) keep working unchanged. + + +@pytest.fixture(autouse=True) +def _reset_notification_callback(): + """Each test starts with no callback registered. Notification + state must not leak across tests — same pattern as _reset_singleton.""" + inbox.set_notification_callback(None) + yield + inbox.set_notification_callback(None) + + +def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState): + """When a callback is registered, record() invokes it with the + canonical to_dict() shape — same shape inbox_peek returns to the + agent. Callers can build MCP notification payloads from this + without re-deriving fields.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + + state.record(_msg("act-1", peer_id="ws-peer", text="hello")) + + assert len(received) == 1 + payload = received[0] + assert payload["activity_id"] == "act-1" + assert payload["text"] == "hello" + assert payload["peer_id"] == "ws-peer" + assert payload["kind"] == "peer_agent" # to_dict derives this + assert payload["method"] == "message/send" + + +def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState): + """The activity_id dedupe path must short-circuit BEFORE invoking + the callback — otherwise a notification-capable host would see + duplicate push events on poller backlog overlap.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + + state.record(_msg("act-1")) + state.record(_msg("act-1")) # dedupe — same id + + assert len(received) == 1, ( + f"expected 1 callback (dedupe), got {len(received)} — " + f"would cause duplicate Claude conversation interrupts" + ) + + +def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState): + """A raising callback (e.g. asyncio loop closed mid-shutdown, + serialization error on an exotic message) must NOT prevent the + message from landing in the queue. Notification delivery is + best-effort; inbox correctness is not negotiable.""" + + def boom(_payload): + raise RuntimeError("simulated callback failure") + + inbox.set_notification_callback(boom) + + # Must not raise, must still queue the message. + state.record(_msg("act-1")) + + queued = state.peek(10) + assert len(queued) == 1 + assert queued[0].activity_id == "act-1" + + +def test_record_no_callback_registered_is_no_op(state: inbox.InboxState): + """When no callback is set (in-container path, or before + activation), record() proceeds normally — no None-call crash.""" + # No set_notification_callback() in this test — autouse fixture + # cleared any previous registration. + state.record(_msg("act-1")) + assert len(state.peek(10)) == 1 + + +def test_set_notification_callback_replaces_previous(state: inbox.InboxState): + """Re-registering the callback replaces the previous — only the + latest callback fires. Test ensures the universal wheel can update + the bridge if its asyncio loop is replaced (e.g. graceful restart).""" + first: list[dict] = [] + second: list[dict] = [] + inbox.set_notification_callback(first.append) + inbox.set_notification_callback(second.append) + + state.record(_msg("act-1")) + + assert len(first) == 0, "first callback should be unregistered" + assert len(second) == 1, "second callback should receive the event" + + +def test_set_notification_callback_none_clears(state: inbox.InboxState): + """Setting None clears the callback — used by tests + the wheel's + shutdown path.""" + received: list[dict] = [] + inbox.set_notification_callback(received.append) + inbox.set_notification_callback(None) + + state.record(_msg("act-1")) + + assert received == []