Heartbeat now echoes the workspace's platform_inbound_secret on every
beat (mirroring /registry/register), and the molecule-mcp client
persists it to /configs/.platform_inbound_secret on receipt.
Symptom (2026-04-30, hongmingwang tenant): chat upload returned 503
"workspace will pick it up on its next heartbeat" and then 401 on
retry — permanent until workspace restart. The 503 message was a lie:
heartbeat used to discard the platform_inbound_secret entirely; only
register delivered it, and register fires once at startup.
Server (Go):
- Heartbeat handler reuses readOrLazyHealInboundSecret (the same
helper chat_files + register use), so heartbeat-time recovery
covers the rotate / mid-life NULL-column case the existing
register-time heal can't reach.
- Failure is non-fatal: liveness contract trumps secret delivery,
chat_files retries lazy-heal on its own next request.
Client (Python):
- _persist_inbound_secret_from_heartbeat parses the heartbeat 200
response and persists via platform_inbound_auth.save_inbound_secret.
- All exceptions swallowed — heartbeat liveness > secret persistence;
next tick (≤20s) retries.
Tests:
- Server: pin secret-present, lazy-heal-mint-on-NULL, and heal-
failure-omits-field branches.
- Client: pin persist-on-200, skip-on-empty, skip-on-non-dict-body,
skip-on-401, swallow-save-OSError.
706 lines
24 KiB
Python
706 lines
24 KiB
Python
"""Tests for workspace/mcp_cli.py — the molecule-mcp console-script
|
|
entry-point validator.
|
|
|
|
The wrapper exists to surface a friendly missing-env error before
|
|
a2a_client.py:22's module-level RuntimeError fires. Regressions here
|
|
ship a poor first-run UX to every external-runtime operator.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
import mcp_cli
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate(monkeypatch, tmp_path):
|
|
"""Each test starts with no Molecule env vars set + a fresh
|
|
CONFIGS_DIR pointing at an empty tmpdir. The heartbeat thread is
|
|
disabled by default so happy-path tests don't spawn a background
|
|
POST loop against a fake URL — individual tests opt back in via
|
|
monkeypatch.delenv when they want to assert heartbeat behavior."""
|
|
for var in ("WORKSPACE_ID", "PLATFORM_URL", "MOLECULE_WORKSPACE_TOKEN"):
|
|
monkeypatch.delenv(var, raising=False)
|
|
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
|
monkeypatch.setenv("MOLECULE_MCP_DISABLE_HEARTBEAT", "1")
|
|
yield
|
|
|
|
|
|
def _run_main_capturing_exit(capsys) -> tuple[int, str]:
|
|
"""Call mcp_cli.main and return (exit_code, stderr).
|
|
|
|
main() is supposed to sys.exit on missing env. Any non-exit return
|
|
means it tried to run the real MCP loop, which we don't want in a
|
|
unit test (and which would also fail because we never set the
|
|
mandatory env).
|
|
"""
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
mcp_cli.main()
|
|
captured = capsys.readouterr()
|
|
code = exc_info.value.code if isinstance(exc_info.value.code, int) else 1
|
|
return code, captured.err
|
|
|
|
|
|
def test_missing_workspace_id_exits_with_message(capsys):
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2, f"expected exit code 2, got {code}"
|
|
assert "WORKSPACE_ID" in err
|
|
assert "PLATFORM_URL" in err # also missing
|
|
assert "MOLECULE_WORKSPACE_TOKEN" in err # also missing
|
|
|
|
|
|
def test_only_workspace_id_missing(capsys, monkeypatch):
|
|
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2
|
|
# Only WORKSPACE_ID should appear in the "currently missing" list.
|
|
assert "Currently missing: WORKSPACE_ID" in err
|
|
|
|
|
|
def test_only_platform_url_missing(capsys, monkeypatch):
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2
|
|
assert "Currently missing: PLATFORM_URL" in err
|
|
|
|
|
|
def test_only_token_missing(capsys, monkeypatch):
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2
|
|
assert "MOLECULE_WORKSPACE_TOKEN" in err
|
|
|
|
|
|
def test_token_file_satisfies_token_requirement(capsys, monkeypatch, tmp_path):
|
|
"""Token from CONFIGS_DIR/.auth_token must be accepted (in-container
|
|
path)."""
|
|
(tmp_path / ".auth_token").write_text("file-token")
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
|
|
# No MOLECULE_WORKSPACE_TOKEN — but file exists. Validation should
|
|
# pass; we then short-circuit before importing the heavy module by
|
|
# patching the import to a no-op spy.
|
|
|
|
spy_called: dict[str, bool] = {"called": False}
|
|
|
|
def fake_cli_main():
|
|
spy_called["called"] = True
|
|
|
|
# Patch the heavy import to avoid actually running the MCP server.
|
|
# mcp_cli does the import lazily inside main(), so we monkeypatch
|
|
# sys.modules to inject a fake a2a_mcp_server.
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = fake_cli_main
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main() # should NOT exit
|
|
assert spy_called["called"], "expected cli_main to be invoked when env+file are valid"
|
|
|
|
|
|
def test_env_token_satisfies_token_requirement(capsys, monkeypatch):
|
|
"""Token from env must be accepted (external-runtime path)."""
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token")
|
|
|
|
spy_called: dict[str, bool] = {"called": False}
|
|
|
|
def fake_cli_main():
|
|
spy_called["called"] = True
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = fake_cli_main
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
assert spy_called["called"]
|
|
|
|
|
|
def test_whitespace_only_env_treated_as_missing(capsys, monkeypatch):
|
|
"""An accidentally-empty env var (WORKSPACE_ID=" ") must NOT be
|
|
considered set — otherwise the error would surface deep inside an
|
|
HTTP call instead of in this validator."""
|
|
monkeypatch.setenv("WORKSPACE_ID", " ")
|
|
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2
|
|
assert "WORKSPACE_ID" in err
|
|
|
|
|
|
def test_help_lists_canvas_tokens_tab_pointer(capsys):
|
|
"""Operator must know WHERE to get a token. The help mentions the
|
|
canvas Tokens tab so they can self-recover without asking on
|
|
Slack."""
|
|
code, err = _run_main_capturing_exit(capsys)
|
|
assert code == 2
|
|
assert "Tokens tab" in err or "canvas" in err.lower()
|
|
|
|
|
|
# ==================== Standalone register + heartbeat ====================
|
|
# molecule-mcp must be a single-process standalone runtime: it registers
|
|
# the workspace at startup AND continuously heartbeats so the platform
|
|
# healthsweep doesn't flip status back to awaiting_agent. Without these,
|
|
# the operator sees "OFFLINE — Restart" in the canvas within ~60s of
|
|
# launching the agent, which was the bug that motivated this PR.
|
|
|
|
|
|
def test_register_called_at_startup(monkeypatch):
|
|
"""When env is valid and heartbeat enabled, register fires once
|
|
before the MCP loop starts."""
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
|
|
|
register_calls: list[tuple[str, str, str]] = []
|
|
|
|
def fake_register(platform_url, workspace_id, token):
|
|
register_calls.append((platform_url, workspace_id, token))
|
|
|
|
def fake_start_thread(*_args, **_kwargs):
|
|
# Return a dummy thread-shaped object so the caller's reference
|
|
# is harmless. Real thread spawning is asserted separately.
|
|
class _Stub:
|
|
def join(self): pass
|
|
return _Stub()
|
|
|
|
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
|
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
|
|
|
|
spy_called: dict[str, bool] = {"called": False}
|
|
|
|
def fake_cli_main():
|
|
spy_called["called"] = True
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = fake_cli_main
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
|
|
assert register_calls == [
|
|
("https://test.moleculesai.app", "00000000-0000-0000-0000-000000000000", "tok"),
|
|
]
|
|
assert spy_called["called"], "MCP loop must run AFTER register"
|
|
|
|
|
|
def test_heartbeat_thread_started(monkeypatch):
|
|
"""The heartbeat daemon thread must start before the MCP loop runs."""
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
|
|
|
monkeypatch.setattr(mcp_cli, "_platform_register", lambda *a, **k: None)
|
|
|
|
thread_started: dict[str, bool] = {"started": False}
|
|
|
|
def fake_start_thread(platform_url, workspace_id, token):
|
|
thread_started["started"] = True
|
|
thread_started["args"] = (platform_url, workspace_id, token)
|
|
class _Stub:
|
|
def join(self): pass
|
|
return _Stub()
|
|
|
|
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = lambda: None
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
|
|
assert thread_started["started"], "heartbeat thread must be spawned"
|
|
assert thread_started["args"][1] == "00000000-0000-0000-0000-000000000000"
|
|
assert thread_started["args"][2] == "tok"
|
|
|
|
|
|
def test_heartbeat_disable_env_skips_both(monkeypatch):
|
|
"""MOLECULE_MCP_DISABLE_HEARTBEAT=1 (the test fixture default + the
|
|
in-container escape hatch) must skip BOTH register and heartbeat,
|
|
so the in-container heartbeat loop in heartbeat.py doesn't compete
|
|
with this thread."""
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
|
# MOLECULE_MCP_DISABLE_HEARTBEAT=1 is set by the autouse fixture.
|
|
|
|
register_called: dict[str, bool] = {"called": False}
|
|
thread_started: dict[str, bool] = {"started": False}
|
|
|
|
monkeypatch.setattr(
|
|
mcp_cli, "_platform_register",
|
|
lambda *a, **k: register_called.update(called=True),
|
|
)
|
|
monkeypatch.setattr(
|
|
mcp_cli, "_start_heartbeat_thread",
|
|
lambda *a, **k: thread_started.update(started=True),
|
|
)
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = lambda: None
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
|
|
assert register_called["called"] is False, "disable env must skip register"
|
|
assert thread_started["started"] is False, "disable env must skip heartbeat thread"
|
|
|
|
|
|
def test_token_resolved_from_env_when_no_file(monkeypatch):
|
|
"""Operator without a /configs volume — token comes from env var."""
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
|
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token")
|
|
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
|
|
|
captured_token: dict[str, str] = {}
|
|
|
|
def fake_register(platform_url, workspace_id, token):
|
|
captured_token["t"] = token
|
|
|
|
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
|
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = lambda: None
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
|
|
assert captured_token["t"] == "env-token"
|
|
|
|
|
|
def test_token_resolved_from_file_when_no_env(monkeypatch, tmp_path):
|
|
"""In-container parity: token comes from /configs/.auth_token when
|
|
env is unset. Mirrors platform_auth.get_token resolution order."""
|
|
(tmp_path / ".auth_token").write_text("file-token")
|
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
|
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
|
monkeypatch.delenv("MOLECULE_WORKSPACE_TOKEN", raising=False)
|
|
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
|
|
|
captured_token: dict[str, str] = {}
|
|
|
|
def fake_register(platform_url, workspace_id, token):
|
|
captured_token["t"] = token
|
|
|
|
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
|
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
|
|
|
|
import types
|
|
fake_module = types.ModuleType("a2a_mcp_server")
|
|
fake_module.cli_main = lambda: None
|
|
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
|
|
|
mcp_cli.main()
|
|
|
|
assert captured_token["t"] == "file-token"
|
|
|
|
|
|
def test_register_401_exits_with_actionable_error(monkeypatch, capsys):
|
|
"""Bad token at startup must hard-fail. Otherwise the operator
|
|
sees no error in their MCP client (which spawns the binary in a
|
|
subprocess), the heartbeat thread silently 401's forever, and
|
|
every tool call also 401's — needle-in-haystack debugging.
|
|
Hard-exiting prints a clear pointer to the canvas Tokens tab."""
|
|
|
|
class FakeResp:
|
|
status_code = 401
|
|
text = "invalid workspace auth token"
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *_a): return False
|
|
def post(self, *_a, **_kw): return FakeResp()
|
|
|
|
import types
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
mcp_cli._platform_register(
|
|
"https://test.moleculesai.app",
|
|
"ws-bad-token",
|
|
"wrong-token",
|
|
)
|
|
assert exc_info.value.code == 3
|
|
err = capsys.readouterr().err
|
|
assert "401" in err
|
|
assert "ws-bad-token" in err
|
|
assert "Tokens tab" in err or "canvas" in err.lower()
|
|
|
|
|
|
def test_register_403_also_exits(monkeypatch, capsys):
|
|
"""403 is the C18 hijack-prevention rejection — same operator
|
|
action (regenerate token) as 401."""
|
|
|
|
class FakeResp:
|
|
status_code = 403
|
|
text = "C18: live tokens exist; bearer didn't match"
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *_a): return False
|
|
def post(self, *_a, **_kw): return FakeResp()
|
|
|
|
import types
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
mcp_cli._platform_register(
|
|
"https://test.moleculesai.app",
|
|
"ws-hijack",
|
|
"stolen-token",
|
|
)
|
|
assert exc_info.value.code == 3
|
|
|
|
|
|
def test_register_500_does_not_exit(monkeypatch):
|
|
"""Transient platform errors (500, 503) must NOT hard-fail —
|
|
those clear on retry and the heartbeat thread will surface
|
|
persistent failures via warning logs."""
|
|
|
|
class FakeResp:
|
|
status_code = 503
|
|
text = "service unavailable"
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *_a): return False
|
|
def post(self, *_a, **_kw): return FakeResp()
|
|
|
|
import types
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
# Should return cleanly, no SystemExit raised
|
|
mcp_cli._platform_register(
|
|
"https://test.moleculesai.app",
|
|
"ws-ok",
|
|
"tok",
|
|
)
|
|
|
|
|
|
def test_register_payload_shape(monkeypatch):
|
|
"""The register POST body must use the field names the workspace-
|
|
server expects (id/url/agent_card/delivery_mode), and must include
|
|
the Origin header for the SaaS edge WAF."""
|
|
captured: dict[str, object] = {}
|
|
|
|
class FakeResp:
|
|
status_code = 200
|
|
text = ""
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *_a): return False
|
|
def post(self, url, json=None, headers=None):
|
|
captured["url"] = url
|
|
captured["json"] = json
|
|
captured["headers"] = headers
|
|
return FakeResp()
|
|
|
|
import types
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
mcp_cli._platform_register(
|
|
"https://test.moleculesai.app",
|
|
"ws-abc",
|
|
"tok",
|
|
)
|
|
|
|
assert captured["url"] == "https://test.moleculesai.app/registry/register"
|
|
body = captured["json"]
|
|
assert body["id"] == "ws-abc"
|
|
assert body["delivery_mode"] == "poll"
|
|
assert body["url"] == ""
|
|
assert "agent_card" in body
|
|
headers = captured["headers"]
|
|
assert headers["Authorization"] == "Bearer tok"
|
|
assert headers["Origin"] == "https://test.moleculesai.app"
|
|
|
|
|
|
def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch):
|
|
"""Heartbeat thread must POST to /registry/heartbeat with the
|
|
workspace_id + Origin/Authorization headers."""
|
|
captured: dict[str, object] = {}
|
|
|
|
class FakeResp:
|
|
status_code = 200
|
|
text = ""
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs): pass
|
|
def __enter__(self): return self
|
|
def __exit__(self, *_a): return False
|
|
def post(self, url, json=None, headers=None):
|
|
captured["url"] = url
|
|
captured["json"] = json
|
|
captured["headers"] = headers
|
|
return FakeResp()
|
|
|
|
import types
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
# Patch sleep so the loop exits after one tick (raise to break out).
|
|
sleep_calls: list[float] = []
|
|
|
|
def fake_sleep(seconds):
|
|
sleep_calls.append(seconds)
|
|
raise SystemExit # break out of the infinite loop
|
|
|
|
monkeypatch.setattr("time.sleep", fake_sleep)
|
|
|
|
with pytest.raises(SystemExit):
|
|
mcp_cli._heartbeat_loop(
|
|
"https://test.moleculesai.app",
|
|
"ws-abc",
|
|
"tok",
|
|
interval=20.0,
|
|
)
|
|
|
|
assert captured["url"] == "https://test.moleculesai.app/registry/heartbeat"
|
|
assert captured["json"]["workspace_id"] == "ws-abc"
|
|
assert captured["headers"]["Authorization"] == "Bearer tok"
|
|
assert captured["headers"]["Origin"] == "https://test.moleculesai.app"
|
|
assert sleep_calls == [20.0], "heartbeat must sleep the configured interval"
|
|
|
|
|
|
# ============== Heartbeat persists platform_inbound_secret (2026-04-30) ==============
|
|
# Heartbeat loop must persist the platform_inbound_secret returned by
|
|
# the platform. Without this, a workspace that lazy-healed the secret
|
|
# on the platform side recovers only on a runtime restart — chat upload
|
|
# 401-forever. Pairs with the server-side
|
|
# TestHeartbeatHandler_DeliversPlatformInboundSecret pin.
|
|
|
|
|
|
def test_heartbeat_persists_inbound_secret_from_response(monkeypatch, tmp_path):
|
|
"""Heartbeat 200 with platform_inbound_secret in body → save_inbound_secret called."""
|
|
|
|
class FakeResp:
|
|
status_code = 200
|
|
text = ""
|
|
|
|
def json(self):
|
|
return {"status": "ok", "platform_inbound_secret": "fresh-secret"}
|
|
|
|
saved: list[str] = []
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
|
|
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
|
|
assert saved == ["fresh-secret"], (
|
|
"expected save_inbound_secret called once with the platform's secret"
|
|
)
|
|
|
|
|
|
def test_heartbeat_persist_skips_when_secret_absent(monkeypatch):
|
|
"""Heartbeat 200 without platform_inbound_secret → no persist call."""
|
|
|
|
class FakeResp:
|
|
def json(self):
|
|
return {"status": "ok"}
|
|
|
|
saved: list[str] = []
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
|
|
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
|
|
assert saved == [], "no secret in body → must NOT call save_inbound_secret"
|
|
|
|
|
|
def test_heartbeat_persist_skips_on_empty_secret(monkeypatch):
|
|
"""Heartbeat 200 with empty-string platform_inbound_secret → no persist."""
|
|
|
|
class FakeResp:
|
|
def json(self):
|
|
return {"status": "ok", "platform_inbound_secret": ""}
|
|
|
|
saved: list[str] = []
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
|
|
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
|
|
assert saved == [], "empty secret string → must NOT call save_inbound_secret"
|
|
|
|
|
|
def test_heartbeat_persist_swallows_non_json_body(monkeypatch):
|
|
"""Heartbeat with unparseable body must not raise — logs + returns."""
|
|
|
|
class FakeResp:
|
|
def json(self):
|
|
raise ValueError("not json")
|
|
|
|
saved: list[str] = []
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
|
|
|
|
# Must not raise; non-JSON body is treated as "no secret to deliver".
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
assert saved == []
|
|
|
|
|
|
def test_heartbeat_persist_handles_non_dict_body(monkeypatch):
|
|
"""Heartbeat returning a list (not a dict) is silently ignored."""
|
|
|
|
class FakeResp:
|
|
def json(self):
|
|
return ["unexpected", "list"]
|
|
|
|
saved: list[str] = []
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
|
|
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
assert saved == []
|
|
|
|
|
|
def test_heartbeat_persist_swallows_save_exceptions(monkeypatch, caplog):
|
|
"""save_inbound_secret raising must not crash the heartbeat loop."""
|
|
|
|
class FakeResp:
|
|
def json(self):
|
|
return {"platform_inbound_secret": "x"}
|
|
|
|
def boom(_secret):
|
|
raise OSError("disk full")
|
|
|
|
import platform_inbound_auth
|
|
|
|
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", boom)
|
|
|
|
# Must not raise — heartbeat liveness > secret persistence.
|
|
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
|
|
|
|
|
|
def test_heartbeat_loop_calls_persist_on_success(monkeypatch):
|
|
"""End-to-end: heartbeat loop on 200 invokes the persist helper."""
|
|
saw: list[object] = []
|
|
|
|
def fake_persist(resp):
|
|
saw.append(resp)
|
|
|
|
monkeypatch.setattr(
|
|
mcp_cli, "_persist_inbound_secret_from_heartbeat", fake_persist
|
|
)
|
|
|
|
class FakeResp:
|
|
status_code = 200
|
|
text = ""
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *_a):
|
|
return False
|
|
|
|
def post(self, *_a, **_k):
|
|
return FakeResp()
|
|
|
|
import types
|
|
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
def fake_sleep(_):
|
|
raise SystemExit
|
|
|
|
monkeypatch.setattr("time.sleep", fake_sleep)
|
|
|
|
with pytest.raises(SystemExit):
|
|
mcp_cli._heartbeat_loop(
|
|
"https://test.moleculesai.app",
|
|
"ws-abc",
|
|
"tok",
|
|
interval=20.0,
|
|
)
|
|
|
|
assert len(saw) == 1, "persist helper must be called once per successful heartbeat"
|
|
|
|
|
|
def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch):
|
|
"""Heartbeat 4xx error path must NOT invoke persist (no body to trust)."""
|
|
saw: list[object] = []
|
|
monkeypatch.setattr(
|
|
mcp_cli,
|
|
"_persist_inbound_secret_from_heartbeat",
|
|
lambda r: saw.append(r),
|
|
)
|
|
|
|
class FakeResp:
|
|
status_code = 401
|
|
text = "unauthorized"
|
|
|
|
class FakeClient:
|
|
def __init__(self, **_kwargs):
|
|
pass
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *_a):
|
|
return False
|
|
|
|
def post(self, *_a, **_k):
|
|
return FakeResp()
|
|
|
|
import types
|
|
|
|
fake_httpx = types.ModuleType("httpx")
|
|
fake_httpx.Client = FakeClient
|
|
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
|
|
|
def fake_sleep(_):
|
|
raise SystemExit
|
|
|
|
monkeypatch.setattr("time.sleep", fake_sleep)
|
|
|
|
with pytest.raises(SystemExit):
|
|
mcp_cli._heartbeat_loop(
|
|
"https://test.moleculesai.app",
|
|
"ws-abc",
|
|
"tok",
|
|
interval=20.0,
|
|
)
|
|
|
|
assert saw == [], "4xx response must NOT trigger persist call"
|