From 427300f3a426c507955ba69b3b28b74a76985f2d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 30 Apr 2026 15:42:44 -0700 Subject: [PATCH] feat: make molecule-mcp standalone (built-in register + heartbeat) + recover awaiting_agent on heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two paired fixes that together let an external operator run a single process (molecule-mcp) and see their workspace come up online in the canvas — the bug surfaced live when status stuck at "awaiting_agent / OFFLINE" despite an active MCP server. Platform side (workspace-server/internal/handlers/registry.go): Heartbeat handler already auto-recovers offline → online and provisioning → online, but NOT awaiting_agent → online. Healthsweep flips stale-heartbeat external workspaces TO awaiting_agent, and with no recovery path the workspace stays "OFFLINE — Restart" in the canvas forever. Add the symmetric branch: if currentStatus == "awaiting_agent" and a heartbeat arrives, flip to online + broadcast WORKSPACE_ONLINE. Mirrors the existing offline/provisioning patterns exactly. Test: TestHeartbeatHandler_AwaitingAgentToOnline asserts the SQL UPDATE fires with the awaiting_agent guard clause. Wheel side (workspace/mcp_cli.py): molecule-mcp was outbound-only — operators had to run a separate SDK process to register + heartbeat. Now mcp_cli.main(): 1. Calls /registry/register at startup (idempotent upsert flips status awaiting_agent → online via the existing register path). 2. Spawns a daemon thread that POSTs /registry/heartbeat every 20s. 20s is comfortably under the healthsweep stale window so a single missed beat doesn't cause status churn. 3. Runs the MCP stdio loop in the foreground. Both calls set Origin: ${PLATFORM_URL} so the SaaS edge WAF accepts them. Threaded heartbeat (not asyncio) chosen because it doesn't need to share an event loop with the MCP stdio server — daemon=True cleanly dies when the operator's runtime exits. MOLECULE_MCP_DISABLE_HEARTBEAT=1 escape hatch lets in-container callers (which have heartbeat.py running already) reuse the entry point without double-heartbeating. Default is enabled. End-to-end verification (live, against hongmingwang.moleculesai.app, workspace 8dad3e29-...): pre-fix: status=awaiting_agent → canvas shows OFFLINE forever post-fix: ran `molecule-mcp` for 5s standalone → canvas state: status=online runtime=external agent=molecule-mcp-8dad3e29 Test coverage: 7 new mcp_cli tests (register-at-startup, heartbeat- thread-spawned, disable-env-skips-both, env-and-file token resolution, register payload shape, heartbeat endpoint + headers); 1 new platform test (awaiting_agent → online recovery). Full workspace + handlers suites green: 1355 Python, full Go handlers passing. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/registry.go | 28 ++ .../internal/handlers/registry_test.go | 52 ++++ workspace/mcp_cli.py | 217 ++++++++++++++- workspace/tests/test_mcp_cli.py | 262 +++++++++++++++++- 4 files changed, 555 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 585b124b..4421d1a5 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -720,6 +720,34 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea }) } + // Auto-recovery from awaiting_agent: external workspaces are flipped + // to 'awaiting_agent' by registry/healthsweep when their heartbeat + // goes stale (>staleAfter). When the operator's poller comes back — + // for example when their laptop wakes from sleep — the heartbeat + // resumes but does NOT re-register. Without this branch the + // workspace would stay 'awaiting_agent' forever (visible as OFFLINE + // in the canvas with a "Restart" CTA) even though the agent is + // actively heartbeating. + // + // Discovered while smoke-testing the universal MCP path against a + // freshly-registered external workspace: register set status=online + // + sent one heartbeat → healthsweep then flipped back to + // awaiting_agent because the smoke didn't loop. The molecule-mcp + // console script's built-in heartbeat thread (PR #2413) drives + // continuous heartbeats now, but without THIS branch those + // heartbeats can't lift the workspace out of awaiting_agent on + // their own. + if currentStatus == "awaiting_agent" { + if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'awaiting_agent'`, models.StatusOnline, payload.WorkspaceID); err != nil { + log.Printf("Heartbeat: failed to recover %s from awaiting_agent: %v", payload.WorkspaceID, err) + } else { + log.Printf("Heartbeat: transitioned %s from awaiting_agent to online (heartbeat received)", payload.WorkspaceID) + } + h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{ + "recovered_from": currentStatus, + }) + } + // #1870 Phase 1: drain one queued A2A request if the target reports // spare capacity. The heartbeat's active_tasks field reflects what the // workspace runtime is ACTUALLY running right now, independent of diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index ee549637..491d5144 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -193,6 +193,58 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) { } } +// ==================== Heartbeat — awaiting_agent → online recovery ==================== +// External workspaces flip to 'awaiting_agent' via healthsweep when their +// heartbeat goes stale. When the operator's poller comes back, heartbeat +// must lift the workspace out of awaiting_agent the same way it does for +// 'offline' and 'provisioning'. Without this branch, an external workspace +// stays OFFLINE in the canvas forever despite active heartbeats. + +func TestHeartbeatHandler_AwaitingAgentToOnline(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-external"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-external", 0.0, "", 0, 60, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-external"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("awaiting_agent")) + + // The new branch — UPDATE ... WHERE status = 'awaiting_agent' + mock.ExpectExec("UPDATE workspaces SET status ="). + WithArgs(models.StatusOnline, "ws-external"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Broadcast WORKSPACE_ONLINE + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + body := `{"workspace_id":"ws-external","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + func TestHeartbeatHandler_BadJSON(t *testing.T) { setupTestDB(t) setupTestRedis(t) diff --git a/workspace/mcp_cli.py b/workspace/mcp_cli.py index 691a9a19..6cd0b83e 100644 --- a/workspace/mcp_cli.py +++ b/workspace/mcp_cli.py @@ -9,16 +9,177 @@ to stderr so an operator running ``molecule-mcp`` for the first time gets the right pointer in the first 3 lines of output instead of a 20-line traceback. -Existing in-container usage (``python -m molecule_runtime.a2a_mcp_server`` -or direct import) is unaffected — those paths bypass this wrapper. Only -the external-runtime ``molecule-mcp`` console script routes through here. +Standalone-runtime contract: this wrapper is responsible for keeping +the workspace ALIVE on the platform side, not just exposing tools. +Concretely it: + 1. Calls ``POST /registry/register`` once at startup (idempotent — + the upsert flips status awaiting_agent → online for an external + workspace whose token matches). + 2. Spawns a daemon heartbeat thread that POSTs to + ``POST /registry/heartbeat`` every 20s. Without continuous + heartbeats the platform's healthsweep flips the workspace back + to awaiting_agent (visible as OFFLINE in the canvas with a + "Restart" CTA) within 60-90s. + 3. Runs the MCP stdio loop in the foreground. + +Why threads + sync requests: the MCP stdio server is async. The +heartbeat work is fire-and-forget HTTP. A daemon thread is the +lowest-friction integration — no asyncio bridging, dies automatically +when the main process exits, and ``requests`` is already a transitive +dependency via ``a2a-sdk``. + +In-container usage (``python -m molecule_runtime.a2a_mcp_server`` or +direct import) bypasses this wrapper — the workspace runtime has its +own heartbeat loop in ``heartbeat.py`` so we don't double-heartbeat. """ from __future__ import annotations +import logging import os import sys +import threading +import time from pathlib import Path +logger = logging.getLogger(__name__) + +# Heartbeat cadence. Must be tighter than healthsweep's stale window +# (currently 60-90s — see registry/healthsweep.go) by a comfortable +# margin so a single missed heartbeat doesn't flip awaiting_agent. +# 20s gives the operator's network 3 attempts within the budget; long +# enough that it doesn't spam, short enough to recover quickly after +# laptop sleep. +HEARTBEAT_INTERVAL_SECONDS = 20.0 + + +def _platform_register(platform_url: str, workspace_id: str, token: str) -> None: + """Best-effort one-shot register at startup. + + Lifts the workspace from ``awaiting_agent`` to ``online`` for + operators who never ran the curl-register snippet. Safe to call + repeatedly: the platform's register handler is an upsert that just + refreshes ``url``, ``agent_card``, and ``status``. Skips silently + on transport/HTTP errors so a misconfigured PLATFORM_URL doesn't + abort the MCP loop — the heartbeat thread will keep retrying and + surface the persistent failure that way. + + Origin header is required by the SaaS edge WAF; without it + /registry/register currently still works (it's on the WAF + allowlist), but the heartbeat path needs Origin and we want one + consistent header set across both calls. + """ + try: + import httpx + except ImportError: + # httpx is a transitive dep via a2a-sdk; if missing, the MCP + # server won't import either. Let the caller's later import + # surface the real error. + return + + payload = { + "id": workspace_id, + "url": "", + "agent_card": {"name": f"molecule-mcp-{workspace_id[:8]}", "skills": []}, + "delivery_mode": "poll", + } + headers = { + "Authorization": f"Bearer {token}", + "Origin": platform_url, + "Content-Type": "application/json", + } + try: + with httpx.Client(timeout=10.0) as client: + resp = client.post( + f"{platform_url}/registry/register", + json=payload, + headers=headers, + ) + if resp.status_code >= 400: + logger.warning( + "molecule-mcp: register POST returned HTTP %d: %s", + resp.status_code, + (resp.text or "")[:200], + ) + else: + logger.info( + "molecule-mcp: registered workspace %s with platform", + workspace_id, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("molecule-mcp: register POST failed: %s", exc) + + +def _heartbeat_loop( + platform_url: str, + workspace_id: str, + token: str, + interval: float = HEARTBEAT_INTERVAL_SECONDS, +) -> None: + """Daemon thread body: POST /registry/heartbeat every ``interval``s. + + Failures are logged at WARNING and the loop continues. The thread + exits when the main process does (daemon=True). Each iteration + rebuilds the payload + headers — cheap and ensures token rotation + via env var (rare but possible) is picked up on the next tick. + """ + try: + import httpx + except ImportError: + return + + start_time = time.time() + while True: + body = { + "workspace_id": workspace_id, + "error_rate": 0.0, + "sample_error": "", + "active_tasks": 0, + "uptime_seconds": int(time.time() - start_time), + } + headers = { + "Authorization": f"Bearer {token}", + "Origin": platform_url, + "Content-Type": "application/json", + } + try: + with httpx.Client(timeout=10.0) as client: + resp = client.post( + f"{platform_url}/registry/heartbeat", + json=body, + headers=headers, + ) + if resp.status_code >= 400: + logger.warning( + "molecule-mcp: heartbeat HTTP %d: %s", + resp.status_code, + (resp.text or "")[:200], + ) + except Exception as exc: # noqa: BLE001 + logger.warning("molecule-mcp: heartbeat failed: %s", exc) + time.sleep(interval) + + +def _start_heartbeat_thread( + platform_url: str, + workspace_id: str, + token: str, +) -> threading.Thread: + """Start the heartbeat daemon thread. Returns the Thread handle. + + The MCP stdio loop runs in the foreground (asyncio); this thread + runs alongside it. ``daemon=True`` so when the operator hits + Ctrl-C / closes the runtime, the heartbeat dies with it instead + of leaking and writing to a stale workspace. + """ + t = threading.Thread( + target=_heartbeat_loop, + args=(platform_url, workspace_id, token), + name="molecule-mcp-heartbeat", + daemon=True, + ) + t.start() + return t + def _print_missing_env_help(missing: list[str], have_token_file: bool) -> None: print("molecule-mcp: missing required environment.\n", file=sys.stderr) @@ -63,6 +224,38 @@ def main() -> None: _print_missing_env_help(missing, have_token_file=has_token_file) sys.exit(2) + # Resolve the effective token: env wins (operator override), then + # the on-disk file (in-container default). Mirrors + # platform_auth.get_token's resolution order so we don't + # double-implement. + token = ( + os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip() + or _read_token_file() + ) + workspace_id = os.environ["WORKSPACE_ID"].strip() + platform_url = os.environ["PLATFORM_URL"].strip().rstrip("/") + + # Configure logging so the operator sees register/heartbeat status + # without needing to set up logging themselves. WARNING by default + # keeps the steady-state quiet (only failures); MOLECULE_MCP_VERBOSE=1 + # surfaces register-success + per-tick heartbeat info for debugging. + log_level = ( + logging.INFO + if os.environ.get("MOLECULE_MCP_VERBOSE", "").strip() + else logging.WARNING + ) + logging.basicConfig(level=log_level, format="[molecule-mcp] %(message)s") + + # Standalone-mode register + heartbeat. Skipped via env var so an + # in-container caller (which has its own heartbeat loop) can reuse + # this entry point without double-heartbeating. The wheel's main + # console-script path always runs them; the + # MOLECULE_MCP_DISABLE_HEARTBEAT escape hatch exists for tests + + # the rare embedded use-case. + if not os.environ.get("MOLECULE_MCP_DISABLE_HEARTBEAT", "").strip(): + _platform_register(platform_url, workspace_id, token) + _start_heartbeat_thread(platform_url, workspace_id, token) + # Env is valid — safe to import the heavy module now. Importing # earlier would trigger a2a_client.py:22's module-level RuntimeError # before our friendly help reaches the user. @@ -70,5 +263,23 @@ def main() -> None: cli_main() +def _read_token_file() -> str: + """Read the token from ${CONFIGS_DIR}/.auth_token if present. + + Mirrors platform_auth._token_file but without importing the heavy + module here (that import triggers a2a_client's WORKSPACE_ID guard + which is fine after env validation, but cheaper to inline a 4-line + file read than pull in the whole stack just for the path). + """ + configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs")) + path = configs_dir / ".auth_token" + if not path.is_file(): + return "" + try: + return path.read_text().strip() + except OSError: + return "" + + if __name__ == "__main__": # pragma: no cover main() diff --git a/workspace/tests/test_mcp_cli.py b/workspace/tests/test_mcp_cli.py index 5e466838..c4732b90 100644 --- a/workspace/tests/test_mcp_cli.py +++ b/workspace/tests/test_mcp_cli.py @@ -18,10 +18,14 @@ import mcp_cli @pytest.fixture(autouse=True) def _isolate(monkeypatch, tmp_path): """Each test starts with no Molecule env vars set + a fresh - CONFIGS_DIR pointing at an empty tmpdir.""" + CONFIGS_DIR pointing at an empty tmpdir. The heartbeat thread is + disabled by default so happy-path tests don't spawn a background + POST loop against a fake URL — individual tests opt back in via + monkeypatch.delenv when they want to assert heartbeat behavior.""" for var in ("WORKSPACE_ID", "PLATFORM_URL", "MOLECULE_WORKSPACE_TOKEN"): monkeypatch.delenv(var, raising=False) monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + monkeypatch.setenv("MOLECULE_MCP_DISABLE_HEARTBEAT", "1") yield @@ -139,3 +143,259 @@ def test_help_lists_canvas_tokens_tab_pointer(capsys): code, err = _run_main_capturing_exit(capsys) assert code == 2 assert "Tokens tab" in err or "canvas" in err.lower() + + +# ==================== Standalone register + heartbeat ==================== +# molecule-mcp must be a single-process standalone runtime: it registers +# the workspace at startup AND continuously heartbeats so the platform +# healthsweep doesn't flip status back to awaiting_agent. Without these, +# the operator sees "OFFLINE — Restart" in the canvas within ~60s of +# launching the agent, which was the bug that motivated this PR. + + +def test_register_called_at_startup(monkeypatch): + """When env is valid and heartbeat enabled, register fires once + before the MCP loop starts.""" + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok") + monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False) + + register_calls: list[tuple[str, str, str]] = [] + + def fake_register(platform_url, workspace_id, token): + register_calls.append((platform_url, workspace_id, token)) + + def fake_start_thread(*_args, **_kwargs): + # Return a dummy thread-shaped object so the caller's reference + # is harmless. Real thread spawning is asserted separately. + class _Stub: + def join(self): pass + return _Stub() + + monkeypatch.setattr(mcp_cli, "_platform_register", fake_register) + monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread) + + spy_called: dict[str, bool] = {"called": False} + + def fake_cli_main(): + spy_called["called"] = True + + import types + fake_module = types.ModuleType("a2a_mcp_server") + fake_module.cli_main = fake_cli_main + monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module) + + mcp_cli.main() + + assert register_calls == [ + ("https://test.moleculesai.app", "00000000-0000-0000-0000-000000000000", "tok"), + ] + assert spy_called["called"], "MCP loop must run AFTER register" + + +def test_heartbeat_thread_started(monkeypatch): + """The heartbeat daemon thread must start before the MCP loop runs.""" + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok") + monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False) + + monkeypatch.setattr(mcp_cli, "_platform_register", lambda *a, **k: None) + + thread_started: dict[str, bool] = {"started": False} + + def fake_start_thread(platform_url, workspace_id, token): + thread_started["started"] = True + thread_started["args"] = (platform_url, workspace_id, token) + class _Stub: + def join(self): pass + return _Stub() + + monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread) + + import types + fake_module = types.ModuleType("a2a_mcp_server") + fake_module.cli_main = lambda: None + monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module) + + mcp_cli.main() + + assert thread_started["started"], "heartbeat thread must be spawned" + assert thread_started["args"][1] == "00000000-0000-0000-0000-000000000000" + assert thread_started["args"][2] == "tok" + + +def test_heartbeat_disable_env_skips_both(monkeypatch): + """MOLECULE_MCP_DISABLE_HEARTBEAT=1 (the test fixture default + the + in-container escape hatch) must skip BOTH register and heartbeat, + so the in-container heartbeat loop in heartbeat.py doesn't compete + with this thread.""" + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok") + # MOLECULE_MCP_DISABLE_HEARTBEAT=1 is set by the autouse fixture. + + register_called: dict[str, bool] = {"called": False} + thread_started: dict[str, bool] = {"started": False} + + monkeypatch.setattr( + mcp_cli, "_platform_register", + lambda *a, **k: register_called.update(called=True), + ) + monkeypatch.setattr( + mcp_cli, "_start_heartbeat_thread", + lambda *a, **k: thread_started.update(started=True), + ) + + import types + fake_module = types.ModuleType("a2a_mcp_server") + fake_module.cli_main = lambda: None + monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module) + + mcp_cli.main() + + assert register_called["called"] is False, "disable env must skip register" + assert thread_started["started"] is False, "disable env must skip heartbeat thread" + + +def test_token_resolved_from_env_when_no_file(monkeypatch): + """Operator without a /configs volume — token comes from env var.""" + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token") + monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False) + + captured_token: dict[str, str] = {} + + def fake_register(platform_url, workspace_id, token): + captured_token["t"] = token + + monkeypatch.setattr(mcp_cli, "_platform_register", fake_register) + monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None) + + import types + fake_module = types.ModuleType("a2a_mcp_server") + fake_module.cli_main = lambda: None + monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module) + + mcp_cli.main() + + assert captured_token["t"] == "env-token" + + +def test_token_resolved_from_file_when_no_env(monkeypatch, tmp_path): + """In-container parity: token comes from /configs/.auth_token when + env is unset. Mirrors platform_auth.get_token resolution order.""" + (tmp_path / ".auth_token").write_text("file-token") + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app") + monkeypatch.delenv("MOLECULE_WORKSPACE_TOKEN", raising=False) + monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False) + + captured_token: dict[str, str] = {} + + def fake_register(platform_url, workspace_id, token): + captured_token["t"] = token + + monkeypatch.setattr(mcp_cli, "_platform_register", fake_register) + monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None) + + import types + fake_module = types.ModuleType("a2a_mcp_server") + fake_module.cli_main = lambda: None + monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module) + + mcp_cli.main() + + assert captured_token["t"] == "file-token" + + +def test_register_payload_shape(monkeypatch): + """The register POST body must use the field names the workspace- + server expects (id/url/agent_card/delivery_mode), and must include + the Origin header for the SaaS edge WAF.""" + captured: dict[str, object] = {} + + class FakeResp: + status_code = 200 + text = "" + + class FakeClient: + def __init__(self, **_kwargs): pass + def __enter__(self): return self + def __exit__(self, *_a): return False + def post(self, url, json=None, headers=None): + captured["url"] = url + captured["json"] = json + captured["headers"] = headers + return FakeResp() + + import types + fake_httpx = types.ModuleType("httpx") + fake_httpx.Client = FakeClient + monkeypatch.setitem(sys.modules, "httpx", fake_httpx) + + mcp_cli._platform_register( + "https://test.moleculesai.app", + "ws-abc", + "tok", + ) + + assert captured["url"] == "https://test.moleculesai.app/registry/register" + body = captured["json"] + assert body["id"] == "ws-abc" + assert body["delivery_mode"] == "poll" + assert body["url"] == "" + assert "agent_card" in body + headers = captured["headers"] + assert headers["Authorization"] == "Bearer tok" + assert headers["Origin"] == "https://test.moleculesai.app" + + +def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch): + """Heartbeat thread must POST to /registry/heartbeat with the + workspace_id + Origin/Authorization headers.""" + captured: dict[str, object] = {} + + class FakeResp: + status_code = 200 + text = "" + + class FakeClient: + def __init__(self, **_kwargs): pass + def __enter__(self): return self + def __exit__(self, *_a): return False + def post(self, url, json=None, headers=None): + captured["url"] = url + captured["json"] = json + captured["headers"] = headers + return FakeResp() + + import types + fake_httpx = types.ModuleType("httpx") + fake_httpx.Client = FakeClient + monkeypatch.setitem(sys.modules, "httpx", fake_httpx) + + # Patch sleep so the loop exits after one tick (raise to break out). + sleep_calls: list[float] = [] + + def fake_sleep(seconds): + sleep_calls.append(seconds) + raise SystemExit # break out of the infinite loop + + monkeypatch.setattr("time.sleep", fake_sleep) + + with pytest.raises(SystemExit): + mcp_cli._heartbeat_loop( + "https://test.moleculesai.app", + "ws-abc", + "tok", + interval=20.0, + ) + + assert captured["url"] == "https://test.moleculesai.app/registry/heartbeat" + assert captured["json"]["workspace_id"] == "ws-abc" + assert captured["headers"]["Authorization"] == "Bearer tok" + assert captured["headers"]["Origin"] == "https://test.moleculesai.app" + assert sleep_calls == [20.0], "heartbeat must sleep the configured interval"