feat(mcp): notifications/claude/channel for push-feel inbox UX

Adds a notification seam to the universal molecule-mcp wheel so push-
notification-capable MCP hosts (Claude Code today; any compliant
client tomorrow) get inbound A2A messages as conversation interrupts
instead of having to poll wait_for_message / inbox_peek.

Wire-up:
- inbox.py: module-level _NOTIFICATION_CALLBACK + set_notification_callback()
  Fires from InboxState.record() AFTER lock release, with same dict
  shape inbox_peek returns. Best-effort — a raising callback never
  prevents the message from landing in the queue.
- a2a_mcp_server.py: _build_channel_notification() pure helper +
  bridge wiring in main() that schedules notifications via
  asyncio.run_coroutine_threadsafe (poller is a daemon thread, MCP
  loop is asyncio).
- Method name 'notifications/claude/channel' matches the contract
  documented in molecule-mcp-claude-channel/server.ts:509.
- wheel_smoke.py: pin set_notification_callback as a published name,
  same regression class as the 0.1.16 main_sync incident.

Pollers (wait_for_message / inbox_peek) keep working unchanged for
runtimes without notification support.

Tests: 6 new in test_inbox.py (callback fires once on record, dedupe
short-circuits before fire, raising cb doesn't break inbox, set/clear
semantics), 5 new in test_a2a_mcp_server.py (method name pin, content
mapping, meta routing, no-id JSON-RPC notification spec, missing-
field tolerance). All 59 combined tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-30 20:10:01 -07:00
parent 0c51df989b
commit 0a3ec53f34
5 changed files with 327 additions and 3 deletions

View File

@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None:
InboxState,
activate as inbox_activate,
get_state as inbox_get_state,
set_notification_callback as inbox_set_notification_callback,
start_poller_thread as inbox_start_poller_thread,
)
assert callable(inbox_activate), "inbox.activate must be callable"
assert callable(inbox_get_state), "inbox.get_state must be callable"
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable"
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
assert callable(get_adapter), "adapters.get_adapter must be callable"

View File

@ -130,6 +130,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
return f"Unknown tool: {name}"
# --- MCP Notification bridge ---
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
def _build_channel_notification(msg: dict) -> dict:
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
envelope expected by Claude Code's channel-bridge contract.
Pure function so the wire shape is unit-testable without spinning
up an asyncio loop. The wire-up in ``main()`` just composes this
with ``asyncio.run_coroutine_threadsafe``.
"""
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"params": {
"content": msg.get("text", ""),
"meta": {
"source": "molecule",
"kind": msg.get("kind", ""),
"peer_id": msg.get("peer_id", ""),
"method": msg.get("method", ""),
"activity_id": msg.get("activity_id", ""),
"ts": msg.get("created_at", ""),
},
},
}
# --- MCP Server (JSON-RPC over stdio) ---
async def main(): # pragma: no cover
@ -148,6 +186,35 @@ async def main(): # pragma: no cover
writer.write(data.encode())
await writer.drain()
# Wire the inbox → MCP notification bridge. Inbox poller (daemon
# thread) calls into here when a new activity row lands; we
# schedule the notification onto the asyncio loop and best-effort
# fire it on the same stdout the responses go to.
loop = asyncio.get_running_loop()
async def _emit_notification(payload: dict) -> None:
data = json.dumps(payload) + "\n"
writer.write(data.encode())
try:
await writer.drain()
except Exception: # noqa: BLE001
# Closed pipe (host disconnected) shouldn't crash the
# inbox poller; let it sit until the host reconnects.
pass
def _on_inbox_message(msg: dict) -> None:
try:
asyncio.run_coroutine_threadsafe(
_emit_notification(_build_channel_notification(msg)),
loop,
)
except RuntimeError:
# Loop closed during shutdown — best-effort, swallow.
pass
import inbox as _inbox_module
_inbox_module.set_notification_callback(_on_inbox_message)
buffer = ""
while True:
try:

View File

@ -53,7 +53,7 @@ import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from typing import Any, Callable
logger = logging.getLogger(__name__)
@ -173,10 +173,14 @@ class InboxState:
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
def record(self, message: InboxMessage) -> None:
"""Append a message and wake any waiter.
"""Append a message, wake any waiter, and fire the notification
callback (if registered) for push-UX-capable hosts.
Skips a row whose activity_id we've already queued — defensive
against the poller racing with the consumer + cursor save.
against the poller racing with the consumer + cursor save. The
dedupe short-circuits BEFORE the notification fires, so a
notification-capable host doesn't see duplicate push events on
backlog overlap.
"""
with self._lock:
for existing in self._queue:
@ -184,6 +188,19 @@ class InboxState:
return
self._queue.append(message)
self._arrival.set()
# Fire notification AFTER releasing the lock so the callback
# is free to do anything (including calling back into inbox)
# without deadlock. Best-effort: a raising callback must not
# prevent the message from landing in the queue — observability
# is more important than push delivery.
cb = _NOTIFICATION_CALLBACK
if cb is not None:
try:
cb(message.to_dict())
except Exception:
logger.warning(
"inbox: notification callback raised", exc_info=True
)
def peek(self, limit: int = 10) -> list[InboxMessage]:
"""Return up to ``limit`` pending messages without removing them."""
@ -240,6 +257,35 @@ class InboxState:
_STATE: InboxState | None = None
# Notification bridge — set by the universal MCP server (a2a_mcp_server.py)
# at startup so that new inbox arrivals can be pushed to notification-
# capable hosts (Claude Code) as MCP `notifications/claude/channel`
# events. Kept module-level (rather than a method on InboxState) so the
# inbox doesn't need to know about MCP — a thin pluggable seam.
#
# Defaults to None: in-container runtimes that don't activate the inbox
# also don't push notifications, and tests start clean. The wheel's
# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge
# tests below.
_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None
def set_notification_callback(cb: Callable[[dict], None] | None) -> None:
"""Register (or clear) the per-message notification callback.
The callback receives ``InboxMessage.to_dict()`` for each new
arrival same shape ``inbox_peek`` returns to the agent, so a
bridge can build its MCP notification payload without re-deriving
fields.
Best-effort: a raising callback does NOT prevent the message from
landing in the queue (see ``InboxState.record``). Pass ``None`` to
clear (used by tests + the wheel's shutdown path).
"""
global _NOTIFICATION_CALLBACK
_NOTIFICATION_CALLBACK = cb
def activate(state: InboxState) -> None:
"""Register an InboxState as the singleton this module exposes.

View File

@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED():
assert forbidden in desc, (
f"`attachments` description must call out {forbidden!r} as a wrong alternative"
)
# ============== Inbox → MCP notification bridge (2026-05-01) ==============
# Notification-capable hosts (Claude Code) get push UX when a new inbound
# message lands; pollers (wait_for_message/inbox_peek) keep working.
# `_build_channel_notification` is the pure shape transformer — wire-up
# in main() composes it with asyncio.run_coroutine_threadsafe.
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
def test_build_channel_notification_content_is_message_text():
"""`content` is what becomes the agent conversation turn —
pulled directly from the inbox message text."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello from canvas",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["params"]["content"] == "hello from canvas"
def test_build_channel_notification_meta_carries_routing_fields():
"""Meta must include kind, peer_id, method, activity_id, ts —
fields the agent or downstream tooling needs to route a reply
(canvas_user /notify, peer_agent /a2a) and to acknowledge
via inbox_pop."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-7",
"text": "ping",
"peer_id": "ws-peer-uuid",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T01:23:45Z",
})
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "ws-peer-uuid"
assert meta["method"] == "message/send"
assert meta["activity_id"] == "act-7"
assert meta["ts"] == "2026-05-01T01:23:45Z"
def test_build_channel_notification_no_id_field():
"""Notifications MUST NOT carry a JSON-RPC `id` field — that's
what distinguishes them from requests. A notification with `id`
would be mis-interpreted as a request and clients would wait
for a response that never comes."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({"text": "x"})
assert "id" not in payload, (
"notifications must omit `id` per JSON-RPC 2.0 spec — "
"presence would make MCP clients await a phantom response"
)
def test_build_channel_notification_handles_missing_fields_gracefully():
"""Some fields may be absent on edge-case messages (e.g. cursor
bootstrapping with no created_at yet). Default to empty strings
so the wire shape stays valid JSON instead of crashing."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({})
assert payload["params"]["content"] == ""
meta = payload["params"]["meta"]
assert meta["activity_id"] == ""
assert meta["peer_id"] == ""
assert meta["kind"] == ""

View File

@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
def test_default_cursor_path_falls_back_to_default(monkeypatch):
monkeypatch.delenv("CONFIGS_DIR", raising=False)
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Notification callback bridge — push UX for notification-capable hosts
# ---------------------------------------------------------------------------
#
# `record()` is called from the poller daemon thread when a new activity
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
# pushed a notification — the universal wheel registers a callback via
# `set_notification_callback()` that fires the MCP notification. Pollers
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
@pytest.fixture(autouse=True)
def _reset_notification_callback():
"""Each test starts with no callback registered. Notification
state must not leak across tests same pattern as _reset_singleton."""
inbox.set_notification_callback(None)
yield
inbox.set_notification_callback(None)
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
"""When a callback is registered, record() invokes it with the
canonical to_dict() shape same shape inbox_peek returns to the
agent. Callers can build MCP notification payloads from this
without re-deriving fields."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
assert len(received) == 1
payload = received[0]
assert payload["activity_id"] == "act-1"
assert payload["text"] == "hello"
assert payload["peer_id"] == "ws-peer"
assert payload["kind"] == "peer_agent" # to_dict derives this
assert payload["method"] == "message/send"
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
"""The activity_id dedupe path must short-circuit BEFORE invoking
the callback otherwise a notification-capable host would see
duplicate push events on poller backlog overlap."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1"))
state.record(_msg("act-1")) # dedupe — same id
assert len(received) == 1, (
f"expected 1 callback (dedupe), got {len(received)}"
f"would cause duplicate Claude conversation interrupts"
)
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
serialization error on an exotic message) must NOT prevent the
message from landing in the queue. Notification delivery is
best-effort; inbox correctness is not negotiable."""
def boom(_payload):
raise RuntimeError("simulated callback failure")
inbox.set_notification_callback(boom)
# Must not raise, must still queue the message.
state.record(_msg("act-1"))
queued = state.peek(10)
assert len(queued) == 1
assert queued[0].activity_id == "act-1"
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
"""When no callback is set (in-container path, or before
activation), record() proceeds normally no None-call crash."""
# No set_notification_callback() in this test — autouse fixture
# cleared any previous registration.
state.record(_msg("act-1"))
assert len(state.peek(10)) == 1
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
"""Re-registering the callback replaces the previous — only the
latest callback fires. Test ensures the universal wheel can update
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
first: list[dict] = []
second: list[dict] = []
inbox.set_notification_callback(first.append)
inbox.set_notification_callback(second.append)
state.record(_msg("act-1"))
assert len(first) == 0, "first callback should be unregistered"
assert len(second) == 1, "second callback should receive the event"
def test_set_notification_callback_none_clears(state: inbox.InboxState):
"""Setting None clears the callback — used by tests + the wheel's
shutdown path."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
inbox.set_notification_callback(None)
state.record(_msg("act-1"))
assert received == []