forked from molecule-ai/molecule-core
Merge pull request #2433 from Molecule-AI/feat/mcp-channel-notifications
feat(mcp): notifications/claude/channel for push-feel inbox UX
This commit is contained in:
commit
c901d52ee3
@ -52,11 +52,13 @@ def smoke_imports_and_invariants() -> None:
|
||||
InboxState,
|
||||
activate as inbox_activate,
|
||||
get_state as inbox_get_state,
|
||||
set_notification_callback as inbox_set_notification_callback,
|
||||
start_poller_thread as inbox_start_poller_thread,
|
||||
)
|
||||
assert callable(inbox_activate), "inbox.activate must be callable"
|
||||
assert callable(inbox_get_state), "inbox.get_state must be callable"
|
||||
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
|
||||
assert callable(inbox_set_notification_callback), "inbox.set_notification_callback must be callable"
|
||||
|
||||
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
|
||||
assert callable(get_adapter), "adapters.get_adapter must be callable"
|
||||
|
||||
@ -130,6 +130,44 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
return f"Unknown tool: {name}"
|
||||
|
||||
|
||||
# --- MCP Notification bridge ---
|
||||
|
||||
# `notifications/claude/channel` matches the contract used by the
|
||||
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
|
||||
# MCP runtime treats this method as a conversation interrupt — `content`
|
||||
# becomes the agent turn, `meta` is structured metadata. Notification-
|
||||
# capable hosts (Claude Code today; any compliant client tomorrow)
|
||||
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
|
||||
# still work unchanged. See task #46 + the deprecation path documented
|
||||
# in workspace/inbox.py:set_notification_callback.
|
||||
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
|
||||
|
||||
|
||||
def _build_channel_notification(msg: dict) -> dict:
|
||||
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
|
||||
envelope expected by Claude Code's channel-bridge contract.
|
||||
|
||||
Pure function so the wire shape is unit-testable without spinning
|
||||
up an asyncio loop. The wire-up in ``main()`` just composes this
|
||||
with ``asyncio.run_coroutine_threadsafe``.
|
||||
"""
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"method": _CHANNEL_NOTIFICATION_METHOD,
|
||||
"params": {
|
||||
"content": msg.get("text", ""),
|
||||
"meta": {
|
||||
"source": "molecule",
|
||||
"kind": msg.get("kind", ""),
|
||||
"peer_id": msg.get("peer_id", ""),
|
||||
"method": msg.get("method", ""),
|
||||
"activity_id": msg.get("activity_id", ""),
|
||||
"ts": msg.get("created_at", ""),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
@ -148,6 +186,35 @@ async def main(): # pragma: no cover
|
||||
writer.write(data.encode())
|
||||
await writer.drain()
|
||||
|
||||
# Wire the inbox → MCP notification bridge. Inbox poller (daemon
|
||||
# thread) calls into here when a new activity row lands; we
|
||||
# schedule the notification onto the asyncio loop and best-effort
|
||||
# fire it on the same stdout the responses go to.
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
async def _emit_notification(payload: dict) -> None:
|
||||
data = json.dumps(payload) + "\n"
|
||||
writer.write(data.encode())
|
||||
try:
|
||||
await writer.drain()
|
||||
except Exception: # noqa: BLE001
|
||||
# Closed pipe (host disconnected) shouldn't crash the
|
||||
# inbox poller; let it sit until the host reconnects.
|
||||
pass
|
||||
|
||||
def _on_inbox_message(msg: dict) -> None:
|
||||
try:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
_emit_notification(_build_channel_notification(msg)),
|
||||
loop,
|
||||
)
|
||||
except RuntimeError:
|
||||
# Loop closed during shutdown — best-effort, swallow.
|
||||
pass
|
||||
|
||||
import inbox as _inbox_module
|
||||
_inbox_module.set_notification_callback(_on_inbox_message)
|
||||
|
||||
buffer = ""
|
||||
while True:
|
||||
try:
|
||||
|
||||
@ -53,7 +53,7 @@ import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -173,10 +173,14 @@ class InboxState:
|
||||
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
|
||||
|
||||
def record(self, message: InboxMessage) -> None:
|
||||
"""Append a message and wake any waiter.
|
||||
"""Append a message, wake any waiter, and fire the notification
|
||||
callback (if registered) for push-UX-capable hosts.
|
||||
|
||||
Skips a row whose activity_id we've already queued — defensive
|
||||
against the poller racing with the consumer + cursor save.
|
||||
against the poller racing with the consumer + cursor save. The
|
||||
dedupe short-circuits BEFORE the notification fires, so a
|
||||
notification-capable host doesn't see duplicate push events on
|
||||
backlog overlap.
|
||||
"""
|
||||
with self._lock:
|
||||
for existing in self._queue:
|
||||
@ -184,6 +188,19 @@ class InboxState:
|
||||
return
|
||||
self._queue.append(message)
|
||||
self._arrival.set()
|
||||
# Fire notification AFTER releasing the lock so the callback
|
||||
# is free to do anything (including calling back into inbox)
|
||||
# without deadlock. Best-effort: a raising callback must not
|
||||
# prevent the message from landing in the queue — observability
|
||||
# is more important than push delivery.
|
||||
cb = _NOTIFICATION_CALLBACK
|
||||
if cb is not None:
|
||||
try:
|
||||
cb(message.to_dict())
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"inbox: notification callback raised", exc_info=True
|
||||
)
|
||||
|
||||
def peek(self, limit: int = 10) -> list[InboxMessage]:
|
||||
"""Return up to ``limit`` pending messages without removing them."""
|
||||
@ -240,6 +257,35 @@ class InboxState:
|
||||
_STATE: InboxState | None = None
|
||||
|
||||
|
||||
# Notification bridge — set by the universal MCP server (a2a_mcp_server.py)
|
||||
# at startup so that new inbox arrivals can be pushed to notification-
|
||||
# capable hosts (Claude Code) as MCP `notifications/claude/channel`
|
||||
# events. Kept module-level (rather than a method on InboxState) so the
|
||||
# inbox doesn't need to know about MCP — a thin pluggable seam.
|
||||
#
|
||||
# Defaults to None: in-container runtimes that don't activate the inbox
|
||||
# also don't push notifications, and tests start clean. The wheel's
|
||||
# wiring is exercised by tests/test_a2a_mcp_server.py + the bridge
|
||||
# tests below.
|
||||
_NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None
|
||||
|
||||
|
||||
def set_notification_callback(cb: Callable[[dict], None] | None) -> None:
|
||||
"""Register (or clear) the per-message notification callback.
|
||||
|
||||
The callback receives ``InboxMessage.to_dict()`` for each new
|
||||
arrival — same shape ``inbox_peek`` returns to the agent, so a
|
||||
bridge can build its MCP notification payload without re-deriving
|
||||
fields.
|
||||
|
||||
Best-effort: a raising callback does NOT prevent the message from
|
||||
landing in the queue (see ``InboxState.record``). Pass ``None`` to
|
||||
clear (used by tests + the wheel's shutdown path).
|
||||
"""
|
||||
global _NOTIFICATION_CALLBACK
|
||||
_NOTIFICATION_CALLBACK = cb
|
||||
|
||||
|
||||
def activate(state: InboxState) -> None:
|
||||
"""Register an InboxState as the singleton this module exposes.
|
||||
|
||||
|
||||
@ -138,3 +138,102 @@ def test_attachments_param_description_emphasizes_REQUIRED():
|
||||
assert forbidden in desc, (
|
||||
f"`attachments` description must call out {forbidden!r} as a wrong alternative"
|
||||
)
|
||||
|
||||
|
||||
# ============== Inbox → MCP notification bridge (2026-05-01) ==============
|
||||
# Notification-capable hosts (Claude Code) get push UX when a new inbound
|
||||
# message lands; pollers (wait_for_message/inbox_peek) keep working.
|
||||
# `_build_channel_notification` is the pure shape transformer — wire-up
|
||||
# in main() composes it with asyncio.run_coroutine_threadsafe.
|
||||
|
||||
|
||||
def test_build_channel_notification_method_matches_claude_contract():
|
||||
"""Method MUST be `notifications/claude/channel` exactly — that's
|
||||
what Claude Code's MCP runtime listens for as a conversation
|
||||
interrupt. Same string as the bun channel bridge sends
|
||||
(server.ts:509) so this is a drop-in replacement."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
|
||||
|
||||
def test_build_channel_notification_content_is_message_text():
|
||||
"""`content` is what becomes the agent conversation turn —
|
||||
pulled directly from the inbox message text."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello from canvas",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["params"]["content"] == "hello from canvas"
|
||||
|
||||
|
||||
def test_build_channel_notification_meta_carries_routing_fields():
|
||||
"""Meta must include kind, peer_id, method, activity_id, ts —
|
||||
fields the agent or downstream tooling needs to route a reply
|
||||
(canvas_user → /notify, peer_agent → /a2a) and to acknowledge
|
||||
via inbox_pop."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-7",
|
||||
"text": "ping",
|
||||
"peer_id": "ws-peer-uuid",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T01:23:45Z",
|
||||
})
|
||||
meta = payload["params"]["meta"]
|
||||
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "ws-peer-uuid"
|
||||
assert meta["method"] == "message/send"
|
||||
assert meta["activity_id"] == "act-7"
|
||||
assert meta["ts"] == "2026-05-01T01:23:45Z"
|
||||
|
||||
|
||||
def test_build_channel_notification_no_id_field():
|
||||
"""Notifications MUST NOT carry a JSON-RPC `id` field — that's
|
||||
what distinguishes them from requests. A notification with `id`
|
||||
would be mis-interpreted as a request and clients would wait
|
||||
for a response that never comes."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({"text": "x"})
|
||||
|
||||
assert "id" not in payload, (
|
||||
"notifications must omit `id` per JSON-RPC 2.0 spec — "
|
||||
"presence would make MCP clients await a phantom response"
|
||||
)
|
||||
|
||||
|
||||
def test_build_channel_notification_handles_missing_fields_gracefully():
|
||||
"""Some fields may be absent on edge-case messages (e.g. cursor
|
||||
bootstrapping with no created_at yet). Default to empty strings
|
||||
so the wire shape stays valid JSON instead of crashing."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({})
|
||||
|
||||
assert payload["params"]["content"] == ""
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["activity_id"] == ""
|
||||
assert meta["peer_id"] == ""
|
||||
assert meta["kind"] == ""
|
||||
|
||||
@ -442,3 +442,113 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
|
||||
def test_default_cursor_path_falls_back_to_default(monkeypatch):
|
||||
monkeypatch.delenv("CONFIGS_DIR", raising=False)
|
||||
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Notification callback bridge — push UX for notification-capable hosts
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# `record()` is called from the poller daemon thread when a new activity
|
||||
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
|
||||
# pushed a notification — the universal wheel registers a callback via
|
||||
# `set_notification_callback()` that fires the MCP notification. Pollers
|
||||
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_notification_callback():
|
||||
"""Each test starts with no callback registered. Notification
|
||||
state must not leak across tests — same pattern as _reset_singleton."""
|
||||
inbox.set_notification_callback(None)
|
||||
yield
|
||||
inbox.set_notification_callback(None)
|
||||
|
||||
|
||||
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
|
||||
"""When a callback is registered, record() invokes it with the
|
||||
canonical to_dict() shape — same shape inbox_peek returns to the
|
||||
agent. Callers can build MCP notification payloads from this
|
||||
without re-deriving fields."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
|
||||
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
|
||||
|
||||
assert len(received) == 1
|
||||
payload = received[0]
|
||||
assert payload["activity_id"] == "act-1"
|
||||
assert payload["text"] == "hello"
|
||||
assert payload["peer_id"] == "ws-peer"
|
||||
assert payload["kind"] == "peer_agent" # to_dict derives this
|
||||
assert payload["method"] == "message/send"
|
||||
|
||||
|
||||
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
|
||||
"""The activity_id dedupe path must short-circuit BEFORE invoking
|
||||
the callback — otherwise a notification-capable host would see
|
||||
duplicate push events on poller backlog overlap."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
state.record(_msg("act-1")) # dedupe — same id
|
||||
|
||||
assert len(received) == 1, (
|
||||
f"expected 1 callback (dedupe), got {len(received)} — "
|
||||
f"would cause duplicate Claude conversation interrupts"
|
||||
)
|
||||
|
||||
|
||||
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
|
||||
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
|
||||
serialization error on an exotic message) must NOT prevent the
|
||||
message from landing in the queue. Notification delivery is
|
||||
best-effort; inbox correctness is not negotiable."""
|
||||
|
||||
def boom(_payload):
|
||||
raise RuntimeError("simulated callback failure")
|
||||
|
||||
inbox.set_notification_callback(boom)
|
||||
|
||||
# Must not raise, must still queue the message.
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
queued = state.peek(10)
|
||||
assert len(queued) == 1
|
||||
assert queued[0].activity_id == "act-1"
|
||||
|
||||
|
||||
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
|
||||
"""When no callback is set (in-container path, or before
|
||||
activation), record() proceeds normally — no None-call crash."""
|
||||
# No set_notification_callback() in this test — autouse fixture
|
||||
# cleared any previous registration.
|
||||
state.record(_msg("act-1"))
|
||||
assert len(state.peek(10)) == 1
|
||||
|
||||
|
||||
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
|
||||
"""Re-registering the callback replaces the previous — only the
|
||||
latest callback fires. Test ensures the universal wheel can update
|
||||
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
|
||||
first: list[dict] = []
|
||||
second: list[dict] = []
|
||||
inbox.set_notification_callback(first.append)
|
||||
inbox.set_notification_callback(second.append)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
assert len(first) == 0, "first callback should be unregistered"
|
||||
assert len(second) == 1, "second callback should receive the event"
|
||||
|
||||
|
||||
def test_set_notification_callback_none_clears(state: inbox.InboxState):
|
||||
"""Setting None clears the callback — used by tests + the wheel's
|
||||
shutdown path."""
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
inbox.set_notification_callback(None)
|
||||
|
||||
state.record(_msg("act-1"))
|
||||
|
||||
assert received == []
|
||||
|
||||
Loading…
Reference in New Issue
Block a user