forked from molecule-ai/molecule-core
Merge pull request #2425 from Molecule-AI/fix/heartbeat-detect-401
fix(mcp_cli): escalate heartbeat 401s with re-onboard guidance
This commit is contained in:
commit
2a5669788c
@ -51,6 +51,16 @@ logger = logging.getLogger(__name__)
|
||||
# laptop sleep.
|
||||
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
||||
|
||||
# After this many consecutive 401/403 heartbeats, escalate from
|
||||
# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute
|
||||
# of sustained auth failure — enough to rule out a transient platform
|
||||
# blip but quick enough that an operator doesn't sit puzzled for 10
|
||||
# minutes wondering why their MCP tools 401. Same threshold used for
|
||||
# repeat-logging at 20-tick (~7 min) intervals so a long-running
|
||||
# session that missed the first ERROR still sees the message.
|
||||
_HEARTBEAT_AUTH_LOUD_THRESHOLD = 3
|
||||
_HEARTBEAT_AUTH_RELOG_INTERVAL = 20
|
||||
|
||||
|
||||
def _platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
||||
"""One-shot register at startup; fails fast on auth errors.
|
||||
@ -145,6 +155,7 @@ def _heartbeat_loop(
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
consecutive_auth_failures = 0
|
||||
while True:
|
||||
body = {
|
||||
"workspace_id": workspace_id,
|
||||
@ -165,19 +176,69 @@ def _heartbeat_loop(
|
||||
json=body,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
if resp.status_code in (401, 403):
|
||||
consecutive_auth_failures += 1
|
||||
_log_heartbeat_auth_failure(
|
||||
consecutive_auth_failures, workspace_id, resp.status_code,
|
||||
)
|
||||
elif resp.status_code >= 400:
|
||||
# Non-auth HTTP error — log, but DO NOT touch the
|
||||
# auth-failure counter (5xx blips, 429, etc. are
|
||||
# transient and unrelated to token validity).
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
consecutive_auth_failures = 0
|
||||
_persist_inbound_secret_from_heartbeat(resp)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def _log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None:
|
||||
"""Escalate consecutive heartbeat 401/403s from quiet WARNING to
|
||||
actionable ERROR.
|
||||
|
||||
The operator's first sign of trouble shouldn't be "tools 401 with no
|
||||
explanation" — that was the failure mode that motivated this code,
|
||||
triggered by a workspace being deleted server-side and its tokens
|
||||
revoked while the runtime kept heartbeating in silence.
|
||||
|
||||
Cadence:
|
||||
* count < threshold: WARNING per tick (transient — could be a
|
||||
platform blip, don't shout yet)
|
||||
* count == threshold: ERROR with re-onboard instructions
|
||||
(the first signal the operator can't miss)
|
||||
* count > threshold and (count - threshold) % relog == 0: re-log
|
||||
ERROR (so a session that started after the first ERROR still
|
||||
sees the message scrolling past in their logs)
|
||||
"""
|
||||
if count < _HEARTBEAT_AUTH_LOUD_THRESHOLD:
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — "
|
||||
"token may be revoked. Will retry; if persistent, regenerate "
|
||||
"from canvas → Tokens.",
|
||||
status_code, count, _HEARTBEAT_AUTH_LOUD_THRESHOLD,
|
||||
)
|
||||
return
|
||||
# At or past the threshold — this is the loud actionable error.
|
||||
if count == _HEARTBEAT_AUTH_LOUD_THRESHOLD or (
|
||||
count - _HEARTBEAT_AUTH_LOUD_THRESHOLD
|
||||
) % _HEARTBEAT_AUTH_RELOG_INTERVAL == 0:
|
||||
logger.error(
|
||||
"molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — "
|
||||
"the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely "
|
||||
"because workspace %s was deleted server-side. The MCP server is "
|
||||
"still running but every platform call will fail. Regenerate the "
|
||||
"workspace + token from the canvas (Tokens tab), update your MCP "
|
||||
"config, and restart your runtime.",
|
||||
count, status_code, workspace_id,
|
||||
)
|
||||
|
||||
|
||||
def _persist_inbound_secret_from_heartbeat(resp: object) -> None:
|
||||
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
|
||||
|
||||
|
||||
@ -703,3 +703,167 @@ def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch):
|
||||
)
|
||||
|
||||
assert saw == [], "4xx response must NOT trigger persist call"
|
||||
|
||||
|
||||
# ============== Heartbeat auth-failure escalation (2026-05-01) ==============
|
||||
# When a workspace is deleted server-side (DELETE /workspaces/:id), the
|
||||
# platform revokes the workspace's auth token. The heartbeat starts
|
||||
# 401-ing. The previous behavior just logged WARNING on every tick — a
|
||||
# user tailing logs might miss it, and there was no actionable signal
|
||||
# anywhere. Escalate after a small number of consecutive auth failures
|
||||
# so the operator gets a clear "token revoked, re-onboard" message and
|
||||
# isn't left to puzzle out why their MCP tools 401.
|
||||
#
|
||||
# Pairs with the register-time 401 hard-fail path that already exists
|
||||
# at mcp_cli.py:104-111.
|
||||
|
||||
|
||||
def _multi_iter_runner(monkeypatch, response_status_codes):
|
||||
"""Run _heartbeat_loop for ``len(response_status_codes)`` iterations.
|
||||
|
||||
Each call to FakeClient.post returns a response with the next status
|
||||
code from ``response_status_codes``. After all responses are consumed,
|
||||
the next sleep raises SystemExit to break the loop.
|
||||
"""
|
||||
import types
|
||||
|
||||
iterations = {"count": 0}
|
||||
target = len(response_status_codes)
|
||||
|
||||
class FakeResp:
|
||||
def __init__(self, status_code):
|
||||
self.status_code = status_code
|
||||
self.text = "" if status_code < 400 else '{"error":"invalid workspace auth token"}'
|
||||
|
||||
def json(self):
|
||||
if self.status_code >= 400:
|
||||
return {"error": "invalid workspace auth token"}
|
||||
return {"status": "ok"}
|
||||
|
||||
class FakeClient:
|
||||
def __init__(self, **_kw): pass
|
||||
def __enter__(self): return self
|
||||
def __exit__(self, *_a): return False
|
||||
def post(self, *_a, **_kw):
|
||||
i = iterations["count"]
|
||||
sc = response_status_codes[i] if i < len(response_status_codes) else 200
|
||||
return FakeResp(sc)
|
||||
|
||||
fake_httpx = types.ModuleType("httpx")
|
||||
fake_httpx.Client = FakeClient
|
||||
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
||||
|
||||
def fake_sleep(_):
|
||||
iterations["count"] += 1
|
||||
if iterations["count"] >= target:
|
||||
raise SystemExit
|
||||
|
||||
monkeypatch.setattr("time.sleep", fake_sleep)
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
mcp_cli._heartbeat_loop(
|
||||
"https://test.moleculesai.app",
|
||||
"ws-deleted-12345678",
|
||||
"stale-token",
|
||||
interval=20.0,
|
||||
)
|
||||
|
||||
|
||||
def test_heartbeat_single_401_logs_warning_not_error(monkeypatch, caplog):
|
||||
"""One 401 alone is not enough to declare the token dead — could be a
|
||||
transient platform blip. Log at WARNING; don't shout."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [401])
|
||||
|
||||
auth_records = [r for r in caplog.records if "401" in r.message
|
||||
or "auth" in r.message.lower()
|
||||
or "revoked" in r.message.lower()]
|
||||
# At least the WARNING-level mention of HTTP 401 must appear.
|
||||
assert any(r.levelno == logging.WARNING for r in auth_records), (
|
||||
f"expected at least one WARNING about 401, got: "
|
||||
f"{[(r.levelname, r.message) for r in auth_records]}"
|
||||
)
|
||||
# Crucially, NOT escalated to ERROR yet — only one failure.
|
||||
assert not any(r.levelno >= logging.ERROR for r in auth_records), (
|
||||
"single 401 must not escalate to ERROR — premature alarm"
|
||||
)
|
||||
|
||||
|
||||
def test_heartbeat_three_consecutive_401s_escalates_to_error(monkeypatch, caplog):
|
||||
"""Token-revoked is the canonical failure mode after a workspace is
|
||||
deleted server-side. After 3 consecutive 401s the operator gets a
|
||||
LOUD ERROR with re-onboard guidance — not buried at WARNING."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [401, 401, 401])
|
||||
|
||||
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||
assert error_records, (
|
||||
f"expected ERROR after 3 consecutive 401s, got only: "
|
||||
f"{[(r.levelname, r.message[:80]) for r in caplog.records]}"
|
||||
)
|
||||
# The message must be actionable — operator needs to know what to do.
|
||||
msg = " ".join(r.message for r in error_records).lower()
|
||||
assert "revoked" in msg or "deleted" in msg, (
|
||||
f"ERROR must explain WHY (token revoked / workspace deleted), got: {msg}"
|
||||
)
|
||||
assert "regenerate" in msg or "re-onboard" in msg or "tokens" in msg, (
|
||||
f"ERROR must point at the canvas Tokens tab so operator knows how to recover, got: {msg}"
|
||||
)
|
||||
# The workspace_id should appear so the operator knows which one is dead.
|
||||
assert "ws-deleted" in msg, f"ERROR must name the dead workspace_id, got: {msg}"
|
||||
|
||||
|
||||
def test_heartbeat_403_treated_same_as_401(monkeypatch, caplog):
|
||||
"""403 Forbidden is the other auth-failure shape (token valid but
|
||||
not authorized for this workspace). Same escalation path."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [403, 403, 403])
|
||||
|
||||
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||
assert error_records, "expected ERROR after 3 consecutive 403s"
|
||||
|
||||
|
||||
def test_heartbeat_recovery_resets_consecutive_counter(monkeypatch, caplog):
|
||||
"""If the platform comes back to 200 in the middle of an outage,
|
||||
the auth-failure counter must reset. A subsequent isolated 401
|
||||
later should NOT immediately escalate."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
|
||||
# Two 401s, then 200, then one 401. If counter resets correctly,
|
||||
# the final 401 is "1 consecutive" and should NOT escalate.
|
||||
_multi_iter_runner(monkeypatch, [401, 401, 200, 401])
|
||||
|
||||
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||
assert not error_records, (
|
||||
f"recovered (200) → reset counter → final isolated 401 must NOT "
|
||||
f"escalate. Got ERRORs: {[r.message[:80] for r in error_records]}"
|
||||
)
|
||||
|
||||
|
||||
def test_heartbeat_500_does_not_increment_auth_counter(monkeypatch, caplog):
|
||||
"""5xx is a server-side blip, not auth. Three consecutive 500s
|
||||
must NOT trigger the 'token revoked' escalation — that would be
|
||||
misleading the operator."""
|
||||
import logging
|
||||
|
||||
caplog.set_level(logging.WARNING, logger="mcp_cli")
|
||||
|
||||
_multi_iter_runner(monkeypatch, [500, 500, 500])
|
||||
|
||||
error_records = [r for r in caplog.records if r.levelno >= logging.ERROR]
|
||||
revoked_errors = [r for r in error_records if "revoked" in r.message.lower()]
|
||||
assert not revoked_errors, (
|
||||
f"5xx must NOT be classified as auth failure — would mislead operator. "
|
||||
f"Got 'revoked' ERRORs: {[r.message[:80] for r in revoked_errors]}"
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user