forked from molecule-ai/molecule-core
Merge pull request #2415 from Molecule-AI/feat/molecule-mcp-inbox-polling
feat(workspace-runtime): inbox polling for standalone molecule-mcp
This commit is contained in:
commit
cc58e87393
@ -64,6 +64,7 @@ TOP_LEVEL_MODULES = {
|
|||||||
"events",
|
"events",
|
||||||
"executor_helpers",
|
"executor_helpers",
|
||||||
"heartbeat",
|
"heartbeat",
|
||||||
|
"inbox",
|
||||||
"initial_prompt",
|
"initial_prompt",
|
||||||
"internal_chat_uploads",
|
"internal_chat_uploads",
|
||||||
"internal_file_read",
|
"internal_file_read",
|
||||||
|
|||||||
@ -43,6 +43,21 @@ def smoke_imports_and_invariants() -> None:
|
|||||||
assert callable(cli_main), "a2a_mcp_server.cli_main must be callable"
|
assert callable(cli_main), "a2a_mcp_server.cli_main must be callable"
|
||||||
assert callable(mcp_cli_main), "mcp_cli.main must be callable"
|
assert callable(mcp_cli_main), "mcp_cli.main must be callable"
|
||||||
|
|
||||||
|
# inbox.activate / get_state / start_poller_thread form the inbound
|
||||||
|
# delivery path for the standalone molecule-mcp wrapper. mcp_cli.main
|
||||||
|
# imports + activates these at startup; if a wheel ships without
|
||||||
|
# them, the standalone agent silently loses the wait_for_message /
|
||||||
|
# inbox_peek / inbox_pop tools and reverts to outbound-only.
|
||||||
|
from molecule_runtime.inbox import ( # noqa: F401
|
||||||
|
InboxState,
|
||||||
|
activate as inbox_activate,
|
||||||
|
get_state as inbox_get_state,
|
||||||
|
start_poller_thread as inbox_start_poller_thread,
|
||||||
|
)
|
||||||
|
assert callable(inbox_activate), "inbox.activate must be callable"
|
||||||
|
assert callable(inbox_get_state), "inbox.get_state must be callable"
|
||||||
|
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
|
||||||
|
|
||||||
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
|
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
|
||||||
assert callable(get_adapter), "adapters.get_adapter must be callable"
|
assert callable(get_adapter), "adapters.get_adapter must be callable"
|
||||||
assert hasattr(BaseAdapter, "name"), "BaseAdapter interface broken"
|
assert hasattr(BaseAdapter, "name"), "BaseAdapter interface broken"
|
||||||
|
|||||||
@ -23,9 +23,12 @@ from a2a_tools import (
|
|||||||
tool_delegate_task,
|
tool_delegate_task,
|
||||||
tool_delegate_task_async,
|
tool_delegate_task_async,
|
||||||
tool_get_workspace_info,
|
tool_get_workspace_info,
|
||||||
|
tool_inbox_peek,
|
||||||
|
tool_inbox_pop,
|
||||||
tool_list_peers,
|
tool_list_peers,
|
||||||
tool_recall_memory,
|
tool_recall_memory,
|
||||||
tool_send_message_to_user,
|
tool_send_message_to_user,
|
||||||
|
tool_wait_for_message,
|
||||||
)
|
)
|
||||||
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
|
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
|
||||||
|
|
||||||
@ -112,6 +115,18 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
|||||||
arguments.get("query", ""),
|
arguments.get("query", ""),
|
||||||
arguments.get("scope", ""),
|
arguments.get("scope", ""),
|
||||||
)
|
)
|
||||||
|
elif name == "wait_for_message":
|
||||||
|
return await tool_wait_for_message(
|
||||||
|
arguments.get("timeout_secs", 60.0),
|
||||||
|
)
|
||||||
|
elif name == "inbox_peek":
|
||||||
|
return await tool_inbox_peek(
|
||||||
|
arguments.get("limit", 10),
|
||||||
|
)
|
||||||
|
elif name == "inbox_pop":
|
||||||
|
return await tool_inbox_pop(
|
||||||
|
arguments.get("activity_id", ""),
|
||||||
|
)
|
||||||
return f"Unknown tool: {name}"
|
return f"Unknown tool: {name}"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -526,3 +526,84 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
|
|||||||
return json.dumps(data)
|
return json.dumps(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return f"Error recalling memory: {e}"
|
return f"Error recalling memory: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# The InboxState singleton is set by mcp_cli before the MCP server starts
|
||||||
|
# (see workspace/inbox.py for the rationale). In-container runtimes never
|
||||||
|
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
|
||||||
|
# these tools surface an informational error rather than raising.
|
||||||
|
#
|
||||||
|
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
|
||||||
|
# in standalone-runtime mode should call ``wait_for_message`` to block
|
||||||
|
# on the next inbound message after they've emitted a reply, forming
|
||||||
|
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
|
||||||
|
# the queue without consuming; ``inbox_pop`` removes a handled message.
|
||||||
|
|
||||||
|
_INBOX_NOT_ENABLED_MSG = (
|
||||||
|
"Error: inbox polling is not enabled in this runtime. The standalone "
|
||||||
|
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
||||||
|
"messages via push delivery and do not need these tools."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_inbox_peek(limit: int = 10) -> str:
|
||||||
|
"""Return up to ``limit`` pending inbound messages without removing them."""
|
||||||
|
import inbox # local import — avoids a circular dep at module load
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
||||||
|
return json.dumps([m.to_dict() for m in messages])
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_inbox_pop(activity_id: str) -> str:
|
||||||
|
"""Remove a message from the inbox queue by activity_id."""
|
||||||
|
import inbox
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
if not isinstance(activity_id, str) or not activity_id:
|
||||||
|
return "Error: activity_id is required."
|
||||||
|
removed = state.pop(activity_id)
|
||||||
|
if removed is None:
|
||||||
|
return json.dumps({"removed": False, "activity_id": activity_id})
|
||||||
|
return json.dumps({"removed": True, "activity_id": activity_id})
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
||||||
|
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
||||||
|
|
||||||
|
Returns the head message non-destructively; the agent decides
|
||||||
|
whether to ``inbox_pop`` it after acting.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import inbox
|
||||||
|
|
||||||
|
state = inbox.get_state()
|
||||||
|
if state is None:
|
||||||
|
return _INBOX_NOT_ENABLED_MSG
|
||||||
|
|
||||||
|
try:
|
||||||
|
timeout = float(timeout_secs)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
timeout = 60.0
|
||||||
|
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
||||||
|
# blocking longer than 5min wastes the prompt cache window for
|
||||||
|
# nothing useful. Operators who want longer can call repeatedly.
|
||||||
|
timeout = max(0.0, min(timeout, 300.0))
|
||||||
|
|
||||||
|
# The threading.Event-based wait would block the asyncio loop.
|
||||||
|
# Run it on the default executor so the MCP server can keep
|
||||||
|
# processing other JSON-RPC requests while we sleep.
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
message = await loop.run_in_executor(None, state.wait, timeout)
|
||||||
|
if message is None:
|
||||||
|
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
||||||
|
return json.dumps(message.to_dict())
|
||||||
|
|||||||
@ -334,6 +334,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
|
|||||||
# grows a `say` or `message` subcommand, change `None` to that
|
# grows a `say` or `message` subcommand, change `None` to that
|
||||||
# keyword and the alignment test will start passing.
|
# keyword and the alignment test will start passing.
|
||||||
"send_message_to_user": None,
|
"send_message_to_user": None,
|
||||||
|
# Inbox tools live in the standalone molecule-mcp wrapper only;
|
||||||
|
# CLI-subprocess runtimes have their own delivery loop and never
|
||||||
|
# invoke these. The alignment test allows None entries — they
|
||||||
|
# appear in registry.TOOLS for adapter consistency without
|
||||||
|
# forcing a CLI subcommand.
|
||||||
|
"wait_for_message": None,
|
||||||
|
"inbox_peek": None,
|
||||||
|
"inbox_pop": None,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
480
workspace/inbox.py
Normal file
480
workspace/inbox.py
Normal file
@ -0,0 +1,480 @@
|
|||||||
|
"""In-memory inbox + background poller for the standalone molecule-mcp path.
|
||||||
|
|
||||||
|
Purpose
|
||||||
|
-------
|
||||||
|
The universal MCP server (a2a_mcp_server.py) is OUTBOUND-ONLY by default —
|
||||||
|
it gives an MCP-aware agent the same A2A delegation, peer-discovery, and
|
||||||
|
memory tools that container-bound runtimes already have. There is no
|
||||||
|
inbound delivery path: when the canvas user types a message or a peer
|
||||||
|
sends an A2A request, the activity lands on the platform but the
|
||||||
|
standalone agent never sees it.
|
||||||
|
|
||||||
|
This module closes that gap WITHOUT requiring a tunnel or a public agent
|
||||||
|
URL. A daemon thread polls ``/workspaces/:id/activity?type=a2a_receive``
|
||||||
|
on the platform and stages new rows in an in-memory deque. Three new MCP
|
||||||
|
tools (``inbox_peek``, ``inbox_pop``, ``wait_for_message``) let the
|
||||||
|
agent observe the queue.
|
||||||
|
|
||||||
|
Why a poller (not push)
|
||||||
|
-----------------------
|
||||||
|
runtime=external workspaces have ``delivery_mode="poll"`` — the platform
|
||||||
|
records inbound A2A in ``activity_logs`` but does not call back to the
|
||||||
|
agent. A poller is the only inbound surface that works without the
|
||||||
|
operator exposing a public URL through a tunnel. 5s cadence matches
|
||||||
|
the molecule-mcp-claude-channel plugin's POLL_INTERVAL — it's already
|
||||||
|
proven on staging for the channel-based delivery path.
|
||||||
|
|
||||||
|
Cursor model
|
||||||
|
------------
|
||||||
|
``activity_logs.id`` is the cursor (server-assigned, monotonic). We
|
||||||
|
persist it to ``${CONFIGS_DIR}/.mcp_inbox_cursor`` so an agent restart
|
||||||
|
doesn't replay the last 10 minutes of inbound traffic and re-act on
|
||||||
|
already-handled messages. On 410 (cursor pruned) we drop back to
|
||||||
|
``since_secs=600`` for a bounded backlog and let the cursor advance
|
||||||
|
naturally from there.
|
||||||
|
|
||||||
|
Scope
|
||||||
|
-----
|
||||||
|
Standalone molecule-mcp ONLY. The in-container runtime has its own
|
||||||
|
push delivery (main.py + canvas WebSocket); we never want both
|
||||||
|
running at once or a single message would be delivered twice. The
|
||||||
|
caller (mcp_cli.main) gates activation explicitly via
|
||||||
|
``activate(state)``; in-container code that imports this module by
|
||||||
|
accident gets a no-op until activate is called.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Poll cadence. 5s mirrors the molecule-mcp-claude-channel plugin's
|
||||||
|
# proven default — fast enough that a canvas user typing "are you
|
||||||
|
# there?" gets picked up before they refresh, slow enough that 12
|
||||||
|
# requests/min won't trip rate limits or wake mobile devices.
|
||||||
|
POLL_INTERVAL_SECONDS = 5.0
|
||||||
|
|
||||||
|
# Initial backlog window for the first poll AND the recovery path
|
||||||
|
# after a stale-cursor 410. 10 minutes is enough to cover a brief
|
||||||
|
# crash/restart without flooding a long-idle workspace with hours of
|
||||||
|
# stale chat.
|
||||||
|
INITIAL_BACKLOG_SECONDS = 600
|
||||||
|
|
||||||
|
# Hard cap on the in-memory deque. The poller is bounded by the
|
||||||
|
# server's per-page limit (default 100) and the agent typically pops
|
||||||
|
# faster than the operator types, so an idle workspace shouldn't
|
||||||
|
# exceed a handful. The cap protects against runaway growth if the
|
||||||
|
# agent process stops calling pop.
|
||||||
|
MAX_QUEUED_MESSAGES = 200
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InboxMessage:
|
||||||
|
"""One inbound A2A message staged for the agent.
|
||||||
|
|
||||||
|
Mirrors the shape the agent sees via inbox_peek / wait_for_message.
|
||||||
|
Fields are derived from the activity_logs row by ``_from_activity``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
activity_id: str
|
||||||
|
text: str
|
||||||
|
peer_id: str # empty string = canvas user; non-empty = peer workspace_id
|
||||||
|
method: str # JSON-RPC method ("message/send", "tasks/send", etc.)
|
||||||
|
created_at: str # RFC3339 timestamp from the activity row
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"activity_id": self.activity_id,
|
||||||
|
"text": self.text,
|
||||||
|
"peer_id": self.peer_id,
|
||||||
|
"kind": "peer_agent" if self.peer_id else "canvas_user",
|
||||||
|
"method": self.method,
|
||||||
|
"created_at": self.created_at,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InboxState:
|
||||||
|
"""Thread-safe queue of pending inbound messages.
|
||||||
|
|
||||||
|
Producer: the poller thread, calling ``record(message)``.
|
||||||
|
Consumers: the MCP tool handlers, calling ``peek``, ``pop``,
|
||||||
|
or ``wait``. Synchronization is via a single ``threading.Lock``
|
||||||
|
(cheap — every operation is O(n) over a small deque) plus an
|
||||||
|
``Event`` that wakes ``wait`` callers when a new message lands.
|
||||||
|
"""
|
||||||
|
|
||||||
|
cursor_path: Path
|
||||||
|
"""File path that persists ``activity_logs.id`` of the most
|
||||||
|
recently observed row, so a restart doesn't replay backlog."""
|
||||||
|
|
||||||
|
_queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES))
|
||||||
|
_lock: threading.Lock = field(default_factory=threading.Lock)
|
||||||
|
_arrival: threading.Event = field(default_factory=threading.Event)
|
||||||
|
_cursor: str | None = None
|
||||||
|
_cursor_loaded: bool = False
|
||||||
|
|
||||||
|
def load_cursor(self) -> str | None:
|
||||||
|
"""Read the persisted cursor from disk. Cached after first call.
|
||||||
|
|
||||||
|
Missing/unreadable file → None (poller will fall back to the
|
||||||
|
initial-backlog window). We never raise: a corrupt cursor is
|
||||||
|
less bad than the inbox refusing to start.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
if self._cursor_loaded:
|
||||||
|
return self._cursor
|
||||||
|
try:
|
||||||
|
if self.cursor_path.is_file():
|
||||||
|
self._cursor = self.cursor_path.read_text().strip() or None
|
||||||
|
except OSError as exc:
|
||||||
|
logger.warning("inbox: failed to read cursor %s: %s", self.cursor_path, exc)
|
||||||
|
self._cursor = None
|
||||||
|
self._cursor_loaded = True
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def save_cursor(self, activity_id: str) -> None:
|
||||||
|
"""Persist the cursor. Best-effort — log + continue on failure.
|
||||||
|
|
||||||
|
Loss of the cursor on a write failure means an extra page of
|
||||||
|
backlog after restart, never a stuck poller. Silent-fail
|
||||||
|
would mask a permission misconfiguration on the operator's
|
||||||
|
configs dir; warn loudly so they can fix it.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._cursor = activity_id
|
||||||
|
self._cursor_loaded = True
|
||||||
|
try:
|
||||||
|
self.cursor_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = self.cursor_path.with_suffix(self.cursor_path.suffix + ".tmp")
|
||||||
|
tmp.write_text(activity_id)
|
||||||
|
tmp.replace(self.cursor_path)
|
||||||
|
except OSError as exc:
|
||||||
|
logger.warning("inbox: failed to persist cursor to %s: %s", self.cursor_path, exc)
|
||||||
|
|
||||||
|
def reset_cursor(self) -> None:
|
||||||
|
"""Forget the cursor. Used after a 410 from the activity API."""
|
||||||
|
with self._lock:
|
||||||
|
self._cursor = None
|
||||||
|
self._cursor_loaded = True
|
||||||
|
try:
|
||||||
|
if self.cursor_path.is_file():
|
||||||
|
self.cursor_path.unlink()
|
||||||
|
except OSError as exc:
|
||||||
|
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
|
||||||
|
|
||||||
|
def record(self, message: InboxMessage) -> None:
|
||||||
|
"""Append a message and wake any waiter.
|
||||||
|
|
||||||
|
Skips a row whose activity_id we've already queued — defensive
|
||||||
|
against the poller racing with the consumer + cursor save.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
for existing in self._queue:
|
||||||
|
if existing.activity_id == message.activity_id:
|
||||||
|
return
|
||||||
|
self._queue.append(message)
|
||||||
|
self._arrival.set()
|
||||||
|
|
||||||
|
def peek(self, limit: int = 10) -> list[InboxMessage]:
|
||||||
|
"""Return up to ``limit`` pending messages without removing them."""
|
||||||
|
if limit <= 0:
|
||||||
|
limit = 10
|
||||||
|
with self._lock:
|
||||||
|
return list(self._queue)[:limit]
|
||||||
|
|
||||||
|
def pop(self, activity_id: str) -> InboxMessage | None:
|
||||||
|
"""Remove a specific message. Idempotent; returns None if absent.
|
||||||
|
|
||||||
|
We require the caller to specify which message it handled
|
||||||
|
rather than auto-popping the head — preserves observability
|
||||||
|
when the agent reads several but only handles one.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
for existing in list(self._queue):
|
||||||
|
if existing.activity_id == activity_id:
|
||||||
|
self._queue.remove(existing)
|
||||||
|
if not self._queue:
|
||||||
|
self._arrival.clear()
|
||||||
|
return existing
|
||||||
|
return None
|
||||||
|
|
||||||
|
def wait(self, timeout_secs: float) -> InboxMessage | None:
|
||||||
|
"""Block until a message is available or timeout elapses.
|
||||||
|
|
||||||
|
Returns the head message WITHOUT popping; the caller decides
|
||||||
|
whether to pop after acting on it. Same shape as Python's
|
||||||
|
Queue.get with timeout, but non-destructive so a peek-style
|
||||||
|
agent can still inspect with peek/pop.
|
||||||
|
"""
|
||||||
|
# Fast path: queue already has something.
|
||||||
|
with self._lock:
|
||||||
|
if self._queue:
|
||||||
|
return self._queue[0]
|
||||||
|
self._arrival.clear()
|
||||||
|
|
||||||
|
triggered = self._arrival.wait(timeout=max(0.0, timeout_secs))
|
||||||
|
if not triggered:
|
||||||
|
return None
|
||||||
|
with self._lock:
|
||||||
|
return self._queue[0] if self._queue else None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Module singleton — set by mcp_cli before MCP server starts.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# In-container callers don't activate; the inbox tools detect the
|
||||||
|
# unset singleton and return an informational error rather than
|
||||||
|
# breaking the dispatch path.
|
||||||
|
|
||||||
|
_STATE: InboxState | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def activate(state: InboxState) -> None:
|
||||||
|
"""Register an InboxState as the singleton this module exposes.
|
||||||
|
|
||||||
|
Idempotent within a process: re-activating with the same state is
|
||||||
|
a no-op; activating with a DIFFERENT state replaces the singleton
|
||||||
|
+ logs at WARNING (the only legitimate caller is mcp_cli at
|
||||||
|
startup; double-activate usually means a test/runtime mix-up).
|
||||||
|
"""
|
||||||
|
global _STATE
|
||||||
|
if _STATE is state:
|
||||||
|
return
|
||||||
|
if _STATE is not None:
|
||||||
|
logger.warning("inbox: replacing existing singleton state")
|
||||||
|
_STATE = state
|
||||||
|
|
||||||
|
|
||||||
|
def get_state() -> InboxState | None:
|
||||||
|
"""Return the active InboxState, or None if the runtime never activated.
|
||||||
|
|
||||||
|
Tool implementations call this and surface a clear "(inbox not
|
||||||
|
enabled)" message to the agent when None — keeps the in-container
|
||||||
|
path's tool dispatch from raising on an inbox-tool call that the
|
||||||
|
agent shouldn't have made anyway.
|
||||||
|
"""
|
||||||
|
return _STATE
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Activity → InboxMessage adapter
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# The platform's a2a_proxy logs request_body as the JSON-RPC envelope
|
||||||
|
# it forwarded to the workspace. Three shapes have been observed in
|
||||||
|
# the wild (verified against workspace-server's logA2ASuccess in
|
||||||
|
# a2a_proxy_helpers.go on 2026-04-29) — handle all three before
|
||||||
|
# falling back to summary so a peer message at least surfaces SOMETHING.
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_text(request_body: Any, summary: str | None) -> str:
|
||||||
|
"""Pull the human-readable text out of an A2A activity row.
|
||||||
|
|
||||||
|
Mirrors molecule-mcp-claude-channel/server.ts:445 (extractText) so
|
||||||
|
canvas-user messages and peer-agent messages render identically
|
||||||
|
across both inbound channels.
|
||||||
|
"""
|
||||||
|
if not isinstance(request_body, dict):
|
||||||
|
return summary or "(empty A2A message)"
|
||||||
|
|
||||||
|
candidates: list[Any] = []
|
||||||
|
params = request_body.get("params") if isinstance(request_body.get("params"), dict) else None
|
||||||
|
if params:
|
||||||
|
message = params.get("message") if isinstance(params.get("message"), dict) else None
|
||||||
|
if message:
|
||||||
|
candidates.append(message.get("parts"))
|
||||||
|
candidates.append(params.get("parts"))
|
||||||
|
candidates.append(request_body.get("parts"))
|
||||||
|
|
||||||
|
# The A2A protocol's part discriminator field varies between SDK
|
||||||
|
# versions: a2a-sdk v0 uses ``type``, v1 uses ``kind``. The platform's
|
||||||
|
# activity_logs preserves whichever the original sender used, so we
|
||||||
|
# accept either. Verified live against a hosted SaaS workspace on
|
||||||
|
# 2026-04-30 — every canvas-user message arrived with ``kind`` and
|
||||||
|
# the type-only filter was silently falling through to summary.
|
||||||
|
for parts in candidates:
|
||||||
|
if isinstance(parts, list):
|
||||||
|
text = "".join(
|
||||||
|
p.get("text", "")
|
||||||
|
for p in parts
|
||||||
|
if isinstance(p, dict)
|
||||||
|
and (p.get("kind") == "text" or p.get("type") == "text")
|
||||||
|
)
|
||||||
|
if text:
|
||||||
|
return text
|
||||||
|
return summary or "(empty A2A message)"
|
||||||
|
|
||||||
|
|
||||||
|
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
||||||
|
"""Convert one /activity row into an InboxMessage."""
|
||||||
|
request_body = row.get("request_body")
|
||||||
|
if isinstance(request_body, str):
|
||||||
|
# The Go handler returns request_body as json.RawMessage; httpx
|
||||||
|
# deserializes that to a dict already. But some legacy paths or
|
||||||
|
# mocked servers may return it as a string — handle defensively.
|
||||||
|
try:
|
||||||
|
request_body = json.loads(request_body)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
request_body = None
|
||||||
|
|
||||||
|
return InboxMessage(
|
||||||
|
activity_id=str(row.get("id", "")),
|
||||||
|
text=_extract_text(request_body, row.get("summary")),
|
||||||
|
peer_id=row.get("source_id") or "",
|
||||||
|
method=row.get("method") or "",
|
||||||
|
created_at=str(row.get("created_at", "")),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Poller — daemon thread that fills the queue from the activity API
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_once(
|
||||||
|
state: InboxState,
|
||||||
|
platform_url: str,
|
||||||
|
workspace_id: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
timeout_secs: float = 10.0,
|
||||||
|
) -> int:
|
||||||
|
"""One poll iteration. Returns number of new messages enqueued.
|
||||||
|
|
||||||
|
Idempotent and stateless apart from the InboxState passed in —
|
||||||
|
safe to call from tests with a stub state + a real httpx mock.
|
||||||
|
"""
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
url = f"{platform_url}/workspaces/{workspace_id}/activity"
|
||||||
|
params: dict[str, str] = {"type": "a2a_receive"}
|
||||||
|
cursor = state.load_cursor()
|
||||||
|
if cursor:
|
||||||
|
params["since_id"] = cursor
|
||||||
|
else:
|
||||||
|
params["since_secs"] = str(INITIAL_BACKLOG_SECONDS)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with httpx.Client(timeout=timeout_secs) as client:
|
||||||
|
resp = client.get(url, params=params, headers=headers)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("inbox poller: GET /activity failed: %s", exc)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if resp.status_code == 410:
|
||||||
|
# Cursor pruned — drop back to the backlog window. The next
|
||||||
|
# poll picks up wherever the activity API has rows now.
|
||||||
|
logger.info(
|
||||||
|
"inbox poller: cursor %s expired (410); resetting to since_secs=%d",
|
||||||
|
cursor,
|
||||||
|
INITIAL_BACKLOG_SECONDS,
|
||||||
|
)
|
||||||
|
state.reset_cursor()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
logger.warning(
|
||||||
|
"inbox poller: HTTP %d from /activity: %s",
|
||||||
|
resp.status_code,
|
||||||
|
(resp.text or "")[:200],
|
||||||
|
)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
rows = resp.json()
|
||||||
|
except ValueError as exc:
|
||||||
|
logger.warning("inbox poller: non-JSON response: %s", exc)
|
||||||
|
return 0
|
||||||
|
if not isinstance(rows, list):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# since_id mode returns ASC (oldest first). since_secs mode returns
|
||||||
|
# DESC; reverse so we record in chronological order and the cursor
|
||||||
|
# we save is the freshest row.
|
||||||
|
if cursor is None:
|
||||||
|
rows = list(reversed(rows))
|
||||||
|
|
||||||
|
new_count = 0
|
||||||
|
last_id: str | None = None
|
||||||
|
for row in rows:
|
||||||
|
if not isinstance(row, dict):
|
||||||
|
continue
|
||||||
|
message = message_from_activity(row)
|
||||||
|
if not message.activity_id:
|
||||||
|
continue
|
||||||
|
state.record(message)
|
||||||
|
last_id = message.activity_id
|
||||||
|
new_count += 1
|
||||||
|
|
||||||
|
if last_id is not None:
|
||||||
|
state.save_cursor(last_id)
|
||||||
|
return new_count
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_loop(
|
||||||
|
state: InboxState,
|
||||||
|
platform_url: str,
|
||||||
|
workspace_id: str,
|
||||||
|
interval: float = POLL_INTERVAL_SECONDS,
|
||||||
|
stop_event: threading.Event | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Daemon-thread body: poll forever until stop_event fires.
|
||||||
|
|
||||||
|
auth_headers() is rebuilt every iteration so a token rotation via
|
||||||
|
env var or .auth_token file is picked up without a restart. Cheap
|
||||||
|
(a dict + an env read).
|
||||||
|
"""
|
||||||
|
from platform_auth import auth_headers
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
_poll_once(state, platform_url, workspace_id, auth_headers())
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("inbox poller: iteration crashed: %s", exc)
|
||||||
|
if stop_event is not None and stop_event.wait(interval):
|
||||||
|
return
|
||||||
|
if stop_event is None:
|
||||||
|
time.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
def start_poller_thread(
|
||||||
|
state: InboxState,
|
||||||
|
platform_url: str,
|
||||||
|
workspace_id: str,
|
||||||
|
interval: float = POLL_INTERVAL_SECONDS,
|
||||||
|
) -> threading.Thread:
|
||||||
|
"""Spawn the poller as a daemon thread. Returns the Thread handle.
|
||||||
|
|
||||||
|
daemon=True so the poller dies with the main process — same
|
||||||
|
rationale as mcp_cli's heartbeat thread (no leaks, no stale
|
||||||
|
workspace writes after the operator hits Ctrl-C).
|
||||||
|
"""
|
||||||
|
t = threading.Thread(
|
||||||
|
target=_poll_loop,
|
||||||
|
args=(state, platform_url, workspace_id, interval),
|
||||||
|
name="molecule-mcp-inbox-poller",
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
t.start()
|
||||||
|
return t
|
||||||
|
|
||||||
|
|
||||||
|
def default_cursor_path() -> Path:
|
||||||
|
"""Standard cursor location: ``${CONFIGS_DIR}/.mcp_inbox_cursor``.
|
||||||
|
|
||||||
|
Mirrors mcp_cli's CONFIGS_DIR resolution so a single
|
||||||
|
operator-facing env var controls every persisted state file
|
||||||
|
(.auth_token + .mcp_inbox_cursor).
|
||||||
|
"""
|
||||||
|
configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs"))
|
||||||
|
return configs_dir / ".mcp_inbox_cursor"
|
||||||
@ -273,6 +273,19 @@ def main() -> None:
|
|||||||
_platform_register(platform_url, workspace_id, token)
|
_platform_register(platform_url, workspace_id, token)
|
||||||
_start_heartbeat_thread(platform_url, workspace_id, token)
|
_start_heartbeat_thread(platform_url, workspace_id, token)
|
||||||
|
|
||||||
|
# Inbox poller — the inbound side of the standalone path. Without
|
||||||
|
# this thread, the universal MCP server is OUTBOUND-ONLY: an agent
|
||||||
|
# can call delegate_task / send_message_to_user but never observe
|
||||||
|
# canvas-user or peer-agent messages. The poller fills an in-memory
|
||||||
|
# queue from the platform's /activity?type=a2a_receive endpoint;
|
||||||
|
# the agent reads via wait_for_message / inbox_peek / inbox_pop.
|
||||||
|
#
|
||||||
|
# Same disable pattern as heartbeat: in-container callers (with
|
||||||
|
# push delivery via canvas WebSocket) skip this to avoid duplicate
|
||||||
|
# delivery; tests use the env to keep imports cheap.
|
||||||
|
if not os.environ.get("MOLECULE_MCP_DISABLE_INBOX", "").strip():
|
||||||
|
_start_inbox_poller(platform_url, workspace_id)
|
||||||
|
|
||||||
# Env is valid — safe to import the heavy module now. Importing
|
# Env is valid — safe to import the heavy module now. Importing
|
||||||
# earlier would trigger a2a_client.py:22's module-level RuntimeError
|
# earlier would trigger a2a_client.py:22's module-level RuntimeError
|
||||||
# before our friendly help reaches the user.
|
# before our friendly help reaches the user.
|
||||||
@ -280,6 +293,28 @@ def main() -> None:
|
|||||||
cli_main()
|
cli_main()
|
||||||
|
|
||||||
|
|
||||||
|
def _start_inbox_poller(platform_url: str, workspace_id: str) -> None:
|
||||||
|
"""Activate the inbox singleton + spawn the poller daemon thread.
|
||||||
|
|
||||||
|
Done lazily here (not at module import) because importing inbox
|
||||||
|
pulls in platform_auth, which only resolves cleanly AFTER env
|
||||||
|
validation succeeds. Activation is idempotent within a process,
|
||||||
|
so a stray double-call (e.g. test harness re-entering main) is
|
||||||
|
harmless.
|
||||||
|
|
||||||
|
The poller thread is daemon=True — dies with the main process.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import inbox
|
||||||
|
except ImportError as exc:
|
||||||
|
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
|
||||||
|
return
|
||||||
|
|
||||||
|
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
|
||||||
|
inbox.activate(state)
|
||||||
|
inbox.start_poller_thread(state, platform_url, workspace_id)
|
||||||
|
|
||||||
|
|
||||||
def _read_token_file() -> str:
|
def _read_token_file() -> str:
|
||||||
"""Read the token from ${CONFIGS_DIR}/.auth_token if present.
|
"""Read the token from ${CONFIGS_DIR}/.auth_token if present.
|
||||||
|
|
||||||
|
|||||||
@ -56,9 +56,12 @@ from a2a_tools import (
|
|||||||
tool_delegate_task,
|
tool_delegate_task,
|
||||||
tool_delegate_task_async,
|
tool_delegate_task_async,
|
||||||
tool_get_workspace_info,
|
tool_get_workspace_info,
|
||||||
|
tool_inbox_peek,
|
||||||
|
tool_inbox_pop,
|
||||||
tool_list_peers,
|
tool_list_peers,
|
||||||
tool_recall_memory,
|
tool_recall_memory,
|
||||||
tool_send_message_to_user,
|
tool_send_message_to_user,
|
||||||
|
tool_wait_for_message,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Section name maps to the heading in the agent-facing system prompt.
|
# Section name maps to the heading in the agent-facing system prompt.
|
||||||
@ -299,6 +302,94 @@ _SEND_MESSAGE_TO_USER = ToolSpec(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Inbox — inbound delivery for the standalone molecule-mcp path.
|
||||||
|
#
|
||||||
|
# These tools observe a poller-fed in-memory queue (see workspace/inbox.py).
|
||||||
|
# They are universally registered so docs + adapters stay aligned, but
|
||||||
|
# they only return real data in the standalone molecule-mcp runtime;
|
||||||
|
# in-container runtimes return an informational "not enabled" message
|
||||||
|
# because their delivery loop is push-based via the canvas WebSocket.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_WAIT_FOR_MESSAGE = ToolSpec(
|
||||||
|
name="wait_for_message",
|
||||||
|
short=(
|
||||||
|
"Block until the next inbound message (canvas user OR peer "
|
||||||
|
"agent) arrives, or until ``timeout_secs`` elapses."
|
||||||
|
),
|
||||||
|
when_to_use=(
|
||||||
|
"Standalone-runtime ONLY (molecule-mcp wrapper). After "
|
||||||
|
"you reply, call this to wait for the next message — forms "
|
||||||
|
"the loop ``wait_for_message → respond → wait_for_message``. "
|
||||||
|
"Returns the head message non-destructively; call inbox_pop "
|
||||||
|
"with the activity_id once you've handled it. In-container "
|
||||||
|
"runtimes receive messages via push and should not call this."
|
||||||
|
),
|
||||||
|
input_schema={
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"timeout_secs": {
|
||||||
|
"type": "number",
|
||||||
|
"description": (
|
||||||
|
"Max seconds to block. Capped at 300. "
|
||||||
|
"Default 60."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
impl=tool_wait_for_message,
|
||||||
|
section=A2A_SECTION,
|
||||||
|
)
|
||||||
|
|
||||||
|
_INBOX_PEEK = ToolSpec(
|
||||||
|
name="inbox_peek",
|
||||||
|
short="List pending inbound messages without removing them.",
|
||||||
|
when_to_use=(
|
||||||
|
"Standalone-runtime ONLY. Use to inspect what's queued "
|
||||||
|
"before deciding which to handle. Non-destructive — pair "
|
||||||
|
"with inbox_pop to consume after replying."
|
||||||
|
),
|
||||||
|
input_schema={
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"limit": {
|
||||||
|
"type": "integer",
|
||||||
|
"description": "Max messages to return. Default 10.",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
impl=tool_inbox_peek,
|
||||||
|
section=A2A_SECTION,
|
||||||
|
)
|
||||||
|
|
||||||
|
_INBOX_POP = ToolSpec(
|
||||||
|
name="inbox_pop",
|
||||||
|
short="Remove a handled message from the inbox queue by activity_id.",
|
||||||
|
when_to_use=(
|
||||||
|
"Standalone-runtime ONLY. Call after you've replied to a "
|
||||||
|
"message returned from wait_for_message or inbox_peek to "
|
||||||
|
"drop it from the queue. Idempotent — popping a missing "
|
||||||
|
"id reports removed=false without erroring."
|
||||||
|
),
|
||||||
|
input_schema={
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"activity_id": {
|
||||||
|
"type": "string",
|
||||||
|
"description": (
|
||||||
|
"activity_id of the message to remove (from "
|
||||||
|
"inbox_peek / wait_for_message output)."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"required": ["activity_id"],
|
||||||
|
},
|
||||||
|
impl=tool_inbox_pop,
|
||||||
|
section=A2A_SECTION,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# HMA — hierarchical persistent memory
|
# HMA — hierarchical persistent memory
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -374,6 +465,10 @@ TOOLS: list[ToolSpec] = [
|
|||||||
_LIST_PEERS,
|
_LIST_PEERS,
|
||||||
_GET_WORKSPACE_INFO,
|
_GET_WORKSPACE_INFO,
|
||||||
_SEND_MESSAGE_TO_USER,
|
_SEND_MESSAGE_TO_USER,
|
||||||
|
# Inbox (standalone-only; in-container returns informational error)
|
||||||
|
_WAIT_FOR_MESSAGE,
|
||||||
|
_INBOX_PEEK,
|
||||||
|
_INBOX_POP,
|
||||||
# HMA
|
# HMA
|
||||||
_COMMIT_MEMORY,
|
_COMMIT_MEMORY,
|
||||||
_RECALL_MEMORY,
|
_RECALL_MEMORY,
|
||||||
|
|||||||
@ -6,6 +6,9 @@
|
|||||||
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
|
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
|
||||||
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
|
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
|
||||||
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
|
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
|
||||||
|
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
|
||||||
|
- **inbox_peek**: List pending inbound messages without removing them.
|
||||||
|
- **inbox_pop**: Remove a handled message from the inbox queue by activity_id.
|
||||||
|
|
||||||
### delegate_task
|
### delegate_task
|
||||||
Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting.
|
Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting.
|
||||||
@ -25,4 +28,13 @@ Use to introspect your own identity (e.g. before reporting back to the user, or
|
|||||||
### send_message_to_user
|
### send_message_to_user
|
||||||
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
|
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
|
||||||
|
|
||||||
|
### wait_for_message
|
||||||
|
Standalone-runtime ONLY (molecule-mcp wrapper). After you reply, call this to wait for the next message — forms the loop ``wait_for_message → respond → wait_for_message``. Returns the head message non-destructively; call inbox_pop with the activity_id once you've handled it. In-container runtimes receive messages via push and should not call this.
|
||||||
|
|
||||||
|
### inbox_peek
|
||||||
|
Standalone-runtime ONLY. Use to inspect what's queued before deciding which to handle. Non-destructive — pair with inbox_pop to consume after replying.
|
||||||
|
|
||||||
|
### inbox_pop
|
||||||
|
Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring.
|
||||||
|
|
||||||
Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer.
|
Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer.
|
||||||
|
|||||||
444
workspace/tests/test_inbox.py
Normal file
444
workspace/tests/test_inbox.py
Normal file
@ -0,0 +1,444 @@
|
|||||||
|
"""Tests for workspace/inbox.py — InboxState + activity API poller.
|
||||||
|
|
||||||
|
Covers the round-trip from a /activity row to an InboxMessage that the
|
||||||
|
agent observes via the three new MCP tools, plus the cursor-persistence
|
||||||
|
+ 410-recovery behavior that keeps the standalone molecule-mcp from
|
||||||
|
re-delivering already-handled messages after a restart.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import inbox
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _reset_singleton():
|
||||||
|
"""Each test starts with a clean module singleton + a fresh
|
||||||
|
InboxState. Activation in one test must not leak into the next."""
|
||||||
|
inbox._STATE = None
|
||||||
|
yield
|
||||||
|
inbox._STATE = None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def state(tmp_path: Path) -> inbox.InboxState:
|
||||||
|
return inbox.InboxState(cursor_path=tmp_path / ".mcp_inbox_cursor")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _extract_text — envelope shape coverage
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_jsonrpc_message_wrapper():
|
||||||
|
body = {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"method": "message/send",
|
||||||
|
"params": {"message": {"parts": [{"type": "text", "text": "hello"}]}},
|
||||||
|
}
|
||||||
|
assert inbox._extract_text(body, None) == "hello"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_a2a_v1_kind_field():
|
||||||
|
"""A2A SDK v1 uses ``kind`` instead of ``type`` as the part
|
||||||
|
discriminator. Hosted SaaS workspaces send the v1 shape today —
|
||||||
|
this case is what live canvas-user messages look like in
|
||||||
|
activity_logs.request_body."""
|
||||||
|
body = {
|
||||||
|
"params": {
|
||||||
|
"message": {
|
||||||
|
"role": "user",
|
||||||
|
"parts": [{"kind": "text", "text": "hello from canvas"}],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert inbox._extract_text(body, None) == "hello from canvas"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_jsonrpc_params_parts():
|
||||||
|
body = {"params": {"parts": [{"type": "text", "text": "from peer"}]}}
|
||||||
|
assert inbox._extract_text(body, None) == "from peer"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_shorthand_parts():
|
||||||
|
body = {"parts": [{"type": "text", "text": "shorthand"}]}
|
||||||
|
assert inbox._extract_text(body, None) == "shorthand"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_concatenates_multiple_parts():
|
||||||
|
body = {
|
||||||
|
"parts": [
|
||||||
|
{"type": "text", "text": "hello "},
|
||||||
|
{"type": "text", "text": "world"},
|
||||||
|
{"type": "image", "url": "https://example.invalid/x.png"},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
assert inbox._extract_text(body, None) == "hello world"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_falls_back_to_summary():
|
||||||
|
assert inbox._extract_text(None, "fallback") == "fallback"
|
||||||
|
assert inbox._extract_text({"unrelated": True}, "fallback") == "fallback"
|
||||||
|
|
||||||
|
|
||||||
|
def test_extract_text_returns_placeholder_when_nothing_usable():
|
||||||
|
assert inbox._extract_text(None, None) == "(empty A2A message)"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# message_from_activity
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_message_from_activity_canvas_user():
|
||||||
|
row = {
|
||||||
|
"id": "act-1",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": "ignored",
|
||||||
|
"request_body": {
|
||||||
|
"params": {"message": {"parts": [{"type": "text", "text": "hi"}]}}
|
||||||
|
},
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
}
|
||||||
|
msg = inbox.message_from_activity(row)
|
||||||
|
assert msg.activity_id == "act-1"
|
||||||
|
assert msg.text == "hi"
|
||||||
|
assert msg.peer_id == ""
|
||||||
|
assert msg.method == "message/send"
|
||||||
|
d = msg.to_dict()
|
||||||
|
assert d["kind"] == "canvas_user"
|
||||||
|
|
||||||
|
|
||||||
|
def test_message_from_activity_peer_agent():
|
||||||
|
row = {
|
||||||
|
"id": "act-2",
|
||||||
|
"source_id": "ws-peer-uuid",
|
||||||
|
"method": "tasks/send",
|
||||||
|
"summary": "delegate",
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "do task"}]},
|
||||||
|
"created_at": "2026-04-30T22:01:00Z",
|
||||||
|
}
|
||||||
|
msg = inbox.message_from_activity(row)
|
||||||
|
assert msg.peer_id == "ws-peer-uuid"
|
||||||
|
assert msg.to_dict()["kind"] == "peer_agent"
|
||||||
|
|
||||||
|
|
||||||
|
def test_message_from_activity_handles_string_request_body():
|
||||||
|
row = {
|
||||||
|
"id": "act-3",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": '{"parts": [{"type": "text", "text": "json string"}]}',
|
||||||
|
"created_at": "2026-04-30T22:02:00Z",
|
||||||
|
}
|
||||||
|
assert inbox.message_from_activity(row).text == "json string"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# InboxState — queue + wait/peek/pop semantics
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _msg(activity_id: str, text: str = "", peer_id: str = "") -> inbox.InboxMessage:
|
||||||
|
return inbox.InboxMessage(
|
||||||
|
activity_id=activity_id,
|
||||||
|
text=text or activity_id,
|
||||||
|
peer_id=peer_id,
|
||||||
|
method="message/send",
|
||||||
|
created_at="2026-04-30T22:00:00Z",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_then_peek(state: inbox.InboxState):
|
||||||
|
state.record(_msg("a"))
|
||||||
|
state.record(_msg("b"))
|
||||||
|
out = state.peek(limit=10)
|
||||||
|
assert [m.activity_id for m in out] == ["a", "b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_record_dedupes_by_activity_id(state: inbox.InboxState):
|
||||||
|
state.record(_msg("a"))
|
||||||
|
state.record(_msg("a")) # same id — must drop the second
|
||||||
|
assert len(state.peek(10)) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_pop_removes_specific_message(state: inbox.InboxState):
|
||||||
|
state.record(_msg("a"))
|
||||||
|
state.record(_msg("b"))
|
||||||
|
removed = state.pop("a")
|
||||||
|
assert removed is not None and removed.activity_id == "a"
|
||||||
|
remaining = state.peek(10)
|
||||||
|
assert [m.activity_id for m in remaining] == ["b"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_pop_missing_id_returns_none(state: inbox.InboxState):
|
||||||
|
state.record(_msg("a"))
|
||||||
|
# Bind the result before asserting so the call still runs under
|
||||||
|
# ``python -O`` (which strips bare assert statements).
|
||||||
|
result = state.pop("does-not-exist")
|
||||||
|
assert result is None
|
||||||
|
# Original message still present
|
||||||
|
assert len(state.peek(10)) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
|
||||||
|
state.record(_msg("a"))
|
||||||
|
start = time.monotonic()
|
||||||
|
msg = state.wait(timeout_secs=5.0)
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
assert msg is not None and msg.activity_id == "a"
|
||||||
|
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
|
||||||
|
def producer():
|
||||||
|
time.sleep(0.05)
|
||||||
|
state.record(_msg("late"))
|
||||||
|
|
||||||
|
threading.Thread(target=producer, daemon=True).start()
|
||||||
|
msg = state.wait(timeout_secs=2.0)
|
||||||
|
assert msg is not None and msg.activity_id == "late"
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_returns_none_on_timeout(state: inbox.InboxState):
|
||||||
|
msg = state.wait(timeout_secs=0.05)
|
||||||
|
assert msg is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_wait_does_not_pop(state: inbox.InboxState):
|
||||||
|
"""wait() is non-destructive — caller decides when to inbox_pop."""
|
||||||
|
state.record(_msg("a"))
|
||||||
|
state.wait(timeout_secs=1.0)
|
||||||
|
state.wait(timeout_secs=1.0)
|
||||||
|
assert len(state.peek(10)) == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Cursor persistence
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_cursor_returns_none_when_file_absent(state: inbox.InboxState):
|
||||||
|
assert state.load_cursor() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_then_load_cursor_round_trip(state: inbox.InboxState):
|
||||||
|
state.save_cursor("act-cursor-1")
|
||||||
|
# Reset the cached flag to force a re-read
|
||||||
|
state._cursor_loaded = False
|
||||||
|
state._cursor = None
|
||||||
|
assert state.load_cursor() == "act-cursor-1"
|
||||||
|
|
||||||
|
|
||||||
|
def test_save_cursor_creates_parent_directory(tmp_path: Path):
|
||||||
|
nested = tmp_path / "nested" / "configs" / ".mcp_inbox_cursor"
|
||||||
|
state = inbox.InboxState(cursor_path=nested)
|
||||||
|
state.save_cursor("act-x")
|
||||||
|
assert nested.read_text() == "act-x"
|
||||||
|
|
||||||
|
|
||||||
|
def test_reset_cursor_deletes_file(state: inbox.InboxState):
|
||||||
|
state.save_cursor("act-y")
|
||||||
|
assert state.cursor_path.is_file()
|
||||||
|
state.reset_cursor()
|
||||||
|
assert not state.cursor_path.is_file()
|
||||||
|
assert state.load_cursor() is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Module singleton
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_state_returns_none_before_activate():
|
||||||
|
assert inbox.get_state() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_activate_then_get_state(state: inbox.InboxState):
|
||||||
|
inbox.activate(state)
|
||||||
|
assert inbox.get_state() is state
|
||||||
|
|
||||||
|
|
||||||
|
def test_activate_idempotent(state: inbox.InboxState):
|
||||||
|
inbox.activate(state)
|
||||||
|
inbox.activate(state) # same state — no-op, no warning expected
|
||||||
|
assert inbox.get_state() is state
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _poll_once — HTTP behavior
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _make_response(status_code: int, json_body: Any = None, text: str = "") -> MagicMock:
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = status_code
|
||||||
|
if json_body is not None:
|
||||||
|
resp.json.return_value = json_body
|
||||||
|
else:
|
||||||
|
resp.json.side_effect = ValueError("no json")
|
||||||
|
resp.text = text
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
def _patch_httpx(returning: MagicMock):
|
||||||
|
"""Replace httpx.Client with a context-manager mock that returns
|
||||||
|
``returning`` from .get(). Captures the GET call args for assertion."""
|
||||||
|
client = MagicMock()
|
||||||
|
client.__enter__ = MagicMock(return_value=client)
|
||||||
|
client.__exit__ = MagicMock(return_value=False)
|
||||||
|
client.get = MagicMock(return_value=returning)
|
||||||
|
return patch("httpx.Client", return_value=client), client
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_fresh_start_uses_since_secs(state: inbox.InboxState):
|
||||||
|
resp = _make_response(200, [])
|
||||||
|
p, client = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
assert n == 0
|
||||||
|
_, kwargs = client.get.call_args
|
||||||
|
assert kwargs["params"]["type"] == "a2a_receive"
|
||||||
|
assert "since_secs" in kwargs["params"]
|
||||||
|
assert "since_id" not in kwargs["params"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_with_cursor_uses_since_id(state: inbox.InboxState):
|
||||||
|
state.save_cursor("act-existing")
|
||||||
|
resp = _make_response(200, [])
|
||||||
|
p, client = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
_, kwargs = client.get.call_args
|
||||||
|
assert kwargs["params"]["since_id"] == "act-existing"
|
||||||
|
assert "since_secs" not in kwargs["params"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_410_resets_cursor(state: inbox.InboxState):
|
||||||
|
state.save_cursor("act-stale")
|
||||||
|
resp = _make_response(410, text="cursor pruned")
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
assert state.load_cursor() is None
|
||||||
|
assert not state.cursor_path.is_file()
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_records_messages_and_advances_cursor(state: inbox.InboxState):
|
||||||
|
state.save_cursor("act-old")
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"id": "act-1",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "first"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "act-2",
|
||||||
|
"source_id": "ws-peer",
|
||||||
|
"method": "tasks/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "second"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:01Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
resp = _make_response(200, rows)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
assert n == 2
|
||||||
|
queue = state.peek(10)
|
||||||
|
assert [m.activity_id for m in queue] == ["act-1", "act-2"]
|
||||||
|
assert state.load_cursor() == "act-2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_500_does_not_raise(state: inbox.InboxState):
|
||||||
|
resp = _make_response(500, text="boom")
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
assert n == 0
|
||||||
|
# Cursor untouched
|
||||||
|
assert state.load_cursor() is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_handles_non_list_payload(state: inbox.InboxState):
|
||||||
|
resp = _make_response(200, {"error": "unexpected"})
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
assert n == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxState):
|
||||||
|
"""When no cursor is set, /activity returns DESC; the poller must
|
||||||
|
reverse so the saved cursor is the freshest row + record order
|
||||||
|
is chronological."""
|
||||||
|
rows_desc = [
|
||||||
|
{
|
||||||
|
"id": "act-newest",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "newest"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:02Z",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "act-oldest",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "oldest"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
resp = _make_response(200, rows_desc)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
queue = state.peek(10)
|
||||||
|
assert [m.activity_id for m in queue] == ["act-oldest", "act-newest"]
|
||||||
|
# Cursor is the newest row, so the next poll picks up only what's
|
||||||
|
# newer — re-restoring forward chronological progression.
|
||||||
|
assert state.load_cursor() == "act-newest"
|
||||||
|
|
||||||
|
|
||||||
|
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
|
||||||
|
"""Daemon flag is required so the poller dies with the parent
|
||||||
|
process; a non-daemon poller would leak across `claude` restarts
|
||||||
|
and write to a stale workspace."""
|
||||||
|
resp = _make_response(200, [])
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p, patch("platform_auth.auth_headers", return_value={}):
|
||||||
|
# Use a very short interval so the loop body runs at least once
|
||||||
|
# before we exit the test.
|
||||||
|
t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01)
|
||||||
|
time.sleep(0.05)
|
||||||
|
assert t.daemon is True
|
||||||
|
assert t.is_alive()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# default_cursor_path respects CONFIGS_DIR
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
|
||||||
|
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||||
|
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_cursor_path_falls_back_to_default(monkeypatch):
|
||||||
|
monkeypatch.delenv("CONFIGS_DIR", raising=False)
|
||||||
|
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
|
||||||
Loading…
Reference in New Issue
Block a user