forked from molecule-ai/molecule-core
Splits the standalone molecule-mcp wrapper into three single-concern
modules per the OSS-shape refactor program:
* mcp_heartbeat.py — register POST + heartbeat loop + auth-failure
escalation + inbound-secret persistence
* mcp_workspace_resolver.py — single + multi-workspace env validation
+ on-disk token-file read + operator-help printer
* mcp_inbox_pollers.py — activate inbox singleton + spawn one daemon
poller per workspace
mcp_cli.py becomes a 193-LOC orchestrator: validates env, calls each
module's helpers, hands off to a2a_mcp_server.cli_main. The console-
script entry molecule-mcp = molecule_runtime.mcp_cli:main is preserved.
Back-compat aliases (mcp_cli._build_agent_card, _heartbeat_loop,
_resolve_workspaces, etc.) re-export the new modules' authoritative
functions so existing tests + wheel_smoke.py + any downstream caller
keeps working unchanged. A new test file pins each alias as the
exact same callable (drift gate via `is`).
Tests:
* 62 existing test_mcp_cli.py + test_mcp_cli_multi_workspace.py
pass against the split.
* Two heartbeat-loop persist tests + the auth-escalation caplog
setup updated to target mcp_heartbeat (the module where the loop
body now lives) instead of mcp_cli (still works through aliases
for direct calls, but Python's name resolution inside the loop
body uses the new module's namespace).
* test_mcp_cli_split.py adds 11 new tests: alias drift gate +
inbox-poller single + multi-workspace branches + degraded
inbox-import logging path (none of those existed before).
Refs RFC #2873.
64 lines
2.5 KiB
Python
64 lines
2.5 KiB
Python
"""Inbox-poller spawn helpers for the standalone ``molecule-mcp`` wrapper.
|
|
|
|
Extracted from ``mcp_cli.py`` (RFC #2873 iter 3). The poller is the
|
|
INBOUND side of the standalone path — without it, the universal MCP
|
|
server is outbound-only (can call ``delegate_task`` /
|
|
``send_message_to_user``, never observes canvas-user / peer-agent
|
|
messages).
|
|
|
|
Public surface:
|
|
|
|
* ``start_inbox_pollers(platform_url, workspace_ids)`` — activate the
|
|
inbox singleton and spawn one daemon poller per workspace.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None:
|
|
"""Activate the inbox singleton + spawn one poller daemon thread per workspace.
|
|
|
|
Done lazily here (not at module import) because importing inbox
|
|
pulls in platform_auth, which only resolves cleanly AFTER env
|
|
validation succeeds. Activation is idempotent within a process,
|
|
so a stray double-call (e.g. test harness re-entering main) is
|
|
harmless.
|
|
|
|
The poller threads are daemon=True — die with the main process.
|
|
|
|
Single-workspace path: one poller, single cursor file at the legacy
|
|
location (``.mcp_inbox_cursor``). Cursor-key resolution falls back
|
|
to the empty string for back-compat with operators whose existing
|
|
on-disk cursor was written by the pre-multi-workspace code.
|
|
|
|
Multi-workspace path: N pollers, each with its own cursor file
|
|
keyed by ``workspace_id[:8]``. Cursors live next to each other in
|
|
configs_dir so an operator inspecting state sees all of them
|
|
together.
|
|
"""
|
|
try:
|
|
import inbox
|
|
except ImportError as exc:
|
|
logger.warning("molecule-mcp: inbox module unavailable: %s", exc)
|
|
return
|
|
|
|
if len(workspace_ids) <= 1:
|
|
# Back-compat exact: single-workspace mode reuses the legacy
|
|
# cursor filename + cursor_path constructor arg, so an existing
|
|
# operator's on-disk state isn't invalidated by upgrade.
|
|
wsid = workspace_ids[0]
|
|
state = inbox.InboxState(cursor_path=inbox.default_cursor_path())
|
|
inbox.activate(state)
|
|
inbox.start_poller_thread(state, platform_url, wsid)
|
|
return
|
|
|
|
# Multi-workspace: per-workspace cursor file, one shared queue.
|
|
cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids}
|
|
state = inbox.InboxState(cursor_paths=cursor_paths)
|
|
inbox.activate(state)
|
|
for wsid in workspace_ids:
|
|
inbox.start_poller_thread(state, platform_url, wsid)
|