"""In-memory inbox + background poller for the standalone molecule-mcp path. Purpose ------- The universal MCP server (a2a_mcp_server.py) is OUTBOUND-ONLY by default — it gives an MCP-aware agent the same A2A delegation, peer-discovery, and memory tools that container-bound runtimes already have. There is no inbound delivery path: when the canvas user types a message or a peer sends an A2A request, the activity lands on the platform but the standalone agent never sees it. This module closes that gap WITHOUT requiring a tunnel or a public agent URL. A daemon thread polls ``/workspaces/:id/activity?type=a2a_receive`` on the platform and stages new rows in an in-memory deque. Three new MCP tools (``inbox_peek``, ``inbox_pop``, ``wait_for_message``) let the agent observe the queue. Why a poller (not push) ----------------------- runtime=external workspaces have ``delivery_mode="poll"`` — the platform records inbound A2A in ``activity_logs`` but does not call back to the agent. A poller is the only inbound surface that works without the operator exposing a public URL through a tunnel. 5s cadence matches the molecule-mcp-claude-channel plugin's POLL_INTERVAL — it's already proven on staging for the channel-based delivery path. Cursor model ------------ ``activity_logs.id`` is the cursor (server-assigned, monotonic). We persist it to ``${CONFIGS_DIR}/.mcp_inbox_cursor`` so an agent restart doesn't replay the last 10 minutes of inbound traffic and re-act on already-handled messages. On 410 (cursor pruned) we drop back to ``since_secs=600`` for a bounded backlog and let the cursor advance naturally from there. Scope ----- Standalone molecule-mcp ONLY. The in-container runtime has its own push delivery (main.py + canvas WebSocket); we never want both running at once or a single message would be delivered twice. The caller (mcp_cli.main) gates activation explicitly via ``activate(state)``; in-container code that imports this module by accident gets a no-op until activate is called. """ from __future__ import annotations import json import logging import os import threading import time from collections import deque from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable import configs_dir logger = logging.getLogger(__name__) # Poll cadence. 5s mirrors the molecule-mcp-claude-channel plugin's # proven default — fast enough that a canvas user typing "are you # there?" gets picked up before they refresh, slow enough that 12 # requests/min won't trip rate limits or wake mobile devices. POLL_INTERVAL_SECONDS = 5.0 # Initial backlog window for the first poll AND the recovery path # after a stale-cursor 410. 10 minutes is enough to cover a brief # crash/restart without flooding a long-idle workspace with hours of # stale chat. INITIAL_BACKLOG_SECONDS = 600 # Hard cap on the in-memory deque. The poller is bounded by the # server's per-page limit (default 100) and the agent typically pops # faster than the operator types, so an idle workspace shouldn't # exceed a handful. The cap protects against runaway growth if the # agent process stops calling pop. MAX_QUEUED_MESSAGES = 200 @dataclass class InboxMessage: """One inbound A2A message staged for the agent. Mirrors the shape the agent sees via inbox_peek / wait_for_message. Fields are derived from the activity_logs row by ``_from_activity``. """ activity_id: str text: str peer_id: str # empty string = canvas user; non-empty = peer workspace_id method: str # JSON-RPC method ("message/send", "tasks/send", etc.) created_at: str # RFC3339 timestamp from the activity row # Which OF MY workspaces did this message arrive on. Only meaningful # for the multi-workspace external agent (one process registered # against multiple workspaces). Empty string = single-workspace # path / pre-multi-workspace caller — back-compat with consumers # that don't set it. Tools like send_message_to_user use this to # know which workspace's identity to reply with. arrival_workspace_id: str = "" def to_dict(self) -> dict[str, Any]: d = { "activity_id": self.activity_id, "text": self.text, "peer_id": self.peer_id, "kind": "peer_agent" if self.peer_id else "canvas_user", "method": self.method, "created_at": self.created_at, } # Only surface arrival_workspace_id when it's set, so single- # workspace consumers don't see a new key in their existing # output. if self.arrival_workspace_id: d["arrival_workspace_id"] = self.arrival_workspace_id return d @dataclass class InboxState: """Thread-safe queue of pending inbound messages. Producer: the poller thread(s), calling ``record(message)``. Consumers: the MCP tool handlers, calling ``peek``, ``pop``, or ``wait``. Synchronization is via a single ``threading.Lock`` (cheap — every operation is O(n) over a small deque) plus an ``Event`` that wakes ``wait`` callers when a new message lands. Cursors are per-workspace. Single-workspace operators construct with ``InboxState(cursor_path=...)`` (back-compat — the path becomes the cursor file for the empty-string workspace_id key). Multi-workspace operators construct with ``InboxState(cursor_paths={wsid: path,...})`` so each poller advances its own cursor independently — one workspace's slow poll can't stall another's, and a 410 on one cursor only resets that one. """ cursor_path: Path | None = None """Single-workspace cursor file. Sets ``cursor_paths[""]`` if ``cursor_paths`` not also supplied. Kept on the dataclass for back-compat — existing callers pass ``cursor_path=`` positionally.""" cursor_paths: dict[str, Path] = field(default_factory=dict) """Per-workspace cursor files keyed by workspace_id. Multi-workspace pollers each own their own row here.""" _queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES)) _lock: threading.Lock = field(default_factory=threading.Lock) _arrival: threading.Event = field(default_factory=threading.Event) _cursors: dict[str, str | None] = field(default_factory=dict) _cursors_loaded: dict[str, bool] = field(default_factory=dict) def __post_init__(self) -> None: # Back-compat: single-workspace constructor passes # cursor_path=Path(...). Promote it into the dict under the # empty-string key so the lookup APIs are uniform. if self.cursor_path is not None and "" not in self.cursor_paths: self.cursor_paths[""] = self.cursor_path def _path_for(self, workspace_id: str) -> Path | None: """Resolve the cursor path for a workspace_id key, or None.""" return self.cursor_paths.get(workspace_id or "") def load_cursor(self, workspace_id: str = "") -> str | None: """Read the persisted cursor from disk. Cached after first call. Missing/unreadable file → None (poller will fall back to the initial-backlog window). We never raise: a corrupt cursor is less bad than the inbox refusing to start. ``workspace_id=""`` is the single-workspace path, untouched. """ path = self._path_for(workspace_id) with self._lock: if self._cursors_loaded.get(workspace_id): return self._cursors.get(workspace_id) cursor: str | None = None if path is not None: try: if path.is_file(): cursor = path.read_text().strip() or None except OSError as exc: logger.warning("inbox: failed to read cursor %s: %s", path, exc) cursor = None self._cursors[workspace_id] = cursor self._cursors_loaded[workspace_id] = True return cursor def save_cursor(self, activity_id: str, workspace_id: str = "") -> None: """Persist the cursor. Best-effort — log + continue on failure. Loss of the cursor on a write failure means an extra page of backlog after restart, never a stuck poller. Silent-fail would mask a permission misconfiguration on the operator's configs dir; warn loudly so they can fix it. """ path = self._path_for(workspace_id) with self._lock: self._cursors[workspace_id] = activity_id self._cursors_loaded[workspace_id] = True if path is None: return try: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(activity_id) tmp.replace(path) except OSError as exc: logger.warning("inbox: failed to persist cursor to %s: %s", path, exc) def reset_cursor(self, workspace_id: str = "") -> None: """Forget the cursor. Used after a 410 from the activity API.""" path = self._path_for(workspace_id) with self._lock: self._cursors[workspace_id] = None self._cursors_loaded[workspace_id] = True if path is None: return try: if path.is_file(): path.unlink() except OSError as exc: logger.warning("inbox: failed to delete cursor %s: %s", path, exc) def record(self, message: InboxMessage) -> None: """Append a message, wake any waiter, and fire the notification callback (if registered) for push-UX-capable hosts. Skips a row whose activity_id we've already queued — defensive against the poller racing with the consumer + cursor save. The dedupe short-circuits BEFORE the notification fires, so a notification-capable host doesn't see duplicate push events on backlog overlap. """ with self._lock: for existing in self._queue: if existing.activity_id == message.activity_id: return self._queue.append(message) self._arrival.set() # Fire notification AFTER releasing the lock so the callback # is free to do anything (including calling back into inbox) # without deadlock. Best-effort: a raising callback must not # prevent the message from landing in the queue — observability # is more important than push delivery. cb = _NOTIFICATION_CALLBACK if cb is not None: try: cb(message.to_dict()) except Exception: logger.warning( "inbox: notification callback raised", exc_info=True ) def peek(self, limit: int = 10) -> list[InboxMessage]: """Return up to ``limit`` pending messages without removing them.""" if limit <= 0: limit = 10 with self._lock: return list(self._queue)[:limit] def pop(self, activity_id: str) -> InboxMessage | None: """Remove a specific message. Idempotent; returns None if absent. We require the caller to specify which message it handled rather than auto-popping the head — preserves observability when the agent reads several but only handles one. """ with self._lock: for existing in list(self._queue): if existing.activity_id == activity_id: self._queue.remove(existing) if not self._queue: self._arrival.clear() return existing return None def wait(self, timeout_secs: float) -> InboxMessage | None: """Block until a message is available or timeout elapses. Returns the head message WITHOUT popping; the caller decides whether to pop after acting on it. Same shape as Python's Queue.get with timeout, but non-destructive so a peek-style agent can still inspect with peek/pop. """ # Fast path: queue already has something. with self._lock: if self._queue: return self._queue[0] self._arrival.clear() triggered = self._arrival.wait(timeout=max(0.0, timeout_secs)) if not triggered: return None with self._lock: return self._queue[0] if self._queue else None # --------------------------------------------------------------------------- # Module singleton — set by mcp_cli before MCP server starts. # --------------------------------------------------------------------------- # # In-container callers don't activate; the inbox tools detect the # unset singleton and return an informational error rather than # breaking the dispatch path. _STATE: InboxState | None = None # Notification bridge — set by the universal MCP server (a2a_mcp_server.py) # at startup so that new inbox arrivals can be pushed to notification- # capable hosts (Claude Code) as MCP `notifications/claude/channel` # events. Kept module-level (rather than a method on InboxState) so the # inbox doesn't need to know about MCP — a thin pluggable seam. # # Defaults to None: in-container runtimes that don't activate the inbox # also don't push notifications, and tests start clean. The wheel's # wiring is exercised by tests/test_a2a_mcp_server.py + the bridge # tests below. _NOTIFICATION_CALLBACK: Callable[[dict], None] | None = None def set_notification_callback(cb: Callable[[dict], None] | None) -> None: """Register (or clear) the per-message notification callback. The callback receives ``InboxMessage.to_dict()`` for each new arrival — same shape ``inbox_peek`` returns to the agent, so a bridge can build its MCP notification payload without re-deriving fields. Best-effort: a raising callback does NOT prevent the message from landing in the queue (see ``InboxState.record``). Pass ``None`` to clear (used by tests + the wheel's shutdown path). """ global _NOTIFICATION_CALLBACK _NOTIFICATION_CALLBACK = cb def activate(state: InboxState) -> None: """Register an InboxState as the singleton this module exposes. Idempotent within a process: re-activating with the same state is a no-op; activating with a DIFFERENT state replaces the singleton + logs at WARNING (the only legitimate caller is mcp_cli at startup; double-activate usually means a test/runtime mix-up). """ global _STATE if _STATE is state: return if _STATE is not None: logger.warning("inbox: replacing existing singleton state") _STATE = state def get_state() -> InboxState | None: """Return the active InboxState, or None if the runtime never activated. Tool implementations call this and surface a clear "(inbox not enabled)" message to the agent when None — keeps the in-container path's tool dispatch from raising on an inbox-tool call that the agent shouldn't have made anyway. """ return _STATE # --------------------------------------------------------------------------- # Activity → InboxMessage adapter # --------------------------------------------------------------------------- # # The platform's a2a_proxy logs request_body as the JSON-RPC envelope # it forwarded to the workspace. Three shapes have been observed in # the wild (verified against workspace-server's logA2ASuccess in # a2a_proxy_helpers.go on 2026-04-29) — handle all three before # falling back to summary so a peer message at least surfaces SOMETHING. def _extract_text(request_body: Any, summary: str | None) -> str: """Pull the human-readable text out of an A2A activity row. Mirrors molecule-mcp-claude-channel/server.ts:445 (extractText) so canvas-user messages and peer-agent messages render identically across both inbound channels. """ if not isinstance(request_body, dict): return summary or "(empty A2A message)" candidates: list[Any] = [] params = request_body.get("params") if isinstance(request_body.get("params"), dict) else None if params: message = params.get("message") if isinstance(params.get("message"), dict) else None if message: candidates.append(message.get("parts")) candidates.append(params.get("parts")) candidates.append(request_body.get("parts")) # The A2A protocol's part discriminator field varies between SDK # versions: a2a-sdk v0 uses ``type``, v1 uses ``kind``. The platform's # activity_logs preserves whichever the original sender used, so we # accept either. Verified live against a hosted SaaS workspace on # 2026-04-30 — every canvas-user message arrived with ``kind`` and # the type-only filter was silently falling through to summary. for parts in candidates: if isinstance(parts, list): text = "".join( p.get("text", "") for p in parts if isinstance(p, dict) and (p.get("kind") == "text" or p.get("type") == "text") ) if text: return text return summary or "(empty A2A message)" def _is_self_notify_row(row: dict[str, Any]) -> bool: """Return True if ``row`` is the agent's own send_message_to_user POST surfacing back through the activity API. The shape (workspace-server handlers/activity.go, ``Notify`` writer): method='notify' AND no peer (source_id is None or '') Matched on both fields together so a future caller using ``method='notify'`` for a different purpose with a real peer_id still passes through. """ if row.get("method") != "notify": return False source_id = row.get("source_id") return source_id is None or source_id == "" def message_from_activity(row: dict[str, Any]) -> InboxMessage: """Convert one /activity row into an InboxMessage. Mutates ``row['request_body']`` in-place to swap any ``platform-pending:`` URIs to the locally-staged ``workspace:`` URIs (see ``inbox_uploads.rewrite_request_body``) — by the time the upstream chat message arrives via this path, the upload-receive row that staged the bytes has already populated the URI cache (lower activity_logs.id, processed earlier in the same poll batch). A cache miss leaves the URI untouched; the agent surfaces an unresolvable URI rather than the inbox silently dropping the part. """ request_body = row.get("request_body") if isinstance(request_body, str): # The Go handler returns request_body as json.RawMessage; httpx # deserializes that to a dict already. But some legacy paths or # mocked servers may return it as a string — handle defensively. try: request_body = json.loads(request_body) except (TypeError, ValueError): request_body = None # Rewrite platform-pending: URIs → workspace: URIs in-place. Imported # at call time to keep the import graph clean for the in-container # path that doesn't use this module (also avoids a circular: the # uploads module is small enough that re-importing per call is # cheap, and the Python import cache makes it free after the first). from inbox_uploads import rewrite_request_body rewrite_request_body(request_body) return InboxMessage( activity_id=str(row.get("id", "")), text=_extract_text(request_body, row.get("summary")), peer_id=row.get("source_id") or "", method=row.get("method") or "", created_at=str(row.get("created_at", "")), ) # --------------------------------------------------------------------------- # Poller — daemon thread that fills the queue from the activity API # --------------------------------------------------------------------------- def _poll_once( state: InboxState, platform_url: str, workspace_id: str, headers: dict[str, str], timeout_secs: float = 10.0, ) -> int: """One poll iteration. Returns number of new messages enqueued. Idempotent and stateless apart from the InboxState passed in — safe to call from tests with a stub state + a real httpx mock. ``workspace_id`` doubles as the cursor key on InboxState — pollers for distinct workspaces get distinct cursors and don't trample each other. For the single-workspace path the cursor key is the empty string (per InboxState.__post_init__'s back-compat promotion of ``cursor_path``). """ import httpx url = f"{platform_url}/workspaces/{workspace_id}/activity" # Dual cursor key resolution: in single-workspace mode the cursor # was historically stored under the "" key (back-compat). In # multi-workspace mode each poller's cursor lives under its own # workspace_id. Try the workspace-specific key first; if absent on # this state, fall back to the legacy empty-string slot so existing # InboxState-with-cursor_path-only constructors keep working. cursor_key = workspace_id if workspace_id in state.cursor_paths else "" params: dict[str, str] = {"type": "a2a_receive"} cursor = state.load_cursor(cursor_key) if cursor: params["since_id"] = cursor else: params["since_secs"] = str(INITIAL_BACKLOG_SECONDS) try: with httpx.Client(timeout=timeout_secs) as client: resp = client.get(url, params=params, headers=headers) except Exception as exc: # noqa: BLE001 logger.warning("inbox poller: GET /activity failed: %s", exc) return 0 if resp.status_code == 410: # Cursor pruned — drop back to the backlog window. The next # poll picks up wherever the activity API has rows now. logger.info( "inbox poller: cursor %s expired (410); resetting to since_secs=%d", cursor, INITIAL_BACKLOG_SECONDS, ) state.reset_cursor(cursor_key) return 0 if resp.status_code >= 400: logger.warning( "inbox poller: HTTP %d from /activity: %s", resp.status_code, (resp.text or "")[:200], ) return 0 try: rows = resp.json() except ValueError as exc: logger.warning("inbox poller: non-JSON response: %s", exc) return 0 if not isinstance(rows, list): return 0 # since_id mode returns ASC (oldest first). since_secs mode returns # DESC; reverse so we record in chronological order and the cursor # we save is the freshest row. if cursor is None: rows = list(reversed(rows)) # Imported lazily at use-site so a runtime that never sees an # upload-receive row never imports the module. Cheap on the hot # path because Python caches the import. from inbox_uploads import is_chat_upload_row, BatchFetcher new_count = 0 last_id: str | None = None # ``batch_fetcher`` is lazy: a poll batch with no upload rows pays # zero overhead. Once the first upload row appears we open one # BatchFetcher and submit every subsequent upload row to its thread # pool; before processing the FIRST non-upload row we drain the # pool (wait_all) so the URI cache is hot when message rewriting # runs. Without the barrier, the chat message that references the # upload would arrive at the agent with the un-rewritten # platform-pending: URI. batch_fetcher: BatchFetcher | None = None def _drain_uploads(bf: BatchFetcher | None) -> None: if bf is None: return bf.wait_all() bf.close() for row in rows: if not isinstance(row, dict): continue if is_chat_upload_row(row): # Side-effect row from the platform's poll-mode chat-upload # handler — fetch the bytes, stage to /workspace/.molecule/ # chat-uploads, ack. NOT enqueued as an InboxMessage; the # agent will see the chat message that REFERENCES this # upload via a separate (later) activity row, with the # pending: URI rewritten to a workspace: URI by # message_from_activity. We DO advance the cursor past # this row so a permanent network outage on /content # doesn't stall the cursor and block real chat traffic. if batch_fetcher is None: batch_fetcher = BatchFetcher( platform_url=platform_url, workspace_id=workspace_id, headers=headers, ) batch_fetcher.submit(row) last_id = str(row.get("id", "")) or last_id continue # Non-upload row: drain any pending uploads first so the URI # cache is populated before we run rewrite_request_body / # message_from_activity on a row that may reference one. if batch_fetcher is not None: _drain_uploads(batch_fetcher) batch_fetcher = None if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with # activity_type='a2a_receive', method='notify', and no # source_id, so the canvas chat-history loader can restore # those bubbles after a page reload (handlers/activity.go, # comment block at line 428). The activity API exposes that # filter only on type, so the same row otherwise lands in # this poll and gets pushed back to the agent — confirmed # live 2026-05-01: agent observed its own outbound as an # inbound `← molecule: Agent message: ...`. Filter here # belt-and-braces; the long-term fix is upstream renaming # the activity_type to `agent_outbound` (molecule-core # #2469). Once that lands, this filter becomes redundant # but stays in place because it only excludes rows we never # want, so removing it would just be churn. # # NB: still call save_cursor for these rows below — we # advance past them so the next poll doesn't keep re-seeing # the same self-notify on every iteration. last_id = str(row.get("id", "")) or last_id continue message = message_from_activity(row) if not message.activity_id: continue # Tag the message with the workspace it arrived on so the agent # (and tools like send_message_to_user) can route the reply to # the right tenant. Empty-string in single-workspace mode keeps # to_dict()'s output shape unchanged for back-compat consumers. message.arrival_workspace_id = workspace_id if cursor_key else "" state.record(message) last_id = message.activity_id new_count += 1 # Drain any uploads still in flight if the batch ended with upload # rows (no chat-message row to trigger the inline drain). Without # this, a future poll that picks up the chat-message row first # would race with the still-running fetches. if batch_fetcher is not None: _drain_uploads(batch_fetcher) if last_id is not None: state.save_cursor(last_id, cursor_key) return new_count def _poll_loop( state: InboxState, platform_url: str, workspace_id: str, interval: float = POLL_INTERVAL_SECONDS, stop_event: threading.Event | None = None, ) -> None: """Daemon-thread body: poll forever until stop_event fires. auth_headers(workspace_id) is rebuilt every iteration so a token rotation via env var, .auth_token file, or per-workspace registry is picked up without a restart. Cheap (a dict + an env read). Multi-workspace pollers pass the workspace_id so the per-workspace bearer token is selected from platform_auth's registry; single- workspace pollers fall through to the legacy resolution path (workspace_id arg is still passed but the registry lookup misses and auth_headers falls back to the cached/file/env token). """ from platform_auth import auth_headers while True: try: _poll_once(state, platform_url, workspace_id, auth_headers(workspace_id)) except Exception as exc: # noqa: BLE001 logger.warning("inbox poller: iteration crashed: %s", exc) if stop_event is not None and stop_event.wait(interval): return if stop_event is None: time.sleep(interval) def start_poller_thread( state: InboxState, platform_url: str, workspace_id: str, interval: float = POLL_INTERVAL_SECONDS, stop_event: threading.Event | None = None, ) -> threading.Thread: """Spawn the poller as a daemon thread. Returns the Thread handle. daemon=True so the poller dies with the main process — same rationale as mcp_cli's heartbeat thread (no leaks, no stale workspace writes after the operator hits Ctrl-C). Thread name embeds the workspace_id (truncated) so a multi-workspace operator running ``ps -eL`` or eyeballing ``threading.enumerate()`` can tell which thread is which without reverse-engineering it from crash tracebacks. Pass ``stop_event`` to enable graceful shutdown — used by tests so the daemon thread doesn't outlive the test that started it and race with later tests' httpx patches. Production code passes None and relies on the daemon flag for process-exit cleanup. """ name = "molecule-mcp-inbox-poller" if workspace_id: name = f"{name}-{workspace_id[:8]}" t = threading.Thread( target=_poll_loop, args=(state, platform_url, workspace_id, interval, stop_event), name=name, daemon=True, ) t.start() return t def default_cursor_path(workspace_id: str = "") -> Path: """Standard cursor location: ``/.mcp_inbox_cursor``. Resolved via configs_dir so the cursor lives next to .auth_token + .platform_inbound_secret regardless of whether the runtime is in-container (/configs) or external (~/.molecule-workspace). Multi-workspace operators pass ``workspace_id`` to get a unique cursor file per workspace (``.mcp_inbox_cursor_``) so pollers don't trample each other's cursors. Single-workspace operators omit the arg and keep the legacy filename — back-compat with existing on-disk cursors. """ base = configs_dir.resolve() / ".mcp_inbox_cursor" if workspace_id: # 8-char prefix is enough to disambiguate two workspaces in the # same operator's setup (UUID v4 first 32 bits ≈ 4 billion of # entropy) without hash-bombing the filename. return base.with_name(f".mcp_inbox_cursor_{workspace_id[:8]}") return base