diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index 74db6b8d..366b86c2 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -64,6 +64,7 @@ TOP_LEVEL_MODULES = { "events", "executor_helpers", "heartbeat", + "inbox", "initial_prompt", "internal_chat_uploads", "internal_file_read", diff --git a/scripts/wheel_smoke.py b/scripts/wheel_smoke.py index b4f14b03..04d5235c 100644 --- a/scripts/wheel_smoke.py +++ b/scripts/wheel_smoke.py @@ -43,6 +43,21 @@ def smoke_imports_and_invariants() -> None: assert callable(cli_main), "a2a_mcp_server.cli_main must be callable" assert callable(mcp_cli_main), "mcp_cli.main must be callable" + # inbox.activate / get_state / start_poller_thread form the inbound + # delivery path for the standalone molecule-mcp wrapper. mcp_cli.main + # imports + activates these at startup; if a wheel ships without + # them, the standalone agent silently loses the wait_for_message / + # inbox_peek / inbox_pop tools and reverts to outbound-only. + from molecule_runtime.inbox import ( # noqa: F401 + InboxState, + activate as inbox_activate, + get_state as inbox_get_state, + start_poller_thread as inbox_start_poller_thread, + ) + assert callable(inbox_activate), "inbox.activate must be callable" + assert callable(inbox_get_state), "inbox.get_state must be callable" + assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable" + assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel" assert callable(get_adapter), "adapters.get_adapter must be callable" assert hasattr(BaseAdapter, "name"), "BaseAdapter interface broken" diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 4e75efc2..9e488f42 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -23,9 +23,12 @@ from a2a_tools import ( tool_delegate_task, tool_delegate_task_async, tool_get_workspace_info, + tool_inbox_peek, + tool_inbox_pop, tool_list_peers, tool_recall_memory, tool_send_message_to_user, + tool_wait_for_message, ) from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS @@ -112,6 +115,18 @@ async def handle_tool_call(name: str, arguments: dict) -> str: arguments.get("query", ""), arguments.get("scope", ""), ) + elif name == "wait_for_message": + return await tool_wait_for_message( + arguments.get("timeout_secs", 60.0), + ) + elif name == "inbox_peek": + return await tool_inbox_peek( + arguments.get("limit", 10), + ) + elif name == "inbox_pop": + return await tool_inbox_pop( + arguments.get("activity_id", ""), + ) return f"Unknown tool: {name}" diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index d5be00bd..6cce6d62 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -526,3 +526,84 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str: return json.dumps(data) except Exception as e: return f"Error recalling memory: {e}" + + +# --------------------------------------------------------------------------- +# Inbox tools — inbound delivery for the standalone molecule-mcp path. +# --------------------------------------------------------------------------- +# +# The InboxState singleton is set by mcp_cli before the MCP server starts +# (see workspace/inbox.py for the rationale). In-container runtimes never +# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and +# these tools surface an informational error rather than raising. +# +# When-to-use guidance (mirrored in platform_tools/registry.py): agents +# in standalone-runtime mode should call ``wait_for_message`` to block +# on the next inbound message after they've emitted a reply, forming +# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting +# the queue without consuming; ``inbox_pop`` removes a handled message. + +_INBOX_NOT_ENABLED_MSG = ( + "Error: inbox polling is not enabled in this runtime. The standalone " + "molecule-mcp wrapper activates it; in-container runtimes receive " + "messages via push delivery and do not need these tools." +) + + +async def tool_inbox_peek(limit: int = 10) -> str: + """Return up to ``limit`` pending inbound messages without removing them.""" + import inbox # local import — avoids a circular dep at module load + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + messages = state.peek(limit=limit if isinstance(limit, int) else 10) + return json.dumps([m.to_dict() for m in messages]) + + +async def tool_inbox_pop(activity_id: str) -> str: + """Remove a message from the inbox queue by activity_id.""" + import inbox + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + if not isinstance(activity_id, str) or not activity_id: + return "Error: activity_id is required." + removed = state.pop(activity_id) + if removed is None: + return json.dumps({"removed": False, "activity_id": activity_id}) + return json.dumps({"removed": True, "activity_id": activity_id}) + + +async def tool_wait_for_message(timeout_secs: float = 60.0) -> str: + """Block until a new message arrives or ``timeout_secs`` elapses. + + Returns the head message non-destructively; the agent decides + whether to ``inbox_pop`` it after acting. + """ + import asyncio + + import inbox + + state = inbox.get_state() + if state is None: + return _INBOX_NOT_ENABLED_MSG + + try: + timeout = float(timeout_secs) + except (TypeError, ValueError): + timeout = 60.0 + # Cap at 300s — Claude Code's default tool timeout is ~10min, and + # blocking longer than 5min wastes the prompt cache window for + # nothing useful. Operators who want longer can call repeatedly. + timeout = max(0.0, min(timeout, 300.0)) + + # The threading.Event-based wait would block the asyncio loop. + # Run it on the default executor so the MCP server can keep + # processing other JSON-RPC requests while we sleep. + loop = asyncio.get_running_loop() + message = await loop.run_in_executor(None, state.wait, timeout) + if message is None: + return json.dumps({"timeout": True, "timeout_secs": timeout}) + return json.dumps(message.to_dict()) diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index a71ce2ee..f3fa177c 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -334,6 +334,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = { # grows a `say` or `message` subcommand, change `None` to that # keyword and the alignment test will start passing. "send_message_to_user": None, + # Inbox tools live in the standalone molecule-mcp wrapper only; + # CLI-subprocess runtimes have their own delivery loop and never + # invoke these. The alignment test allows None entries — they + # appear in registry.TOOLS for adapter consistency without + # forcing a CLI subcommand. + "wait_for_message": None, + "inbox_peek": None, + "inbox_pop": None, } diff --git a/workspace/inbox.py b/workspace/inbox.py new file mode 100644 index 00000000..3674a714 --- /dev/null +++ b/workspace/inbox.py @@ -0,0 +1,480 @@ +"""In-memory inbox + background poller for the standalone molecule-mcp path. + +Purpose +------- +The universal MCP server (a2a_mcp_server.py) is OUTBOUND-ONLY by default — +it gives an MCP-aware agent the same A2A delegation, peer-discovery, and +memory tools that container-bound runtimes already have. There is no +inbound delivery path: when the canvas user types a message or a peer +sends an A2A request, the activity lands on the platform but the +standalone agent never sees it. + +This module closes that gap WITHOUT requiring a tunnel or a public agent +URL. A daemon thread polls ``/workspaces/:id/activity?type=a2a_receive`` +on the platform and stages new rows in an in-memory deque. Three new MCP +tools (``inbox_peek``, ``inbox_pop``, ``wait_for_message``) let the +agent observe the queue. + +Why a poller (not push) +----------------------- +runtime=external workspaces have ``delivery_mode="poll"`` — the platform +records inbound A2A in ``activity_logs`` but does not call back to the +agent. A poller is the only inbound surface that works without the +operator exposing a public URL through a tunnel. 5s cadence matches +the molecule-mcp-claude-channel plugin's POLL_INTERVAL — it's already +proven on staging for the channel-based delivery path. + +Cursor model +------------ +``activity_logs.id`` is the cursor (server-assigned, monotonic). We +persist it to ``${CONFIGS_DIR}/.mcp_inbox_cursor`` so an agent restart +doesn't replay the last 10 minutes of inbound traffic and re-act on +already-handled messages. On 410 (cursor pruned) we drop back to +``since_secs=600`` for a bounded backlog and let the cursor advance +naturally from there. + +Scope +----- +Standalone molecule-mcp ONLY. The in-container runtime has its own +push delivery (main.py + canvas WebSocket); we never want both +running at once or a single message would be delivered twice. The +caller (mcp_cli.main) gates activation explicitly via +``activate(state)``; in-container code that imports this module by +accident gets a no-op until activate is called. +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +import time +from collections import deque +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +# Poll cadence. 5s mirrors the molecule-mcp-claude-channel plugin's +# proven default — fast enough that a canvas user typing "are you +# there?" gets picked up before they refresh, slow enough that 12 +# requests/min won't trip rate limits or wake mobile devices. +POLL_INTERVAL_SECONDS = 5.0 + +# Initial backlog window for the first poll AND the recovery path +# after a stale-cursor 410. 10 minutes is enough to cover a brief +# crash/restart without flooding a long-idle workspace with hours of +# stale chat. +INITIAL_BACKLOG_SECONDS = 600 + +# Hard cap on the in-memory deque. The poller is bounded by the +# server's per-page limit (default 100) and the agent typically pops +# faster than the operator types, so an idle workspace shouldn't +# exceed a handful. The cap protects against runaway growth if the +# agent process stops calling pop. +MAX_QUEUED_MESSAGES = 200 + + +@dataclass +class InboxMessage: + """One inbound A2A message staged for the agent. + + Mirrors the shape the agent sees via inbox_peek / wait_for_message. + Fields are derived from the activity_logs row by ``_from_activity``. + """ + + activity_id: str + text: str + peer_id: str # empty string = canvas user; non-empty = peer workspace_id + method: str # JSON-RPC method ("message/send", "tasks/send", etc.) + created_at: str # RFC3339 timestamp from the activity row + + def to_dict(self) -> dict[str, Any]: + return { + "activity_id": self.activity_id, + "text": self.text, + "peer_id": self.peer_id, + "kind": "peer_agent" if self.peer_id else "canvas_user", + "method": self.method, + "created_at": self.created_at, + } + + +@dataclass +class InboxState: + """Thread-safe queue of pending inbound messages. + + Producer: the poller thread, calling ``record(message)``. + Consumers: the MCP tool handlers, calling ``peek``, ``pop``, + or ``wait``. Synchronization is via a single ``threading.Lock`` + (cheap — every operation is O(n) over a small deque) plus an + ``Event`` that wakes ``wait`` callers when a new message lands. + """ + + cursor_path: Path + """File path that persists ``activity_logs.id`` of the most + recently observed row, so a restart doesn't replay backlog.""" + + _queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES)) + _lock: threading.Lock = field(default_factory=threading.Lock) + _arrival: threading.Event = field(default_factory=threading.Event) + _cursor: str | None = None + _cursor_loaded: bool = False + + def load_cursor(self) -> str | None: + """Read the persisted cursor from disk. Cached after first call. + + Missing/unreadable file → None (poller will fall back to the + initial-backlog window). We never raise: a corrupt cursor is + less bad than the inbox refusing to start. + """ + with self._lock: + if self._cursor_loaded: + return self._cursor + try: + if self.cursor_path.is_file(): + self._cursor = self.cursor_path.read_text().strip() or None + except OSError as exc: + logger.warning("inbox: failed to read cursor %s: %s", self.cursor_path, exc) + self._cursor = None + self._cursor_loaded = True + return self._cursor + + def save_cursor(self, activity_id: str) -> None: + """Persist the cursor. Best-effort — log + continue on failure. + + Loss of the cursor on a write failure means an extra page of + backlog after restart, never a stuck poller. Silent-fail + would mask a permission misconfiguration on the operator's + configs dir; warn loudly so they can fix it. + """ + with self._lock: + self._cursor = activity_id + self._cursor_loaded = True + try: + self.cursor_path.parent.mkdir(parents=True, exist_ok=True) + tmp = self.cursor_path.with_suffix(self.cursor_path.suffix + ".tmp") + tmp.write_text(activity_id) + tmp.replace(self.cursor_path) + except OSError as exc: + logger.warning("inbox: failed to persist cursor to %s: %s", self.cursor_path, exc) + + def reset_cursor(self) -> None: + """Forget the cursor. Used after a 410 from the activity API.""" + with self._lock: + self._cursor = None + self._cursor_loaded = True + try: + if self.cursor_path.is_file(): + self.cursor_path.unlink() + except OSError as exc: + logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc) + + def record(self, message: InboxMessage) -> None: + """Append a message and wake any waiter. + + Skips a row whose activity_id we've already queued — defensive + against the poller racing with the consumer + cursor save. + """ + with self._lock: + for existing in self._queue: + if existing.activity_id == message.activity_id: + return + self._queue.append(message) + self._arrival.set() + + def peek(self, limit: int = 10) -> list[InboxMessage]: + """Return up to ``limit`` pending messages without removing them.""" + if limit <= 0: + limit = 10 + with self._lock: + return list(self._queue)[:limit] + + def pop(self, activity_id: str) -> InboxMessage | None: + """Remove a specific message. Idempotent; returns None if absent. + + We require the caller to specify which message it handled + rather than auto-popping the head — preserves observability + when the agent reads several but only handles one. + """ + with self._lock: + for existing in list(self._queue): + if existing.activity_id == activity_id: + self._queue.remove(existing) + if not self._queue: + self._arrival.clear() + return existing + return None + + def wait(self, timeout_secs: float) -> InboxMessage | None: + """Block until a message is available or timeout elapses. + + Returns the head message WITHOUT popping; the caller decides + whether to pop after acting on it. Same shape as Python's + Queue.get with timeout, but non-destructive so a peek-style + agent can still inspect with peek/pop. + """ + # Fast path: queue already has something. + with self._lock: + if self._queue: + return self._queue[0] + self._arrival.clear() + + triggered = self._arrival.wait(timeout=max(0.0, timeout_secs)) + if not triggered: + return None + with self._lock: + return self._queue[0] if self._queue else None + + +# --------------------------------------------------------------------------- +# Module singleton — set by mcp_cli before MCP server starts. +# --------------------------------------------------------------------------- +# +# In-container callers don't activate; the inbox tools detect the +# unset singleton and return an informational error rather than +# breaking the dispatch path. + +_STATE: InboxState | None = None + + +def activate(state: InboxState) -> None: + """Register an InboxState as the singleton this module exposes. + + Idempotent within a process: re-activating with the same state is + a no-op; activating with a DIFFERENT state replaces the singleton + + logs at WARNING (the only legitimate caller is mcp_cli at + startup; double-activate usually means a test/runtime mix-up). + """ + global _STATE + if _STATE is state: + return + if _STATE is not None: + logger.warning("inbox: replacing existing singleton state") + _STATE = state + + +def get_state() -> InboxState | None: + """Return the active InboxState, or None if the runtime never activated. + + Tool implementations call this and surface a clear "(inbox not + enabled)" message to the agent when None — keeps the in-container + path's tool dispatch from raising on an inbox-tool call that the + agent shouldn't have made anyway. + """ + return _STATE + + +# --------------------------------------------------------------------------- +# Activity → InboxMessage adapter +# --------------------------------------------------------------------------- +# +# The platform's a2a_proxy logs request_body as the JSON-RPC envelope +# it forwarded to the workspace. Three shapes have been observed in +# the wild (verified against workspace-server's logA2ASuccess in +# a2a_proxy_helpers.go on 2026-04-29) — handle all three before +# falling back to summary so a peer message at least surfaces SOMETHING. + + +def _extract_text(request_body: Any, summary: str | None) -> str: + """Pull the human-readable text out of an A2A activity row. + + Mirrors molecule-mcp-claude-channel/server.ts:445 (extractText) so + canvas-user messages and peer-agent messages render identically + across both inbound channels. + """ + if not isinstance(request_body, dict): + return summary or "(empty A2A message)" + + candidates: list[Any] = [] + params = request_body.get("params") if isinstance(request_body.get("params"), dict) else None + if params: + message = params.get("message") if isinstance(params.get("message"), dict) else None + if message: + candidates.append(message.get("parts")) + candidates.append(params.get("parts")) + candidates.append(request_body.get("parts")) + + # The A2A protocol's part discriminator field varies between SDK + # versions: a2a-sdk v0 uses ``type``, v1 uses ``kind``. The platform's + # activity_logs preserves whichever the original sender used, so we + # accept either. Verified live against a hosted SaaS workspace on + # 2026-04-30 — every canvas-user message arrived with ``kind`` and + # the type-only filter was silently falling through to summary. + for parts in candidates: + if isinstance(parts, list): + text = "".join( + p.get("text", "") + for p in parts + if isinstance(p, dict) + and (p.get("kind") == "text" or p.get("type") == "text") + ) + if text: + return text + return summary or "(empty A2A message)" + + +def message_from_activity(row: dict[str, Any]) -> InboxMessage: + """Convert one /activity row into an InboxMessage.""" + request_body = row.get("request_body") + if isinstance(request_body, str): + # The Go handler returns request_body as json.RawMessage; httpx + # deserializes that to a dict already. But some legacy paths or + # mocked servers may return it as a string — handle defensively. + try: + request_body = json.loads(request_body) + except (TypeError, ValueError): + request_body = None + + return InboxMessage( + activity_id=str(row.get("id", "")), + text=_extract_text(request_body, row.get("summary")), + peer_id=row.get("source_id") or "", + method=row.get("method") or "", + created_at=str(row.get("created_at", "")), + ) + + +# --------------------------------------------------------------------------- +# Poller — daemon thread that fills the queue from the activity API +# --------------------------------------------------------------------------- + + +def _poll_once( + state: InboxState, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = 10.0, +) -> int: + """One poll iteration. Returns number of new messages enqueued. + + Idempotent and stateless apart from the InboxState passed in — + safe to call from tests with a stub state + a real httpx mock. + """ + import httpx + + url = f"{platform_url}/workspaces/{workspace_id}/activity" + params: dict[str, str] = {"type": "a2a_receive"} + cursor = state.load_cursor() + if cursor: + params["since_id"] = cursor + else: + params["since_secs"] = str(INITIAL_BACKLOG_SECONDS) + + try: + with httpx.Client(timeout=timeout_secs) as client: + resp = client.get(url, params=params, headers=headers) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox poller: GET /activity failed: %s", exc) + return 0 + + if resp.status_code == 410: + # Cursor pruned — drop back to the backlog window. The next + # poll picks up wherever the activity API has rows now. + logger.info( + "inbox poller: cursor %s expired (410); resetting to since_secs=%d", + cursor, + INITIAL_BACKLOG_SECONDS, + ) + state.reset_cursor() + return 0 + + if resp.status_code >= 400: + logger.warning( + "inbox poller: HTTP %d from /activity: %s", + resp.status_code, + (resp.text or "")[:200], + ) + return 0 + + try: + rows = resp.json() + except ValueError as exc: + logger.warning("inbox poller: non-JSON response: %s", exc) + return 0 + if not isinstance(rows, list): + return 0 + + # since_id mode returns ASC (oldest first). since_secs mode returns + # DESC; reverse so we record in chronological order and the cursor + # we save is the freshest row. + if cursor is None: + rows = list(reversed(rows)) + + new_count = 0 + last_id: str | None = None + for row in rows: + if not isinstance(row, dict): + continue + message = message_from_activity(row) + if not message.activity_id: + continue + state.record(message) + last_id = message.activity_id + new_count += 1 + + if last_id is not None: + state.save_cursor(last_id) + return new_count + + +def _poll_loop( + state: InboxState, + platform_url: str, + workspace_id: str, + interval: float = POLL_INTERVAL_SECONDS, + stop_event: threading.Event | None = None, +) -> None: + """Daemon-thread body: poll forever until stop_event fires. + + auth_headers() is rebuilt every iteration so a token rotation via + env var or .auth_token file is picked up without a restart. Cheap + (a dict + an env read). + """ + from platform_auth import auth_headers + + while True: + try: + _poll_once(state, platform_url, workspace_id, auth_headers()) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox poller: iteration crashed: %s", exc) + if stop_event is not None and stop_event.wait(interval): + return + if stop_event is None: + time.sleep(interval) + + +def start_poller_thread( + state: InboxState, + platform_url: str, + workspace_id: str, + interval: float = POLL_INTERVAL_SECONDS, +) -> threading.Thread: + """Spawn the poller as a daemon thread. Returns the Thread handle. + + daemon=True so the poller dies with the main process — same + rationale as mcp_cli's heartbeat thread (no leaks, no stale + workspace writes after the operator hits Ctrl-C). + """ + t = threading.Thread( + target=_poll_loop, + args=(state, platform_url, workspace_id, interval), + name="molecule-mcp-inbox-poller", + daemon=True, + ) + t.start() + return t + + +def default_cursor_path() -> Path: + """Standard cursor location: ``${CONFIGS_DIR}/.mcp_inbox_cursor``. + + Mirrors mcp_cli's CONFIGS_DIR resolution so a single + operator-facing env var controls every persisted state file + (.auth_token + .mcp_inbox_cursor). + """ + configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs")) + return configs_dir / ".mcp_inbox_cursor" diff --git a/workspace/mcp_cli.py b/workspace/mcp_cli.py index 35ac2a4d..55107e4e 100644 --- a/workspace/mcp_cli.py +++ b/workspace/mcp_cli.py @@ -273,6 +273,19 @@ def main() -> None: _platform_register(platform_url, workspace_id, token) _start_heartbeat_thread(platform_url, workspace_id, token) + # Inbox poller — the inbound side of the standalone path. Without + # this thread, the universal MCP server is OUTBOUND-ONLY: an agent + # can call delegate_task / send_message_to_user but never observe + # canvas-user or peer-agent messages. The poller fills an in-memory + # queue from the platform's /activity?type=a2a_receive endpoint; + # the agent reads via wait_for_message / inbox_peek / inbox_pop. + # + # Same disable pattern as heartbeat: in-container callers (with + # push delivery via canvas WebSocket) skip this to avoid duplicate + # delivery; tests use the env to keep imports cheap. + if not os.environ.get("MOLECULE_MCP_DISABLE_INBOX", "").strip(): + _start_inbox_poller(platform_url, workspace_id) + # Env is valid — safe to import the heavy module now. Importing # earlier would trigger a2a_client.py:22's module-level RuntimeError # before our friendly help reaches the user. @@ -280,6 +293,28 @@ def main() -> None: cli_main() +def _start_inbox_poller(platform_url: str, workspace_id: str) -> None: + """Activate the inbox singleton + spawn the poller daemon thread. + + Done lazily here (not at module import) because importing inbox + pulls in platform_auth, which only resolves cleanly AFTER env + validation succeeds. Activation is idempotent within a process, + so a stray double-call (e.g. test harness re-entering main) is + harmless. + + The poller thread is daemon=True — dies with the main process. + """ + try: + import inbox + except ImportError as exc: + logger.warning("molecule-mcp: inbox module unavailable: %s", exc) + return + + state = inbox.InboxState(cursor_path=inbox.default_cursor_path()) + inbox.activate(state) + inbox.start_poller_thread(state, platform_url, workspace_id) + + def _read_token_file() -> str: """Read the token from ${CONFIGS_DIR}/.auth_token if present. diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py index d0f12cb0..8091bc8f 100644 --- a/workspace/platform_tools/registry.py +++ b/workspace/platform_tools/registry.py @@ -56,9 +56,12 @@ from a2a_tools import ( tool_delegate_task, tool_delegate_task_async, tool_get_workspace_info, + tool_inbox_peek, + tool_inbox_pop, tool_list_peers, tool_recall_memory, tool_send_message_to_user, + tool_wait_for_message, ) # Section name maps to the heading in the agent-facing system prompt. @@ -299,6 +302,94 @@ _SEND_MESSAGE_TO_USER = ToolSpec( ) +# --------------------------------------------------------------------------- +# Inbox — inbound delivery for the standalone molecule-mcp path. +# +# These tools observe a poller-fed in-memory queue (see workspace/inbox.py). +# They are universally registered so docs + adapters stay aligned, but +# they only return real data in the standalone molecule-mcp runtime; +# in-container runtimes return an informational "not enabled" message +# because their delivery loop is push-based via the canvas WebSocket. +# --------------------------------------------------------------------------- + +_WAIT_FOR_MESSAGE = ToolSpec( + name="wait_for_message", + short=( + "Block until the next inbound message (canvas user OR peer " + "agent) arrives, or until ``timeout_secs`` elapses." + ), + when_to_use=( + "Standalone-runtime ONLY (molecule-mcp wrapper). After " + "you reply, call this to wait for the next message — forms " + "the loop ``wait_for_message → respond → wait_for_message``. " + "Returns the head message non-destructively; call inbox_pop " + "with the activity_id once you've handled it. In-container " + "runtimes receive messages via push and should not call this." + ), + input_schema={ + "type": "object", + "properties": { + "timeout_secs": { + "type": "number", + "description": ( + "Max seconds to block. Capped at 300. " + "Default 60." + ), + }, + }, + }, + impl=tool_wait_for_message, + section=A2A_SECTION, +) + +_INBOX_PEEK = ToolSpec( + name="inbox_peek", + short="List pending inbound messages without removing them.", + when_to_use=( + "Standalone-runtime ONLY. Use to inspect what's queued " + "before deciding which to handle. Non-destructive — pair " + "with inbox_pop to consume after replying." + ), + input_schema={ + "type": "object", + "properties": { + "limit": { + "type": "integer", + "description": "Max messages to return. Default 10.", + }, + }, + }, + impl=tool_inbox_peek, + section=A2A_SECTION, +) + +_INBOX_POP = ToolSpec( + name="inbox_pop", + short="Remove a handled message from the inbox queue by activity_id.", + when_to_use=( + "Standalone-runtime ONLY. Call after you've replied to a " + "message returned from wait_for_message or inbox_peek to " + "drop it from the queue. Idempotent — popping a missing " + "id reports removed=false without erroring." + ), + input_schema={ + "type": "object", + "properties": { + "activity_id": { + "type": "string", + "description": ( + "activity_id of the message to remove (from " + "inbox_peek / wait_for_message output)." + ), + }, + }, + "required": ["activity_id"], + }, + impl=tool_inbox_pop, + section=A2A_SECTION, +) + + # --------------------------------------------------------------------------- # HMA — hierarchical persistent memory # --------------------------------------------------------------------------- @@ -374,6 +465,10 @@ TOOLS: list[ToolSpec] = [ _LIST_PEERS, _GET_WORKSPACE_INFO, _SEND_MESSAGE_TO_USER, + # Inbox (standalone-only; in-container returns informational error) + _WAIT_FOR_MESSAGE, + _INBOX_PEEK, + _INBOX_POP, # HMA _COMMIT_MEMORY, _RECALL_MEMORY, diff --git a/workspace/tests/snapshots/a2a_instructions_mcp.txt b/workspace/tests/snapshots/a2a_instructions_mcp.txt index 62a2b95d..35863cf4 100644 --- a/workspace/tests/snapshots/a2a_instructions_mcp.txt +++ b/workspace/tests/snapshots/a2a_instructions_mcp.txt @@ -6,6 +6,9 @@ - **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each. - **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status. - **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out. +- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses. +- **inbox_peek**: List pending inbound messages without removing them. +- **inbox_pop**: Remove a handled message from the inbox queue by activity_id. ### delegate_task Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting. @@ -25,4 +28,13 @@ Use to introspect your own identity (e.g. before reporting back to the user, or ### send_message_to_user Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable). +### wait_for_message +Standalone-runtime ONLY (molecule-mcp wrapper). After you reply, call this to wait for the next message — forms the loop ``wait_for_message → respond → wait_for_message``. Returns the head message non-destructively; call inbox_pop with the activity_id once you've handled it. In-container runtimes receive messages via push and should not call this. + +### inbox_peek +Standalone-runtime ONLY. Use to inspect what's queued before deciding which to handle. Non-destructive — pair with inbox_pop to consume after replying. + +### inbox_pop +Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring. + Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer. diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py new file mode 100644 index 00000000..03bcf8a4 --- /dev/null +++ b/workspace/tests/test_inbox.py @@ -0,0 +1,444 @@ +"""Tests for workspace/inbox.py — InboxState + activity API poller. + +Covers the round-trip from a /activity row to an InboxMessage that the +agent observes via the three new MCP tools, plus the cursor-persistence ++ 410-recovery behavior that keeps the standalone molecule-mcp from +re-delivering already-handled messages after a restart. +""" +from __future__ import annotations + +import threading +import time +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +import inbox + + +@pytest.fixture(autouse=True) +def _reset_singleton(): + """Each test starts with a clean module singleton + a fresh + InboxState. Activation in one test must not leak into the next.""" + inbox._STATE = None + yield + inbox._STATE = None + + +@pytest.fixture() +def state(tmp_path: Path) -> inbox.InboxState: + return inbox.InboxState(cursor_path=tmp_path / ".mcp_inbox_cursor") + + +# --------------------------------------------------------------------------- +# _extract_text — envelope shape coverage +# --------------------------------------------------------------------------- + + +def test_extract_text_jsonrpc_message_wrapper(): + body = { + "jsonrpc": "2.0", + "method": "message/send", + "params": {"message": {"parts": [{"type": "text", "text": "hello"}]}}, + } + assert inbox._extract_text(body, None) == "hello" + + +def test_extract_text_a2a_v1_kind_field(): + """A2A SDK v1 uses ``kind`` instead of ``type`` as the part + discriminator. Hosted SaaS workspaces send the v1 shape today — + this case is what live canvas-user messages look like in + activity_logs.request_body.""" + body = { + "params": { + "message": { + "role": "user", + "parts": [{"kind": "text", "text": "hello from canvas"}], + } + } + } + assert inbox._extract_text(body, None) == "hello from canvas" + + +def test_extract_text_jsonrpc_params_parts(): + body = {"params": {"parts": [{"type": "text", "text": "from peer"}]}} + assert inbox._extract_text(body, None) == "from peer" + + +def test_extract_text_shorthand_parts(): + body = {"parts": [{"type": "text", "text": "shorthand"}]} + assert inbox._extract_text(body, None) == "shorthand" + + +def test_extract_text_concatenates_multiple_parts(): + body = { + "parts": [ + {"type": "text", "text": "hello "}, + {"type": "text", "text": "world"}, + {"type": "image", "url": "https://example.invalid/x.png"}, + ] + } + assert inbox._extract_text(body, None) == "hello world" + + +def test_extract_text_falls_back_to_summary(): + assert inbox._extract_text(None, "fallback") == "fallback" + assert inbox._extract_text({"unrelated": True}, "fallback") == "fallback" + + +def test_extract_text_returns_placeholder_when_nothing_usable(): + assert inbox._extract_text(None, None) == "(empty A2A message)" + + +# --------------------------------------------------------------------------- +# message_from_activity +# --------------------------------------------------------------------------- + + +def test_message_from_activity_canvas_user(): + row = { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": "ignored", + "request_body": { + "params": {"message": {"parts": [{"type": "text", "text": "hi"}]}} + }, + "created_at": "2026-04-30T22:00:00Z", + } + msg = inbox.message_from_activity(row) + assert msg.activity_id == "act-1" + assert msg.text == "hi" + assert msg.peer_id == "" + assert msg.method == "message/send" + d = msg.to_dict() + assert d["kind"] == "canvas_user" + + +def test_message_from_activity_peer_agent(): + row = { + "id": "act-2", + "source_id": "ws-peer-uuid", + "method": "tasks/send", + "summary": "delegate", + "request_body": {"parts": [{"type": "text", "text": "do task"}]}, + "created_at": "2026-04-30T22:01:00Z", + } + msg = inbox.message_from_activity(row) + assert msg.peer_id == "ws-peer-uuid" + assert msg.to_dict()["kind"] == "peer_agent" + + +def test_message_from_activity_handles_string_request_body(): + row = { + "id": "act-3", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": '{"parts": [{"type": "text", "text": "json string"}]}', + "created_at": "2026-04-30T22:02:00Z", + } + assert inbox.message_from_activity(row).text == "json string" + + +# --------------------------------------------------------------------------- +# InboxState — queue + wait/peek/pop semantics +# --------------------------------------------------------------------------- + + +def _msg(activity_id: str, text: str = "", peer_id: str = "") -> inbox.InboxMessage: + return inbox.InboxMessage( + activity_id=activity_id, + text=text or activity_id, + peer_id=peer_id, + method="message/send", + created_at="2026-04-30T22:00:00Z", + ) + + +def test_record_then_peek(state: inbox.InboxState): + state.record(_msg("a")) + state.record(_msg("b")) + out = state.peek(limit=10) + assert [m.activity_id for m in out] == ["a", "b"] + + +def test_record_dedupes_by_activity_id(state: inbox.InboxState): + state.record(_msg("a")) + state.record(_msg("a")) # same id — must drop the second + assert len(state.peek(10)) == 1 + + +def test_pop_removes_specific_message(state: inbox.InboxState): + state.record(_msg("a")) + state.record(_msg("b")) + removed = state.pop("a") + assert removed is not None and removed.activity_id == "a" + remaining = state.peek(10) + assert [m.activity_id for m in remaining] == ["b"] + + +def test_pop_missing_id_returns_none(state: inbox.InboxState): + state.record(_msg("a")) + # Bind the result before asserting so the call still runs under + # ``python -O`` (which strips bare assert statements). + result = state.pop("does-not-exist") + assert result is None + # Original message still present + assert len(state.peek(10)) == 1 + + +def test_wait_returns_existing_head_immediately(state: inbox.InboxState): + state.record(_msg("a")) + start = time.monotonic() + msg = state.wait(timeout_secs=5.0) + elapsed = time.monotonic() - start + assert msg is not None and msg.activity_id == "a" + assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)" + + +def test_wait_blocks_until_message_arrives(state: inbox.InboxState): + def producer(): + time.sleep(0.05) + state.record(_msg("late")) + + threading.Thread(target=producer, daemon=True).start() + msg = state.wait(timeout_secs=2.0) + assert msg is not None and msg.activity_id == "late" + + +def test_wait_returns_none_on_timeout(state: inbox.InboxState): + msg = state.wait(timeout_secs=0.05) + assert msg is None + + +def test_wait_does_not_pop(state: inbox.InboxState): + """wait() is non-destructive — caller decides when to inbox_pop.""" + state.record(_msg("a")) + state.wait(timeout_secs=1.0) + state.wait(timeout_secs=1.0) + assert len(state.peek(10)) == 1 + + +# --------------------------------------------------------------------------- +# Cursor persistence +# --------------------------------------------------------------------------- + + +def test_load_cursor_returns_none_when_file_absent(state: inbox.InboxState): + assert state.load_cursor() is None + + +def test_save_then_load_cursor_round_trip(state: inbox.InboxState): + state.save_cursor("act-cursor-1") + # Reset the cached flag to force a re-read + state._cursor_loaded = False + state._cursor = None + assert state.load_cursor() == "act-cursor-1" + + +def test_save_cursor_creates_parent_directory(tmp_path: Path): + nested = tmp_path / "nested" / "configs" / ".mcp_inbox_cursor" + state = inbox.InboxState(cursor_path=nested) + state.save_cursor("act-x") + assert nested.read_text() == "act-x" + + +def test_reset_cursor_deletes_file(state: inbox.InboxState): + state.save_cursor("act-y") + assert state.cursor_path.is_file() + state.reset_cursor() + assert not state.cursor_path.is_file() + assert state.load_cursor() is None + + +# --------------------------------------------------------------------------- +# Module singleton +# --------------------------------------------------------------------------- + + +def test_get_state_returns_none_before_activate(): + assert inbox.get_state() is None + + +def test_activate_then_get_state(state: inbox.InboxState): + inbox.activate(state) + assert inbox.get_state() is state + + +def test_activate_idempotent(state: inbox.InboxState): + inbox.activate(state) + inbox.activate(state) # same state — no-op, no warning expected + assert inbox.get_state() is state + + +# --------------------------------------------------------------------------- +# _poll_once — HTTP behavior +# --------------------------------------------------------------------------- + + +def _make_response(status_code: int, json_body: Any = None, text: str = "") -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + if json_body is not None: + resp.json.return_value = json_body + else: + resp.json.side_effect = ValueError("no json") + resp.text = text + return resp + + +def _patch_httpx(returning: MagicMock): + """Replace httpx.Client with a context-manager mock that returns + ``returning`` from .get(). Captures the GET call args for assertion.""" + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=returning) + return patch("httpx.Client", return_value=client), client + + +def test_poll_once_fresh_start_uses_since_secs(state: inbox.InboxState): + resp = _make_response(200, []) + p, client = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + assert n == 0 + _, kwargs = client.get.call_args + assert kwargs["params"]["type"] == "a2a_receive" + assert "since_secs" in kwargs["params"] + assert "since_id" not in kwargs["params"] + + +def test_poll_once_with_cursor_uses_since_id(state: inbox.InboxState): + state.save_cursor("act-existing") + resp = _make_response(200, []) + p, client = _patch_httpx(resp) + with p: + inbox._poll_once(state, "http://platform", "ws-1", {}) + _, kwargs = client.get.call_args + assert kwargs["params"]["since_id"] == "act-existing" + assert "since_secs" not in kwargs["params"] + + +def test_poll_once_410_resets_cursor(state: inbox.InboxState): + state.save_cursor("act-stale") + resp = _make_response(410, text="cursor pruned") + p, _ = _patch_httpx(resp) + with p: + inbox._poll_once(state, "http://platform", "ws-1", {}) + assert state.load_cursor() is None + assert not state.cursor_path.is_file() + + +def test_poll_once_records_messages_and_advances_cursor(state: inbox.InboxState): + state.save_cursor("act-old") + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "first"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + { + "id": "act-2", + "source_id": "ws-peer", + "method": "tasks/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "second"}]}, + "created_at": "2026-04-30T22:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + assert n == 2 + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-1", "act-2"] + assert state.load_cursor() == "act-2" + + +def test_poll_once_500_does_not_raise(state: inbox.InboxState): + resp = _make_response(500, text="boom") + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + assert n == 0 + # Cursor untouched + assert state.load_cursor() is None + + +def test_poll_once_handles_non_list_payload(state: inbox.InboxState): + resp = _make_response(200, {"error": "unexpected"}) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + assert n == 0 + + +def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxState): + """When no cursor is set, /activity returns DESC; the poller must + reverse so the saved cursor is the freshest row + record order + is chronological.""" + rows_desc = [ + { + "id": "act-newest", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "newest"}]}, + "created_at": "2026-04-30T22:00:02Z", + }, + { + "id": "act-oldest", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "oldest"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + resp = _make_response(200, rows_desc) + p, _ = _patch_httpx(resp) + with p: + inbox._poll_once(state, "http://platform", "ws-1", {}) + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-oldest", "act-newest"] + # Cursor is the newest row, so the next poll picks up only what's + # newer — re-restoring forward chronological progression. + assert state.load_cursor() == "act-newest" + + +def test_start_poller_thread_is_daemon(state: inbox.InboxState): + """Daemon flag is required so the poller dies with the parent + process; a non-daemon poller would leak across `claude` restarts + and write to a stale workspace.""" + resp = _make_response(200, []) + p, _ = _patch_httpx(resp) + with p, patch("platform_auth.auth_headers", return_value={}): + # Use a very short interval so the loop body runs at least once + # before we exit the test. + t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01) + time.sleep(0.05) + assert t.daemon is True + assert t.is_alive() + + +# --------------------------------------------------------------------------- +# default_cursor_path respects CONFIGS_DIR +# --------------------------------------------------------------------------- + + +def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor" + + +def test_default_cursor_path_falls_back_to_default(monkeypatch): + monkeypatch.delenv("CONFIGS_DIR", raising=False) + assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"