forked from molecule-ai/molecule-core
feat(workspace-runtime): add inbox polling for standalone molecule-mcp path
The universal MCP server (a2a_mcp_server.py) was outbound-only — agents
in standalone runtimes (Claude Code, hermes, codex, etc.) could
delegate, list peers, and write memories, but never observed the
canvas-user or peer-agent messages addressed to them. This blocked
"constantly responding" loops without forcing operators back onto a
runtime-specific channel plugin.
This PR closes the inbound gap with a poller-fed in-memory queue and
three new MCP tools:
- wait_for_message(timeout_secs?) — block until next message arrives
- inbox_peek(limit?) — list pending messages (non-destructive)
- inbox_pop(activity_id) — drop a handled message
A daemon thread polls /workspaces/:id/activity?type=a2a_receive every
5s, fills the queue from the cursor (since_id), and persists the cursor
to ${CONFIGS_DIR}/.mcp_inbox_cursor so a restart doesn't replay backlog.
On 410 (cursor pruned) we fall back to since_secs=600 for a bounded
recovery window. Activity-row → InboxMessage extraction mirrors the
molecule-mcp-claude-channel plugin's extractText (envelope shapes #1-3
+ summary fallback).
mcp_cli.main starts the poller alongside the existing register +
heartbeat threads. In-container runtimes (which have push delivery via
canvas WebSocket) skip activation, so inbox tools return an
informational "(inbox not enabled)" message instead of double-delivery.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
d00c8be8c9
commit
b47d4ceb00
@ -64,6 +64,7 @@ TOP_LEVEL_MODULES = {
|
||||
"events",
|
||||
"executor_helpers",
|
||||
"heartbeat",
|
||||
"inbox",
|
||||
"initial_prompt",
|
||||
"internal_chat_uploads",
|
||||
"internal_file_read",
|
||||
|
||||
@ -43,6 +43,21 @@ def smoke_imports_and_invariants() -> None:
|
||||
assert callable(cli_main), "a2a_mcp_server.cli_main must be callable"
|
||||
assert callable(mcp_cli_main), "mcp_cli.main must be callable"
|
||||
|
||||
# inbox.activate / get_state / start_poller_thread form the inbound
|
||||
# delivery path for the standalone molecule-mcp wrapper. mcp_cli.main
|
||||
# imports + activates these at startup; if a wheel ships without
|
||||
# them, the standalone agent silently loses the wait_for_message /
|
||||
# inbox_peek / inbox_pop tools and reverts to outbound-only.
|
||||
from molecule_runtime.inbox import ( # noqa: F401
|
||||
InboxState,
|
||||
activate as inbox_activate,
|
||||
get_state as inbox_get_state,
|
||||
start_poller_thread as inbox_start_poller_thread,
|
||||
)
|
||||
assert callable(inbox_activate), "inbox.activate must be callable"
|
||||
assert callable(inbox_get_state), "inbox.get_state must be callable"
|
||||
assert callable(inbox_start_poller_thread), "inbox.start_poller_thread must be callable"
|
||||
|
||||
assert a2a_client._A2A_ERROR_PREFIX, "a2a_client missing error sentinel"
|
||||
assert callable(get_adapter), "adapters.get_adapter must be callable"
|
||||
assert hasattr(BaseAdapter, "name"), "BaseAdapter interface broken"
|
||||
|
||||
@ -23,9 +23,12 @@ from a2a_tools import (
|
||||
tool_delegate_task,
|
||||
tool_delegate_task_async,
|
||||
tool_get_workspace_info,
|
||||
tool_inbox_peek,
|
||||
tool_inbox_pop,
|
||||
tool_list_peers,
|
||||
tool_recall_memory,
|
||||
tool_send_message_to_user,
|
||||
tool_wait_for_message,
|
||||
)
|
||||
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
|
||||
|
||||
@ -112,6 +115,18 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
arguments.get("query", ""),
|
||||
arguments.get("scope", ""),
|
||||
)
|
||||
elif name == "wait_for_message":
|
||||
return await tool_wait_for_message(
|
||||
arguments.get("timeout_secs", 60.0),
|
||||
)
|
||||
elif name == "inbox_peek":
|
||||
return await tool_inbox_peek(
|
||||
arguments.get("limit", 10),
|
||||
)
|
||||
elif name == "inbox_pop":
|
||||
return await tool_inbox_pop(
|
||||
arguments.get("activity_id", ""),
|
||||
)
|
||||
return f"Unknown tool: {name}"
|
||||
|
||||
|
||||
|
||||
@ -526,3 +526,84 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
|
||||
return json.dumps(data)
|
||||
except Exception as e:
|
||||
return f"Error recalling memory: {e}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The InboxState singleton is set by mcp_cli before the MCP server starts
|
||||
# (see workspace/inbox.py for the rationale). In-container runtimes never
|
||||
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
|
||||
# these tools surface an informational error rather than raising.
|
||||
#
|
||||
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
|
||||
# in standalone-runtime mode should call ``wait_for_message`` to block
|
||||
# on the next inbound message after they've emitted a reply, forming
|
||||
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
|
||||
# the queue without consuming; ``inbox_pop`` removes a handled message.
|
||||
|
||||
_INBOX_NOT_ENABLED_MSG = (
|
||||
"Error: inbox polling is not enabled in this runtime. The standalone "
|
||||
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
||||
"messages via push delivery and do not need these tools."
|
||||
)
|
||||
|
||||
|
||||
async def tool_inbox_peek(limit: int = 10) -> str:
|
||||
"""Return up to ``limit`` pending inbound messages without removing them."""
|
||||
import inbox # local import — avoids a circular dep at module load
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
||||
return json.dumps([m.to_dict() for m in messages])
|
||||
|
||||
|
||||
async def tool_inbox_pop(activity_id: str) -> str:
|
||||
"""Remove a message from the inbox queue by activity_id."""
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
if not isinstance(activity_id, str) or not activity_id:
|
||||
return "Error: activity_id is required."
|
||||
removed = state.pop(activity_id)
|
||||
if removed is None:
|
||||
return json.dumps({"removed": False, "activity_id": activity_id})
|
||||
return json.dumps({"removed": True, "activity_id": activity_id})
|
||||
|
||||
|
||||
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
||||
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
||||
|
||||
Returns the head message non-destructively; the agent decides
|
||||
whether to ``inbox_pop`` it after acting.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
|
||||
try:
|
||||
timeout = float(timeout_secs)
|
||||
except (TypeError, ValueError):
|
||||
timeout = 60.0
|
||||
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
||||
# blocking longer than 5min wastes the prompt cache window for
|
||||
# nothing useful. Operators who want longer can call repeatedly.
|
||||
timeout = max(0.0, min(timeout, 300.0))
|
||||
|
||||
# The threading.Event-based wait would block the asyncio loop.
|
||||
# Run it on the default executor so the MCP server can keep
|
||||
# processing other JSON-RPC requests while we sleep.
|
||||
loop = asyncio.get_running_loop()
|
||||
message = await loop.run_in_executor(None, state.wait, timeout)
|
||||
if message is None:
|
||||
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
||||
return json.dumps(message.to_dict())
|
||||
|
||||
@ -334,6 +334,14 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
|
||||
# grows a `say` or `message` subcommand, change `None` to that
|
||||
# keyword and the alignment test will start passing.
|
||||
"send_message_to_user": None,
|
||||
# Inbox tools live in the standalone molecule-mcp wrapper only;
|
||||
# CLI-subprocess runtimes have their own delivery loop and never
|
||||
# invoke these. The alignment test allows None entries — they
|
||||
# appear in registry.TOOLS for adapter consistency without
|
||||
# forcing a CLI subcommand.
|
||||
"wait_for_message": None,
|
||||
"inbox_peek": None,
|
||||
"inbox_pop": None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
480
workspace/inbox.py
Normal file
480
workspace/inbox.py
Normal file
@ -0,0 +1,480 @@
|
||||
"""In-memory inbox + background poller for the standalone molecule-mcp path.
|
||||
|
||||
Purpose
|
||||
-------
|
||||
The universal MCP server (a2a_mcp_server.py) is OUTBOUND-ONLY by default —
|
||||
it gives an MCP-aware agent the same A2A delegation, peer-discovery, and
|
||||
memory tools that container-bound runtimes already have. There is no
|
||||
inbound delivery path: when the canvas user types a message or a peer
|
||||
sends an A2A request, the activity lands on the platform but the
|
||||
standalone agent never sees it.
|
||||
|
||||
This module closes that gap WITHOUT requiring a tunnel or a public agent
|
||||
URL. A daemon thread polls ``/workspaces/:id/activity?type=a2a_receive``
|
||||
on the platform and stages new rows in an in-memory deque. Three new MCP
|
||||
tools (``inbox_peek``, ``inbox_pop``, ``wait_for_message``) let the
|
||||
agent observe the queue.
|
||||
|
||||
Why a poller (not push)
|
||||
-----------------------
|
||||
runtime=external workspaces have ``delivery_mode="poll"`` — the platform
|
||||
records inbound A2A in ``activity_logs`` but does not call back to the
|
||||
agent. A poller is the only inbound surface that works without the
|
||||
operator exposing a public URL through a tunnel. 5s cadence matches
|
||||
the molecule-mcp-claude-channel plugin's POLL_INTERVAL — it's already
|
||||
proven on staging for the channel-based delivery path.
|
||||
|
||||
Cursor model
|
||||
------------
|
||||
``activity_logs.id`` is the cursor (server-assigned, monotonic). We
|
||||
persist it to ``${CONFIGS_DIR}/.mcp_inbox_cursor`` so an agent restart
|
||||
doesn't replay the last 10 minutes of inbound traffic and re-act on
|
||||
already-handled messages. On 410 (cursor pruned) we drop back to
|
||||
``since_secs=600`` for a bounded backlog and let the cursor advance
|
||||
naturally from there.
|
||||
|
||||
Scope
|
||||
-----
|
||||
Standalone molecule-mcp ONLY. The in-container runtime has its own
|
||||
push delivery (main.py + canvas WebSocket); we never want both
|
||||
running at once or a single message would be delivered twice. The
|
||||
caller (mcp_cli.main) gates activation explicitly via
|
||||
``activate(state)``; in-container code that imports this module by
|
||||
accident gets a no-op until activate is called.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Poll cadence. 5s mirrors the molecule-mcp-claude-channel plugin's
|
||||
# proven default — fast enough that a canvas user typing "are you
|
||||
# there?" gets picked up before they refresh, slow enough that 12
|
||||
# requests/min won't trip rate limits or wake mobile devices.
|
||||
POLL_INTERVAL_SECONDS = 5.0
|
||||
|
||||
# Initial backlog window for the first poll AND the recovery path
|
||||
# after a stale-cursor 410. 10 minutes is enough to cover a brief
|
||||
# crash/restart without flooding a long-idle workspace with hours of
|
||||
# stale chat.
|
||||
INITIAL_BACKLOG_SECONDS = 600
|
||||
|
||||
# Hard cap on the in-memory deque. The poller is bounded by the
|
||||
# server's per-page limit (default 100) and the agent typically pops
|
||||
# faster than the operator types, so an idle workspace shouldn't
|
||||
# exceed a handful. The cap protects against runaway growth if the
|
||||
# agent process stops calling pop.
|
||||
MAX_QUEUED_MESSAGES = 200
|
||||
|
||||
|
||||
@dataclass
|
||||
class InboxMessage:
|
||||
"""One inbound A2A message staged for the agent.
|
||||
|
||||
Mirrors the shape the agent sees via inbox_peek / wait_for_message.
|
||||
Fields are derived from the activity_logs row by ``_from_activity``.
|
||||
"""
|
||||
|
||||
activity_id: str
|
||||
text: str
|
||||
peer_id: str # empty string = canvas user; non-empty = peer workspace_id
|
||||
method: str # JSON-RPC method ("message/send", "tasks/send", etc.)
|
||||
created_at: str # RFC3339 timestamp from the activity row
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"activity_id": self.activity_id,
|
||||
"text": self.text,
|
||||
"peer_id": self.peer_id,
|
||||
"kind": "peer_agent" if self.peer_id else "canvas_user",
|
||||
"method": self.method,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class InboxState:
|
||||
"""Thread-safe queue of pending inbound messages.
|
||||
|
||||
Producer: the poller thread, calling ``record(message)``.
|
||||
Consumers: the MCP tool handlers, calling ``peek``, ``pop``,
|
||||
or ``wait``. Synchronization is via a single ``threading.Lock``
|
||||
(cheap — every operation is O(n) over a small deque) plus an
|
||||
``Event`` that wakes ``wait`` callers when a new message lands.
|
||||
"""
|
||||
|
||||
cursor_path: Path
|
||||
"""File path that persists ``activity_logs.id`` of the most
|
||||
recently observed row, so a restart doesn't replay backlog."""
|
||||
|
||||
_queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES))
|
||||
_lock: threading.Lock = field(default_factory=threading.Lock)
|
||||
_arrival: threading.Event = field(default_factory=threading.Event)
|
||||
_cursor: str | None = None
|
||||
_cursor_loaded: bool = False
|
||||
|
||||
def load_cursor(self) -> str | None:
|
||||
"""Read the persisted cursor from disk. Cached after first call.
|
||||
|
||||
Missing/unreadable file → None (poller will fall back to the
|
||||
initial-backlog window). We never raise: a corrupt cursor is
|
||||
less bad than the inbox refusing to start.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._cursor_loaded:
|
||||
return self._cursor
|
||||
try:
|
||||
if self.cursor_path.is_file():
|
||||
self._cursor = self.cursor_path.read_text().strip() or None
|
||||
except OSError as exc:
|
||||
logger.warning("inbox: failed to read cursor %s: %s", self.cursor_path, exc)
|
||||
self._cursor = None
|
||||
self._cursor_loaded = True
|
||||
return self._cursor
|
||||
|
||||
def save_cursor(self, activity_id: str) -> None:
|
||||
"""Persist the cursor. Best-effort — log + continue on failure.
|
||||
|
||||
Loss of the cursor on a write failure means an extra page of
|
||||
backlog after restart, never a stuck poller. Silent-fail
|
||||
would mask a permission misconfiguration on the operator's
|
||||
configs dir; warn loudly so they can fix it.
|
||||
"""
|
||||
with self._lock:
|
||||
self._cursor = activity_id
|
||||
self._cursor_loaded = True
|
||||
try:
|
||||
self.cursor_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = self.cursor_path.with_suffix(self.cursor_path.suffix + ".tmp")
|
||||
tmp.write_text(activity_id)
|
||||
tmp.replace(self.cursor_path)
|
||||
except OSError as exc:
|
||||
logger.warning("inbox: failed to persist cursor to %s: %s", self.cursor_path, exc)
|
||||
|
||||
def reset_cursor(self) -> None:
|
||||
"""Forget the cursor. Used after a 410 from the activity API."""
|
||||
with self._lock:
|
||||
self._cursor = None
|
||||
self._cursor_loaded = True
|
||||
try:
|
||||
if self.cursor_path.is_file():
|
||||
self.cursor_path.unlink()
|
||||
except OSError as exc:
|
||||
logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc)
|
||||
|
||||
def record(self, message: InboxMessage) -> None:
|
||||
"""Append a message and wake any waiter.
|
||||
|
||||
Skips a row whose activity_id we've already queued — defensive
|
||||
against the poller racing with the consumer + cursor save.
|
||||
"""
|
||||
with self._lock:
|
||||
for existing in self._queue:
|
||||
if existing.activity_id == message.activity_id:
|
||||
return
|
||||
self._queue.append(message)
|
||||
self._arrival.set()
|
||||
|
||||
def peek(self, limit: int = 10) -> list[InboxMessage]:
|
||||
"""Return up to ``limit`` pending messages without removing them."""
|
||||
if limit <= 0:
|
||||
limit = 10
|
||||
with self._lock:
|
||||
return list(self._queue)[:limit]
|
||||
|
||||
def pop(self, activity_id: str) -> InboxMessage | None:
|
||||
"""Remove a specific message. Idempotent; returns None if absent.
|
||||
|
||||
We require the caller to specify which message it handled
|
||||
rather than auto-popping the head — preserves observability
|
||||
when the agent reads several but only handles one.
|
||||
"""
|
||||
with self._lock:
|
||||
for existing in list(self._queue):
|
||||
if existing.activity_id == activity_id:
|
||||
self._queue.remove(existing)
|
||||
if not self._queue:
|
||||
self._arrival.clear()
|
||||
return existing
|
||||
return None
|
||||
|
||||
def wait(self, timeout_secs: float) -> InboxMessage | None:
|
||||
"""Block until a message is available or timeout elapses.
|
||||
|
||||
Returns the head message WITHOUT popping; the caller decides
|
||||
whether to pop after acting on it. Same shape as Python's
|
||||
Queue.get with timeout, but non-destructive so a peek-style
|
||||
agent can still inspect with peek/pop.
|
||||
"""
|
||||
# Fast path: queue already has something.
|
||||
with self._lock:
|
||||
if self._queue:
|
||||
return self._queue[0]
|
||||
self._arrival.clear()
|
||||
|
||||
triggered = self._arrival.wait(timeout=max(0.0, timeout_secs))
|
||||
if not triggered:
|
||||
return None
|
||||
with self._lock:
|
||||
return self._queue[0] if self._queue else None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module singleton — set by mcp_cli before MCP server starts.
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# In-container callers don't activate; the inbox tools detect the
|
||||
# unset singleton and return an informational error rather than
|
||||
# breaking the dispatch path.
|
||||
|
||||
_STATE: InboxState | None = None
|
||||
|
||||
|
||||
def activate(state: InboxState) -> None:
|
||||
"""Register an InboxState as the singleton this module exposes.
|
||||
|
||||
Idempotent within a process: re-activating with the same state is
|
||||
a no-op; activating with a DIFFERENT state replaces the singleton
|
||||
+ logs at WARNING (the only legitimate caller is mcp_cli at
|
||||
startup; double-activate usually means a test/runtime mix-up).
|
||||
"""
|
||||
global _STATE
|
||||
if _STATE is state:
|
||||
return
|
||||
if _STATE is not None:
|
||||
logger.warning("inbox: replacing existing singleton state")
|
||||
_STATE = state
|
||||
|
||||
|
||||
def get_state() -> InboxState | None:
|
||||
"""Return the active InboxState, or None if the runtime never activated.
|
||||
|
||||
Tool implementations call this and surface a clear "(inbox not
|
||||
enabled)" message to the agent when None — keeps the in-container
|
||||
path's tool dispatch from raising on an inbox-tool call that the
|
||||
agent shouldn't have made anyway.
|
||||
"""
|
||||
return _STATE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Activity → InboxMessage adapter
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The platform's a2a_proxy logs request_body as the JSON-RPC envelope
|
||||
# it forwarded to the workspace. Three shapes have been observed in
|
||||
# the wild (verified against workspace-server's logA2ASuccess in
|
||||
# a2a_proxy_helpers.go on 2026-04-29) — handle all three before
|
||||
# falling back to summary so a peer message at least surfaces SOMETHING.
|
||||
|
||||
|
||||
def _extract_text(request_body: Any, summary: str | None) -> str:
|
||||
"""Pull the human-readable text out of an A2A activity row.
|
||||
|
||||
Mirrors molecule-mcp-claude-channel/server.ts:445 (extractText) so
|
||||
canvas-user messages and peer-agent messages render identically
|
||||
across both inbound channels.
|
||||
"""
|
||||
if not isinstance(request_body, dict):
|
||||
return summary or "(empty A2A message)"
|
||||
|
||||
candidates: list[Any] = []
|
||||
params = request_body.get("params") if isinstance(request_body.get("params"), dict) else None
|
||||
if params:
|
||||
message = params.get("message") if isinstance(params.get("message"), dict) else None
|
||||
if message:
|
||||
candidates.append(message.get("parts"))
|
||||
candidates.append(params.get("parts"))
|
||||
candidates.append(request_body.get("parts"))
|
||||
|
||||
# The A2A protocol's part discriminator field varies between SDK
|
||||
# versions: a2a-sdk v0 uses ``type``, v1 uses ``kind``. The platform's
|
||||
# activity_logs preserves whichever the original sender used, so we
|
||||
# accept either. Verified live against a hosted SaaS workspace on
|
||||
# 2026-04-30 — every canvas-user message arrived with ``kind`` and
|
||||
# the type-only filter was silently falling through to summary.
|
||||
for parts in candidates:
|
||||
if isinstance(parts, list):
|
||||
text = "".join(
|
||||
p.get("text", "")
|
||||
for p in parts
|
||||
if isinstance(p, dict)
|
||||
and (p.get("kind") == "text" or p.get("type") == "text")
|
||||
)
|
||||
if text:
|
||||
return text
|
||||
return summary or "(empty A2A message)"
|
||||
|
||||
|
||||
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
||||
"""Convert one /activity row into an InboxMessage."""
|
||||
request_body = row.get("request_body")
|
||||
if isinstance(request_body, str):
|
||||
# The Go handler returns request_body as json.RawMessage; httpx
|
||||
# deserializes that to a dict already. But some legacy paths or
|
||||
# mocked servers may return it as a string — handle defensively.
|
||||
try:
|
||||
request_body = json.loads(request_body)
|
||||
except (TypeError, ValueError):
|
||||
request_body = None
|
||||
|
||||
return InboxMessage(
|
||||
activity_id=str(row.get("id", "")),
|
||||
text=_extract_text(request_body, row.get("summary")),
|
||||
peer_id=row.get("source_id") or "",
|
||||
method=row.get("method") or "",
|
||||
created_at=str(row.get("created_at", "")),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Poller — daemon thread that fills the queue from the activity API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _poll_once(
|
||||
state: InboxState,
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
headers: dict[str, str],
|
||||
timeout_secs: float = 10.0,
|
||||
) -> int:
|
||||
"""One poll iteration. Returns number of new messages enqueued.
|
||||
|
||||
Idempotent and stateless apart from the InboxState passed in —
|
||||
safe to call from tests with a stub state + a real httpx mock.
|
||||
"""
|
||||
import httpx
|
||||
|
||||
url = f"{platform_url}/workspaces/{workspace_id}/activity"
|
||||
params: dict[str, str] = {"type": "a2a_receive"}
|
||||
cursor = state.load_cursor()
|
||||
if cursor:
|
||||
params["since_id"] = cursor
|
||||
else:
|
||||
params["since_secs"] = str(INITIAL_BACKLOG_SECONDS)
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=timeout_secs) as client:
|
||||
resp = client.get(url, params=params, headers=headers)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("inbox poller: GET /activity failed: %s", exc)
|
||||
return 0
|
||||
|
||||
if resp.status_code == 410:
|
||||
# Cursor pruned — drop back to the backlog window. The next
|
||||
# poll picks up wherever the activity API has rows now.
|
||||
logger.info(
|
||||
"inbox poller: cursor %s expired (410); resetting to since_secs=%d",
|
||||
cursor,
|
||||
INITIAL_BACKLOG_SECONDS,
|
||||
)
|
||||
state.reset_cursor()
|
||||
return 0
|
||||
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
"inbox poller: HTTP %d from /activity: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
return 0
|
||||
|
||||
try:
|
||||
rows = resp.json()
|
||||
except ValueError as exc:
|
||||
logger.warning("inbox poller: non-JSON response: %s", exc)
|
||||
return 0
|
||||
if not isinstance(rows, list):
|
||||
return 0
|
||||
|
||||
# since_id mode returns ASC (oldest first). since_secs mode returns
|
||||
# DESC; reverse so we record in chronological order and the cursor
|
||||
# we save is the freshest row.
|
||||
if cursor is None:
|
||||
rows = list(reversed(rows))
|
||||
|
||||
new_count = 0
|
||||
last_id: str | None = None
|
||||
for row in rows:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
message = message_from_activity(row)
|
||||
if not message.activity_id:
|
||||
continue
|
||||
state.record(message)
|
||||
last_id = message.activity_id
|
||||
new_count += 1
|
||||
|
||||
if last_id is not None:
|
||||
state.save_cursor(last_id)
|
||||
return new_count
|
||||
|
||||
|
||||
def _poll_loop(
|
||||
state: InboxState,
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
interval: float = POLL_INTERVAL_SECONDS,
|
||||
stop_event: threading.Event | None = None,
|
||||
) -> None:
|
||||
"""Daemon-thread body: poll forever until stop_event fires.
|
||||
|
||||
auth_headers() is rebuilt every iteration so a token rotation via
|
||||
env var or .auth_token file is picked up without a restart. Cheap
|
||||
(a dict + an env read).
|
||||
"""
|
||||
from platform_auth import auth_headers
|
||||
|
||||
while True:
|
||||
try:
|
||||
_poll_once(state, platform_url, workspace_id, auth_headers())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("inbox poller: iteration crashed: %s", exc)
|
||||
if stop_event is not None and stop_event.wait(interval):
|
||||
return
|
||||
if stop_event is None:
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def start_poller_thread(
|
||||
state: InboxState,
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
interval: float = POLL_INTERVAL_SECONDS,
|
||||
) -> threading.Thread:
|
||||
"""Spawn the poller as a daemon thread. Returns the Thread handle.
|
||||
|
||||
daemon=True so the poller dies with the main process — same
|
||||
rationale as mcp_cli's heartbeat thread (no leaks, no stale
|
||||
workspace writes after the operator hits Ctrl-C).
|
||||
"""
|
||||
t = threading.Thread(
|
||||
target=_poll_loop,
|
||||
args=(state, platform_url, workspace_id, interval),
|
||||
name="molecule-mcp-inbox-poller",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
|
||||
def default_cursor_path() -> Path:
|
||||
"""Standard cursor location: ``${CONFIGS_DIR}/.mcp_inbox_cursor``.
|
||||
|
||||
Mirrors mcp_cli's CONFIGS_DIR resolution so a single
|
||||
operator-facing env var controls every persisted state file
|
||||
(.auth_token + .mcp_inbox_cursor).
|
||||
"""
|
||||
configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs"))
|
||||
return configs_dir / ".mcp_inbox_cursor"
|
||||
@ -273,6 +273,19 @@ def main() -> None:
|
||||
_platform_register(platform_url, workspace_id, token)
|
||||
_start_heartbeat_thread(platform_url, workspace_id, token)
|
||||
|
||||
# Inbox poller — the inbound side of the standalone path. Without
|
||||
# this thread, the universal MCP server is OUTBOUND-ONLY: an agent
|
||||
# can call delegate_task / send_message_to_user but never observe
|
||||
# canvas-user or peer-agent messages. The poller fills an in-memory
|
||||
# queue from the platform's /activity?type=a2a_receive endpoint;
|
||||
# the agent reads via wait_for_message / inbox_peek / inbox_pop.
|
||||
#
|
||||
# Same disable pattern as heartbeat: in-container callers (with
|
||||
# push delivery via canvas WebSocket) skip this to avoid duplicate
|
||||
# delivery; tests use the env to keep imports cheap.
|
||||
if not os.environ.get("MOLECULE_MCP_DISABLE_INBOX", "").strip():
|
||||
_start_inbox_poller(platform_url, workspace_id)
|
||||
|
||||
# Env is valid — safe to import the heavy module now. Importing
|
||||
# earlier would trigger a2a_client.py:22's module-level RuntimeError
|
||||
# before our friendly help reaches the user.
|
||||
@ -280,6 +293,28 @@ def main() -> None:
|
||||
cli_main()
|
||||
|
||||
|
||||
def _start_inbox_poller(platform_url: str, workspace_id: str) -> None:
|
||||
"""Activate the inbox singleton + spawn the poller daemon thread.
|
||||
|
||||
Done lazily here (not at module import) because importing inbox
|
||||
pulls in platform_auth, which only resolves cleanly AFTER env
|
||||
validation succeeds. Activation is idempotent within a process,
|
||||
so a stray double-call (e.g. test harness re-entering main) is
|
||||
harmless.
|
||||
|
||||
The poller thread is daemon=True — dies with the main process.
|
||||
"""
|
||||
try:
|
||||
import inbox
|
||||
except ImportError as exc:
|
||||
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
|
||||
return
|
||||
|
||||
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
|
||||
inbox.activate(state)
|
||||
inbox.start_poller_thread(state, platform_url, workspace_id)
|
||||
|
||||
|
||||
def _read_token_file() -> str:
|
||||
"""Read the token from ${CONFIGS_DIR}/.auth_token if present.
|
||||
|
||||
|
||||
@ -56,9 +56,12 @@ from a2a_tools import (
|
||||
tool_delegate_task,
|
||||
tool_delegate_task_async,
|
||||
tool_get_workspace_info,
|
||||
tool_inbox_peek,
|
||||
tool_inbox_pop,
|
||||
tool_list_peers,
|
||||
tool_recall_memory,
|
||||
tool_send_message_to_user,
|
||||
tool_wait_for_message,
|
||||
)
|
||||
|
||||
# Section name maps to the heading in the agent-facing system prompt.
|
||||
@ -299,6 +302,94 @@ _SEND_MESSAGE_TO_USER = ToolSpec(
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Inbox — inbound delivery for the standalone molecule-mcp path.
|
||||
#
|
||||
# These tools observe a poller-fed in-memory queue (see workspace/inbox.py).
|
||||
# They are universally registered so docs + adapters stay aligned, but
|
||||
# they only return real data in the standalone molecule-mcp runtime;
|
||||
# in-container runtimes return an informational "not enabled" message
|
||||
# because their delivery loop is push-based via the canvas WebSocket.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_WAIT_FOR_MESSAGE = ToolSpec(
|
||||
name="wait_for_message",
|
||||
short=(
|
||||
"Block until the next inbound message (canvas user OR peer "
|
||||
"agent) arrives, or until ``timeout_secs`` elapses."
|
||||
),
|
||||
when_to_use=(
|
||||
"Standalone-runtime ONLY (molecule-mcp wrapper). After "
|
||||
"you reply, call this to wait for the next message — forms "
|
||||
"the loop ``wait_for_message → respond → wait_for_message``. "
|
||||
"Returns the head message non-destructively; call inbox_pop "
|
||||
"with the activity_id once you've handled it. In-container "
|
||||
"runtimes receive messages via push and should not call this."
|
||||
),
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"timeout_secs": {
|
||||
"type": "number",
|
||||
"description": (
|
||||
"Max seconds to block. Capped at 300. "
|
||||
"Default 60."
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
impl=tool_wait_for_message,
|
||||
section=A2A_SECTION,
|
||||
)
|
||||
|
||||
_INBOX_PEEK = ToolSpec(
|
||||
name="inbox_peek",
|
||||
short="List pending inbound messages without removing them.",
|
||||
when_to_use=(
|
||||
"Standalone-runtime ONLY. Use to inspect what's queued "
|
||||
"before deciding which to handle. Non-destructive — pair "
|
||||
"with inbox_pop to consume after replying."
|
||||
),
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"limit": {
|
||||
"type": "integer",
|
||||
"description": "Max messages to return. Default 10.",
|
||||
},
|
||||
},
|
||||
},
|
||||
impl=tool_inbox_peek,
|
||||
section=A2A_SECTION,
|
||||
)
|
||||
|
||||
_INBOX_POP = ToolSpec(
|
||||
name="inbox_pop",
|
||||
short="Remove a handled message from the inbox queue by activity_id.",
|
||||
when_to_use=(
|
||||
"Standalone-runtime ONLY. Call after you've replied to a "
|
||||
"message returned from wait_for_message or inbox_peek to "
|
||||
"drop it from the queue. Idempotent — popping a missing "
|
||||
"id reports removed=false without erroring."
|
||||
),
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"activity_id": {
|
||||
"type": "string",
|
||||
"description": (
|
||||
"activity_id of the message to remove (from "
|
||||
"inbox_peek / wait_for_message output)."
|
||||
),
|
||||
},
|
||||
},
|
||||
"required": ["activity_id"],
|
||||
},
|
||||
impl=tool_inbox_pop,
|
||||
section=A2A_SECTION,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HMA — hierarchical persistent memory
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -374,6 +465,10 @@ TOOLS: list[ToolSpec] = [
|
||||
_LIST_PEERS,
|
||||
_GET_WORKSPACE_INFO,
|
||||
_SEND_MESSAGE_TO_USER,
|
||||
# Inbox (standalone-only; in-container returns informational error)
|
||||
_WAIT_FOR_MESSAGE,
|
||||
_INBOX_PEEK,
|
||||
_INBOX_POP,
|
||||
# HMA
|
||||
_COMMIT_MEMORY,
|
||||
_RECALL_MEMORY,
|
||||
|
||||
@ -6,6 +6,9 @@
|
||||
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
|
||||
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
|
||||
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
|
||||
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
|
||||
- **inbox_peek**: List pending inbound messages without removing them.
|
||||
- **inbox_pop**: Remove a handled message from the inbox queue by activity_id.
|
||||
|
||||
### delegate_task
|
||||
Use for QUICK questions and small sub-tasks where you can afford to wait inline. Returns the peer's response text directly. For longer-running work (research, multi-minute jobs) use delegate_task_async + check_task_status instead so you don't hold this workspace busy waiting.
|
||||
@ -25,4 +28,13 @@ Use to introspect your own identity (e.g. before reporting back to the user, or
|
||||
### send_message_to_user
|
||||
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
|
||||
|
||||
### wait_for_message
|
||||
Standalone-runtime ONLY (molecule-mcp wrapper). After you reply, call this to wait for the next message — forms the loop ``wait_for_message → respond → wait_for_message``. Returns the head message non-destructively; call inbox_pop with the activity_id once you've handled it. In-container runtimes receive messages via push and should not call this.
|
||||
|
||||
### inbox_peek
|
||||
Standalone-runtime ONLY. Use to inspect what's queued before deciding which to handle. Non-destructive — pair with inbox_pop to consume after replying.
|
||||
|
||||
### inbox_pop
|
||||
Standalone-runtime ONLY. Call after you've replied to a message returned from wait_for_message or inbox_peek to drop it from the queue. Idempotent — popping a missing id reports removed=false without erroring.
|
||||
|
||||
Always use list_peers first to discover available workspace IDs. Access control is enforced — you can only reach siblings and parent/children. If a delegation returns a DELEGATION FAILED message, do NOT forward the raw error to the user. Instead: (1) try a different peer, (2) handle the task yourself, or (3) tell the user which peer is unavailable and provide your own best answer.
|
||||
|
||||
441
workspace/tests/test_inbox.py
Normal file
441
workspace/tests/test_inbox.py
Normal file
@ -0,0 +1,441 @@
|
||||
"""Tests for workspace/inbox.py — InboxState + activity API poller.
|
||||
|
||||
Covers the round-trip from a /activity row to an InboxMessage that the
|
||||
agent observes via the three new MCP tools, plus the cursor-persistence
|
||||
+ 410-recovery behavior that keeps the standalone molecule-mcp from
|
||||
re-delivering already-handled messages after a restart.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import inbox
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_singleton():
|
||||
"""Each test starts with a clean module singleton + a fresh
|
||||
InboxState. Activation in one test must not leak into the next."""
|
||||
inbox._STATE = None
|
||||
yield
|
||||
inbox._STATE = None
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def state(tmp_path: Path) -> inbox.InboxState:
|
||||
return inbox.InboxState(cursor_path=tmp_path / ".mcp_inbox_cursor")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_text — envelope shape coverage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_extract_text_jsonrpc_message_wrapper():
|
||||
body = {
|
||||
"jsonrpc": "2.0",
|
||||
"method": "message/send",
|
||||
"params": {"message": {"parts": [{"type": "text", "text": "hello"}]}},
|
||||
}
|
||||
assert inbox._extract_text(body, None) == "hello"
|
||||
|
||||
|
||||
def test_extract_text_a2a_v1_kind_field():
|
||||
"""A2A SDK v1 uses ``kind`` instead of ``type`` as the part
|
||||
discriminator. Hosted SaaS workspaces send the v1 shape today —
|
||||
this case is what live canvas-user messages look like in
|
||||
activity_logs.request_body."""
|
||||
body = {
|
||||
"params": {
|
||||
"message": {
|
||||
"role": "user",
|
||||
"parts": [{"kind": "text", "text": "hello from canvas"}],
|
||||
}
|
||||
}
|
||||
}
|
||||
assert inbox._extract_text(body, None) == "hello from canvas"
|
||||
|
||||
|
||||
def test_extract_text_jsonrpc_params_parts():
|
||||
body = {"params": {"parts": [{"type": "text", "text": "from peer"}]}}
|
||||
assert inbox._extract_text(body, None) == "from peer"
|
||||
|
||||
|
||||
def test_extract_text_shorthand_parts():
|
||||
body = {"parts": [{"type": "text", "text": "shorthand"}]}
|
||||
assert inbox._extract_text(body, None) == "shorthand"
|
||||
|
||||
|
||||
def test_extract_text_concatenates_multiple_parts():
|
||||
body = {
|
||||
"parts": [
|
||||
{"type": "text", "text": "hello "},
|
||||
{"type": "text", "text": "world"},
|
||||
{"type": "image", "url": "https://example.invalid/x.png"},
|
||||
]
|
||||
}
|
||||
assert inbox._extract_text(body, None) == "hello world"
|
||||
|
||||
|
||||
def test_extract_text_falls_back_to_summary():
|
||||
assert inbox._extract_text(None, "fallback") == "fallback"
|
||||
assert inbox._extract_text({"unrelated": True}, "fallback") == "fallback"
|
||||
|
||||
|
||||
def test_extract_text_returns_placeholder_when_nothing_usable():
|
||||
assert inbox._extract_text(None, None) == "(empty A2A message)"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# message_from_activity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_message_from_activity_canvas_user():
|
||||
row = {
|
||||
"id": "act-1",
|
||||
"source_id": None,
|
||||
"method": "message/send",
|
||||
"summary": "ignored",
|
||||
"request_body": {
|
||||
"params": {"message": {"parts": [{"type": "text", "text": "hi"}]}}
|
||||
},
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
}
|
||||
msg = inbox.message_from_activity(row)
|
||||
assert msg.activity_id == "act-1"
|
||||
assert msg.text == "hi"
|
||||
assert msg.peer_id == ""
|
||||
assert msg.method == "message/send"
|
||||
d = msg.to_dict()
|
||||
assert d["kind"] == "canvas_user"
|
||||
|
||||
|
||||
def test_message_from_activity_peer_agent():
|
||||
row = {
|
||||
"id": "act-2",
|
||||
"source_id": "ws-peer-uuid",
|
||||
"method": "tasks/send",
|
||||
"summary": "delegate",
|
||||
"request_body": {"parts": [{"type": "text", "text": "do task"}]},
|
||||
"created_at": "2026-04-30T22:01:00Z",
|
||||
}
|
||||
msg = inbox.message_from_activity(row)
|
||||
assert msg.peer_id == "ws-peer-uuid"
|
||||
assert msg.to_dict()["kind"] == "peer_agent"
|
||||
|
||||
|
||||
def test_message_from_activity_handles_string_request_body():
|
||||
row = {
|
||||
"id": "act-3",
|
||||
"source_id": None,
|
||||
"method": "message/send",
|
||||
"summary": None,
|
||||
"request_body": '{"parts": [{"type": "text", "text": "json string"}]}',
|
||||
"created_at": "2026-04-30T22:02:00Z",
|
||||
}
|
||||
assert inbox.message_from_activity(row).text == "json string"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# InboxState — queue + wait/peek/pop semantics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _msg(activity_id: str, text: str = "", peer_id: str = "") -> inbox.InboxMessage:
|
||||
return inbox.InboxMessage(
|
||||
activity_id=activity_id,
|
||||
text=text or activity_id,
|
||||
peer_id=peer_id,
|
||||
method="message/send",
|
||||
created_at="2026-04-30T22:00:00Z",
|
||||
)
|
||||
|
||||
|
||||
def test_record_then_peek(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
state.record(_msg("b"))
|
||||
out = state.peek(limit=10)
|
||||
assert [m.activity_id for m in out] == ["a", "b"]
|
||||
|
||||
|
||||
def test_record_dedupes_by_activity_id(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
state.record(_msg("a")) # same id — must drop the second
|
||||
assert len(state.peek(10)) == 1
|
||||
|
||||
|
||||
def test_pop_removes_specific_message(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
state.record(_msg("b"))
|
||||
removed = state.pop("a")
|
||||
assert removed is not None and removed.activity_id == "a"
|
||||
remaining = state.peek(10)
|
||||
assert [m.activity_id for m in remaining] == ["b"]
|
||||
|
||||
|
||||
def test_pop_missing_id_returns_none(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
assert state.pop("does-not-exist") is None
|
||||
# Original message still present
|
||||
assert len(state.peek(10)) == 1
|
||||
|
||||
|
||||
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
start = time.monotonic()
|
||||
msg = state.wait(timeout_secs=5.0)
|
||||
elapsed = time.monotonic() - start
|
||||
assert msg is not None and msg.activity_id == "a"
|
||||
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
|
||||
|
||||
|
||||
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
|
||||
def producer():
|
||||
time.sleep(0.05)
|
||||
state.record(_msg("late"))
|
||||
|
||||
threading.Thread(target=producer, daemon=True).start()
|
||||
msg = state.wait(timeout_secs=2.0)
|
||||
assert msg is not None and msg.activity_id == "late"
|
||||
|
||||
|
||||
def test_wait_returns_none_on_timeout(state: inbox.InboxState):
|
||||
msg = state.wait(timeout_secs=0.05)
|
||||
assert msg is None
|
||||
|
||||
|
||||
def test_wait_does_not_pop(state: inbox.InboxState):
|
||||
"""wait() is non-destructive — caller decides when to inbox_pop."""
|
||||
state.record(_msg("a"))
|
||||
state.wait(timeout_secs=1.0)
|
||||
state.wait(timeout_secs=1.0)
|
||||
assert len(state.peek(10)) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cursor persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_load_cursor_returns_none_when_file_absent(state: inbox.InboxState):
|
||||
assert state.load_cursor() is None
|
||||
|
||||
|
||||
def test_save_then_load_cursor_round_trip(state: inbox.InboxState):
|
||||
state.save_cursor("act-cursor-1")
|
||||
# Reset the cached flag to force a re-read
|
||||
state._cursor_loaded = False
|
||||
state._cursor = None
|
||||
assert state.load_cursor() == "act-cursor-1"
|
||||
|
||||
|
||||
def test_save_cursor_creates_parent_directory(tmp_path: Path):
|
||||
nested = tmp_path / "nested" / "configs" / ".mcp_inbox_cursor"
|
||||
state = inbox.InboxState(cursor_path=nested)
|
||||
state.save_cursor("act-x")
|
||||
assert nested.read_text() == "act-x"
|
||||
|
||||
|
||||
def test_reset_cursor_deletes_file(state: inbox.InboxState):
|
||||
state.save_cursor("act-y")
|
||||
assert state.cursor_path.is_file()
|
||||
state.reset_cursor()
|
||||
assert not state.cursor_path.is_file()
|
||||
assert state.load_cursor() is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_get_state_returns_none_before_activate():
|
||||
assert inbox.get_state() is None
|
||||
|
||||
|
||||
def test_activate_then_get_state(state: inbox.InboxState):
|
||||
inbox.activate(state)
|
||||
assert inbox.get_state() is state
|
||||
|
||||
|
||||
def test_activate_idempotent(state: inbox.InboxState):
|
||||
inbox.activate(state)
|
||||
inbox.activate(state) # same state — no-op, no warning expected
|
||||
assert inbox.get_state() is state
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _poll_once — HTTP behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_response(status_code: int, json_body: Any = None, text: str = "") -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.status_code = status_code
|
||||
if json_body is not None:
|
||||
resp.json.return_value = json_body
|
||||
else:
|
||||
resp.json.side_effect = ValueError("no json")
|
||||
resp.text = text
|
||||
return resp
|
||||
|
||||
|
||||
def _patch_httpx(returning: MagicMock):
|
||||
"""Replace httpx.Client with a context-manager mock that returns
|
||||
``returning`` from .get(). Captures the GET call args for assertion."""
|
||||
client = MagicMock()
|
||||
client.__enter__ = MagicMock(return_value=client)
|
||||
client.__exit__ = MagicMock(return_value=False)
|
||||
client.get = MagicMock(return_value=returning)
|
||||
return patch("httpx.Client", return_value=client), client
|
||||
|
||||
|
||||
def test_poll_once_fresh_start_uses_since_secs(state: inbox.InboxState):
|
||||
resp = _make_response(200, [])
|
||||
p, client = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
assert n == 0
|
||||
_, kwargs = client.get.call_args
|
||||
assert kwargs["params"]["type"] == "a2a_receive"
|
||||
assert "since_secs" in kwargs["params"]
|
||||
assert "since_id" not in kwargs["params"]
|
||||
|
||||
|
||||
def test_poll_once_with_cursor_uses_since_id(state: inbox.InboxState):
|
||||
state.save_cursor("act-existing")
|
||||
resp = _make_response(200, [])
|
||||
p, client = _patch_httpx(resp)
|
||||
with p:
|
||||
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
_, kwargs = client.get.call_args
|
||||
assert kwargs["params"]["since_id"] == "act-existing"
|
||||
assert "since_secs" not in kwargs["params"]
|
||||
|
||||
|
||||
def test_poll_once_410_resets_cursor(state: inbox.InboxState):
|
||||
state.save_cursor("act-stale")
|
||||
resp = _make_response(410, text="cursor pruned")
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
assert state.load_cursor() is None
|
||||
assert not state.cursor_path.is_file()
|
||||
|
||||
|
||||
def test_poll_once_records_messages_and_advances_cursor(state: inbox.InboxState):
|
||||
state.save_cursor("act-old")
|
||||
rows = [
|
||||
{
|
||||
"id": "act-1",
|
||||
"source_id": None,
|
||||
"method": "message/send",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "first"}]},
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
},
|
||||
{
|
||||
"id": "act-2",
|
||||
"source_id": "ws-peer",
|
||||
"method": "tasks/send",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "second"}]},
|
||||
"created_at": "2026-04-30T22:00:01Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
assert n == 2
|
||||
queue = state.peek(10)
|
||||
assert [m.activity_id for m in queue] == ["act-1", "act-2"]
|
||||
assert state.load_cursor() == "act-2"
|
||||
|
||||
|
||||
def test_poll_once_500_does_not_raise(state: inbox.InboxState):
|
||||
resp = _make_response(500, text="boom")
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
assert n == 0
|
||||
# Cursor untouched
|
||||
assert state.load_cursor() is None
|
||||
|
||||
|
||||
def test_poll_once_handles_non_list_payload(state: inbox.InboxState):
|
||||
resp = _make_response(200, {"error": "unexpected"})
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
assert n == 0
|
||||
|
||||
|
||||
def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxState):
|
||||
"""When no cursor is set, /activity returns DESC; the poller must
|
||||
reverse so the saved cursor is the freshest row + record order
|
||||
is chronological."""
|
||||
rows_desc = [
|
||||
{
|
||||
"id": "act-newest",
|
||||
"source_id": None,
|
||||
"method": "message/send",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "newest"}]},
|
||||
"created_at": "2026-04-30T22:00:02Z",
|
||||
},
|
||||
{
|
||||
"id": "act-oldest",
|
||||
"source_id": None,
|
||||
"method": "message/send",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "oldest"}]},
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows_desc)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
queue = state.peek(10)
|
||||
assert [m.activity_id for m in queue] == ["act-oldest", "act-newest"]
|
||||
# Cursor is the newest row, so the next poll picks up only what's
|
||||
# newer — re-restoring forward chronological progression.
|
||||
assert state.load_cursor() == "act-newest"
|
||||
|
||||
|
||||
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
|
||||
"""Daemon flag is required so the poller dies with the parent
|
||||
process; a non-daemon poller would leak across `claude` restarts
|
||||
and write to a stale workspace."""
|
||||
resp = _make_response(200, [])
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p, patch("platform_auth.auth_headers", return_value={}):
|
||||
# Use a very short interval so the loop body runs at least once
|
||||
# before we exit the test.
|
||||
t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01)
|
||||
time.sleep(0.05)
|
||||
assert t.daemon is True
|
||||
assert t.is_alive()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# default_cursor_path respects CONFIGS_DIR
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
|
||||
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
|
||||
|
||||
|
||||
def test_default_cursor_path_falls_back_to_default(monkeypatch):
|
||||
monkeypatch.delenv("CONFIGS_DIR", raising=False)
|
||||
assert inbox.default_cursor_path() == Path("/configs") / ".mcp_inbox_cursor"
|
||||
Loading…
Reference in New Issue
Block a user