Merge pull request #2415 from Molecule-AI/feat/molecule-mcp-inbox-polling

feat(workspace-runtime): inbox polling for standalone molecule-mcp
This commit is contained in:
Hongming Wang 2026-04-30 23:41:47 +00:00 committed by GitHub
commit cc58e87393
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1186 additions and 0 deletions

View File

@ -64,6 +64,7 @@ TOP_LEVEL_MODULES = {
"events",
"executor_helpers",
"heartbeat",
"inbox",
"initial_prompt",
"internal_chat_uploads",
"internal_file_read",

View File

@ -43,6 +43,21 @@ def smoke_imports_and_invariants() -> None:
assert callable(cli_main), "a2a_mcp_server.cli_main must be callable"
assert callable(mcp_cli_main), "mcp_cli.main must be callable"
# inbox.activate / get_state / start_poller_thread form the inbound
# delivery path for the standalone molecule-mcp wrapper. mcp_cli.main
# imports + activates these at startup; if a wheel ships without
# them, the standalone agent silently loses the wait_for_message /
# inbox_peek / inbox_pop tools and reverts to outbound-only.
from molecule_runtime.inbox import ( # noqa: F401
InboxState,
activate as inbox_activate,
get_state as inbox_get_state,
start_poller_thread as inbox_start_poller_thread,
)
assert callable(inbox_activate), "inbox.activate must be callable"
assert callable(inbox_get_state), "inbox.get_state must be callable"
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
assert callable(get_adapter), "adapters.get_adapter must be callable"
assert hasattr(BaseAdapter, "name"), "BaseAdapter interface broken"

View File

@ -23,9 +23,12 @@ from a2a_tools import (
tool_delegate_task,
tool_delegate_task_async,
tool_get_workspace_info,
tool_inbox_peek,
tool_inbox_pop,
tool_list_peers,
tool_recall_memory,
tool_send_message_to_user,
tool_wait_for_message,
)
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
@ -112,6 +115,18 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
arguments.get("query", ""),
arguments.get("scope", ""),
)
elif name == "wait_for_message":
return await tool_wait_for_message(
arguments.get("timeout_secs", 60.0),
)
elif name == "inbox_peek":
return await tool_inbox_peek(
arguments.get("limit", 10),
)
elif name == "inbox_pop":
return await tool_inbox_pop(
arguments.get("activity_id", ""),
)
return f"Unknown tool: {name}"

View File

@ -526,3 +526,84 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
return json.dumps(data)
except Exception as e:
return f"Error recalling memory: {e}"
# ---------------------------------------------------------------------------
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
# ---------------------------------------------------------------------------
#
# The InboxState singleton is set by mcp_cli before the MCP server starts
# (see workspace/inbox.py for the rationale). In-container runtimes never
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
# these tools surface an informational error rather than raising.
#
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
# in standalone-runtime mode should call ``wait_for_message`` to block
# on the next inbound message after they've emitted a reply, forming
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
# the queue without consuming; ``inbox_pop`` removes a handled message.
_INBOX_NOT_ENABLED_MSG = (
"Error: inbox polling is not enabled in this runtime. The standalone "
"molecule-mcp wrapper activates it; in-container runtimes receive "
"messages via push delivery and do not need these tools."
)
async def tool_inbox_peek(limit: int = 10) -> str:
"""Return up to ``limit`` pending inbound messages without removing them."""
import inbox # local import — avoids a circular dep at module load
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
return json.dumps([m.to_dict() for m in messages])
async def tool_inbox_pop(activity_id: str) -> str:
"""Remove a message from the inbox queue by activity_id."""
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
if not isinstance(activity_id, str) or not activity_id:
return "Error: activity_id is required."
removed = state.pop(activity_id)
if removed is None:
return json.dumps({"removed": False, "activity_id": activity_id})
return json.dumps({"removed": True, "activity_id": activity_id})
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
"""Block until a new message arrives or ``timeout_secs`` elapses.
Returns the head message non-destructively; the agent decides
whether to ``inbox_pop`` it after acting.
"""
import asyncio
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
try:
timeout = float(timeout_secs)
except (TypeError, ValueError):
timeout = 60.0
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
# blocking longer than 5min wastes the prompt cache window for
# nothing useful. Operators who want longer can call repeatedly.
timeout = max(0.0, min(timeout, 300.0))
# The threading.Event-based wait would block the asyncio loop.
# Run it on the default executor so the MCP server can keep
# processing other JSON-RPC requests while we sleep.
loop = asyncio.get_running_loop()
message = await loop.run_in_executor(None, state.wait, timeout)
if message is None:
return json.dumps({"timeout": True, "timeout_secs": timeout})
return json.dumps(message.to_dict())

View File

@ -334,6 +334,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
# grows a `say` or `message` subcommand, change `None` to that
# keyword and the alignment test will start passing.
"send_message_to_user": None,
# Inbox tools live in the standalone molecule-mcp wrapper only;
# CLI-subprocess runtimes have their own delivery loop and never
# invoke these. The alignment test allows None entries — they
# appear in registry.TOOLS for adapter consistency without
# forcing a CLI subcommand.
"wait_for_message": None,
"inbox_peek": None,
"inbox_pop": None,
}

480
workspace/inbox.py Normal file
View File

@ -0,0 +1,480 @@
"""In-memory inbox + background poller for the standalone molecule-mcp path.
Purpose
-------
The universal MCP server (a2a_mcp_server.py) is OUTBOUND-ONLY by default
it gives an MCP-aware agent the same A2A delegation, peer-discovery, and
memory tools that container-bound runtimes already have. There is no
inbound delivery path: when the canvas user types a message or a peer
sends an A2A request, the activity lands on the platform but the
standalone agent never sees it.
This module closes that gap WITHOUT requiring a tunnel or a public agent
URL. A daemon thread polls ``/workspaces/:id/activity?type=a2a_receive``
on the platform and stages new rows in an in-memory deque. Three new MCP
tools (``inbox_peek``, ``inbox_pop``, ``wait_for_message``) let the
agent observe the queue.
Why a poller (not push)
-----------------------
runtime=external workspaces have ``delivery_mode="poll"`` the platform
records inbound A2A in ``activity_logs`` but does not call back to the
agent. A poller is the only inbound surface that works without the
operator exposing a public URL through a tunnel. 5s cadence matches
the molecule-mcp-claude-channel plugin's POLL_INTERVAL — it's already
proven on staging for the channel-based delivery path.
Cursor model
------------
``activity_logs.id`` is the cursor (server-assigned, monotonic). We
persist it to ``${CONFIGS_DIR}/.mcp_inbox_cursor`` so an agent restart
doesn't replay the last 10 minutes of inbound traffic and re-act on
already-handled messages. On 410 (cursor pruned) we drop back to
``since_secs=600`` for a bounded backlog and let the cursor advance
naturally from there.
Scope
-----
Standalone molecule-mcp ONLY. The in-container runtime has its own
push delivery (main.py + canvas WebSocket); we never want both
running at once or a single message would be delivered twice. The
caller (mcp_cli.main) gates activation explicitly via
``activate(state)``; in-container code that imports this module by
accident gets a no-op until activate is called.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Poll cadence. 5s mirrors the molecule-mcp-claude-channel plugin's
# proven default — fast enough that a canvas user typing "are you
# there?" gets picked up before they refresh, slow enough that 12
# requests/min won't trip rate limits or wake mobile devices.
POLL_INTERVAL_SECONDS = 5.0
# Initial backlog window for the first poll AND the recovery path
# after a stale-cursor 410. 10 minutes is enough to cover a brief
# crash/restart without flooding a long-idle workspace with hours of
# stale chat.
INITIAL_BACKLOG_SECONDS = 600
# Hard cap on the in-memory deque. The poller is bounded by the
# server's per-page limit (default 100) and the agent typically pops
# faster than the operator types, so an idle workspace shouldn't
# exceed a handful. The cap protects against runaway growth if the
# agent process stops calling pop.
MAX_QUEUED_MESSAGES = 200
@dataclass
class InboxMessage:
"""One inbound A2A message staged for the agent.
Mirrors the shape the agent sees via inbox_peek / wait_for_message.
Fields are derived from the activity_logs row by ``_from_activity``.
"""
activity_id: str
text: str
peer_id: str # empty string = canvas user; non-empty = peer workspace_id
method: str # JSON-RPC method ("message/send", "tasks/send", etc.)
created_at: str # RFC3339 timestamp from the activity row
def to_dict(self) -> dict[str, Any]:
return {
"activity_id": self.activity_id,
"text": self.text,
"peer_id": self.peer_id,
"kind": "peer_agent" if self.peer_id else "canvas_user",
"method": self.method,
"created_at": self.created_at,
}
@dataclass
class InboxState:
"""Thread-safe queue of pending inbound messages.
Producer: the poller thread, calling ``record(message)``.
Consumers: the MCP tool handlers, calling ``peek``, ``pop``,
or ``wait``. Synchronization is via a single ``threading.Lock``
(cheap every operation is O(n) over a small deque) plus an
``Event`` that wakes ``wait`` callers when a new message lands.
"""
cursor_path: Path
"""File path that persists ``activity_logs.id`` of the most
recently observed row, so a restart doesn't replay backlog."""
_queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES))
_lock: threading.Lock = field(default_factory=threading.Lock)
_arrival: threading.Event = field(default_factory=threading.Event)
_cursor: str | None = None
_cursor_loaded: bool = False
def load_cursor(self) -> str | None:
"""Read the persisted cursor from disk. Cached after first call.
Missing/unreadable file None (poller will fall back to the
initial-backlog window). We never raise: a corrupt cursor is
less bad than the inbox refusing to start.
"""
with self._lock:
if self._cursor_loaded:
return self._cursor
try:
if self.cursor_path.is_file():
self._cursor = self.cursor_path.read_text().strip() or None
except OSError as exc:
logger.warning("inbox: failed to read cursor %s: %s", self.cursor_path, exc)
self._cursor = None
self._cursor_loaded = True
return self._cursor
def save_cursor(self, activity_id: str) -> None:
"""Persist the cursor. Best-effort — log + continue on failure.
Loss of the cursor on a write failure means an extra page of
backlog after restart, never a stuck poller. Silent-fail
would mask a permission misconfiguration on the operator's
configs dir; warn loudly so they can fix it.
"""
with self._lock:
self._cursor = activity_id
self._cursor_loaded = True
try:
self.cursor_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.cursor_path.with_suffix(self.cursor_path.suffix + ".tmp")
tmp.write_text(activity_id)
tmp.replace(self.cursor_path)
except OSError as exc:
logger.warning("inbox: failed to persist cursor to %s: %s", self.cursor_path, exc)
def reset_cursor(self) -> None:
"""Forget the cursor. Used after a 410 from the activity API."""
with self._lock:
self._cursor = None
self._cursor_loaded = True
try:
if self.cursor_path.is_file():
self.cursor_path.unlink()
except OSError as exc:
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
def record(self, message: InboxMessage) -> None:
"""Append a message and wake any waiter.
Skips a row whose activity_id we've already queued — defensive
against the poller racing with the consumer + cursor save.
"""
with self._lock:
for existing in self._queue:
if existing.activity_id == message.activity_id:
return
self._queue.append(message)
self._arrival.set()
def peek(self, limit: int = 10) -> list[InboxMessage]:
"""Return up to ``limit`` pending messages without removing them."""
if limit <= 0:
limit = 10
with self._lock:
return list(self._queue)[:limit]
def pop(self, activity_id: str) -> InboxMessage | None:
"""Remove a specific message. Idempotent; returns None if absent.
We require the caller to specify which message it handled
rather than auto-popping the head preserves observability
when the agent reads several but only handles one.
"""
with self._lock:
for existing in list(self._queue):
if existing.activity_id == activity_id:
self._queue.remove(existing)
if not self._queue:
self._arrival.clear()
return existing
return None
def wait(self, timeout_secs: float) -> InboxMessage | None:
"""Block until a message is available or timeout elapses.
Returns the head message WITHOUT popping; the caller decides
whether to pop after acting on it. Same shape as Python's
Queue.get with timeout, but non-destructive so a peek-style
agent can still inspect with peek/pop.
"""
# Fast path: queue already has something.
with self._lock:
if self._queue:
return self._queue[0]
self._arrival.clear()
triggered = self._arrival.wait(timeout=max(0.0, timeout_secs))
if not triggered:
return None
with self._lock:
return self._queue[0] if self._queue else None
# ---------------------------------------------------------------------------
# Module singleton — set by mcp_cli before MCP server starts.
# ---------------------------------------------------------------------------
#
# In-container callers don't activate; the inbox tools detect the
# unset singleton and return an informational error rather than
# breaking the dispatch path.
_STATE: InboxState | None = None
def activate(state: InboxState) -> None:
"""Register an InboxState as the singleton this module exposes.
Idempotent within a process: re-activating with the same state is
a no-op; activating with a DIFFERENT state replaces the singleton
+ logs at WARNING (the only legitimate caller is mcp_cli at
startup; double-activate usually means a test/runtime mix-up).
"""
global _STATE
if _STATE is state:
return
if _STATE is not None:
logger.warning("inbox: replacing existing singleton state")
_STATE = state
def get_state() -> InboxState | None:
"""Return the active InboxState, or None if the runtime never activated.
Tool implementations call this and surface a clear "(inbox not
enabled)" message to the agent when None — keeps the in-container
path's tool dispatch from raising on an inbox-tool call that the
agent shouldn't have made anyway.
"""
return _STATE
# ---------------------------------------------------------------------------
# Activity → InboxMessage adapter
# ---------------------------------------------------------------------------
#
# The platform's a2a_proxy logs request_body as the JSON-RPC envelope
# it forwarded to the workspace. Three shapes have been observed in
# the wild (verified against workspace-server's logA2ASuccess in
# a2a_proxy_helpers.go on 2026-04-29) — handle all three before
# falling back to summary so a peer message at least surfaces SOMETHING.
def _extract_text(request_body: Any, summary: str | None) -> str:
"""Pull the human-readable text out of an A2A activity row.
Mirrors molecule-mcp-claude-channel/server.ts:445 (extractText) so
canvas-user messages and peer-agent messages render identically
across both inbound channels.
"""
if not isinstance(request_body, dict):
return summary or "(empty A2A message)"
candidates: list[Any] = []
params = request_body.get("params") if isinstance(request_body.get("params"), dict) else None
if params:
message = params.get("message") if isinstance(params.get("message"), dict) else None
if message:
candidates.append(message.get("parts"))
candidates.append(params.get("parts"))
candidates.append(request_body.get("parts"))
# The A2A protocol's part discriminator field varies between SDK
# versions: a2a-sdk v0 uses ``type``, v1 uses ``kind``. The platform's
# activity_logs preserves whichever the original sender used, so we
# accept either. Verified live against a hosted SaaS workspace on
# 2026-04-30 — every canvas-user message arrived with ``kind`` and
# the type-only filter was silently falling through to summary.
for parts in candidates:
if isinstance(parts, list):
text = "".join(
p.get("text", "")
for p in parts
if isinstance(p, dict)
and (p.get("kind") == "text" or p.get("type") == "text")
)
if text:
return text
return summary or "(empty A2A message)"
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage."""
request_body = row.get("request_body")
if isinstance(request_body, str):
# The Go handler returns request_body as json.RawMessage; httpx
# deserializes that to a dict already. But some legacy paths or
# mocked servers may return it as a string — handle defensively.
try:
request_body = json.loads(request_body)
except (TypeError, ValueError):
request_body = None
return InboxMessage(
activity_id=str(row.get("id", "")),
text=_extract_text(request_body, row.get("summary")),
peer_id=row.get("source_id") or "",
method=row.get("method") or "",
created_at=str(row.get("created_at", "")),
)
# ---------------------------------------------------------------------------
# Poller — daemon thread that fills the queue from the activity API
# ---------------------------------------------------------------------------
def _poll_once(
state: InboxState,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = 10.0,
) -> int:
"""One poll iteration. Returns number of new messages enqueued.
Idempotent and stateless apart from the InboxState passed in
safe to call from tests with a stub state + a real httpx mock.
"""
import httpx
url = f"{platform_url}/workspaces/{workspace_id}/activity"
params: dict[str, str] = {"type": "a2a_receive"}
cursor = state.load_cursor()
if cursor:
params["since_id"] = cursor
else:
params["since_secs"] = str(INITIAL_BACKLOG_SECONDS)
try:
with httpx.Client(timeout=timeout_secs) as client:
resp = client.get(url, params=params, headers=headers)
except Exception as exc: # noqa: BLE001
logger.warning("inbox poller: GET /activity failed: %s", exc)
return 0
if resp.status_code == 410:
# Cursor pruned — drop back to the backlog window. The next
# poll picks up wherever the activity API has rows now.
logger.info(
"inbox poller: cursor %s expired (410); resetting to since_secs=%d",
cursor,
INITIAL_BACKLOG_SECONDS,
)
state.reset_cursor()
return 0
if resp.status_code >= 400:
logger.warning(
"inbox poller: HTTP %d from /activity: %s",
resp.status_code,
(resp.text or "")[:200],
)
return 0
try:
rows = resp.json()
except ValueError as exc:
logger.warning("inbox poller: non-JSON response: %s", exc)
return 0
if not isinstance(rows, list):
return 0
# since_id mode returns ASC (oldest first). since_secs mode returns
# DESC; reverse so we record in chronological order and the cursor
# we save is the freshest row.
if cursor is None:
rows = list(reversed(rows))
new_count = 0
last_id: str | None = None
for row in rows:
if not isinstance(row, dict):
continue
message = message_from_activity(row)
if not message.activity_id:
continue
state.record(message)
last_id = message.activity_id
new_count += 1
if last_id is not None:
state.save_cursor(last_id)
return new_count
def _poll_loop(
state: InboxState,
platform_url: str,
workspace_id: str,
interval: float = POLL_INTERVAL_SECONDS,
stop_event: threading.Event | None = None,
) -> None:
"""Daemon-thread body: poll forever until stop_event fires.
auth_headers() is rebuilt every iteration so a token rotation via
env var or .auth_token file is picked up without a restart. Cheap
(a dict + an env read).
"""
from platform_auth import auth_headers
while True:
try:
_poll_once(state, platform_url, workspace_id, auth_headers())
except Exception as exc: # noqa: BLE001
logger.warning("inbox poller: iteration crashed: %s", exc)
if stop_event is not None and stop_event.wait(interval):
return
if stop_event is None:
time.sleep(interval)
def start_poller_thread(
state: InboxState,
platform_url: str,
workspace_id: str,
interval: float = POLL_INTERVAL_SECONDS,
) -> threading.Thread:
"""Spawn the poller as a daemon thread. Returns the Thread handle.
daemon=True so the poller dies with the main process same
rationale as mcp_cli's heartbeat thread (no leaks, no stale
workspace writes after the operator hits Ctrl-C).
"""
t = threading.Thread(
target=_poll_loop,
args=(state, platform_url, workspace_id, interval),
name="molecule-mcp-inbox-poller",
daemon=True,
)
t.start()
return t
def default_cursor_path() -> Path:
"""Standard cursor location: ``${CONFIGS_DIR}/.mcp_inbox_cursor``.
Mirrors mcp_cli's CONFIGS_DIR resolution so a single
operator-facing env var controls every persisted state file
(.auth_token + .mcp_inbox_cursor).
"""
configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs"))
return configs_dir / ".mcp_inbox_cursor"

View File

@ -273,6 +273,19 @@ def main() -> None:
_platform_register(platform_url, workspace_id, token)
_start_heartbeat_thread(platform_url, workspace_id, token)
# Inbox poller — the inbound side of the standalone path. Without
# this thread, the universal MCP server is OUTBOUND-ONLY: an agent
# can call delegate_task / send_message_to_user but never observe
# canvas-user or peer-agent messages. The poller fills an in-memory
# queue from the platform's /activity?type=a2a_receive endpoint;
# the agent reads via wait_for_message / inbox_peek / inbox_pop.
#
# Same disable pattern as heartbeat: in-container callers (with
# push delivery via canvas WebSocket) skip this to avoid duplicate
# delivery; tests use the env to keep imports cheap.
if not os.environ.get("MOLECULE_MCP_DISABLE_INBOX", "").strip():
_start_inbox_poller(platform_url, workspace_id)
# Env is valid — safe to import the heavy module now. Importing
# earlier would trigger a2a_client.py:22's module-level RuntimeError
# before our friendly help reaches the user.
@ -280,6 +293,28 @@ def main() -> None:
cli_main()
def _start_inbox_poller(platform_url: str, workspace_id: str) -> None:
"""Activate the inbox singleton + spawn the poller daemon thread.
Done lazily here (not at module import) because importing inbox
pulls in platform_auth, which only resolves cleanly AFTER env
validation succeeds. Activation is idempotent within a process,
so a stray double-call (e.g. test harness re-entering main) is
harmless.
The poller thread is daemon=True dies with the main process.
"""
try:
import inbox
except ImportError as exc:
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
return
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
inbox.activate(state)
inbox.start_poller_thread(state, platform_url, workspace_id)
def _read_token_file() -> str:
"""Read the token from ${CONFIGS_DIR}/.auth_token if present.

View File

@ -56,9 +56,12 @@ from a2a_tools import (
tool_delegate_task,
tool_delegate_task_async,
tool_get_workspace_info,
tool_inbox_peek,
tool_inbox_pop,
tool_list_peers,
tool_recall_memory,
tool_send_message_to_user,
tool_wait_for_message,
)
# Section name maps to the heading in the agent-facing system prompt.
@ -299,6 +302,94 @@ _SEND_MESSAGE_TO_USER = ToolSpec(
)
# ---------------------------------------------------------------------------
# Inbox — inbound delivery for the standalone molecule-mcp path.
#
# These tools observe a poller-fed in-memory queue (see workspace/inbox.py).
# They are universally registered so docs + adapters stay aligned, but
# they only return real data in the standalone molecule-mcp runtime;
# in-container runtimes return an informational "not enabled" message
# because their delivery loop is push-based via the canvas WebSocket.
# ---------------------------------------------------------------------------
_WAIT_FOR_MESSAGE = ToolSpec(
name="wait_for_message",
short=(
"Block until the next inbound message (canvas user OR peer "
"agent) arrives, or until ``timeout_secs`` elapses."
),
when_to_use=(
"Standalone-runtime ONLY (molecule-mcp wrapper). After "
"you reply, call this to wait for the next message — forms "
"the loop ``wait_for_message → respond → wait_for_message``. "
"Returns the head message non-destructively; call inbox_pop "
"with the activity_id once you've handled it. In-container "
"runtimes receive messages via push and should not call this."
),
input_schema={
"type": "object",
"properties": {
"timeout_secs": {
"type": "number",
"description": (
"Max seconds to block. Capped at 300. "
"Default 60."
),
},
},
},
impl=tool_wait_for_message,
section=A2A_SECTION,
)
_INBOX_PEEK = ToolSpec(
name="inbox_peek",
short="List pending inbound messages without removing them.",
when_to_use=(
"Standalone-runtime ONLY. Use to inspect what's queued "
"before deciding which to handle. Non-destructive — pair "
"with inbox_pop to consume after replying."
),
input_schema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Max messages to return. Default 10.",
},
},
},
impl=tool_inbox_peek,
section=A2A_SECTION,
)
_INBOX_POP = ToolSpec(
name="inbox_pop",
short="Remove a handled message from the inbox queue by activity_id.",
when_to_use=(
"Standalone-runtime ONLY. Call after you've replied to a "
"message returned from wait_for_message or inbox_peek to "
"drop it from the queue. Idempotent — popping a missing "
"id reports removed=false without erroring."
),
input_schema={
"type": "object",
"properties": {
"activity_id": {
"type": "string",
"description": (
"activity_id of the message to remove (from "
"inbox_peek / wait_for_message output)."
),
},
},
"required": ["activity_id"],
},
impl=tool_inbox_pop,
section=A2A_SECTION,
)
# ---------------------------------------------------------------------------
# HMA — hierarchical persistent memory
# ---------------------------------------------------------------------------
@ -374,6 +465,10 @@ TOOLS: list[ToolSpec] = [
_LIST_PEERS,
_GET_WORKSPACE_INFO,
_SEND_MESSAGE_TO_USER,
# Inbox (standalone-only; in-container returns informational error)
_WAIT_FOR_MESSAGE,
_INBOX_PEEK,
_INBOX_POP,
# HMA
_COMMIT_MEMORY,
_RECALL_MEMORY,

View File

@ -6,6 +6,9 @@
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
- **inbox_peek**: List pending inbound messages without removing them.
- **inbox_pop**: Remove a handled message from the inbox queue by activity_id.
### delegate_task
Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting.
@ -25,4 +28,13 @@ Use to introspect your own identity (e.g. before reporting back to the user, or
### send_message_to_user
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
### wait_for_message
Standalone-runtime ONLY (molecule-mcp wrapper). After you reply, call this to wait for the next message — forms the loop ``wait_for_message → respond → wait_for_message``. Returns the head message non-destructively; call inbox_pop with the activity_id once you've handled it. In-container runtimes receive messages via push and should not call this.
### inbox_peek
Standalone-runtime ONLY. Use to inspect what's queued before deciding which to handle. Non-destructive — pair with inbox_pop to consume after replying.
### inbox_pop
Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring.
Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer.

View File

@ -0,0 +1,444 @@
"""Tests for workspace/inbox.py — InboxState + activity API poller.
Covers the round-trip from a /activity row to an InboxMessage that the
agent observes via the three new MCP tools, plus the cursor-persistence
+ 410-recovery behavior that keeps the standalone molecule-mcp from
re-delivering already-handled messages after a restart.
"""
from __future__ import annotations
import threading
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import inbox
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Each test starts with a clean module singleton + a fresh
InboxState. Activation in one test must not leak into the next."""
inbox._STATE = None
yield
inbox._STATE = None
@pytest.fixture()
def state(tmp_path: Path) -> inbox.InboxState:
return inbox.InboxState(cursor_path=tmp_path / ".mcp_inbox_cursor")
# ---------------------------------------------------------------------------
# _extract_text — envelope shape coverage
# ---------------------------------------------------------------------------
def test_extract_text_jsonrpc_message_wrapper():
body = {
"jsonrpc": "2.0",
"method": "message/send",
"params": {"message": {"parts": [{"type": "text", "text": "hello"}]}},
}
assert inbox._extract_text(body, None) == "hello"
def test_extract_text_a2a_v1_kind_field():
"""A2A SDK v1 uses ``kind`` instead of ``type`` as the part
discriminator. Hosted SaaS workspaces send the v1 shape today
this case is what live canvas-user messages look like in
activity_logs.request_body."""
body = {
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "hello from canvas"}],
}
}
}
assert inbox._extract_text(body, None) == "hello from canvas"
def test_extract_text_jsonrpc_params_parts():
body = {"params": {"parts": [{"type": "text", "text": "from peer"}]}}
assert inbox._extract_text(body, None) == "from peer"
def test_extract_text_shorthand_parts():
body = {"parts": [{"type": "text", "text": "shorthand"}]}
assert inbox._extract_text(body, None) == "shorthand"
def test_extract_text_concatenates_multiple_parts():
body = {
"parts": [
{"type": "text", "text": "hello "},
{"type": "text", "text": "world"},
{"type": "image", "url": "https://example.invalid/x.png"},
]
}
assert inbox._extract_text(body, None) == "hello world"
def test_extract_text_falls_back_to_summary():
assert inbox._extract_text(None, "fallback") == "fallback"
assert inbox._extract_text({"unrelated": True}, "fallback") == "fallback"
def test_extract_text_returns_placeholder_when_nothing_usable():
assert inbox._extract_text(None, None) == "(empty A2A message)"
# ---------------------------------------------------------------------------
# message_from_activity
# ---------------------------------------------------------------------------
def test_message_from_activity_canvas_user():
row = {
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": "ignored",
"request_body": {
"params": {"message": {"parts": [{"type": "text", "text": "hi"}]}}
},
"created_at": "2026-04-30T22:00:00Z",
}
msg = inbox.message_from_activity(row)
assert msg.activity_id == "act-1"
assert msg.text == "hi"
assert msg.peer_id == ""
assert msg.method == "message/send"
d = msg.to_dict()
assert d["kind"] == "canvas_user"
def test_message_from_activity_peer_agent():
row = {
"id": "act-2",
"source_id": "ws-peer-uuid",
"method": "tasks/send",
"summary": "delegate",
"request_body": {"parts": [{"type": "text", "text": "do task"}]},
"created_at": "2026-04-30T22:01:00Z",
}
msg = inbox.message_from_activity(row)
assert msg.peer_id == "ws-peer-uuid"
assert msg.to_dict()["kind"] == "peer_agent"
def test_message_from_activity_handles_string_request_body():
row = {
"id": "act-3",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": '{"parts": [{"type": "text", "text": "json string"}]}',
"created_at": "2026-04-30T22:02:00Z",
}
assert inbox.message_from_activity(row).text == "json string"
# ---------------------------------------------------------------------------
# InboxState — queue + wait/peek/pop semantics
# ---------------------------------------------------------------------------
def _msg(activity_id: str, text: str = "", peer_id: str = "") -> inbox.InboxMessage:
return inbox.InboxMessage(
activity_id=activity_id,
text=text or activity_id,
peer_id=peer_id,
method="message/send",
created_at="2026-04-30T22:00:00Z",
)
def test_record_then_peek(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("b"))
out = state.peek(limit=10)
assert [m.activity_id for m in out] == ["a", "b"]
def test_record_dedupes_by_activity_id(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("a")) # same id — must drop the second
assert len(state.peek(10)) == 1
def test_pop_removes_specific_message(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("b"))
removed = state.pop("a")
assert removed is not None and removed.activity_id == "a"
remaining = state.peek(10)
assert [m.activity_id for m in remaining] == ["b"]
def test_pop_missing_id_returns_none(state: inbox.InboxState):
state.record(_msg("a"))
# Bind the result before asserting so the call still runs under
# ``python -O`` (which strips bare assert statements).
result = state.pop("does-not-exist")
assert result is None
# Original message still present
assert len(state.peek(10)) == 1
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
state.record(_msg("a"))
start = time.monotonic()
msg = state.wait(timeout_secs=5.0)
elapsed = time.monotonic() - start
assert msg is not None and msg.activity_id == "a"
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
def producer():
time.sleep(0.05)
state.record(_msg("late"))
threading.Thread(target=producer, daemon=True).start()
msg = state.wait(timeout_secs=2.0)
assert msg is not None and msg.activity_id == "late"
def test_wait_returns_none_on_timeout(state: inbox.InboxState):
msg = state.wait(timeout_secs=0.05)
assert msg is None
def test_wait_does_not_pop(state: inbox.InboxState):
"""wait() is non-destructive — caller decides when to inbox_pop."""
state.record(_msg("a"))
state.wait(timeout_secs=1.0)
state.wait(timeout_secs=1.0)
assert len(state.peek(10)) == 1
# ---------------------------------------------------------------------------
# Cursor persistence
# ---------------------------------------------------------------------------
def test_load_cursor_returns_none_when_file_absent(state: inbox.InboxState):
assert state.load_cursor() is None
def test_save_then_load_cursor_round_trip(state: inbox.InboxState):
state.save_cursor("act-cursor-1")
# Reset the cached flag to force a re-read
state._cursor_loaded = False
state._cursor = None
assert state.load_cursor() == "act-cursor-1"
def test_save_cursor_creates_parent_directory(tmp_path: Path):
nested = tmp_path / "nested" / "configs" / ".mcp_inbox_cursor"
state = inbox.InboxState(cursor_path=nested)
state.save_cursor("act-x")
assert nested.read_text() == "act-x"
def test_reset_cursor_deletes_file(state: inbox.InboxState):
state.save_cursor("act-y")
assert state.cursor_path.is_file()
state.reset_cursor()
assert not state.cursor_path.is_file()
assert state.load_cursor() is None
# ---------------------------------------------------------------------------
# Module singleton
# ---------------------------------------------------------------------------
def test_get_state_returns_none_before_activate():
assert inbox.get_state() is None
def test_activate_then_get_state(state: inbox.InboxState):
inbox.activate(state)
assert inbox.get_state() is state
def test_activate_idempotent(state: inbox.InboxState):
inbox.activate(state)
inbox.activate(state) # same state — no-op, no warning expected
assert inbox.get_state() is state
# ---------------------------------------------------------------------------
# _poll_once — HTTP behavior
# ---------------------------------------------------------------------------
def _make_response(status_code: int, json_body: Any = None, text: str = "") -> MagicMock:
resp = MagicMock()
resp.status_code = status_code
if json_body is not None:
resp.json.return_value = json_body
else:
resp.json.side_effect = ValueError("no json")
resp.text = text
return resp
def _patch_httpx(returning: MagicMock):
"""Replace httpx.Client with a context-manager mock that returns
``returning`` from .get(). Captures the GET call args for assertion."""
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(return_value=returning)
return patch("httpx.Client", return_value=client), client
def test_poll_once_fresh_start_uses_since_secs(state: inbox.InboxState):
resp = _make_response(200, [])
p, client = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
_, kwargs = client.get.call_args
assert kwargs["params"]["type"] == "a2a_receive"
assert "since_secs" in kwargs["params"]
assert "since_id" not in kwargs["params"]
def test_poll_once_with_cursor_uses_since_id(state: inbox.InboxState):
state.save_cursor("act-existing")
resp = _make_response(200, [])
p, client = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
_, kwargs = client.get.call_args
assert kwargs["params"]["since_id"] == "act-existing"
assert "since_secs" not in kwargs["params"]
def test_poll_once_410_resets_cursor(state: inbox.InboxState):
state.save_cursor("act-stale")
resp = _make_response(410, text="cursor pruned")
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
assert state.load_cursor() is None
assert not state.cursor_path.is_file()
def test_poll_once_records_messages_and_advances_cursor(state: inbox.InboxState):
state.save_cursor("act-old")
rows = [
{
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "first"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-2",
"source_id": "ws-peer",
"method": "tasks/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "second"}]},
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 2
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-1", "act-2"]
assert state.load_cursor() == "act-2"
def test_poll_once_500_does_not_raise(state: inbox.InboxState):
resp = _make_response(500, text="boom")
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
# Cursor untouched
assert state.load_cursor() is None
def test_poll_once_handles_non_list_payload(state: inbox.InboxState):
resp = _make_response(200, {"error": "unexpected"})
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxState):
"""When no cursor is set, /activity returns DESC; the poller must
reverse so the saved cursor is the freshest row + record order
is chronological."""
rows_desc = [
{
"id": "act-newest",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "newest"}]},
"created_at": "2026-04-30T22:00:02Z",
},
{
"id": "act-oldest",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "oldest"}]},
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows_desc)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-oldest", "act-newest"]
# Cursor is the newest row, so the next poll picks up only what's
# newer — re-restoring forward chronological progression.
assert state.load_cursor() == "act-newest"
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts
and write to a stale workspace."""
resp = _make_response(200, [])
p, _ = _patch_httpx(resp)
with p, patch("platform_auth.auth_headers", return_value={}):
# Use a very short interval so the loop body runs at least once
# before we exit the test.
t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01)
time.sleep(0.05)
assert t.daemon is True
assert t.is_alive()
# ---------------------------------------------------------------------------
# default_cursor_path respects CONFIGS_DIR
# ---------------------------------------------------------------------------
def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
def test_default_cursor_path_falls_back_to_default(monkeypatch):
monkeypatch.delenv("CONFIGS_DIR", raising=False)
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"