forked from molecule-ai/molecule-core
Merge pull request #2425 from Molecule-AI/fix/heartbeat-detect-401
fix(mcp_cli): escalate heartbeat 401s with re-onboard guidance
This commit is contained in:
commit
2a5669788c
@ -51,6 +51,16 @@ logger = logging.getLogger(__name__)
|
|||||||
# laptop sleep.
|
# laptop sleep.
|
||||||
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
||||||
|
|
||||||
|
# After this many consecutive 401/403 heartbeats, escalate from
|
||||||
|
# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute
|
||||||
|
# of sustained auth failure — enough to rule out a transient platform
|
||||||
|
# blip but quick enough that an operator doesn't sit puzzled for 10
|
||||||
|
# minutes wondering why their MCP tools 401. Same threshold used for
|
||||||
|
# repeat-logging at 20-tick (~7 min) intervals so a long-running
|
||||||
|
# session that missed the first ERROR still sees the message.
|
||||||
|
_HEARTBEAT_AUTH_LOUD_THRESHOLD = 3
|
||||||
|
_HEARTBEAT_AUTH_RELOG_INTERVAL = 20
|
||||||
|
|
||||||
|
|
||||||
def _platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
def _platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
||||||
"""One-shot register at startup; fails fast on auth errors.
|
"""One-shot register at startup; fails fast on auth errors.
|
||||||
@ -145,6 +155,7 @@ def _heartbeat_loop(
|
|||||||
return
|
return
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
consecutive_auth_failures = 0
|
||||||
while True:
|
while True:
|
||||||
body = {
|
body = {
|
||||||
"workspace_id": workspace_id,
|
"workspace_id": workspace_id,
|
||||||
@ -165,19 +176,69 @@ def _heartbeat_loop(
|
|||||||
json=body,
|
json=body,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
if resp.status_code >= 400:
|
if resp.status_code in (401, 403):
|
||||||
|
consecutive_auth_failures += 1
|
||||||
|
_log_heartbeat_auth_failure(
|
||||||
|
consecutive_auth_failures, workspace_id, resp.status_code,
|
||||||
|
)
|
||||||
|
elif resp.status_code >= 400:
|
||||||
|
# Non-auth HTTP error — log, but DO NOT touch the
|
||||||
|
# auth-failure counter (5xx blips, 429, etc. are
|
||||||
|
# transient and unrelated to token validity).
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"molecule-mcp: heartbeat HTTP %d: %s",
|
"molecule-mcp: heartbeat HTTP %d: %s",
|
||||||
resp.status_code,
|
resp.status_code,
|
||||||
(resp.text or "")[:200],
|
(resp.text or "")[:200],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
consecutive_auth_failures = 0
|
||||||
_persist_inbound_secret_from_heartbeat(resp)
|
_persist_inbound_secret_from_heartbeat(resp)
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
||||||
time.sleep(interval)
|
time.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
def _log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None:
|
||||||
|
"""Escalate consecutive heartbeat 401/403s from quiet WARNING to
|
||||||
|
actionable ERROR.
|
||||||
|
|
||||||
|
The operator's first sign of trouble shouldn't be "tools 401 with no
|
||||||
|
explanation" — that was the failure mode that motivated this code,
|
||||||
|
triggered by a workspace being deleted server-side and its tokens
|
||||||
|
revoked while the runtime kept heartbeating in silence.
|
||||||
|
|
||||||
|
Cadence:
|
||||||
|
* count < threshold: WARNING per tick (transient — could be a
|
||||||
|
platform blip, don't shout yet)
|
||||||
|
* count == threshold: ERROR with re-onboard instructions
|
||||||
|
(the first signal the operator can't miss)
|
||||||
|
* count > threshold and (count - threshold) % relog == 0: re-log
|
||||||
|
ERROR (so a session that started after the first ERROR still
|
||||||
|
sees the message scrolling past in their logs)
|
||||||
|
"""
|
||||||
|
if count < _HEARTBEAT_AUTH_LOUD_THRESHOLD:
|
||||||
|
logger.warning(
|
||||||
|
"molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — "
|
||||||
|
"token may be revoked. Will retry; if persistent, regenerate "
|
||||||
|
"from canvas → Tokens.",
|
||||||
|
status_code, count, _HEARTBEAT_AUTH_LOUD_THRESHOLD,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
# At or past the threshold — this is the loud actionable error.
|
||||||
|
if count == _HEARTBEAT_AUTH_LOUD_THRESHOLD or (
|
||||||
|
count - _HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||||
|
) % _HEARTBEAT_AUTH_RELOG_INTERVAL == 0:
|
||||||
|
logger.error(
|
||||||
|
"molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — "
|
||||||
|
"the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely "
|
||||||
|
"because workspace %s was deleted server-side. The MCP server is "
|
||||||
|
"still running but every platform call will fail. Regenerate the "
|
||||||
|
"workspace + token from the canvas (Tokens tab), update your MCP "
|
||||||
|
"config, and restart your runtime.",
|
||||||
|
count, status_code, workspace_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
def _persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
||||||
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
||||||
|
|
||||||
|
|||||||
@ -703,3 +703,167 @@ def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert saw == [], "4xx response must NOT trigger persist call"
|
assert saw == [], "4xx response must NOT trigger persist call"
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Heartbeat auth-failure escalation (2026-05-01) ==============
|
||||||
|
# When a workspace is deleted server-side (DELETE /workspaces/:id), the
|
||||||
|
# platform revokes the workspace's auth token. The heartbeat starts
|
||||||
|
# 401-ing. The previous behavior just logged WARNING on every tick — a
|
||||||
|
# user tailing logs might miss it, and there was no actionable signal
|
||||||
|
# anywhere. Escalate after a small number of consecutive auth failures
|
||||||
|
# so the operator gets a clear "token revoked, re-onboard" message and
|
||||||
|
# isn't left to puzzle out why their MCP tools 401.
|
||||||
|
#
|
||||||
|
# Pairs with the register-time 401 hard-fail path that already exists
|
||||||
|
# at mcp_cli.py:104-111.
|
||||||
|
|
||||||
|
|
||||||
|
def _multi_iter_runner(monkeypatch, response_status_codes):
|
||||||
|
"""Run _heartbeat_loop for ``len(response_status_codes)`` iterations.
|
||||||
|
|
||||||
|
Each call to FakeClient.post returns a response with the next status
|
||||||
|
code from ``response_status_codes``. After all responses are consumed,
|
||||||
|
the next sleep raises SystemExit to break the loop.
|
||||||
|
"""
|
||||||
|
import types
|
||||||
|
|
||||||
|
iterations = {"count": 0}
|
||||||
|
target = len(response_status_codes)
|
||||||
|
|
||||||
|
class FakeResp:
|
||||||
|
def __init__(self, status_code):
|
||||||
|
self.status_code = status_code
|
||||||
|
self.text = "" if status_code < 400 else '{"error":"invalid workspace auth token"}'
|
||||||
|
|
||||||
|
def json(self):
|
||||||
|
if self.status_code >= 400:
|
||||||
|
return {"error": "invalid workspace auth token"}
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
class FakeClient:
|
||||||
|
def __init__(self, **_kw): pass
|
||||||
|
def __enter__(self): return self
|
||||||
|
def __exit__(self, *_a): return False
|
||||||
|
def post(self, *_a, **_kw):
|
||||||
|
i = iterations["count"]
|
||||||
|
sc = response_status_codes[i] if i < len(response_status_codes) else 200
|
||||||
|
return FakeResp(sc)
|
||||||
|
|
||||||
|
fake_httpx = types.ModuleType("httpx")
|
||||||
|
fake_httpx.Client = FakeClient
|
||||||
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
||||||
|
|
||||||
|
def fake_sleep(_):
|
||||||
|
iterations["count"] += 1
|
||||||
|
if iterations["count"] >= target:
|
||||||
|
raise SystemExit
|
||||||
|
|
||||||
|
monkeypatch.setattr("time.sleep", fake_sleep)
|
||||||
|
|
||||||
|
with pytest.raises(SystemExit):
|
||||||
|
mcp_cli._heartbeat_loop(
|
||||||
|
"https://test.moleculesai.app",
|
||||||
|
"ws-deleted-12345678",
|
||||||
|
"stale-token",
|
||||||
|
interval=20.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_single_401_logs_warning_not_error(monkeypatch, caplog):
|
||||||
|
"""One 401 alone is not enough to declare the token dead — could be a
|
||||||
|
transient platform blip. Log at WARNING; don't shout."""
|
||||||
|
import logging
|
||||||
|
|
||||||
|
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||||
|
|
||||||
|
_multi_iter_runner(monkeypatch, [401])
|
||||||
|
|
||||||
|
auth_records = [r for r in caplog.records if "401" in r.message
|
||||||
|
or "auth" in r.message.lower()
|
||||||
|
or "revoked" in r.message.lower()]
|
||||||
|
# At least the WARNING-level mention of HTTP 401 must appear.
|
||||||
|
assert any(r.levelno == logging.WARNING for r in auth_records), (
|
||||||
|
f"expected at least one WARNING about 401, got: "
|
||||||
|
f"{[(r.levelname, r.message) for r in auth_records]}"
|
||||||
|
)
|
||||||
|
# Crucially, NOT escalated to ERROR yet — only one failure.
|
||||||
|
assert not any(r.levelno >= logging.ERROR for r in auth_records), (
|
||||||
|
"single 401 must not escalate to ERROR — premature alarm"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_three_consecutive_401s_escalates_to_error(monkeypatch, caplog):
|
||||||
|
"""Token-revoked is the canonical failure mode after a workspace is
|
||||||
|
deleted server-side. After 3 consecutive 401s the operator gets a
|
||||||
|
LOUD ERROR with re-onboard guidance — not buried at WARNING."""
|
||||||
|
import logging
|
||||||
|
|
||||||
|
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||||
|
|
||||||
|
_multi_iter_runner(monkeypatch, [401, 401, 401])
|
||||||
|
|
||||||
|
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||||
|
assert error_records, (
|
||||||
|
f"expected ERROR after 3 consecutive 401s, got only: "
|
||||||
|
f"{[(r.levelname, r.message[:80]) for r in caplog.records]}"
|
||||||
|
)
|
||||||
|
# The message must be actionable — operator needs to know what to do.
|
||||||
|
msg = " ".join(r.message for r in error_records).lower()
|
||||||
|
assert "revoked" in msg or "deleted" in msg, (
|
||||||
|
f"ERROR must explain WHY (token revoked / workspace deleted), got: {msg}"
|
||||||
|
)
|
||||||
|
assert "regenerate" in msg or "re-onboard" in msg or "tokens" in msg, (
|
||||||
|
f"ERROR must point at the canvas Tokens tab so operator knows how to recover, got: {msg}"
|
||||||
|
)
|
||||||
|
# The workspace_id should appear so the operator knows which one is dead.
|
||||||
|
assert "ws-deleted" in msg, f"ERROR must name the dead workspace_id, got: {msg}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_403_treated_same_as_401(monkeypatch, caplog):
|
||||||
|
"""403 Forbidden is the other auth-failure shape (token valid but
|
||||||
|
not authorized for this workspace). Same escalation path."""
|
||||||
|
import logging
|
||||||
|
|
||||||
|
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||||
|
|
||||||
|
_multi_iter_runner(monkeypatch, [403, 403, 403])
|
||||||
|
|
||||||
|
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||||
|
assert error_records, "expected ERROR after 3 consecutive 403s"
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_recovery_resets_consecutive_counter(monkeypatch, caplog):
|
||||||
|
"""If the platform comes back to 200 in the middle of an outage,
|
||||||
|
the auth-failure counter must reset. A subsequent isolated 401
|
||||||
|
later should NOT immediately escalate."""
|
||||||
|
import logging
|
||||||
|
|
||||||
|
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||||
|
|
||||||
|
# Two 401s, then 200, then one 401. If counter resets correctly,
|
||||||
|
# the final 401 is "1 consecutive" and should NOT escalate.
|
||||||
|
_multi_iter_runner(monkeypatch, [401, 401, 200, 401])
|
||||||
|
|
||||||
|
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||||
|
assert not error_records, (
|
||||||
|
f"recovered (200) → reset counter → final isolated 401 must NOT "
|
||||||
|
f"escalate. Got ERRORs: {[r.message[:80] for r in error_records]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_heartbeat_500_does_not_increment_auth_counter(monkeypatch, caplog):
|
||||||
|
"""5xx is a server-side blip, not auth. Three consecutive 500s
|
||||||
|
must NOT trigger the 'token revoked' escalation — that would be
|
||||||
|
misleading the operator."""
|
||||||
|
import logging
|
||||||
|
|
||||||
|
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||||
|
|
||||||
|
_multi_iter_runner(monkeypatch, [500, 500, 500])
|
||||||
|
|
||||||
|
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||||
|
revoked_errors = [r for r in error_records if "revoked" in r.message.lower()]
|
||||||
|
assert not revoked_errors, (
|
||||||
|
f"5xx must NOT be classified as auth failure — would mislead operator. "
|
||||||
|
f"Got 'revoked' ERRORs: {[r.message[:80] for r in revoked_errors]}"
|
||||||
|
)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user