feat: make molecule-mcp standalone (built-in register + heartbeat) + recover awaiting_agent on heartbeat
Two paired fixes that together let an external operator run a single
process (molecule-mcp) and see their workspace come up online in the
canvas — the bug surfaced live when status stuck at "awaiting_agent /
OFFLINE" despite an active MCP server.
Platform side (workspace-server/internal/handlers/registry.go):
Heartbeat handler already auto-recovers offline → online and
provisioning → online, but NOT awaiting_agent → online. Healthsweep
flips stale-heartbeat external workspaces TO awaiting_agent, and
with no recovery path the workspace stays "OFFLINE — Restart" in the
canvas forever. Add the symmetric branch: if currentStatus ==
"awaiting_agent" and a heartbeat arrives, flip to online + broadcast
WORKSPACE_ONLINE. Mirrors the existing offline/provisioning patterns
exactly. Test: TestHeartbeatHandler_AwaitingAgentToOnline asserts
the SQL UPDATE fires with the awaiting_agent guard clause.
Wheel side (workspace/mcp_cli.py):
molecule-mcp was outbound-only — operators had to run a separate
SDK process to register + heartbeat. Now mcp_cli.main():
1. Calls /registry/register at startup (idempotent upsert flips
status awaiting_agent → online via the existing register path).
2. Spawns a daemon thread that POSTs /registry/heartbeat every
20s. 20s is comfortably under the healthsweep stale window so
a single missed beat doesn't cause status churn.
3. Runs the MCP stdio loop in the foreground.
Both calls set Origin: ${PLATFORM_URL} so the SaaS edge WAF accepts
them. Threaded heartbeat (not asyncio) chosen because it doesn't
need to share an event loop with the MCP stdio server — daemon=True
cleanly dies when the operator's runtime exits.
MOLECULE_MCP_DISABLE_HEARTBEAT=1 escape hatch lets in-container
callers (which have heartbeat.py running already) reuse the entry
point without double-heartbeating. Default is enabled.
End-to-end verification (live, against
hongmingwang.moleculesai.app, workspace 8dad3e29-...):
pre-fix: status=awaiting_agent → canvas shows OFFLINE forever
post-fix: ran `molecule-mcp` for 5s standalone → canvas state:
status=online runtime=external agent=molecule-mcp-8dad3e29
Test coverage: 7 new mcp_cli tests (register-at-startup, heartbeat-
thread-spawned, disable-env-skips-both, env-and-file token resolution,
register payload shape, heartbeat endpoint + headers); 1 new platform
test (awaiting_agent → online recovery). Full workspace + handlers
suites green: 1355 Python, full Go handlers passing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
716589742c
commit
427300f3a4
@ -720,6 +720,34 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
})
|
||||
}
|
||||
|
||||
// Auto-recovery from awaiting_agent: external workspaces are flipped
|
||||
// to 'awaiting_agent' by registry/healthsweep when their heartbeat
|
||||
// goes stale (>staleAfter). When the operator's poller comes back —
|
||||
// for example when their laptop wakes from sleep — the heartbeat
|
||||
// resumes but does NOT re-register. Without this branch the
|
||||
// workspace would stay 'awaiting_agent' forever (visible as OFFLINE
|
||||
// in the canvas with a "Restart" CTA) even though the agent is
|
||||
// actively heartbeating.
|
||||
//
|
||||
// Discovered while smoke-testing the universal MCP path against a
|
||||
// freshly-registered external workspace: register set status=online
|
||||
// + sent one heartbeat → healthsweep then flipped back to
|
||||
// awaiting_agent because the smoke didn't loop. The molecule-mcp
|
||||
// console script's built-in heartbeat thread (PR #2413) drives
|
||||
// continuous heartbeats now, but without THIS branch those
|
||||
// heartbeats can't lift the workspace out of awaiting_agent on
|
||||
// their own.
|
||||
if currentStatus == "awaiting_agent" {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'awaiting_agent'`, models.StatusOnline, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to recover %s from awaiting_agent: %v", payload.WorkspaceID, err)
|
||||
} else {
|
||||
log.Printf("Heartbeat: transitioned %s from awaiting_agent to online (heartbeat received)", payload.WorkspaceID)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_ONLINE", payload.WorkspaceID, map[string]interface{}{
|
||||
"recovered_from": currentStatus,
|
||||
})
|
||||
}
|
||||
|
||||
// #1870 Phase 1: drain one queued A2A request if the target reports
|
||||
// spare capacity. The heartbeat's active_tasks field reflects what the
|
||||
// workspace runtime is ACTUALLY running right now, independent of
|
||||
|
||||
@ -193,6 +193,58 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Heartbeat — awaiting_agent → online recovery ====================
|
||||
// External workspaces flip to 'awaiting_agent' via healthsweep when their
|
||||
// heartbeat goes stale. When the operator's poller comes back, heartbeat
|
||||
// must lift the workspace out of awaiting_agent the same way it does for
|
||||
// 'offline' and 'provisioning'. Without this branch, an external workspace
|
||||
// stays OFFLINE in the canvas forever despite active heartbeats.
|
||||
|
||||
func TestHeartbeatHandler_AwaitingAgentToOnline(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-external").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-external", 0.0, "", 0, 60, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-external").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("awaiting_agent"))
|
||||
|
||||
// The new branch — UPDATE ... WHERE status = 'awaiting_agent'
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
WithArgs(models.StatusOnline, "ws-external").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Broadcast WORKSPACE_ONLINE
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"workspace_id":"ws-external","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHeartbeatHandler_BadJSON(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@ -9,16 +9,177 @@ to stderr so an operator running ``molecule-mcp`` for the first time
|
||||
gets the right pointer in the first 3 lines of output instead of a
|
||||
20-line traceback.
|
||||
|
||||
Existing in-container usage (``python -m molecule_runtime.a2a_mcp_server``
|
||||
or direct import) is unaffected — those paths bypass this wrapper. Only
|
||||
the external-runtime ``molecule-mcp`` console script routes through here.
|
||||
Standalone-runtime contract: this wrapper is responsible for keeping
|
||||
the workspace ALIVE on the platform side, not just exposing tools.
|
||||
Concretely it:
|
||||
1. Calls ``POST /registry/register`` once at startup (idempotent —
|
||||
the upsert flips status awaiting_agent → online for an external
|
||||
workspace whose token matches).
|
||||
2. Spawns a daemon heartbeat thread that POSTs to
|
||||
``POST /registry/heartbeat`` every 20s. Without continuous
|
||||
heartbeats the platform's healthsweep flips the workspace back
|
||||
to awaiting_agent (visible as OFFLINE in the canvas with a
|
||||
"Restart" CTA) within 60-90s.
|
||||
3. Runs the MCP stdio loop in the foreground.
|
||||
|
||||
Why threads + sync requests: the MCP stdio server is async. The
|
||||
heartbeat work is fire-and-forget HTTP. A daemon thread is the
|
||||
lowest-friction integration — no asyncio bridging, dies automatically
|
||||
when the main process exits, and ``requests`` is already a transitive
|
||||
dependency via ``a2a-sdk``.
|
||||
|
||||
In-container usage (``python -m molecule_runtime.a2a_mcp_server`` or
|
||||
direct import) bypasses this wrapper — the workspace runtime has its
|
||||
own heartbeat loop in ``heartbeat.py`` so we don't double-heartbeat.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Heartbeat cadence. Must be tighter than healthsweep's stale window
|
||||
# (currently 60-90s — see registry/healthsweep.go) by a comfortable
|
||||
# margin so a single missed heartbeat doesn't flip awaiting_agent.
|
||||
# 20s gives the operator's network 3 attempts within the budget; long
|
||||
# enough that it doesn't spam, short enough to recover quickly after
|
||||
# laptop sleep.
|
||||
HEARTBEAT_INTERVAL_SECONDS = 20.0
|
||||
|
||||
|
||||
def _platform_register(platform_url: str, workspace_id: str, token: str) -> None:
|
||||
"""Best-effort one-shot register at startup.
|
||||
|
||||
Lifts the workspace from ``awaiting_agent`` to ``online`` for
|
||||
operators who never ran the curl-register snippet. Safe to call
|
||||
repeatedly: the platform's register handler is an upsert that just
|
||||
refreshes ``url``, ``agent_card``, and ``status``. Skips silently
|
||||
on transport/HTTP errors so a misconfigured PLATFORM_URL doesn't
|
||||
abort the MCP loop — the heartbeat thread will keep retrying and
|
||||
surface the persistent failure that way.
|
||||
|
||||
Origin header is required by the SaaS edge WAF; without it
|
||||
/registry/register currently still works (it's on the WAF
|
||||
allowlist), but the heartbeat path needs Origin and we want one
|
||||
consistent header set across both calls.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
# httpx is a transitive dep via a2a-sdk; if missing, the MCP
|
||||
# server won't import either. Let the caller's later import
|
||||
# surface the real error.
|
||||
return
|
||||
|
||||
payload = {
|
||||
"id": workspace_id,
|
||||
"url": "",
|
||||
"agent_card": {"name": f"molecule-mcp-{workspace_id[:8]}", "skills": []},
|
||||
"delivery_mode": "poll",
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/register",
|
||||
json=payload,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
"molecule-mcp: register POST returned HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"molecule-mcp: registered workspace %s with platform",
|
||||
workspace_id,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: register POST failed: %s", exc)
|
||||
|
||||
|
||||
def _heartbeat_loop(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
interval: float = HEARTBEAT_INTERVAL_SECONDS,
|
||||
) -> None:
|
||||
"""Daemon thread body: POST /registry/heartbeat every ``interval``s.
|
||||
|
||||
Failures are logged at WARNING and the loop continues. The thread
|
||||
exits when the main process does (daemon=True). Each iteration
|
||||
rebuilds the payload + headers — cheap and ensures token rotation
|
||||
via env var (rare but possible) is picked up on the next tick.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
body = {
|
||||
"workspace_id": workspace_id,
|
||||
"error_rate": 0.0,
|
||||
"sample_error": "",
|
||||
"active_tasks": 0,
|
||||
"uptime_seconds": int(time.time() - start_time),
|
||||
}
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Origin": platform_url,
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
resp = client.post(
|
||||
f"{platform_url}/registry/heartbeat",
|
||||
json=body,
|
||||
headers=headers,
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
"molecule-mcp: heartbeat HTTP %d: %s",
|
||||
resp.status_code,
|
||||
(resp.text or "")[:200],
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def _start_heartbeat_thread(
|
||||
platform_url: str,
|
||||
workspace_id: str,
|
||||
token: str,
|
||||
) -> threading.Thread:
|
||||
"""Start the heartbeat daemon thread. Returns the Thread handle.
|
||||
|
||||
The MCP stdio loop runs in the foreground (asyncio); this thread
|
||||
runs alongside it. ``daemon=True`` so when the operator hits
|
||||
Ctrl-C / closes the runtime, the heartbeat dies with it instead
|
||||
of leaking and writing to a stale workspace.
|
||||
"""
|
||||
t = threading.Thread(
|
||||
target=_heartbeat_loop,
|
||||
args=(platform_url, workspace_id, token),
|
||||
name="molecule-mcp-heartbeat",
|
||||
daemon=True,
|
||||
)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
|
||||
def _print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
|
||||
print("molecule-mcp: missing required environment.\n", file=sys.stderr)
|
||||
@ -63,6 +224,38 @@ def main() -> None:
|
||||
_print_missing_env_help(missing, have_token_file=has_token_file)
|
||||
sys.exit(2)
|
||||
|
||||
# Resolve the effective token: env wins (operator override), then
|
||||
# the on-disk file (in-container default). Mirrors
|
||||
# platform_auth.get_token's resolution order so we don't
|
||||
# double-implement.
|
||||
token = (
|
||||
os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()
|
||||
or _read_token_file()
|
||||
)
|
||||
workspace_id = os.environ["WORKSPACE_ID"].strip()
|
||||
platform_url = os.environ["PLATFORM_URL"].strip().rstrip("/")
|
||||
|
||||
# Configure logging so the operator sees register/heartbeat status
|
||||
# without needing to set up logging themselves. WARNING by default
|
||||
# keeps the steady-state quiet (only failures); MOLECULE_MCP_VERBOSE=1
|
||||
# surfaces register-success + per-tick heartbeat info for debugging.
|
||||
log_level = (
|
||||
logging.INFO
|
||||
if os.environ.get("MOLECULE_MCP_VERBOSE", "").strip()
|
||||
else logging.WARNING
|
||||
)
|
||||
logging.basicConfig(level=log_level, format="[molecule-mcp] %(message)s")
|
||||
|
||||
# Standalone-mode register + heartbeat. Skipped via env var so an
|
||||
# in-container caller (which has its own heartbeat loop) can reuse
|
||||
# this entry point without double-heartbeating. The wheel's main
|
||||
# console-script path always runs them; the
|
||||
# MOLECULE_MCP_DISABLE_HEARTBEAT escape hatch exists for tests +
|
||||
# the rare embedded use-case.
|
||||
if not os.environ.get("MOLECULE_MCP_DISABLE_HEARTBEAT", "").strip():
|
||||
_platform_register(platform_url, workspace_id, token)
|
||||
_start_heartbeat_thread(platform_url, workspace_id, token)
|
||||
|
||||
# Env is valid — safe to import the heavy module now. Importing
|
||||
# earlier would trigger a2a_client.py:22's module-level RuntimeError
|
||||
# before our friendly help reaches the user.
|
||||
@ -70,5 +263,23 @@ def main() -> None:
|
||||
cli_main()
|
||||
|
||||
|
||||
def _read_token_file() -> str:
|
||||
"""Read the token from ${CONFIGS_DIR}/.auth_token if present.
|
||||
|
||||
Mirrors platform_auth._token_file but without importing the heavy
|
||||
module here (that import triggers a2a_client's WORKSPACE_ID guard
|
||||
which is fine after env validation, but cheaper to inline a 4-line
|
||||
file read than pull in the whole stack just for the path).
|
||||
"""
|
||||
configs_dir = Path(os.environ.get("CONFIGS_DIR", "/configs"))
|
||||
path = configs_dir / ".auth_token"
|
||||
if not path.is_file():
|
||||
return ""
|
||||
try:
|
||||
return path.read_text().strip()
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
main()
|
||||
|
||||
@ -18,10 +18,14 @@ import mcp_cli
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate(monkeypatch, tmp_path):
|
||||
"""Each test starts with no Molecule env vars set + a fresh
|
||||
CONFIGS_DIR pointing at an empty tmpdir."""
|
||||
CONFIGS_DIR pointing at an empty tmpdir. The heartbeat thread is
|
||||
disabled by default so happy-path tests don't spawn a background
|
||||
POST loop against a fake URL — individual tests opt back in via
|
||||
monkeypatch.delenv when they want to assert heartbeat behavior."""
|
||||
for var in ("WORKSPACE_ID", "PLATFORM_URL", "MOLECULE_WORKSPACE_TOKEN"):
|
||||
monkeypatch.delenv(var, raising=False)
|
||||
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||
monkeypatch.setenv("MOLECULE_MCP_DISABLE_HEARTBEAT", "1")
|
||||
yield
|
||||
|
||||
|
||||
@ -139,3 +143,259 @@ def test_help_lists_canvas_tokens_tab_pointer(capsys):
|
||||
code, err = _run_main_capturing_exit(capsys)
|
||||
assert code == 2
|
||||
assert "Tokens tab" in err or "canvas" in err.lower()
|
||||
|
||||
|
||||
# ==================== Standalone register + heartbeat ====================
|
||||
# molecule-mcp must be a single-process standalone runtime: it registers
|
||||
# the workspace at startup AND continuously heartbeats so the platform
|
||||
# healthsweep doesn't flip status back to awaiting_agent. Without these,
|
||||
# the operator sees "OFFLINE — Restart" in the canvas within ~60s of
|
||||
# launching the agent, which was the bug that motivated this PR.
|
||||
|
||||
|
||||
def test_register_called_at_startup(monkeypatch):
|
||||
"""When env is valid and heartbeat enabled, register fires once
|
||||
before the MCP loop starts."""
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
||||
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
||||
|
||||
register_calls: list[tuple[str, str, str]] = []
|
||||
|
||||
def fake_register(platform_url, workspace_id, token):
|
||||
register_calls.append((platform_url, workspace_id, token))
|
||||
|
||||
def fake_start_thread(*_args, **_kwargs):
|
||||
# Return a dummy thread-shaped object so the caller's reference
|
||||
# is harmless. Real thread spawning is asserted separately.
|
||||
class _Stub:
|
||||
def join(self): pass
|
||||
return _Stub()
|
||||
|
||||
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
||||
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
|
||||
|
||||
spy_called: dict[str, bool] = {"called": False}
|
||||
|
||||
def fake_cli_main():
|
||||
spy_called["called"] = True
|
||||
|
||||
import types
|
||||
fake_module = types.ModuleType("a2a_mcp_server")
|
||||
fake_module.cli_main = fake_cli_main
|
||||
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
||||
|
||||
mcp_cli.main()
|
||||
|
||||
assert register_calls == [
|
||||
("https://test.moleculesai.app", "00000000-0000-0000-0000-000000000000", "tok"),
|
||||
]
|
||||
assert spy_called["called"], "MCP loop must run AFTER register"
|
||||
|
||||
|
||||
def test_heartbeat_thread_started(monkeypatch):
|
||||
"""The heartbeat daemon thread must start before the MCP loop runs."""
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
||||
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
||||
|
||||
monkeypatch.setattr(mcp_cli, "_platform_register", lambda *a, **k: None)
|
||||
|
||||
thread_started: dict[str, bool] = {"started": False}
|
||||
|
||||
def fake_start_thread(platform_url, workspace_id, token):
|
||||
thread_started["started"] = True
|
||||
thread_started["args"] = (platform_url, workspace_id, token)
|
||||
class _Stub:
|
||||
def join(self): pass
|
||||
return _Stub()
|
||||
|
||||
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", fake_start_thread)
|
||||
|
||||
import types
|
||||
fake_module = types.ModuleType("a2a_mcp_server")
|
||||
fake_module.cli_main = lambda: None
|
||||
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
||||
|
||||
mcp_cli.main()
|
||||
|
||||
assert thread_started["started"], "heartbeat thread must be spawned"
|
||||
assert thread_started["args"][1] == "00000000-0000-0000-0000-000000000000"
|
||||
assert thread_started["args"][2] == "tok"
|
||||
|
||||
|
||||
def test_heartbeat_disable_env_skips_both(monkeypatch):
|
||||
"""MOLECULE_MCP_DISABLE_HEARTBEAT=1 (the test fixture default + the
|
||||
in-container escape hatch) must skip BOTH register and heartbeat,
|
||||
so the in-container heartbeat loop in heartbeat.py doesn't compete
|
||||
with this thread."""
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok")
|
||||
# MOLECULE_MCP_DISABLE_HEARTBEAT=1 is set by the autouse fixture.
|
||||
|
||||
register_called: dict[str, bool] = {"called": False}
|
||||
thread_started: dict[str, bool] = {"started": False}
|
||||
|
||||
monkeypatch.setattr(
|
||||
mcp_cli, "_platform_register",
|
||||
lambda *a, **k: register_called.update(called=True),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
mcp_cli, "_start_heartbeat_thread",
|
||||
lambda *a, **k: thread_started.update(started=True),
|
||||
)
|
||||
|
||||
import types
|
||||
fake_module = types.ModuleType("a2a_mcp_server")
|
||||
fake_module.cli_main = lambda: None
|
||||
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
||||
|
||||
mcp_cli.main()
|
||||
|
||||
assert register_called["called"] is False, "disable env must skip register"
|
||||
assert thread_started["started"] is False, "disable env must skip heartbeat thread"
|
||||
|
||||
|
||||
def test_token_resolved_from_env_when_no_file(monkeypatch):
|
||||
"""Operator without a /configs volume — token comes from env var."""
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "env-token")
|
||||
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
||||
|
||||
captured_token: dict[str, str] = {}
|
||||
|
||||
def fake_register(platform_url, workspace_id, token):
|
||||
captured_token["t"] = token
|
||||
|
||||
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
||||
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
|
||||
|
||||
import types
|
||||
fake_module = types.ModuleType("a2a_mcp_server")
|
||||
fake_module.cli_main = lambda: None
|
||||
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
||||
|
||||
mcp_cli.main()
|
||||
|
||||
assert captured_token["t"] == "env-token"
|
||||
|
||||
|
||||
def test_token_resolved_from_file_when_no_env(monkeypatch, tmp_path):
|
||||
"""In-container parity: token comes from /configs/.auth_token when
|
||||
env is unset. Mirrors platform_auth.get_token resolution order."""
|
||||
(tmp_path / ".auth_token").write_text("file-token")
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "https://test.moleculesai.app")
|
||||
monkeypatch.delenv("MOLECULE_WORKSPACE_TOKEN", raising=False)
|
||||
monkeypatch.delenv("MOLECULE_MCP_DISABLE_HEARTBEAT", raising=False)
|
||||
|
||||
captured_token: dict[str, str] = {}
|
||||
|
||||
def fake_register(platform_url, workspace_id, token):
|
||||
captured_token["t"] = token
|
||||
|
||||
monkeypatch.setattr(mcp_cli, "_platform_register", fake_register)
|
||||
monkeypatch.setattr(mcp_cli, "_start_heartbeat_thread", lambda *a, **k: None)
|
||||
|
||||
import types
|
||||
fake_module = types.ModuleType("a2a_mcp_server")
|
||||
fake_module.cli_main = lambda: None
|
||||
monkeypatch.setitem(sys.modules, "a2a_mcp_server", fake_module)
|
||||
|
||||
mcp_cli.main()
|
||||
|
||||
assert captured_token["t"] == "file-token"
|
||||
|
||||
|
||||
def test_register_payload_shape(monkeypatch):
|
||||
"""The register POST body must use the field names the workspace-
|
||||
server expects (id/url/agent_card/delivery_mode), and must include
|
||||
the Origin header for the SaaS edge WAF."""
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
class FakeResp:
|
||||
status_code = 200
|
||||
text = ""
|
||||
|
||||
class FakeClient:
|
||||
def __init__(self, **_kwargs): pass
|
||||
def __enter__(self): return self
|
||||
def __exit__(self, *_a): return False
|
||||
def post(self, url, json=None, headers=None):
|
||||
captured["url"] = url
|
||||
captured["json"] = json
|
||||
captured["headers"] = headers
|
||||
return FakeResp()
|
||||
|
||||
import types
|
||||
fake_httpx = types.ModuleType("httpx")
|
||||
fake_httpx.Client = FakeClient
|
||||
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
||||
|
||||
mcp_cli._platform_register(
|
||||
"https://test.moleculesai.app",
|
||||
"ws-abc",
|
||||
"tok",
|
||||
)
|
||||
|
||||
assert captured["url"] == "https://test.moleculesai.app/registry/register"
|
||||
body = captured["json"]
|
||||
assert body["id"] == "ws-abc"
|
||||
assert body["delivery_mode"] == "poll"
|
||||
assert body["url"] == ""
|
||||
assert "agent_card" in body
|
||||
headers = captured["headers"]
|
||||
assert headers["Authorization"] == "Bearer tok"
|
||||
assert headers["Origin"] == "https://test.moleculesai.app"
|
||||
|
||||
|
||||
def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch):
|
||||
"""Heartbeat thread must POST to /registry/heartbeat with the
|
||||
workspace_id + Origin/Authorization headers."""
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
class FakeResp:
|
||||
status_code = 200
|
||||
text = ""
|
||||
|
||||
class FakeClient:
|
||||
def __init__(self, **_kwargs): pass
|
||||
def __enter__(self): return self
|
||||
def __exit__(self, *_a): return False
|
||||
def post(self, url, json=None, headers=None):
|
||||
captured["url"] = url
|
||||
captured["json"] = json
|
||||
captured["headers"] = headers
|
||||
return FakeResp()
|
||||
|
||||
import types
|
||||
fake_httpx = types.ModuleType("httpx")
|
||||
fake_httpx.Client = FakeClient
|
||||
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
|
||||
|
||||
# Patch sleep so the loop exits after one tick (raise to break out).
|
||||
sleep_calls: list[float] = []
|
||||
|
||||
def fake_sleep(seconds):
|
||||
sleep_calls.append(seconds)
|
||||
raise SystemExit # break out of the infinite loop
|
||||
|
||||
monkeypatch.setattr("time.sleep", fake_sleep)
|
||||
|
||||
with pytest.raises(SystemExit):
|
||||
mcp_cli._heartbeat_loop(
|
||||
"https://test.moleculesai.app",
|
||||
"ws-abc",
|
||||
"tok",
|
||||
interval=20.0,
|
||||
)
|
||||
|
||||
assert captured["url"] == "https://test.moleculesai.app/registry/heartbeat"
|
||||
assert captured["json"]["workspace_id"] == "ws-abc"
|
||||
assert captured["headers"]["Authorization"] == "Bearer tok"
|
||||
assert captured["headers"]["Origin"] == "https://test.moleculesai.app"
|
||||
assert sleep_calls == [20.0], "heartbeat must sleep the configured interval"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user