molecule-core/workspace/tests/test_mcp_cli.py
Hongming Wang 427300f3a4 feat: make molecule-mcp standalone (built-in register + heartbeat) + recover awaiting_agent on heartbeat
Two paired fixes that together let an external operator run a single
process (molecule-mcp) and see their workspace come up online in the
canvas — the bug surfaced live when status stuck at "awaiting_agent /
OFFLINE" despite an active MCP server.

Platform side (workspace-server/internal/handlers/registry.go):
  Heartbeat handler already auto-recovers offline → online and
  provisioning → online, but NOT awaiting_agent → online. Healthsweep
  flips stale-heartbeat external workspaces TO awaiting_agent, and
  with no recovery path the workspace stays "OFFLINE — Restart" in the
  canvas forever. Add the symmetric branch: if currentStatus ==
  "awaiting_agent" and a heartbeat arrives, flip to online + broadcast
  WORKSPACE_ONLINE. Mirrors the existing offline/provisioning patterns
  exactly. Test: TestHeartbeatHandler_AwaitingAgentToOnline asserts
  the SQL UPDATE fires with the awaiting_agent guard clause.

Wheel side (workspace/mcp_cli.py):
  molecule-mcp was outbound-only — operators had to run a separate
  SDK process to register + heartbeat. Now mcp_cli.main():
    1. Calls /registry/register at startup (idempotent upsert flips
       status awaiting_agent → online via the existing register path).
    2. Spawns a daemon thread that POSTs /registry/heartbeat every
       20s. 20s is comfortably under the healthsweep stale window so
       a single missed beat doesn't cause status churn.
    3. Runs the MCP stdio loop in the foreground.

  Both calls set Origin: ${PLATFORM_URL} so the SaaS edge WAF accepts
  them. Threaded heartbeat (not asyncio) chosen because it doesn't
  need to share an event loop with the MCP stdio server — daemon=True
  cleanly dies when the operator's runtime exits.

  MOLECULE_MCP_DISABLE_HEARTBEAT=1 escape hatch lets in-container
  callers (which have heartbeat.py running already) reuse the entry
  point without double-heartbeating. Default is enabled.

End-to-end verification (live, against
hongmingwang.moleculesai.app, workspace 8dad3e29-...):
  pre-fix:  status=awaiting_agent → canvas shows OFFLINE forever
  post-fix: ran `molecule-mcp` for 5s standalone → canvas state:
            status=online runtime=external agent=molecule-mcp-8dad3e29

Test coverage: 7 new mcp_cli tests (register-at-startup, heartbeat-
thread-spawned, disable-env-skips-both, env-and-file token resolution,
register payload shape, heartbeat endpoint + headers); 1 new platform
test (awaiting_agent → online recovery). Full workspace + handlers
suites green: 1355 Python, full Go handlers passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 15:42:44 -07:00

402 lines
15 KiB
Python

"""Tests for workspace/mcp_cli.py — the molecule-mcp console-script
entry-point validator.
The wrapper exists to surface a friendly missing-env error before
a2a_client.py:22's module-level RuntimeError fires. Regressions here
ship a poor first-run UX to every external-runtime operator.
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
import mcp_cli
@pytest.fixture(autouse=True)
def _isolate(monkeypatch, tmp_path):
"""Each test starts with no Molecule env vars set + a fresh
CONFIGS_DIR pointing at an empty tmpdir. The heartbeat thread is
disabled by default so happy-path tests don't spawn a background
POST loop against a fake URL — individual tests opt back in via
monkeypatch.delenv when they want to assert heartbeat behavior."""
for var in ("WORKSPACE_ID", "PLATFORM_URL", "MOLECULE_WORKSPACE_TOKEN"):
monkeypatch.delenv(var, raising=False)
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
monkeypatch.setenv("MOLECULE_MCP_DISABLE_HEARTBEAT", "1")
yield
def _run_main_capturing_exit(capsys) -> tuple[int, str]:
"""Call mcp_cli.main and return (exit_code, stderr).
main() is supposed to sys.exit on missing env. Any non-exit return
means it tried to run the real MCP loop, which we don't want in a
unit test (and which would also fail because we never set the
mandatory env).
"""
with pytest.raises(SystemExit) as exc_info:
mcp_cli.main()
captured = capsys.readouterr()
code = exc_info.value.code if isinstance(exc_info.value.code, int) else 1
return code, captured.err
def test_missing_workspace_id_exits_with_message(capsys):
code, err = _run_main_capturing_exit(capsys)
assert code == 2, f"expected exit code 2, got {code}"
assert "WORKSPACE_ID" in err
assert "PLATFORM_URL" in err # also missing
assert "MOLECULE_WORKSPACE_TOKEN" in err # also missing
def test_only_workspace_id_missing(capsys, monkeypatch):
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
code, err = _run_main_capturing_exit(capsys)
assert code == 2
# Only WORKSPACE_ID should appear in the "currently missing" list.
assert "Currently missing: WORKSPACE_ID" in err
def test_only_platform_url_missing(capsys, monkeypatch):
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
code, err = _run_main_capturing_exit(capsys)
assert code == 2
assert "Currently missing: PLATFORM_URL" in err
def test_only_token_missing(capsys, monkeypatch):
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
code, err = _run_main_capturing_exit(capsys)
assert code == 2
assert "MOLECULE_WORKSPACE_TOKEN" in err
def test_token_file_satisfies_token_requirement(capsys, monkeypatch, tmp_path):
"""Token from CONFIGS_DIR/.auth_token must be accepted (in-container
path)."""
(tmp_path / ".auth_token").write_text("file-token")
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
# No MOLECULE_WORKSPACE_TOKEN — but file exists. Validation should
# pass; we then short-circuit before importing the heavy module by
# patching the import to a no-op spy.
spy_called: dict[str, bool] = {"called": False}
def fake_cli_main():
spy_called["called"] = True
# Patch the heavy import to avoid actually running the MCP server.
# mcp_cli does the import lazily inside main(), so we monkeypatch
# sys.modules to inject a fake a2a_mcp_server.
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = fake_cli_main
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main() # should NOT exit
assert spy_called["called"], "expected cli_main to be invoked when env+file are valid"
def test_env_token_satisfies_token_requirement(capsys, monkeypatch):
"""Token from env must be accepted (external-runtime path)."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token")
spy_called: dict[str, bool] = {"called": False}
def fake_cli_main():
spy_called["called"] = True
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = fake_cli_main
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert spy_called["called"]
def test_whitespace_only_env_treated_as_missing(capsys, monkeypatch):
"""An accidentally-empty env var (WORKSPACE_ID=" ") must NOT be
considered set — otherwise the error would surface deep inside an
HTTP call instead of in this validator."""
monkeypatch.setenv("WORKSPACE_ID", " ")
monkeypatch.setenv("PLATFORM_URL", "http://localhost:8080")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
code, err = _run_main_capturing_exit(capsys)
assert code == 2
assert "WORKSPACE_ID" in err
def test_help_lists_canvas_tokens_tab_pointer(capsys):
"""Operator must know WHERE to get a token. The help mentions the
canvas Tokens tab so they can self-recover without asking on
Slack."""
code, err = _run_main_capturing_exit(capsys)
assert code == 2
assert "Tokens tab" in err or "canvas" in err.lower()
# ==================== Standalone register + heartbeat ====================
# molecule-mcp must be a single-process standalone runtime: it registers
# the workspace at startup AND continuously heartbeats so the platform
# healthsweep doesn't flip status back to awaiting_agent. Without these,
# the operator sees "OFFLINE — Restart" in the canvas within ~60s of
# launching the agent, which was the bug that motivated this PR.
def test_register_called_at_startup(monkeypatch):
"""When env is valid and heartbeat enabled, register fires once
before the MCP loop starts."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
register_calls: list[tuple[str, str, str]] = []
def fake_register(platform_url, workspace_id, token):
register_calls.append((platform_url, workspace_id, token))
def fake_start_thread(*_args, **_kwargs):
# Return a dummy thread-shaped object so the caller's reference
# is harmless. Real thread spawning is asserted separately.
class _Stub:
def join(self): pass
return _Stub()
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
spy_called: dict[str, bool] = {"called": False}
def fake_cli_main():
spy_called["called"] = True
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = fake_cli_main
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert register_calls == [
("https://test.moleculesai.app", "00000000-0000-0000-0000-000000000000", "tok"),
]
assert spy_called["called"], "MCP loop must run AFTER register"
def test_heartbeat_thread_started(monkeypatch):
"""The heartbeat daemon thread must start before the MCP loop runs."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
monkeypatch.setattr(mcp_cli, "_platform_register", lambda *a, **k: None)
thread_started: dict[str, bool] = {"started": False}
def fake_start_thread(platform_url, workspace_id, token):
thread_started["started"] = True
thread_started["args"] = (platform_url, workspace_id, token)
class _Stub:
def join(self): pass
return _Stub()
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = lambda: None
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert thread_started["started"], "heartbeat thread must be spawned"
assert thread_started["args"][1] == "00000000-0000-0000-0000-000000000000"
assert thread_started["args"][2] == "tok"
def test_heartbeat_disable_env_skips_both(monkeypatch):
"""MOLECULE_MCP_DISABLE_HEARTBEAT=1 (the test fixture default + the
in-container escape hatch) must skip BOTH register and heartbeat,
so the in-container heartbeat loop in heartbeat.py doesn't compete
with this thread."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
# MOLECULE_MCP_DISABLE_HEARTBEAT=1 is set by the autouse fixture.
register_called: dict[str, bool] = {"called": False}
thread_started: dict[str, bool] = {"started": False}
monkeypatch.setattr(
mcp_cli, "_platform_register",
lambda *a, **k: register_called.update(called=True),
)
monkeypatch.setattr(
mcp_cli, "_start_heartbeat_thread",
lambda *a, **k: thread_started.update(started=True),
)
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = lambda: None
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert register_called["called"] is False, "disable env must skip register"
assert thread_started["started"] is False, "disable env must skip heartbeat thread"
def test_token_resolved_from_env_when_no_file(monkeypatch):
"""Operator without a /configs volume — token comes from env var."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token")
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
captured_token: dict[str, str] = {}
def fake_register(platform_url, workspace_id, token):
captured_token["t"] = token
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = lambda: None
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert captured_token["t"] == "env-token"
def test_token_resolved_from_file_when_no_env(monkeypatch, tmp_path):
"""In-container parity: token comes from /configs/.auth_token when
env is unset. Mirrors platform_auth.get_token resolution order."""
(tmp_path / ".auth_token").write_text("file-token")
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
monkeypatch.delenv("MOLECULE_WORKSPACE_TOKEN", raising=False)
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
captured_token: dict[str, str] = {}
def fake_register(platform_url, workspace_id, token):
captured_token["t"] = token
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
import types
fake_module = types.ModuleType("a2a_mcp_server")
fake_module.cli_main = lambda: None
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
mcp_cli.main()
assert captured_token["t"] == "file-token"
def test_register_payload_shape(monkeypatch):
"""The register POST body must use the field names the workspace-
server expects (id/url/agent_card/delivery_mode), and must include
the Origin header for the SaaS edge WAF."""
captured: dict[str, object] = {}
class FakeResp:
status_code = 200
text = ""
class FakeClient:
def __init__(self, **_kwargs): pass
def __enter__(self): return self
def __exit__(self, *_a): return False
def post(self, url, json=None, headers=None):
captured["url"] = url
captured["json"] = json
captured["headers"] = headers
return FakeResp()
import types
fake_httpx = types.ModuleType("httpx")
fake_httpx.Client = FakeClient
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
mcp_cli._platform_register(
"https://test.moleculesai.app",
"ws-abc",
"tok",
)
assert captured["url"] == "https://test.moleculesai.app/registry/register"
body = captured["json"]
assert body["id"] == "ws-abc"
assert body["delivery_mode"] == "poll"
assert body["url"] == ""
assert "agent_card" in body
headers = captured["headers"]
assert headers["Authorization"] == "Bearer tok"
assert headers["Origin"] == "https://test.moleculesai.app"
def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch):
"""Heartbeat thread must POST to /registry/heartbeat with the
workspace_id + Origin/Authorization headers."""
captured: dict[str, object] = {}
class FakeResp:
status_code = 200
text = ""
class FakeClient:
def __init__(self, **_kwargs): pass
def __enter__(self): return self
def __exit__(self, *_a): return False
def post(self, url, json=None, headers=None):
captured["url"] = url
captured["json"] = json
captured["headers"] = headers
return FakeResp()
import types
fake_httpx = types.ModuleType("httpx")
fake_httpx.Client = FakeClient
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
# Patch sleep so the loop exits after one tick (raise to break out).
sleep_calls: list[float] = []
def fake_sleep(seconds):
sleep_calls.append(seconds)
raise SystemExit # break out of the infinite loop
monkeypatch.setattr("time.sleep", fake_sleep)
with pytest.raises(SystemExit):
mcp_cli._heartbeat_loop(
"https://test.moleculesai.app",
"ws-abc",
"tok",
interval=20.0,
)
assert captured["url"] == "https://test.moleculesai.app/registry/heartbeat"
assert captured["json"]["workspace_id"] == "ws-abc"
assert captured["headers"]["Authorization"] == "Bearer tok"
assert captured["headers"]["Origin"] == "https://test.moleculesai.app"
assert sleep_calls == [20.0], "heartbeat must sleep the configured interval"