Splits the standalone molecule-mcp wrapper into three single-concern
modules per the OSS-shape refactor program:
* mcp_heartbeat.py — register POST + heartbeat loop + auth-failure
escalation + inbound-secret persistence
* mcp_workspace_resolver.py — single + multi-workspace env validation
+ on-disk token-file read + operator-help printer
* mcp_inbox_pollers.py — activate inbox singleton + spawn one daemon
poller per workspace
mcp_cli.py becomes a 193-LOC orchestrator: validates env, calls each
module's helpers, hands off to a2a_mcp_server.cli_main. The console-
script entry molecule-mcp = molecule_runtime.mcp_cli:main is preserved.
Back-compat aliases (mcp_cli._build_agent_card, _heartbeat_loop,
_resolve_workspaces, etc.) re-export the new modules' authoritative
functions so existing tests + wheel_smoke.py + any downstream caller
keeps working unchanged. A new test file pins each alias as the
exact same callable (drift gate via `is`).
Tests:
* 62 existing test_mcp_cli.py + test_mcp_cli_multi_workspace.py
pass against the split.
* Two heartbeat-loop persist tests + the auth-escalation caplog
setup updated to target mcp_heartbeat (the module where the loop
body now lives) instead of mcp_cli (still works through aliases
for direct calls, but Python's name resolution inside the loop
body uses the new module's namespace).
* test_mcp_cli_split.py adds 11 new tests: alias drift gate +
inbox-poller single + multi-workspace branches + degraded
inbox-import logging path (none of those existed before).
Refs RFC #2873.
326 lines
13 KiB
Python
326 lines
13 KiB
Python
"""Heartbeat + register thread for the standalone ``molecule-mcp`` wrapper.
|
|
|
|
Extracted from ``mcp_cli.py`` (RFC #2873 iter 3) so the heartbeat /
|
|
register concern lives in its own module. The console-script entry
|
|
``mcp_cli:main`` still drives the spawn, but the loop body, auth-failure
|
|
escalation, and inbound-secret persistence now live here so they can be
|
|
read, tested, and replaced independently of the orchestrator.
|
|
|
|
Public surface:
|
|
|
|
* ``HEARTBEAT_INTERVAL_SECONDS`` — cadence constant.
|
|
* ``build_agent_card(workspace_id)`` — payload helper.
|
|
* ``platform_register(platform_url, workspace_id, token)`` — one-shot
|
|
POST /registry/register at startup.
|
|
* ``start_heartbeat_thread(platform_url, workspace_id, token)`` — spawn
|
|
the daemon thread.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Heartbeat cadence. Must be tighter than healthsweep's stale window
|
|
# (currently 60-90s — see registry/healthsweep.go) by a comfortable
|
|
# margin so a single missed heartbeat doesn't flip awaiting_agent.
|
|
# 20s gives the operator's network 3 attempts within the budget; long
|
|
# enough that it doesn't spam, short enough to recover quickly after
|
|
# laptop sleep.
|
|
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
|
|
|
# After this many consecutive 401/403 heartbeats, escalate from
|
|
# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute
|
|
# of sustained auth failure — enough to rule out a transient platform
|
|
# blip but quick enough that an operator doesn't sit puzzled for 10
|
|
# minutes wondering why their MCP tools 401. Same threshold used for
|
|
# repeat-logging at 20-tick (~7 min) intervals so a long-running
|
|
# session that missed the first ERROR still sees the message.
|
|
HEARTBEAT_AUTH_LOUD_THRESHOLD = 3
|
|
HEARTBEAT_AUTH_RELOG_INTERVAL = 20
|
|
|
|
|
|
def build_agent_card(workspace_id: str) -> dict:
|
|
"""Build the ``agent_card`` payload sent to /registry/register.
|
|
|
|
Three optional env vars override the defaults so an operator can
|
|
surface human-readable identity + capabilities to peers and the
|
|
canvas Skills tab without code changes:
|
|
|
|
* ``MOLECULE_AGENT_NAME`` — display name (defaults to
|
|
``molecule-mcp-{id[:8]}``). Surfaced in canvas workspace cards
|
|
and ``list_peers`` output.
|
|
* ``MOLECULE_AGENT_DESCRIPTION`` — one-liner about the agent's
|
|
purpose. Rendered in canvas Details + Skills tabs.
|
|
* ``MOLECULE_AGENT_SKILLS`` — comma-separated skill names
|
|
(e.g. ``research,code-review,memory-curation``). Each name is
|
|
expanded to a ``{"name": ...}`` skill object — the minimum
|
|
shape that satisfies both ``shared_runtime.summarize_peers``
|
|
(uses ``s["name"]``) and the canvas SkillsTab.tsx schema
|
|
(id falls back to name when omitted). Empty / whitespace
|
|
entries are dropped.
|
|
|
|
Defaults match the previous hardcoded behaviour exactly so this
|
|
is a strict superset — an operator who sets none of the env vars
|
|
sees no change.
|
|
"""
|
|
name = (os.environ.get("MOLECULE_AGENT_NAME") or "").strip()
|
|
if not name:
|
|
name = f"molecule-mcp-{workspace_id[:8]}"
|
|
|
|
description = (os.environ.get("MOLECULE_AGENT_DESCRIPTION") or "").strip()
|
|
|
|
skills_raw = (os.environ.get("MOLECULE_AGENT_SKILLS") or "").strip()
|
|
skills: list[dict] = []
|
|
if skills_raw:
|
|
for s in skills_raw.split(","):
|
|
label = s.strip()
|
|
if label:
|
|
skills.append({"name": label})
|
|
|
|
card: dict = {"name": name, "skills": skills}
|
|
if description:
|
|
card["description"] = description
|
|
return card
|
|
|
|
|
|
def platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
|
"""One-shot register at startup; fails fast on auth errors.
|
|
|
|
Lifts the workspace from ``awaiting_agent`` to ``online`` for
|
|
operators who never ran the curl-register snippet. Safe to call
|
|
repeatedly: the platform's register handler is an upsert that
|
|
just refreshes ``url``, ``agent_card``, and ``status``.
|
|
|
|
Failure model (post-review):
|
|
- 401 / 403 → ``sys.exit(3)`` immediately. The operator's
|
|
token is wrong; silently looping in a broken state would
|
|
make this hard to diagnose because the MCP tools would 401
|
|
on every call too. Hard-fail is the kindest option.
|
|
- Other 4xx/5xx → log a warning + continue. The heartbeat
|
|
thread will surface persistent failures; transient platform
|
|
blips shouldn't abort the MCP loop.
|
|
- Network / transport errors → log + continue. Same reasoning.
|
|
|
|
Origin header is required by the SaaS edge WAF; without it
|
|
/registry/register currently still works (it's on the WAF
|
|
allowlist), but the heartbeat path needs Origin and we want one
|
|
consistent header set across both calls.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
# httpx is a transitive dep via a2a-sdk; if missing, the MCP
|
|
# server won't import either. Let the caller's later import
|
|
# surface the real error.
|
|
return
|
|
|
|
payload = {
|
|
"id": workspace_id,
|
|
"url": "",
|
|
"agent_card": build_agent_card(workspace_id),
|
|
"delivery_mode": "poll",
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Origin": platform_url,
|
|
"Content-Type": "application/json",
|
|
}
|
|
try:
|
|
with httpx.Client(timeout=10.0) as client:
|
|
resp = client.post(
|
|
f"{platform_url}/registry/register",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
if resp.status_code in (401, 403):
|
|
print(
|
|
f"molecule-mcp: register rejected with HTTP {resp.status_code} — "
|
|
f"the token in MOLECULE_WORKSPACE_TOKEN is invalid for workspace "
|
|
f"{workspace_id}. Regenerate from the canvas → Tokens tab.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(3)
|
|
if resp.status_code >= 400:
|
|
logger.warning(
|
|
"molecule-mcp: register POST returned HTTP %d: %s",
|
|
resp.status_code,
|
|
(resp.text or "")[:200],
|
|
)
|
|
else:
|
|
logger.info(
|
|
"molecule-mcp: registered workspace %s with platform",
|
|
workspace_id,
|
|
)
|
|
except SystemExit:
|
|
raise
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("molecule-mcp: register POST failed: %s", exc)
|
|
|
|
|
|
def heartbeat_loop(
|
|
platform_url: str,
|
|
workspace_id: str,
|
|
token: str,
|
|
interval: float = HEARTBEAT_INTERVAL_SECONDS,
|
|
) -> None:
|
|
"""Daemon thread body: POST /registry/heartbeat every ``interval``s.
|
|
|
|
Failures are logged at WARNING and the loop continues. The thread
|
|
exits when the main process does (daemon=True). Each iteration
|
|
rebuilds the payload + headers — cheap and ensures token rotation
|
|
via env var (rare but possible) is picked up on the next tick.
|
|
"""
|
|
try:
|
|
import httpx
|
|
except ImportError:
|
|
return
|
|
|
|
start_time = time.time()
|
|
consecutive_auth_failures = 0
|
|
while True:
|
|
body = {
|
|
"workspace_id": workspace_id,
|
|
"error_rate": 0.0,
|
|
"sample_error": "",
|
|
"active_tasks": 0,
|
|
"uptime_seconds": int(time.time() - start_time),
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Origin": platform_url,
|
|
"Content-Type": "application/json",
|
|
}
|
|
try:
|
|
with httpx.Client(timeout=10.0) as client:
|
|
resp = client.post(
|
|
f"{platform_url}/registry/heartbeat",
|
|
json=body,
|
|
headers=headers,
|
|
)
|
|
if resp.status_code in (401, 403):
|
|
consecutive_auth_failures += 1
|
|
log_heartbeat_auth_failure(
|
|
consecutive_auth_failures, workspace_id, resp.status_code,
|
|
)
|
|
elif resp.status_code >= 400:
|
|
# Non-auth HTTP error — log, but DO NOT touch the
|
|
# auth-failure counter (5xx blips, 429, etc. are
|
|
# transient and unrelated to token validity).
|
|
logger.warning(
|
|
"molecule-mcp: heartbeat HTTP %d: %s",
|
|
resp.status_code,
|
|
(resp.text or "")[:200],
|
|
)
|
|
else:
|
|
consecutive_auth_failures = 0
|
|
persist_inbound_secret_from_heartbeat(resp)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
|
time.sleep(interval)
|
|
|
|
|
|
def log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None:
|
|
"""Escalate consecutive heartbeat 401/403s from quiet WARNING to
|
|
actionable ERROR.
|
|
|
|
The operator's first sign of trouble shouldn't be "tools 401 with no
|
|
explanation" — that was the failure mode that motivated this code,
|
|
triggered by a workspace being deleted server-side and its tokens
|
|
revoked while the runtime kept heartbeating in silence.
|
|
|
|
Cadence:
|
|
* count < threshold: WARNING per tick (transient — could be a
|
|
platform blip, don't shout yet)
|
|
* count == threshold: ERROR with re-onboard instructions
|
|
(the first signal the operator can't miss)
|
|
* count > threshold and (count - threshold) % relog == 0: re-log
|
|
ERROR (so a session that started after the first ERROR still
|
|
sees the message scrolling past in their logs)
|
|
"""
|
|
if count < HEARTBEAT_AUTH_LOUD_THRESHOLD:
|
|
logger.warning(
|
|
"molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — "
|
|
"token may be revoked. Will retry; if persistent, regenerate "
|
|
"from canvas → Tokens.",
|
|
status_code, count, HEARTBEAT_AUTH_LOUD_THRESHOLD,
|
|
)
|
|
return
|
|
# At or past the threshold — this is the loud actionable error.
|
|
if count == HEARTBEAT_AUTH_LOUD_THRESHOLD or (
|
|
count - HEARTBEAT_AUTH_LOUD_THRESHOLD
|
|
) % HEARTBEAT_AUTH_RELOG_INTERVAL == 0:
|
|
logger.error(
|
|
"molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — "
|
|
"the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely "
|
|
"because workspace %s was deleted server-side. The MCP server is "
|
|
"still running but every platform call will fail. Regenerate the "
|
|
"workspace + token from the canvas (Tokens tab), update your MCP "
|
|
"config, and restart your runtime.",
|
|
count, status_code, workspace_id,
|
|
)
|
|
|
|
|
|
def persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
|
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
|
|
|
The platform's heartbeat handler returns the secret on every beat
|
|
(mirroring /registry/register) so a workspace that lazy-healed the
|
|
secret on the platform side — typical recovery path for a workspace
|
|
whose row had a NULL ``platform_inbound_secret`` after a partial
|
|
bootstrap — picks it up within one heartbeat tick instead of
|
|
requiring a runtime restart.
|
|
|
|
Without this delivery path the chat-upload code path's "secret was
|
|
just minted, will pick up on next heartbeat" 503 message is a lie
|
|
and the workspace stays 401-forever until the operator restarts
|
|
the runtime. Caught 2026-04-30 on hongmingwang tenant.
|
|
|
|
Failure is non-fatal: if the body isn't JSON, doesn't carry the
|
|
field, or the disk write fails, the next heartbeat retries. This
|
|
matches the cold-start register flow in main.py:319-323.
|
|
"""
|
|
try:
|
|
body = resp.json()
|
|
except Exception: # noqa: BLE001
|
|
return
|
|
if not isinstance(body, dict):
|
|
return
|
|
secret = body.get("platform_inbound_secret")
|
|
if not secret:
|
|
return
|
|
try:
|
|
from platform_inbound_auth import save_inbound_secret
|
|
|
|
save_inbound_secret(secret)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.warning(
|
|
"molecule-mcp: persist inbound secret from heartbeat failed: %s", exc
|
|
)
|
|
|
|
|
|
def start_heartbeat_thread(
|
|
platform_url: str,
|
|
workspace_id: str,
|
|
token: str,
|
|
) -> threading.Thread:
|
|
"""Start the heartbeat daemon thread. Returns the Thread handle.
|
|
|
|
The MCP stdio loop runs in the foreground (asyncio); this thread
|
|
runs alongside it. ``daemon=True`` so when the operator hits
|
|
Ctrl-C / closes the runtime, the heartbeat dies with it instead
|
|
of leaking and writing to a stale workspace.
|
|
"""
|
|
t = threading.Thread(
|
|
target=heartbeat_loop,
|
|
args=(platform_url, workspace_id, token),
|
|
name="molecule-mcp-heartbeat",
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
return t
|