From 28ef75d25e69e92a4cbdf5aa2beaba63d2a437b0 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 04:33:06 -0700 Subject: [PATCH 1/5] refactor(workspace): split mcp_cli.py (626 LOC) into focused modules (RFC #2873 iter 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Splits the standalone molecule-mcp wrapper into three single-concern modules per the OSS-shape refactor program: * mcp_heartbeat.py — register POST + heartbeat loop + auth-failure escalation + inbound-secret persistence * mcp_workspace_resolver.py — single + multi-workspace env validation + on-disk token-file read + operator-help printer * mcp_inbox_pollers.py — activate inbox singleton + spawn one daemon poller per workspace mcp_cli.py becomes a 193-LOC orchestrator: validates env, calls each module's helpers, hands off to a2a_mcp_server.cli_main. The console- script entry molecule-mcp = molecule_runtime.mcp_cli:main is preserved. Back-compat aliases (mcp_cli._build_agent_card, _heartbeat_loop, _resolve_workspaces, etc.) re-export the new modules' authoritative functions so existing tests + wheel_smoke.py + any downstream caller keeps working unchanged. A new test file pins each alias as the exact same callable (drift gate via `is`). Tests: * 62 existing test_mcp_cli.py + test_mcp_cli_multi_workspace.py pass against the split. * Two heartbeat-loop persist tests + the auth-escalation caplog setup updated to target mcp_heartbeat (the module where the loop body now lives) instead of mcp_cli (still works through aliases for direct calls, but Python's name resolution inside the loop body uses the new module's namespace). * test_mcp_cli_split.py adds 11 new tests: alias drift gate + inbox-poller single + multi-workspace branches + degraded inbox-import logging path (none of those existed before). Refs RFC #2873. --- workspace/mcp_cli.py | 499 ++------------------------ workspace/mcp_heartbeat.py | 325 +++++++++++++++++ workspace/mcp_inbox_pollers.py | 63 ++++ workspace/mcp_workspace_resolver.py | 146 ++++++++ workspace/tests/test_mcp_cli.py | 22 +- workspace/tests/test_mcp_cli_split.py | 231 ++++++++++++ 6 files changed, 812 insertions(+), 474 deletions(-) create mode 100644 workspace/mcp_heartbeat.py create mode 100644 workspace/mcp_inbox_pollers.py create mode 100644 workspace/mcp_workspace_resolver.py create mode 100644 workspace/tests/test_mcp_cli_split.py diff --git a/workspace/mcp_cli.py b/workspace/mcp_cli.py index feea0b83..e890a66d 100644 --- a/workspace/mcp_cli.py +++ b/workspace/mcp_cli.py @@ -31,422 +31,53 @@ dependency via ``a2a-sdk``. In-container usage (``python -m molecule_runtime.a2a_mcp_server`` or direct import) bypasses this wrapper — the workspace runtime has its own heartbeat loop in ``heartbeat.py`` so we don't double-heartbeat. + +Module layout (RFC #2873 iter 3 split): + * ``mcp_heartbeat`` — register POST + heartbeat loop + auth-failure + escalation + inbound-secret persistence. + * ``mcp_workspace_resolver`` — env validation, single + multi-workspace + resolution, operator-help printer, on-disk token-file read. + * ``mcp_inbox_pollers`` — activate the inbox singleton + spawn one + daemon poller per workspace. + +This file keeps just ``main()`` plus thin re-exports of the private +symbols so existing tests' imports (``mcp_cli._build_agent_card``, +``mcp_cli._heartbeat_loop``, etc.) keep working without churn. """ from __future__ import annotations -import json import logging import os import sys -import threading -import time -from pathlib import Path import configs_dir +import mcp_heartbeat +import mcp_inbox_pollers +import mcp_workspace_resolver logger = logging.getLogger(__name__) -# Heartbeat cadence. Must be tighter than healthsweep's stale window -# (currently 60-90s — see registry/healthsweep.go) by a comfortable -# margin so a single missed heartbeat doesn't flip awaiting_agent. -# 20s gives the operator's network 3 attempts within the budget; long -# enough that it doesn't spam, short enough to recover quickly after -# laptop sleep. -HEARTBEAT_INTERVAL_SECONDS = 20.0 +# Re-export public surface for back-compat with the pre-split callers +# and tests. The underscore-prefixed names mirror the names that +# existed in this module before the split — keeping them ensures +# `mcp_cli._build_agent_card`, `mcp_cli._heartbeat_loop`, etc. +# resolve identically to the new functions. +HEARTBEAT_INTERVAL_SECONDS = mcp_heartbeat.HEARTBEAT_INTERVAL_SECONDS +_HEARTBEAT_AUTH_LOUD_THRESHOLD = mcp_heartbeat.HEARTBEAT_AUTH_LOUD_THRESHOLD +_HEARTBEAT_AUTH_RELOG_INTERVAL = mcp_heartbeat.HEARTBEAT_AUTH_RELOG_INTERVAL -# After this many consecutive 401/403 heartbeats, escalate from -# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute -# of sustained auth failure — enough to rule out a transient platform -# blip but quick enough that an operator doesn't sit puzzled for 10 -# minutes wondering why their MCP tools 401. Same threshold used for -# repeat-logging at 20-tick (~7 min) intervals so a long-running -# session that missed the first ERROR still sees the message. -_HEARTBEAT_AUTH_LOUD_THRESHOLD = 3 -_HEARTBEAT_AUTH_RELOG_INTERVAL = 20 +_build_agent_card = mcp_heartbeat.build_agent_card +_platform_register = mcp_heartbeat.platform_register +_heartbeat_loop = mcp_heartbeat.heartbeat_loop +_log_heartbeat_auth_failure = mcp_heartbeat.log_heartbeat_auth_failure +_persist_inbound_secret_from_heartbeat = mcp_heartbeat.persist_inbound_secret_from_heartbeat +_start_heartbeat_thread = mcp_heartbeat.start_heartbeat_thread +_resolve_workspaces = mcp_workspace_resolver.resolve_workspaces +_print_missing_env_help = mcp_workspace_resolver.print_missing_env_help +_read_token_file = mcp_workspace_resolver.read_token_file -def _build_agent_card(workspace_id: str) -> dict: - """Build the ``agent_card`` payload sent to /registry/register. - - Three optional env vars override the defaults so an operator can - surface human-readable identity + capabilities to peers and the - canvas Skills tab without code changes: - - * ``MOLECULE_AGENT_NAME`` — display name (defaults to - ``molecule-mcp-{id[:8]}``). Surfaced in canvas workspace cards - and ``list_peers`` output. - * ``MOLECULE_AGENT_DESCRIPTION`` — one-liner about the agent's - purpose. Rendered in canvas Details + Skills tabs. - * ``MOLECULE_AGENT_SKILLS`` — comma-separated skill names - (e.g. ``research,code-review,memory-curation``). Each name is - expanded to a ``{"name": ...}`` skill object — the minimum - shape that satisfies both ``shared_runtime.summarize_peers`` - (uses ``s["name"]``) and the canvas SkillsTab.tsx schema - (id falls back to name when omitted). Empty / whitespace - entries are dropped. - - Defaults match the previous hardcoded behaviour exactly so this - is a strict superset — an operator who sets none of the env vars - sees no change. - """ - name = (os.environ.get("MOLECULE_AGENT_NAME") or "").strip() - if not name: - name = f"molecule-mcp-{workspace_id[:8]}" - - description = (os.environ.get("MOLECULE_AGENT_DESCRIPTION") or "").strip() - - skills_raw = (os.environ.get("MOLECULE_AGENT_SKILLS") or "").strip() - skills: list[dict] = [] - if skills_raw: - for s in skills_raw.split(","): - label = s.strip() - if label: - skills.append({"name": label}) - - card: dict = {"name": name, "skills": skills} - if description: - card["description"] = description - return card - - -def _platform_register(platform_url: str, workspace_id: str, token: str) -> None: - """One-shot register at startup; fails fast on auth errors. - - Lifts the workspace from ``awaiting_agent`` to ``online`` for - operators who never ran the curl-register snippet. Safe to call - repeatedly: the platform's register handler is an upsert that - just refreshes ``url``, ``agent_card``, and ``status``. - - Failure model (post-review): - - 401 / 403 → ``sys.exit(3)`` immediately. The operator's - token is wrong; silently looping in a broken state would - make this hard to diagnose because the MCP tools would 401 - on every call too. Hard-fail is the kindest option. - - Other 4xx/5xx → log a warning + continue. The heartbeat - thread will surface persistent failures; transient platform - blips shouldn't abort the MCP loop. - - Network / transport errors → log + continue. Same reasoning. - - Origin header is required by the SaaS edge WAF; without it - /registry/register currently still works (it's on the WAF - allowlist), but the heartbeat path needs Origin and we want one - consistent header set across both calls. - """ - try: - import httpx - except ImportError: - # httpx is a transitive dep via a2a-sdk; if missing, the MCP - # server won't import either. Let the caller's later import - # surface the real error. - return - - payload = { - "id": workspace_id, - "url": "", - "agent_card": _build_agent_card(workspace_id), - "delivery_mode": "poll", - } - headers = { - "Authorization": f"Bearer {token}", - "Origin": platform_url, - "Content-Type": "application/json", - } - try: - with httpx.Client(timeout=10.0) as client: - resp = client.post( - f"{platform_url}/registry/register", - json=payload, - headers=headers, - ) - if resp.status_code in (401, 403): - print( - f"molecule-mcp: register rejected with HTTP {resp.status_code} — " - f"the token in MOLECULE_WORKSPACE_TOKEN is invalid for workspace " - f"{workspace_id}. Regenerate from the canvas → Tokens tab.", - file=sys.stderr, - ) - sys.exit(3) - if resp.status_code >= 400: - logger.warning( - "molecule-mcp: register POST returned HTTP %d: %s", - resp.status_code, - (resp.text or "")[:200], - ) - else: - logger.info( - "molecule-mcp: registered workspace %s with platform", - workspace_id, - ) - except SystemExit: - raise - except Exception as exc: # noqa: BLE001 - logger.warning("molecule-mcp: register POST failed: %s", exc) - - -def _heartbeat_loop( - platform_url: str, - workspace_id: str, - token: str, - interval: float = HEARTBEAT_INTERVAL_SECONDS, -) -> None: - """Daemon thread body: POST /registry/heartbeat every ``interval``s. - - Failures are logged at WARNING and the loop continues. The thread - exits when the main process does (daemon=True). Each iteration - rebuilds the payload + headers — cheap and ensures token rotation - via env var (rare but possible) is picked up on the next tick. - """ - try: - import httpx - except ImportError: - return - - start_time = time.time() - consecutive_auth_failures = 0 - while True: - body = { - "workspace_id": workspace_id, - "error_rate": 0.0, - "sample_error": "", - "active_tasks": 0, - "uptime_seconds": int(time.time() - start_time), - } - headers = { - "Authorization": f"Bearer {token}", - "Origin": platform_url, - "Content-Type": "application/json", - } - try: - with httpx.Client(timeout=10.0) as client: - resp = client.post( - f"{platform_url}/registry/heartbeat", - json=body, - headers=headers, - ) - if resp.status_code in (401, 403): - consecutive_auth_failures += 1 - _log_heartbeat_auth_failure( - consecutive_auth_failures, workspace_id, resp.status_code, - ) - elif resp.status_code >= 400: - # Non-auth HTTP error — log, but DO NOT touch the - # auth-failure counter (5xx blips, 429, etc. are - # transient and unrelated to token validity). - logger.warning( - "molecule-mcp: heartbeat HTTP %d: %s", - resp.status_code, - (resp.text or "")[:200], - ) - else: - consecutive_auth_failures = 0 - _persist_inbound_secret_from_heartbeat(resp) - except Exception as exc: # noqa: BLE001 - logger.warning("molecule-mcp: heartbeat failed: %s", exc) - time.sleep(interval) - - -def _log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None: - """Escalate consecutive heartbeat 401/403s from quiet WARNING to - actionable ERROR. - - The operator's first sign of trouble shouldn't be "tools 401 with no - explanation" — that was the failure mode that motivated this code, - triggered by a workspace being deleted server-side and its tokens - revoked while the runtime kept heartbeating in silence. - - Cadence: - * count < threshold: WARNING per tick (transient — could be a - platform blip, don't shout yet) - * count == threshold: ERROR with re-onboard instructions - (the first signal the operator can't miss) - * count > threshold and (count - threshold) % relog == 0: re-log - ERROR (so a session that started after the first ERROR still - sees the message scrolling past in their logs) - """ - if count < _HEARTBEAT_AUTH_LOUD_THRESHOLD: - logger.warning( - "molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — " - "token may be revoked. Will retry; if persistent, regenerate " - "from canvas → Tokens.", - status_code, count, _HEARTBEAT_AUTH_LOUD_THRESHOLD, - ) - return - # At or past the threshold — this is the loud actionable error. - if count == _HEARTBEAT_AUTH_LOUD_THRESHOLD or ( - count - _HEARTBEAT_AUTH_LOUD_THRESHOLD - ) % _HEARTBEAT_AUTH_RELOG_INTERVAL == 0: - logger.error( - "molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — " - "the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely " - "because workspace %s was deleted server-side. The MCP server is " - "still running but every platform call will fail. Regenerate the " - "workspace + token from the canvas (Tokens tab), update your MCP " - "config, and restart your runtime.", - count, status_code, workspace_id, - ) - - -def _persist_inbound_secret_from_heartbeat(resp: object) -> None: - """Persist ``platform_inbound_secret`` from a heartbeat response, if any. - - The platform's heartbeat handler returns the secret on every beat - (mirroring /registry/register) so a workspace that lazy-healed the - secret on the platform side — typical recovery path for a workspace - whose row had a NULL ``platform_inbound_secret`` after a partial - bootstrap — picks it up within one heartbeat tick instead of - requiring a runtime restart. - - Without this delivery path the chat-upload code path's "secret was - just minted, will pick up on next heartbeat" 503 message is a lie - and the workspace stays 401-forever until the operator restarts - the runtime. Caught 2026-04-30 on hongmingwang tenant. - - Failure is non-fatal: if the body isn't JSON, doesn't carry the - field, or the disk write fails, the next heartbeat retries. This - matches the cold-start register flow in main.py:319-323. - """ - try: - body = resp.json() - except Exception: # noqa: BLE001 - return - if not isinstance(body, dict): - return - secret = body.get("platform_inbound_secret") - if not secret: - return - try: - from platform_inbound_auth import save_inbound_secret - - save_inbound_secret(secret) - except Exception as exc: # noqa: BLE001 - logger.warning( - "molecule-mcp: persist inbound secret from heartbeat failed: %s", exc - ) - - -def _start_heartbeat_thread( - platform_url: str, - workspace_id: str, - token: str, -) -> threading.Thread: - """Start the heartbeat daemon thread. Returns the Thread handle. - - The MCP stdio loop runs in the foreground (asyncio); this thread - runs alongside it. ``daemon=True`` so when the operator hits - Ctrl-C / closes the runtime, the heartbeat dies with it instead - of leaking and writing to a stale workspace. - """ - t = threading.Thread( - target=_heartbeat_loop, - args=(platform_url, workspace_id, token), - name="molecule-mcp-heartbeat", - daemon=True, - ) - t.start() - return t - - -def _resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]: - """Return the list of ``(workspace_id, token)`` pairs to register. - - Resolution order: - - 1. ``MOLECULE_WORKSPACES`` env var — JSON array of - ``{"id": "...", "token": "..."}`` objects. Activates the - multi-workspace external-agent path (one process registered into - N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN`` - are IGNORED — the JSON is the source of truth. - - 2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from - ``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``. - This is the pre-existing path; back-compat exact. - - Returns ``(workspaces, errors)``: - * ``workspaces``: list of ``(workspace_id, token)`` — non-empty - on the happy path. - * ``errors``: human-readable strings describing what's missing / - malformed. ``main()`` surfaces these with the same shape as - ``_print_missing_env_help`` so the operator's first run gives - actionable output. - - Why JSON env (not file): ergonomic for Claude Code MCP config (one - string in ``mcpServers.molecule.env`` instead of a sidecar file) - and for CI / launchers. A separate config-file path can be added - later without breaking this. - """ - raw = os.environ.get("MOLECULE_WORKSPACES", "").strip() - if raw: - try: - parsed = json.loads(raw) - except json.JSONDecodeError as exc: - return [], [ - f"MOLECULE_WORKSPACES is not valid JSON ({exc.msg} at pos " - f"{exc.pos}). Expected: '[{{\"id\":\"\",\"token\":" - f"\"\"}},{{...}}]'" - ] - if not isinstance(parsed, list) or not parsed: - return [], [ - "MOLECULE_WORKSPACES must be a non-empty JSON array of " - "{\"id\":\"...\",\"token\":\"...\"} objects" - ] - out: list[tuple[str, str]] = [] - seen: set[str] = set() - errors: list[str] = [] - for i, entry in enumerate(parsed): - if not isinstance(entry, dict): - errors.append( - f"MOLECULE_WORKSPACES[{i}] is not an object — got {type(entry).__name__}" - ) - continue - wsid = str(entry.get("id", "")).strip() - tok = str(entry.get("token", "")).strip() - if not wsid or not tok: - errors.append( - f"MOLECULE_WORKSPACES[{i}] missing 'id' or 'token'" - ) - continue - if wsid in seen: - errors.append( - f"MOLECULE_WORKSPACES[{i}] duplicate workspace id {wsid!r}" - ) - continue - seen.add(wsid) - out.append((wsid, tok)) - if errors: - return [], errors - return out, [] - - # Single-workspace back-compat path. - wsid = os.environ.get("WORKSPACE_ID", "").strip() - if not wsid: - return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"] - tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip() - if not tok: - tok = _read_token_file() - if not tok: - return [], [ - "MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required" - ] - return [(wsid, tok)], [] - - -def _print_missing_env_help(missing: list[str], have_token_file: bool) -> None: - print("molecule-mcp: missing required environment.\n", file=sys.stderr) - print("Set the following before running molecule-mcp:", file=sys.stderr) - print(" WORKSPACE_ID — your workspace UUID (from canvas)", file=sys.stderr) - print( - " PLATFORM_URL — base URL of your Molecule platform " - "(e.g. https://your-tenant.staging.moleculesai.app)", - file=sys.stderr, - ) - if not have_token_file: - print( - " MOLECULE_WORKSPACE_TOKEN — bearer token for this workspace " - "(canvas → Tokens tab)", - file=sys.stderr, - ) - print("", file=sys.stderr) - print(f"Currently missing: {', '.join(missing)}", file=sys.stderr) +_start_inbox_pollers = mcp_inbox_pollers.start_inbox_pollers def main() -> None: @@ -558,69 +189,5 @@ def main() -> None: cli_main() -def _start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None: - """Activate the inbox singleton + spawn one poller daemon thread per workspace. - - Done lazily here (not at module import) because importing inbox - pulls in platform_auth, which only resolves cleanly AFTER env - validation succeeds. Activation is idempotent within a process, - so a stray double-call (e.g. test harness re-entering main) is - harmless. - - The poller threads are daemon=True — die with the main process. - - Single-workspace path: one poller, single cursor file at the legacy - location (``.mcp_inbox_cursor``). Cursor-key resolution falls back - to the empty string for back-compat with operators whose existing - on-disk cursor was written by the pre-multi-workspace code. - - Multi-workspace path: N pollers, each with its own cursor file - keyed by ``workspace_id[:8]``. Cursors live next to each other in - configs_dir so an operator inspecting state sees all of them - together. - """ - try: - import inbox - except ImportError as exc: - logger.warning("molecule-mcp: inbox module unavailable: %s", exc) - return - - if len(workspace_ids) <= 1: - # Back-compat exact: single-workspace mode reuses the legacy - # cursor filename + cursor_path constructor arg, so an existing - # operator's on-disk state isn't invalidated by upgrade. - wsid = workspace_ids[0] - state = inbox.InboxState(cursor_path=inbox.default_cursor_path()) - inbox.activate(state) - inbox.start_poller_thread(state, platform_url, wsid) - return - - # Multi-workspace: per-workspace cursor file, one shared queue. - cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids} - state = inbox.InboxState(cursor_paths=cursor_paths) - inbox.activate(state) - for wsid in workspace_ids: - inbox.start_poller_thread(state, platform_url, wsid) - - -def _read_token_file() -> str: - """Read the token from the resolved configs dir's ``.auth_token`` if - present. - - Mirrors platform_auth._token_file's location resolution but without - importing the heavy module here (that import triggers a2a_client's - WORKSPACE_ID guard which is fine after env validation, but cheaper - to inline a 4-line file read than pull in the whole stack just for - the path). - """ - path = configs_dir.resolve() / ".auth_token" - if not path.is_file(): - return "" - try: - return path.read_text().strip() - except OSError: - return "" - - if __name__ == "__main__": # pragma: no cover main() diff --git a/workspace/mcp_heartbeat.py b/workspace/mcp_heartbeat.py new file mode 100644 index 00000000..2d27aa29 --- /dev/null +++ b/workspace/mcp_heartbeat.py @@ -0,0 +1,325 @@ +"""Heartbeat + register thread for the standalone ``molecule-mcp`` wrapper. + +Extracted from ``mcp_cli.py`` (RFC #2873 iter 3) so the heartbeat / +register concern lives in its own module. The console-script entry +``mcp_cli:main`` still drives the spawn, but the loop body, auth-failure +escalation, and inbound-secret persistence now live here so they can be +read, tested, and replaced independently of the orchestrator. + +Public surface: + +* ``HEARTBEAT_INTERVAL_SECONDS`` — cadence constant. +* ``build_agent_card(workspace_id)`` — payload helper. +* ``platform_register(platform_url, workspace_id, token)`` — one-shot + POST /registry/register at startup. +* ``start_heartbeat_thread(platform_url, workspace_id, token)`` — spawn + the daemon thread. +""" +from __future__ import annotations + +import logging +import os +import sys +import threading +import time + +logger = logging.getLogger(__name__) + +# Heartbeat cadence. Must be tighter than healthsweep's stale window +# (currently 60-90s — see registry/healthsweep.go) by a comfortable +# margin so a single missed heartbeat doesn't flip awaiting_agent. +# 20s gives the operator's network 3 attempts within the budget; long +# enough that it doesn't spam, short enough to recover quickly after +# laptop sleep. +HEARTBEAT_INTERVAL_SECONDS = 20.0 + +# After this many consecutive 401/403 heartbeats, escalate from +# WARNING to ERROR with re-onboard guidance. 3 ticks at 20s = ~1 minute +# of sustained auth failure — enough to rule out a transient platform +# blip but quick enough that an operator doesn't sit puzzled for 10 +# minutes wondering why their MCP tools 401. Same threshold used for +# repeat-logging at 20-tick (~7 min) intervals so a long-running +# session that missed the first ERROR still sees the message. +HEARTBEAT_AUTH_LOUD_THRESHOLD = 3 +HEARTBEAT_AUTH_RELOG_INTERVAL = 20 + + +def build_agent_card(workspace_id: str) -> dict: + """Build the ``agent_card`` payload sent to /registry/register. + + Three optional env vars override the defaults so an operator can + surface human-readable identity + capabilities to peers and the + canvas Skills tab without code changes: + + * ``MOLECULE_AGENT_NAME`` — display name (defaults to + ``molecule-mcp-{id[:8]}``). Surfaced in canvas workspace cards + and ``list_peers`` output. + * ``MOLECULE_AGENT_DESCRIPTION`` — one-liner about the agent's + purpose. Rendered in canvas Details + Skills tabs. + * ``MOLECULE_AGENT_SKILLS`` — comma-separated skill names + (e.g. ``research,code-review,memory-curation``). Each name is + expanded to a ``{"name": ...}`` skill object — the minimum + shape that satisfies both ``shared_runtime.summarize_peers`` + (uses ``s["name"]``) and the canvas SkillsTab.tsx schema + (id falls back to name when omitted). Empty / whitespace + entries are dropped. + + Defaults match the previous hardcoded behaviour exactly so this + is a strict superset — an operator who sets none of the env vars + sees no change. + """ + name = (os.environ.get("MOLECULE_AGENT_NAME") or "").strip() + if not name: + name = f"molecule-mcp-{workspace_id[:8]}" + + description = (os.environ.get("MOLECULE_AGENT_DESCRIPTION") or "").strip() + + skills_raw = (os.environ.get("MOLECULE_AGENT_SKILLS") or "").strip() + skills: list[dict] = [] + if skills_raw: + for s in skills_raw.split(","): + label = s.strip() + if label: + skills.append({"name": label}) + + card: dict = {"name": name, "skills": skills} + if description: + card["description"] = description + return card + + +def platform_register(platform_url: str, workspace_id: str, token: str) -> None: + """One-shot register at startup; fails fast on auth errors. + + Lifts the workspace from ``awaiting_agent`` to ``online`` for + operators who never ran the curl-register snippet. Safe to call + repeatedly: the platform's register handler is an upsert that + just refreshes ``url``, ``agent_card``, and ``status``. + + Failure model (post-review): + - 401 / 403 → ``sys.exit(3)`` immediately. The operator's + token is wrong; silently looping in a broken state would + make this hard to diagnose because the MCP tools would 401 + on every call too. Hard-fail is the kindest option. + - Other 4xx/5xx → log a warning + continue. The heartbeat + thread will surface persistent failures; transient platform + blips shouldn't abort the MCP loop. + - Network / transport errors → log + continue. Same reasoning. + + Origin header is required by the SaaS edge WAF; without it + /registry/register currently still works (it's on the WAF + allowlist), but the heartbeat path needs Origin and we want one + consistent header set across both calls. + """ + try: + import httpx + except ImportError: + # httpx is a transitive dep via a2a-sdk; if missing, the MCP + # server won't import either. Let the caller's later import + # surface the real error. + return + + payload = { + "id": workspace_id, + "url": "", + "agent_card": build_agent_card(workspace_id), + "delivery_mode": "poll", + } + headers = { + "Authorization": f"Bearer {token}", + "Origin": platform_url, + "Content-Type": "application/json", + } + try: + with httpx.Client(timeout=10.0) as client: + resp = client.post( + f"{platform_url}/registry/register", + json=payload, + headers=headers, + ) + if resp.status_code in (401, 403): + print( + f"molecule-mcp: register rejected with HTTP {resp.status_code} — " + f"the token in MOLECULE_WORKSPACE_TOKEN is invalid for workspace " + f"{workspace_id}. Regenerate from the canvas → Tokens tab.", + file=sys.stderr, + ) + sys.exit(3) + if resp.status_code >= 400: + logger.warning( + "molecule-mcp: register POST returned HTTP %d: %s", + resp.status_code, + (resp.text or "")[:200], + ) + else: + logger.info( + "molecule-mcp: registered workspace %s with platform", + workspace_id, + ) + except SystemExit: + raise + except Exception as exc: # noqa: BLE001 + logger.warning("molecule-mcp: register POST failed: %s", exc) + + +def heartbeat_loop( + platform_url: str, + workspace_id: str, + token: str, + interval: float = HEARTBEAT_INTERVAL_SECONDS, +) -> None: + """Daemon thread body: POST /registry/heartbeat every ``interval``s. + + Failures are logged at WARNING and the loop continues. The thread + exits when the main process does (daemon=True). Each iteration + rebuilds the payload + headers — cheap and ensures token rotation + via env var (rare but possible) is picked up on the next tick. + """ + try: + import httpx + except ImportError: + return + + start_time = time.time() + consecutive_auth_failures = 0 + while True: + body = { + "workspace_id": workspace_id, + "error_rate": 0.0, + "sample_error": "", + "active_tasks": 0, + "uptime_seconds": int(time.time() - start_time), + } + headers = { + "Authorization": f"Bearer {token}", + "Origin": platform_url, + "Content-Type": "application/json", + } + try: + with httpx.Client(timeout=10.0) as client: + resp = client.post( + f"{platform_url}/registry/heartbeat", + json=body, + headers=headers, + ) + if resp.status_code in (401, 403): + consecutive_auth_failures += 1 + log_heartbeat_auth_failure( + consecutive_auth_failures, workspace_id, resp.status_code, + ) + elif resp.status_code >= 400: + # Non-auth HTTP error — log, but DO NOT touch the + # auth-failure counter (5xx blips, 429, etc. are + # transient and unrelated to token validity). + logger.warning( + "molecule-mcp: heartbeat HTTP %d: %s", + resp.status_code, + (resp.text or "")[:200], + ) + else: + consecutive_auth_failures = 0 + persist_inbound_secret_from_heartbeat(resp) + except Exception as exc: # noqa: BLE001 + logger.warning("molecule-mcp: heartbeat failed: %s", exc) + time.sleep(interval) + + +def log_heartbeat_auth_failure(count: int, workspace_id: str, status_code: int) -> None: + """Escalate consecutive heartbeat 401/403s from quiet WARNING to + actionable ERROR. + + The operator's first sign of trouble shouldn't be "tools 401 with no + explanation" — that was the failure mode that motivated this code, + triggered by a workspace being deleted server-side and its tokens + revoked while the runtime kept heartbeating in silence. + + Cadence: + * count < threshold: WARNING per tick (transient — could be a + platform blip, don't shout yet) + * count == threshold: ERROR with re-onboard instructions + (the first signal the operator can't miss) + * count > threshold and (count - threshold) % relog == 0: re-log + ERROR (so a session that started after the first ERROR still + sees the message scrolling past in their logs) + """ + if count < HEARTBEAT_AUTH_LOUD_THRESHOLD: + logger.warning( + "molecule-mcp: heartbeat HTTP %d (auth failure %d/%d) — " + "token may be revoked. Will retry; if persistent, regenerate " + "from canvas → Tokens.", + status_code, count, HEARTBEAT_AUTH_LOUD_THRESHOLD, + ) + return + # At or past the threshold — this is the loud actionable error. + if count == HEARTBEAT_AUTH_LOUD_THRESHOLD or ( + count - HEARTBEAT_AUTH_LOUD_THRESHOLD + ) % HEARTBEAT_AUTH_RELOG_INTERVAL == 0: + logger.error( + "molecule-mcp: %d consecutive heartbeat auth failures (HTTP %d) — " + "the token in MOLECULE_WORKSPACE_TOKEN has been REVOKED, likely " + "because workspace %s was deleted server-side. The MCP server is " + "still running but every platform call will fail. Regenerate the " + "workspace + token from the canvas (Tokens tab), update your MCP " + "config, and restart your runtime.", + count, status_code, workspace_id, + ) + + +def persist_inbound_secret_from_heartbeat(resp: object) -> None: + """Persist ``platform_inbound_secret`` from a heartbeat response, if any. + + The platform's heartbeat handler returns the secret on every beat + (mirroring /registry/register) so a workspace that lazy-healed the + secret on the platform side — typical recovery path for a workspace + whose row had a NULL ``platform_inbound_secret`` after a partial + bootstrap — picks it up within one heartbeat tick instead of + requiring a runtime restart. + + Without this delivery path the chat-upload code path's "secret was + just minted, will pick up on next heartbeat" 503 message is a lie + and the workspace stays 401-forever until the operator restarts + the runtime. Caught 2026-04-30 on hongmingwang tenant. + + Failure is non-fatal: if the body isn't JSON, doesn't carry the + field, or the disk write fails, the next heartbeat retries. This + matches the cold-start register flow in main.py:319-323. + """ + try: + body = resp.json() + except Exception: # noqa: BLE001 + return + if not isinstance(body, dict): + return + secret = body.get("platform_inbound_secret") + if not secret: + return + try: + from platform_inbound_auth import save_inbound_secret + + save_inbound_secret(secret) + except Exception as exc: # noqa: BLE001 + logger.warning( + "molecule-mcp: persist inbound secret from heartbeat failed: %s", exc + ) + + +def start_heartbeat_thread( + platform_url: str, + workspace_id: str, + token: str, +) -> threading.Thread: + """Start the heartbeat daemon thread. Returns the Thread handle. + + The MCP stdio loop runs in the foreground (asyncio); this thread + runs alongside it. ``daemon=True`` so when the operator hits + Ctrl-C / closes the runtime, the heartbeat dies with it instead + of leaking and writing to a stale workspace. + """ + t = threading.Thread( + target=heartbeat_loop, + args=(platform_url, workspace_id, token), + name="molecule-mcp-heartbeat", + daemon=True, + ) + t.start() + return t diff --git a/workspace/mcp_inbox_pollers.py b/workspace/mcp_inbox_pollers.py new file mode 100644 index 00000000..659da5ed --- /dev/null +++ b/workspace/mcp_inbox_pollers.py @@ -0,0 +1,63 @@ +"""Inbox-poller spawn helpers for the standalone ``molecule-mcp`` wrapper. + +Extracted from ``mcp_cli.py`` (RFC #2873 iter 3). The poller is the +INBOUND side of the standalone path — without it, the universal MCP +server is outbound-only (can call ``delegate_task`` / +``send_message_to_user``, never observes canvas-user / peer-agent +messages). + +Public surface: + +* ``start_inbox_pollers(platform_url, workspace_ids)`` — activate the + inbox singleton and spawn one daemon poller per workspace. +""" +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +def start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None: + """Activate the inbox singleton + spawn one poller daemon thread per workspace. + + Done lazily here (not at module import) because importing inbox + pulls in platform_auth, which only resolves cleanly AFTER env + validation succeeds. Activation is idempotent within a process, + so a stray double-call (e.g. test harness re-entering main) is + harmless. + + The poller threads are daemon=True — die with the main process. + + Single-workspace path: one poller, single cursor file at the legacy + location (``.mcp_inbox_cursor``). Cursor-key resolution falls back + to the empty string for back-compat with operators whose existing + on-disk cursor was written by the pre-multi-workspace code. + + Multi-workspace path: N pollers, each with its own cursor file + keyed by ``workspace_id[:8]``. Cursors live next to each other in + configs_dir so an operator inspecting state sees all of them + together. + """ + try: + import inbox + except ImportError as exc: + logger.warning("molecule-mcp: inbox module unavailable: %s", exc) + return + + if len(workspace_ids) <= 1: + # Back-compat exact: single-workspace mode reuses the legacy + # cursor filename + cursor_path constructor arg, so an existing + # operator's on-disk state isn't invalidated by upgrade. + wsid = workspace_ids[0] + state = inbox.InboxState(cursor_path=inbox.default_cursor_path()) + inbox.activate(state) + inbox.start_poller_thread(state, platform_url, wsid) + return + + # Multi-workspace: per-workspace cursor file, one shared queue. + cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids} + state = inbox.InboxState(cursor_paths=cursor_paths) + inbox.activate(state) + for wsid in workspace_ids: + inbox.start_poller_thread(state, platform_url, wsid) diff --git a/workspace/mcp_workspace_resolver.py b/workspace/mcp_workspace_resolver.py new file mode 100644 index 00000000..a6fe3bff --- /dev/null +++ b/workspace/mcp_workspace_resolver.py @@ -0,0 +1,146 @@ +"""Env validation + workspace resolution for the standalone ``molecule-mcp``. + +Extracted from ``mcp_cli.py`` (RFC #2873 iter 3). Deals with the two +shapes ``molecule-mcp`` accepts: + + * Single-workspace legacy shape: ``WORKSPACE_ID`` + token from + ``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``. + * Multi-workspace JSON shape: ``MOLECULE_WORKSPACES`` env var carries a + JSON array of ``{"id": ..., "token": ...}`` entries. + +Public surface: + +* ``resolve_workspaces()`` → ``(workspaces, errors)``. +* ``read_token_file()`` → token text or ``""``. +* ``print_missing_env_help(missing, have_token_file)`` — operator-help + printer. +""" +from __future__ import annotations + +import json +import os +import sys + +import configs_dir + + +def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]: + """Return the list of ``(workspace_id, token)`` pairs to register. + + Resolution order: + + 1. ``MOLECULE_WORKSPACES`` env var — JSON array of + ``{"id": "...", "token": "..."}`` objects. Activates the + multi-workspace external-agent path (one process registered into + N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN`` + are IGNORED — the JSON is the source of truth. + + 2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from + ``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``. + This is the pre-existing path; back-compat exact. + + Returns ``(workspaces, errors)``: + * ``workspaces``: list of ``(workspace_id, token)`` — non-empty + on the happy path. + * ``errors``: human-readable strings describing what's missing / + malformed. ``main()`` surfaces these with the same shape as + ``print_missing_env_help`` so the operator's first run gives + actionable output. + + Why JSON env (not file): ergonomic for Claude Code MCP config (one + string in ``mcpServers.molecule.env`` instead of a sidecar file) + and for CI / launchers. A separate config-file path can be added + later without breaking this. + """ + raw = os.environ.get("MOLECULE_WORKSPACES", "").strip() + if raw: + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + return [], [ + f"MOLECULE_WORKSPACES is not valid JSON ({exc.msg} at pos " + f"{exc.pos}). Expected: '[{{\"id\":\"\",\"token\":" + f"\"\"}},{{...}}]'" + ] + if not isinstance(parsed, list) or not parsed: + return [], [ + "MOLECULE_WORKSPACES must be a non-empty JSON array of " + "{\"id\":\"...\",\"token\":\"...\"} objects" + ] + out: list[tuple[str, str]] = [] + seen: set[str] = set() + errors: list[str] = [] + for i, entry in enumerate(parsed): + if not isinstance(entry, dict): + errors.append( + f"MOLECULE_WORKSPACES[{i}] is not an object — got {type(entry).__name__}" + ) + continue + wsid = str(entry.get("id", "")).strip() + tok = str(entry.get("token", "")).strip() + if not wsid or not tok: + errors.append( + f"MOLECULE_WORKSPACES[{i}] missing 'id' or 'token'" + ) + continue + if wsid in seen: + errors.append( + f"MOLECULE_WORKSPACES[{i}] duplicate workspace id {wsid!r}" + ) + continue + seen.add(wsid) + out.append((wsid, tok)) + if errors: + return [], errors + return out, [] + + # Single-workspace back-compat path. + wsid = os.environ.get("WORKSPACE_ID", "").strip() + if not wsid: + return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"] + tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip() + if not tok: + tok = read_token_file() + if not tok: + return [], [ + "MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required" + ] + return [(wsid, tok)], [] + + +def print_missing_env_help(missing: list[str], have_token_file: bool) -> None: + print("molecule-mcp: missing required environment.\n", file=sys.stderr) + print("Set the following before running molecule-mcp:", file=sys.stderr) + print(" WORKSPACE_ID — your workspace UUID (from canvas)", file=sys.stderr) + print( + " PLATFORM_URL — base URL of your Molecule platform " + "(e.g. https://your-tenant.staging.moleculesai.app)", + file=sys.stderr, + ) + if not have_token_file: + print( + " MOLECULE_WORKSPACE_TOKEN — bearer token for this workspace " + "(canvas → Tokens tab)", + file=sys.stderr, + ) + print("", file=sys.stderr) + print(f"Currently missing: {', '.join(missing)}", file=sys.stderr) + + +def read_token_file() -> str: + """Read the token from the resolved configs dir's ``.auth_token`` if + present. + + Mirrors platform_auth._token_file's location resolution but without + importing the heavy module here (that import triggers a2a_client's + WORKSPACE_ID guard which is fine after env validation, but cheaper + to inline a 4-line file read than pull in the whole stack just for + the path). + """ + path = configs_dir.resolve() / ".auth_token" + if not path.is_file(): + return "" + try: + return path.read_text().strip() + except OSError: + return "" diff --git a/workspace/tests/test_mcp_cli.py b/workspace/tests/test_mcp_cli.py index 608d1e7c..a1061394 100644 --- a/workspace/tests/test_mcp_cli.py +++ b/workspace/tests/test_mcp_cli.py @@ -13,6 +13,7 @@ from pathlib import Path import pytest import mcp_cli +import mcp_heartbeat @pytest.fixture(autouse=True) @@ -739,8 +740,13 @@ def test_heartbeat_loop_calls_persist_on_success(monkeypatch): def fake_persist(resp): saw.append(resp) + # Patch on mcp_heartbeat — that's where heartbeat_loop's internal + # name resolution looks up persist_inbound_secret_from_heartbeat + # after the RFC #2873 iter 3 split. The mcp_cli._persist_…_from_heartbeat + # back-compat re-export still exists, but patching it here would not + # affect the loop body. monkeypatch.setattr( - mcp_cli, "_persist_inbound_secret_from_heartbeat", fake_persist + mcp_heartbeat, "persist_inbound_secret_from_heartbeat", fake_persist ) class FakeResp: @@ -786,8 +792,8 @@ def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch): """Heartbeat 4xx error path must NOT invoke persist (no body to trust).""" saw: list[object] = [] monkeypatch.setattr( - mcp_cli, - "_persist_inbound_secret_from_heartbeat", + mcp_heartbeat, + "persist_inbound_secret_from_heartbeat", lambda r: saw.append(r), ) @@ -899,7 +905,7 @@ def test_heartbeat_single_401_logs_warning_not_error(monkeypatch, caplog): transient platform blip. Log at WARNING; don't shout.""" import logging - caplog.set_level(logging.WARNING, logger="mcp_cli") + caplog.set_level(logging.WARNING, logger="mcp_heartbeat") _multi_iter_runner(monkeypatch, [401]) @@ -923,7 +929,7 @@ def test_heartbeat_three_consecutive_401s_escalates_to_error(monkeypatch, caplog LOUD ERROR with re-onboard guidance — not buried at WARNING.""" import logging - caplog.set_level(logging.WARNING, logger="mcp_cli") + caplog.set_level(logging.WARNING, logger="mcp_heartbeat") _multi_iter_runner(monkeypatch, [401, 401, 401]) @@ -949,7 +955,7 @@ def test_heartbeat_403_treated_same_as_401(monkeypatch, caplog): not authorized for this workspace). Same escalation path.""" import logging - caplog.set_level(logging.WARNING, logger="mcp_cli") + caplog.set_level(logging.WARNING, logger="mcp_heartbeat") _multi_iter_runner(monkeypatch, [403, 403, 403]) @@ -963,7 +969,7 @@ def test_heartbeat_recovery_resets_consecutive_counter(monkeypatch, caplog): later should NOT immediately escalate.""" import logging - caplog.set_level(logging.WARNING, logger="mcp_cli") + caplog.set_level(logging.WARNING, logger="mcp_heartbeat") # Two 401s, then 200, then one 401. If counter resets correctly, # the final 401 is "1 consecutive" and should NOT escalate. @@ -982,7 +988,7 @@ def test_heartbeat_500_does_not_increment_auth_counter(monkeypatch, caplog): misleading the operator.""" import logging - caplog.set_level(logging.WARNING, logger="mcp_cli") + caplog.set_level(logging.WARNING, logger="mcp_heartbeat") _multi_iter_runner(monkeypatch, [500, 500, 500]) diff --git a/workspace/tests/test_mcp_cli_split.py b/workspace/tests/test_mcp_cli_split.py new file mode 100644 index 00000000..e8a39817 --- /dev/null +++ b/workspace/tests/test_mcp_cli_split.py @@ -0,0 +1,231 @@ +"""RFC #2873 iter 3 — drift gate + behavior tests for the post-split surface. + +The bulk of the heartbeat / resolver behavior is exercised by +``test_mcp_cli.py`` and ``test_mcp_cli_multi_workspace.py`` through the +``mcp_cli._symbol`` back-compat aliases. This file pins: + + 1. The split is **behavior-neutral via aliasing** — every previously- + exposed ``mcp_cli._foo`` symbol is the SAME callable as the new + module's authoritative function. If a refactor accidentally drops + an alias or points it at a stale copy, this fails. + + 2. ``mcp_inbox_pollers.start_inbox_pollers`` works for both single- + workspace (legacy back-compat) and multi-workspace shapes. + ``mcp_cli`` had no direct test for this branch before the split. +""" +from __future__ import annotations + +import sys +import types + +import pytest + +import mcp_cli +import mcp_heartbeat +import mcp_inbox_pollers +import mcp_workspace_resolver + + +# ============== Drift gate: back-compat aliases point at the real fn ============== + +class TestBackCompatAliases: + """Pin that ``mcp_cli._foo is real_fn``. A test that re-implements + the alias would still pass — the ``is`` check guarantees we didn't + create a wrapper that drifts.""" + + def test_heartbeat_aliases(self): + assert mcp_cli._build_agent_card is mcp_heartbeat.build_agent_card + assert mcp_cli._platform_register is mcp_heartbeat.platform_register + assert mcp_cli._heartbeat_loop is mcp_heartbeat.heartbeat_loop + assert mcp_cli._log_heartbeat_auth_failure is mcp_heartbeat.log_heartbeat_auth_failure + assert ( + mcp_cli._persist_inbound_secret_from_heartbeat + is mcp_heartbeat.persist_inbound_secret_from_heartbeat + ) + assert mcp_cli._start_heartbeat_thread is mcp_heartbeat.start_heartbeat_thread + + def test_resolver_aliases(self): + assert mcp_cli._resolve_workspaces is mcp_workspace_resolver.resolve_workspaces + assert mcp_cli._print_missing_env_help is mcp_workspace_resolver.print_missing_env_help + assert mcp_cli._read_token_file is mcp_workspace_resolver.read_token_file + + def test_inbox_pollers_alias(self): + assert mcp_cli._start_inbox_pollers is mcp_inbox_pollers.start_inbox_pollers + + def test_constants_match(self): + assert ( + mcp_cli.HEARTBEAT_INTERVAL_SECONDS + == mcp_heartbeat.HEARTBEAT_INTERVAL_SECONDS + ) + assert ( + mcp_cli._HEARTBEAT_AUTH_LOUD_THRESHOLD + == mcp_heartbeat.HEARTBEAT_AUTH_LOUD_THRESHOLD + ) + assert ( + mcp_cli._HEARTBEAT_AUTH_RELOG_INTERVAL + == mcp_heartbeat.HEARTBEAT_AUTH_RELOG_INTERVAL + ) + + +# ============== mcp_inbox_pollers — both shapes + degraded import ============== + +class _FakeInboxState: + def __init__(self, **kwargs): + self.kwargs = kwargs + + +def _install_fake_inbox(monkeypatch): + """Inject a fake ``inbox`` module so we observe the spawn calls + without pulling in the real platform_auth dependency tree.""" + activations: list[_FakeInboxState] = [] + spawned: list[tuple[_FakeInboxState, str, str]] = [] + cursor_paths: list[str] = [] + + def default_cursor_path(wsid=None): + # Mirror the real signature: optional wsid → distinct path per id, + # absent → legacy single path. + path = f"/tmp/.mcp_inbox_cursor.{wsid[:8]}" if wsid else "/tmp/.mcp_inbox_cursor" + cursor_paths.append(path) + return path + + def activate(state): + activations.append(state) + + def start_poller_thread(state, platform_url, wsid): + spawned.append((state, platform_url, wsid)) + + fake = types.ModuleType("inbox") + fake.InboxState = _FakeInboxState + fake.activate = activate + fake.default_cursor_path = default_cursor_path + fake.start_poller_thread = start_poller_thread + monkeypatch.setitem(sys.modules, "inbox", fake) + return activations, spawned, cursor_paths + + +class TestStartInboxPollers: + def test_single_workspace_uses_legacy_cursor_path(self, monkeypatch): + """Back-compat exact: single-workspace mode reuses the legacy + cursor filename so an existing operator's on-disk state isn't + invalidated by upgrade.""" + activations, spawned, cursor_paths = _install_fake_inbox(monkeypatch) + + mcp_inbox_pollers.start_inbox_pollers( + "https://test.moleculesai.app", ["ws-only-one"] + ) + + assert len(activations) == 1, "exactly one inbox.activate call" + assert len(spawned) == 1, "exactly one poller thread spawned" + # Single-workspace path uses default_cursor_path() with no arg — + # the cursor_path captured here must be the legacy filename + # (no per-ws suffix). + assert cursor_paths == ["/tmp/.mcp_inbox_cursor"] + # State carries cursor_path, not cursor_paths + state = activations[0] + assert state.kwargs == {"cursor_path": "/tmp/.mcp_inbox_cursor"} + # Spawned poller is for the right workspace + assert spawned[0] == (state, "https://test.moleculesai.app", "ws-only-one") + + def test_multi_workspace_uses_per_workspace_cursor_paths(self, monkeypatch): + """Multi-workspace path: per-workspace cursor file, one shared + InboxState. N pollers, each pointed at the same state so the + agent's inbox_peek/pop sees a merged view.""" + activations, spawned, _ = _install_fake_inbox(monkeypatch) + + wsids = ["ws-aaaaaaaa", "ws-bbbbbbbb", "ws-cccccccc"] + mcp_inbox_pollers.start_inbox_pollers( + "https://test.moleculesai.app", wsids + ) + + # One state, one activate, three pollers + assert len(activations) == 1 + assert len(spawned) == 3 + state = activations[0] + # Multi-workspace state carries cursor_paths (mapping) + assert "cursor_paths" in state.kwargs + assert set(state.kwargs["cursor_paths"].keys()) == set(wsids) + # All pollers share the same state + for s, _url, _wsid in spawned: + assert s is state + # All workspace ids covered + assert sorted(t[2] for t in spawned) == sorted(wsids) + + def test_inbox_module_unavailable_logs_and_returns(self, monkeypatch, caplog): + """If ``import inbox`` fails (older install or stripped + runtime), spawn must NOT raise — log a warning and continue. + The MCP server can still serve outbound tools.""" + import logging + + # Force ImportError by injecting a module sentinel that raises. + class _Boom: + def __getattr__(self, _name): + raise ImportError("inbox stripped from this build") + + # Setting sys.modules["inbox"] to a broken object isn't enough — + # the import statement reads sys.modules first; if the entry is + # truthy, Python returns it. We need to force the import to raise. + # Easiest: pre-poison sys.modules so the `import inbox` line + # raises by setting the entry to None (Python special-cases None + # as "explicit ImportError"). + monkeypatch.setitem(sys.modules, "inbox", None) + + caplog.set_level(logging.WARNING, logger="mcp_inbox_pollers") + # Should not raise. + mcp_inbox_pollers.start_inbox_pollers( + "https://test.moleculesai.app", ["ws-1"] + ) + warnings = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("inbox module unavailable" in r.message for r in warnings), ( + f"expected a 'inbox module unavailable' warning, got: " + f"{[r.message for r in warnings]}" + ) + + +# ============== mcp_heartbeat.build_agent_card — short direct tests ============== + +class TestBuildAgentCardDirect: + """Spot-check the new module's public surface; the full test matrix + lives in ``test_mcp_cli.py`` reaching through ``mcp_cli._build_agent_card``. + """ + + def test_default_card_shape(self, monkeypatch): + for v in ("MOLECULE_AGENT_NAME", "MOLECULE_AGENT_DESCRIPTION", "MOLECULE_AGENT_SKILLS"): + monkeypatch.delenv(v, raising=False) + card = mcp_heartbeat.build_agent_card("8dad3e29-c32a-4ec7-9ea7-94fe2d2d98ec") + assert card == {"name": "molecule-mcp-8dad3e29", "skills": []} + + def test_skills_csv_split_and_trim(self, monkeypatch): + monkeypatch.setenv("MOLECULE_AGENT_SKILLS", "research, , code-review,memory-curation, ") + card = mcp_heartbeat.build_agent_card("ws-1") + assert card["skills"] == [ + {"name": "research"}, + {"name": "code-review"}, + {"name": "memory-curation"}, + ] + + +# ============== mcp_workspace_resolver — short direct tests ============== + +class TestResolveWorkspacesDirect: + @pytest.fixture(autouse=True) + def _isolate(self, monkeypatch, tmp_path): + for v in ("WORKSPACE_ID", "MOLECULE_WORKSPACE_TOKEN", "MOLECULE_WORKSPACES"): + monkeypatch.delenv(v, raising=False) + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + yield + + def test_single_workspace_via_env(self, monkeypatch): + monkeypatch.setenv("WORKSPACE_ID", "ws-1") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok") + out, errors = mcp_workspace_resolver.resolve_workspaces() + assert out == [("ws-1", "tok")] + assert errors == [] + + def test_multi_workspace_via_json_env(self, monkeypatch): + monkeypatch.setenv( + "MOLECULE_WORKSPACES", + '[{"id":"ws-a","token":"a"},{"id":"ws-b","token":"b"}]', + ) + out, errors = mcp_workspace_resolver.resolve_workspaces() + assert out == [("ws-a", "a"), ("ws-b", "b")] + assert errors == [] From 0c461eb9f1d37b38b67229a5eb9300e942b86941 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 04:43:16 -0700 Subject: [PATCH 2/5] refactor(workspace): extract RBAC helpers from a2a_tools.py to a2a_tools_rbac.py (RFC #2873 iter 4a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First slice of the a2a_tools.py (991 LOC) split — single-concern module for the workspace's RBAC + auth-header layer: * _ROLE_PERMISSIONS canonical table * _get_workspace_tier * _check_memory_write_permission * _check_memory_read_permission * _is_root_workspace * _auth_headers_for_heartbeat a2a_tools.py shrinks from 991 → 915 LOC. Internal call sites (15 references) work unchanged because the bare names are re-imported at module-level — Python's local-then-module name resolution still finds them in a2a_tools's namespace, so existing tests' patch("a2a_tools._foo", …) keeps working. The RBAC layer can now evolve independently of the 18 tool handlers. Adding a new role or capability action touches one file, not the kitchen-sink module. Tests: * 77 existing test_a2a_tools_impl.py pass unchanged. * test_a2a_tools_rbac.py adds 28 focused tests: - 6 alias drift-gate tests (`_foo is rbac.foo`) - 4 get_workspace_tier env+config branches - 2 is_root_workspace tier branches - 6 check_memory_write_permission roles + override branches - 3 check_memory_read_permission scenarios - 3 auth_headers_for_heartbeat platform_auth branches - 4 ROLE_PERMISSIONS table invariants * Direct coverage for the helper module (was previously only exercised through 991-LOC tool-handler tests). Refs RFC #2873. --- workspace/a2a_tools.py | 102 ++------- workspace/a2a_tools_rbac.py | 138 ++++++++++++ workspace/tests/test_a2a_tools_rbac.py | 281 +++++++++++++++++++++++++ 3 files changed, 432 insertions(+), 89 deletions(-) create mode 100644 workspace/a2a_tools_rbac.py create mode 100644 workspace/tests/test_a2a_tools_rbac.py diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 55a19758..f3faf619 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -28,96 +28,20 @@ from platform_auth import list_registered_workspaces # --------------------------------------------------------------------------- -# RBAC helpers (mirror builtin_tools/audit.py for a2a_tools isolation) +# RBAC + auth helpers — extracted to a2a_tools_rbac (RFC #2873 iter 4a). +# Re-exported here under the legacy underscore names so existing tests' +# patch("a2a_tools._check_memory_write_permission", …) and call sites +# inside this module that resolve bare names against the module-level +# namespace continue to work unchanged. # --------------------------------------------------------------------------- - -_ROLE_PERMISSIONS = { - "admin": {"delegate", "approve", "memory.read", "memory.write"}, - "operator": {"delegate", "approve", "memory.read", "memory.write"}, - "read-only": {"memory.read"}, - "no-delegation": {"approve", "memory.read", "memory.write"}, - "no-approval": {"delegate", "memory.read", "memory.write"}, - "memory-readonly": {"memory.read"}, -} - - -def _get_workspace_tier() -> int: - """Return the workspace tier from config (0 = root, 1+ = tenant).""" - try: - from config import load_config - - cfg = load_config() - return getattr(cfg, "tier", 1) - except Exception: - return int(os.environ.get("WORKSPACE_TIER", 1)) - - -def _check_memory_write_permission() -> bool: - """Return True if this workspace's RBAC roles grant memory.write.""" - try: - from config import load_config - - cfg = load_config() - roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) - allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) - except Exception: - # Fail closed: deny when config is unavailable - roles = ["operator"] - allowed = {} - - for role in roles: - if role == "admin": - return True - if role in allowed: - if "memory.write" in allowed[role]: - return True - elif role in _ROLE_PERMISSIONS and "memory.write" in _ROLE_PERMISSIONS[role]: - return True - return False - - -def _check_memory_read_permission() -> bool: - """Return True if this workspace's RBAC roles grant memory.read.""" - try: - from config import load_config - - cfg = load_config() - roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) - allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) - except Exception: - roles = ["operator"] - allowed = {} - - for role in roles: - if role == "admin": - return True - if role in allowed: - if "memory.read" in allowed[role]: - return True - elif role in _ROLE_PERMISSIONS and "memory.read" in _ROLE_PERMISSIONS[role]: - return True - return False - - -def _is_root_workspace() -> bool: - """Return True if this workspace is tier 0 (root/root-org).""" - return _get_workspace_tier() == 0 - - -def _auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]: - """Return Phase 30.1 auth headers; tolerate platform_auth being absent - in older installs (e.g. during rolling upgrade). - - ``workspace_id`` selects the per-workspace token from the multi- - workspace registry when set (PR-1: external agent registered in - multiple workspaces). With no arg the legacy single-token path is - unchanged. - """ - try: - from platform_auth import auth_headers - return auth_headers(workspace_id) if workspace_id else auth_headers() - except Exception: - return {} +from a2a_tools_rbac import ( # noqa: E402 (import after the from-a2a_client block) + _auth_headers_for_heartbeat, + _check_memory_read_permission, + _check_memory_write_permission, + _get_workspace_tier, + _is_root_workspace, + _ROLE_PERMISSIONS, +) # Per-field caps on the heartbeat / activity payload. Borrowed from diff --git a/workspace/a2a_tools_rbac.py b/workspace/a2a_tools_rbac.py new file mode 100644 index 00000000..25bffd93 --- /dev/null +++ b/workspace/a2a_tools_rbac.py @@ -0,0 +1,138 @@ +"""RBAC + auth-header helpers shared by all a2a_tools tool handlers. + +Extracted from ``a2a_tools.py`` (RFC #2873 iter 4a). Centralises the +"what can this workspace do" + "how do I prove it on a platform call" +concerns into a single module so: + + * Future tools added under ``a2a_tools/`` see one obvious helper to + call instead of re-implementing the role/tier check. + * The role-permission table is in ONE place — adding a new role + or capability touches one file, not every tool that gates on it. + * Tests targeting these helpers don't have to import the whole + 991-LOC ``a2a_tools`` surface. + +Public surface: + +* ``ROLE_PERMISSIONS`` — canonical role → action set table. +* ``get_workspace_tier()`` — config-resolved tier (0 = root). +* ``check_memory_write_permission()`` — boolean. +* ``check_memory_read_permission()`` — boolean. +* ``is_root_workspace()`` — boolean (tier == 0). +* ``auth_headers_for_heartbeat(workspace_id=None)`` — auth-header dict + with the multi-workspace registry lookup; tolerates ``platform_auth`` + missing on older installs (returns ``{}``). + +Underscore-prefixed back-compat aliases (``_ROLE_PERMISSIONS``, +``_check_memory_write_permission``, etc.) match the names previously +exposed in ``a2a_tools`` so existing tests' +``patch("a2a_tools._foo", ...)`` continue to work via the re-exports +in ``a2a_tools.py``. +""" +from __future__ import annotations + +import os + + +# Mirror ``builtin_tools/audit.py`` for a2a_tools isolation. Listed as a +# module-level constant rather than computed lazily so the table is +# discoverable in static analysis + ``grep``. +ROLE_PERMISSIONS: dict[str, set[str]] = { + "admin": {"delegate", "approve", "memory.read", "memory.write"}, + "operator": {"delegate", "approve", "memory.read", "memory.write"}, + "read-only": {"memory.read"}, + "no-delegation": {"approve", "memory.read", "memory.write"}, + "no-approval": {"delegate", "memory.read", "memory.write"}, + "memory-readonly": {"memory.read"}, +} + + +def get_workspace_tier() -> int: + """Return the workspace tier from config (0 = root, 1+ = tenant).""" + try: + from config import load_config + + cfg = load_config() + return getattr(cfg, "tier", 1) + except Exception: + return int(os.environ.get("WORKSPACE_TIER", 1)) + + +def _resolve_role_state() -> tuple[list[str], dict]: + """Return (roles, allowed_actions) from config. + + Fail-closed: if config is unavailable, fall back to an "operator" + default with no per-role overrides. Operator has memory.read + + memory.write but not the elevated approve/delegate over GLOBAL + scope, so a config outage doesn't grant unexpected privileges. + """ + try: + from config import load_config + + cfg = load_config() + roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) + allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) + return roles, allowed + except Exception: + return ["operator"], {} + + +def check_memory_write_permission() -> bool: + """Return True if this workspace's RBAC roles grant memory.write.""" + roles, allowed = _resolve_role_state() + for role in roles: + if role == "admin": + return True + if role in allowed: + if "memory.write" in allowed[role]: + return True + elif role in ROLE_PERMISSIONS and "memory.write" in ROLE_PERMISSIONS[role]: + return True + return False + + +def check_memory_read_permission() -> bool: + """Return True if this workspace's RBAC roles grant memory.read.""" + roles, allowed = _resolve_role_state() + for role in roles: + if role == "admin": + return True + if role in allowed: + if "memory.read" in allowed[role]: + return True + elif role in ROLE_PERMISSIONS and "memory.read" in ROLE_PERMISSIONS[role]: + return True + return False + + +def is_root_workspace() -> bool: + """Return True if this workspace is tier 0 (root/root-org).""" + return get_workspace_tier() == 0 + + +def auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]: + """Return Phase 30.1 auth headers; tolerate platform_auth being absent + in older installs (e.g. during rolling upgrade). + + ``workspace_id`` selects the per-workspace token from the multi- + workspace registry when set (PR-1: external agent registered in + multiple workspaces). With no arg the legacy single-token path is + unchanged. + """ + try: + from platform_auth import auth_headers + return auth_headers(workspace_id) if workspace_id else auth_headers() + except Exception: + return {} + + +# ============== Back-compat aliases for the previous a2a_tools names ============== +# Tests + downstream call sites refer to the pre-extract names; aliasing +# keeps both forms valid. The new public names (no underscore prefix) +# are preferred for new code. + +_ROLE_PERMISSIONS = ROLE_PERMISSIONS +_get_workspace_tier = get_workspace_tier +_check_memory_write_permission = check_memory_write_permission +_check_memory_read_permission = check_memory_read_permission +_is_root_workspace = is_root_workspace +_auth_headers_for_heartbeat = auth_headers_for_heartbeat diff --git a/workspace/tests/test_a2a_tools_rbac.py b/workspace/tests/test_a2a_tools_rbac.py new file mode 100644 index 00000000..4cb0b38e --- /dev/null +++ b/workspace/tests/test_a2a_tools_rbac.py @@ -0,0 +1,281 @@ +"""Direct tests for ``a2a_tools_rbac`` (RFC #2873 iter 4a). + +The full behavior matrix is exercised through ``a2a_tools._foo`` aliases +in ``test_a2a_tools_impl.py``. This file pins: + + 1. **Drift gate** — ``a2a_tools._foo is a2a_tools_rbac.foo`` for every + extracted symbol. A refactor that wraps or re-implements an alias + fails this test. + 2. **Direct unit coverage** for each helper without going through the + a2a_tools surface, so regressions in the small RBAC layer surface + against THIS module's tests, not the 991-LOC tool-handler tests. +""" +from __future__ import annotations + +import os +import sys +from unittest.mock import patch + +import pytest + + +@pytest.fixture(autouse=True) +def _require_workspace_id(monkeypatch): + # a2a_client raises at import-time without WORKSPACE_ID. Setting it + # once per test isolates the env so an absent value in CI doesn't + # surface as an opaque RuntimeError from a2a_tools' import. + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "http://test.invalid") + yield + + +# ============== Drift gate ============== + +class TestBackCompatAliases: + """Pin that every legacy underscore name in ``a2a_tools`` is the + EXACT same callable / object as the new public name in + ``a2a_tools_rbac``. Catches accidental re-implementation in either + direction.""" + + def test_role_permissions_is_same_object(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._ROLE_PERMISSIONS is a2a_tools_rbac.ROLE_PERMISSIONS + + def test_get_workspace_tier_alias(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._get_workspace_tier is a2a_tools_rbac.get_workspace_tier + + def test_check_memory_write_permission_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._check_memory_write_permission + is a2a_tools_rbac.check_memory_write_permission + ) + + def test_check_memory_read_permission_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._check_memory_read_permission + is a2a_tools_rbac.check_memory_read_permission + ) + + def test_is_root_workspace_alias(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._is_root_workspace is a2a_tools_rbac.is_root_workspace + + def test_auth_headers_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._auth_headers_for_heartbeat + is a2a_tools_rbac.auth_headers_for_heartbeat + ) + + +# ============== get_workspace_tier ============== + +class TestGetWorkspaceTier: + def test_uses_config_when_available(self): + """Happy path: load_config returns an object with .tier.""" + import a2a_tools_rbac + + class _Cfg: + tier = 0 + + with patch("config.load_config", return_value=_Cfg()): + assert a2a_tools_rbac.get_workspace_tier() == 0 + + def test_default_tier_when_config_lacks_attr(self): + import a2a_tools_rbac + + class _Cfg: + pass + + with patch("config.load_config", return_value=_Cfg()): + # getattr default = 1 + assert a2a_tools_rbac.get_workspace_tier() == 1 + + def test_falls_back_to_env_var(self, monkeypatch): + """When load_config raises, read WORKSPACE_TIER from env.""" + import a2a_tools_rbac + monkeypatch.setenv("WORKSPACE_TIER", "5") + with patch("config.load_config", side_effect=RuntimeError("config unavailable")): + assert a2a_tools_rbac.get_workspace_tier() == 5 + + def test_fallback_default_one_when_env_unset(self, monkeypatch): + import a2a_tools_rbac + monkeypatch.delenv("WORKSPACE_TIER", raising=False) + with patch("config.load_config", side_effect=RuntimeError("boom")): + assert a2a_tools_rbac.get_workspace_tier() == 1 + + +# ============== is_root_workspace ============== + +class TestIsRootWorkspace: + def test_tier_zero_is_root(self): + import a2a_tools_rbac + with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=0): + assert a2a_tools_rbac.is_root_workspace() is True + + def test_nonzero_tier_is_not_root(self): + import a2a_tools_rbac + for tier in (1, 2, 99): + with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=tier): + assert a2a_tools_rbac.is_root_workspace() is False, f"tier={tier}" + + +# ============== check_memory_write_permission ============== + +class _RBACCfg: + """Minimal config stub matching the load_config().rbac shape.""" + + def __init__(self, roles=None, allowed_actions=None): + class _RBAC: + pass + self.rbac = _RBAC() + self.rbac.roles = roles or ["operator"] + self.rbac.allowed_actions = allowed_actions or {} + + +class TestCheckMemoryWritePermission: + def test_admin_role_grants_write(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_operator_role_grants_write(self): + """Operator is in the canonical ROLE_PERMISSIONS table with + memory.write — must work without per-role overrides.""" + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["operator"])): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_read_only_role_denies_write(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])): + assert a2a_tools_rbac.check_memory_write_permission() is False + + def test_per_role_override_grants(self): + """Per-role override in allowed_actions wins over the canonical + table — operators can grant write to memory-readonly via config.""" + import a2a_tools_rbac + cfg = _RBACCfg( + roles=["memory-readonly"], + allowed_actions={"memory-readonly": {"memory.read", "memory.write"}}, + ) + with patch("config.load_config", return_value=cfg): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_per_role_override_denies(self): + """Per-role override that drops write blocks an operator from + writing — the override is the authoritative source when present.""" + import a2a_tools_rbac + cfg = _RBACCfg( + roles=["operator"], + allowed_actions={"operator": {"memory.read"}}, + ) + with patch("config.load_config", return_value=cfg): + assert a2a_tools_rbac.check_memory_write_permission() is False + + def test_fail_closed_when_config_unavailable(self): + """Fail-closed contract: config outage falls back to ['operator'] + with no overrides — operator has memory.write in the canonical + table, so write IS granted in this fallback. The fail-closed + property is for ELEVATED ops (admin scope), not for the basic + write that operator has by default. This test pins the contract: + config errors do not silently grant admin.""" + import a2a_tools_rbac + with patch("config.load_config", side_effect=RuntimeError("boom")): + # operator has memory.write → True (preserved behavior) + assert a2a_tools_rbac.check_memory_write_permission() is True + + +# ============== check_memory_read_permission ============== + +class TestCheckMemoryReadPermission: + def test_admin_grants_read(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])): + assert a2a_tools_rbac.check_memory_read_permission() is True + + def test_read_only_grants_read(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])): + assert a2a_tools_rbac.check_memory_read_permission() is True + + def test_unknown_role_denies(self): + """A role that's not in ROLE_PERMISSIONS and not in + allowed_actions overrides denies by default.""" + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["random-undefined-role"])): + assert a2a_tools_rbac.check_memory_read_permission() is False + + +# ============== auth_headers_for_heartbeat ============== + +class TestAuthHeadersForHeartbeat: + def test_no_workspace_id_uses_legacy_path(self): + """No-arg call routes to platform_auth.auth_headers() — the + legacy single-token path.""" + import a2a_tools_rbac + called: dict[str, object] = {} + + def fake_auth_headers(*args): + called["args"] = args + return {"Authorization": "Bearer legacy-token"} + + with patch("platform_auth.auth_headers", fake_auth_headers): + out = a2a_tools_rbac.auth_headers_for_heartbeat() + assert out == {"Authorization": "Bearer legacy-token"} + # Legacy path is auth_headers() with no arg + assert called["args"] == () + + def test_with_workspace_id_routes_per_workspace(self): + import a2a_tools_rbac + called: dict[str, object] = {} + + def fake_auth_headers(wsid): + called["wsid"] = wsid + return {"Authorization": f"Bearer tok-{wsid}"} + + with patch("platform_auth.auth_headers", fake_auth_headers): + out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-abc") + assert out == {"Authorization": "Bearer tok-ws-abc"} + assert called["wsid"] == "ws-abc" + + def test_returns_empty_when_platform_auth_missing(self, monkeypatch): + """Older installs without platform_auth get {} so callers don't + crash — they'll just send unauthed and the platform 401 handler + surfaces the real error.""" + import a2a_tools_rbac + # Force ImportError by setting sys.modules entry to None + monkeypatch.setitem(sys.modules, "platform_auth", None) + out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-1") + assert out == {} + + +# ============== ROLE_PERMISSIONS canonical table ============== + +class TestRolePermissionsTable: + def test_admin_has_all_actions(self): + import a2a_tools_rbac + assert a2a_tools_rbac.ROLE_PERMISSIONS["admin"] == { + "delegate", "approve", "memory.read", "memory.write", + } + + def test_read_only_has_only_memory_read(self): + import a2a_tools_rbac + assert a2a_tools_rbac.ROLE_PERMISSIONS["read-only"] == {"memory.read"} + + def test_no_delegation_is_missing_delegate(self): + import a2a_tools_rbac + assert "delegate" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-delegation"] + + def test_no_approval_is_missing_approve(self): + import a2a_tools_rbac + assert "approve" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-approval"] From 8388144098a06ac559c1500cedcaa2348919e3ce Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 05:00:29 -0700 Subject: [PATCH 3/5] fix(build): add iter-3 mcp_* modules to TOP_LEVEL_MODULES drift gate The iter-3 split created mcp_heartbeat / mcp_inbox_pollers / mcp_workspace_resolver but the wheel build's drift-gate check at scripts/build_runtime_package.py:TOP_LEVEL_MODULES wasn't updated. Without this fix the wheel ships those modules un-rewritten, so their imports of platform_auth / configs_dir / etc. break at runtime. Caught by the 'PR-built wheel + import smoke' check. Refs RFC #2873 iter 3. --- scripts/build_runtime_package.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index f267e173..e4b4bd21 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -74,6 +74,9 @@ TOP_LEVEL_MODULES = { "internal_file_read", "main", "mcp_cli", + "mcp_heartbeat", + "mcp_inbox_pollers", + "mcp_workspace_resolver", "molecule_ai_status", "not_configured_handler", "platform_auth", From 17aec22f9b39a92e298426c0319d629905fd3c5e Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 05:00:47 -0700 Subject: [PATCH 4/5] fix(build): add a2a_tools_rbac to TOP_LEVEL_MODULES drift gate Iter 4a's new module needs to be in the rewrite list so the wheel ships its imports prefixed correctly. Caught by 'PR-built wheel + import smoke'. Refs RFC #2873 iter 4a. --- scripts/build_runtime_package.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index f267e173..60963b96 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -55,6 +55,7 @@ TOP_LEVEL_MODULES = { "a2a_executor", "a2a_mcp_server", "a2a_tools", + "a2a_tools_rbac", "adapter_base", "agent", "agents_md", From 48d19452694788c1f8dbc4d06cfdb0a65eb9968d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 08:32:56 -0700 Subject: [PATCH 5/5] test(org-import): tighten AST gate to discriminate workspaces vs lookalikes (#2872 Imp-1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous TestCreateWorkspaceTree_CallsLookupBeforeInsert used bytes.Index("INSERT INTO workspaces"), which prefix-matches INSERT INTO workspaces_audit, INSERT INTO workspace_secrets, and INSERT INTO workspace_channels. RFC #2872 cited this as a silent false-pass mode: a future refactor that adds an audit-table INSERT literal earlier in source than the real workspaces INSERT would make the gate point at the wrong target. Replaces the byte-search with a go/ast walk + a regex that requires `\s*\(` after `workspaces` — distinguishes the real target from prefix lookalikes. Adds three discriminating tests: - TestWorkspacesInsertRE_RejectsLookalikes — pins the regex against 9 sql shapes (real, raw-string-literal, audit-shadow, workspace_* prefixes, canvas_layouts, UPDATE/SELECT, comments). - TestGate_FailsWhenLookupAfterInsert — synthesizes Go source where the lookup is positioned AFTER the workspaces INSERT, asserts the helper returns lookupPos > insertPos (which the production gate flags via t.Errorf). Proves the gate isn't vestigial. - TestGate_IgnoresAuditTableShadow — synthesizes source with an audit-table INSERT BEFORE the lookup + real INSERT, asserts the tightened regex correctly walks past the shadow and finds the real INSERT. Also extracts findLookupAndWorkspacesInsertPos as a helper so the gate logic can be exercised against synthetic source, not only against the real org_import.go. Memory: feedback_assert_exact_not_substring.md (verify tightened test FAILS on old code) — TestGate_FailsWhenLookupAfterInsert is the failing-on-bug-shape proof. Closes the silent-false-pass mode of #2872 Important-1. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../handlers/org_import_idempotency_test.go | 255 +++++++++++++++++- 1 file changed, 245 insertions(+), 10 deletions(-) diff --git a/workspace-server/internal/handlers/org_import_idempotency_test.go b/workspace-server/internal/handlers/org_import_idempotency_test.go index 0d7498fb..cefc6e74 100644 --- a/workspace-server/internal/handlers/org_import_idempotency_test.go +++ b/workspace-server/internal/handlers/org_import_idempotency_test.go @@ -1,11 +1,15 @@ package handlers import ( - "bytes" "context" "errors" + "go/ast" + "go/parser" + "go/token" "os" "path/filepath" + "regexp" + "strconv" "strings" "testing" @@ -119,6 +123,60 @@ func TestLookupExistingChild_DBError_Propagates(t *testing.T) { } } +// workspacesInsertRE matches a SQL literal that begins (after optional +// leading whitespace) with `INSERT INTO workspaces` followed by `(` — +// requiring the open-paren rules out lookalikes like +// `INSERT INTO workspaces_audit`, `INSERT INTO workspace_secrets`, +// `INSERT INTO workspace_channels`, `INSERT INTO canvas_layouts`. The +// previous bytes.Index gate accepted `workspaces_audit` as a prefix +// match — see RFC #2872 Important-1 for the silent-false-pass shape. +var workspacesInsertRE = regexp.MustCompile(`(?s)^\s*INSERT\s+INTO\s+workspaces\s*\(`) + +// findLookupAndWorkspacesInsertPos walks the AST of `src` and returns +// the source positions of (a) the first call to `lookupExistingChild` +// and (b) the first CallExpr whose argument list contains a STRING +// BasicLit matching workspacesInsertRE. Either may be token.NoPos if +// not found. +// +// Extracted as a helper so the gate logic can be exercised against +// synthetic source — TestGate_FailsWhenLookupAfterInsert below proves +// the gate actually catches the bug shape, not just the happy path. +func findLookupAndWorkspacesInsertPos(t *testing.T, fname string, src []byte) (lookupPos, insertPos token.Pos, fset *token.FileSet) { + t.Helper() + fset = token.NewFileSet() + file, err := parser.ParseFile(fset, fname, src, parser.ParseComments) + if err != nil { + t.Fatalf("parse %s: %v", fname, err) + } + lookupPos, insertPos = token.NoPos, token.NoPos + ast.Inspect(file, func(n ast.Node) bool { + call, ok := n.(*ast.CallExpr) + if !ok { + return true + } + if sel, ok := call.Fun.(*ast.SelectorExpr); ok { + if sel.Sel.Name == "lookupExistingChild" && lookupPos == token.NoPos { + lookupPos = call.Pos() + } + } + for _, arg := range call.Args { + lit, ok := arg.(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + continue + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if workspacesInsertRE.MatchString(raw) && insertPos == token.NoPos { + insertPos = call.Pos() + } + } + return true + }) + return +} + // Source-level guard — pins that org_import.go calls // h.lookupExistingChild BEFORE its INSERT INTO workspaces. // @@ -126,6 +184,11 @@ func TestLookupExistingChild_DBError_Propagates(t *testing.T) { // (idempotency check before INSERT), not just function names. If a // future refactor reintroduces the un-checked INSERT (the original // bug shape that leaked 72 workspaces in 4 days), this test fails. +// +// AST-walk implementation closes the silent-false-pass mode that the +// previous bytes.Index gate had — see workspacesInsertRE comment for +// the failure mode (workspaces_audit / workspace_secrets / etc. +// shadowing the real target via prefix match). func TestCreateWorkspaceTree_CallsLookupBeforeInsert(t *testing.T) { wd, err := os.Getwd() if err != nil { @@ -135,17 +198,189 @@ func TestCreateWorkspaceTree_CallsLookupBeforeInsert(t *testing.T) { if err != nil { t.Fatalf("read org_import.go: %v", err) } + lookupPos, insertPos, fset := findLookupAndWorkspacesInsertPos(t, "org_import.go", src) - lookupAt := bytes.Index(src, []byte("h.lookupExistingChild(")) - insertAt := bytes.Index(src, []byte("INSERT INTO workspaces")) - - if lookupAt < 0 { - t.Fatalf("org_import.go missing call to h.lookupExistingChild — idempotency check removed?") + if lookupPos == token.NoPos { + t.Fatalf("AST: no call to lookupExistingChild in org_import.go — idempotency check removed?") } - if insertAt < 0 { - t.Fatalf("org_import.go missing INSERT INTO workspaces — schema change?") + if insertPos == token.NoPos { + t.Fatalf("AST: no SQL literal matching `^\\s*INSERT INTO workspaces\\s*\\(` in any CallExpr in org_import.go — schema change or rename?") } - if lookupAt > insertAt { - t.Errorf("h.lookupExistingChild must come BEFORE INSERT INTO workspaces in org_import.go (lookup@%d, insert@%d) — non-idempotent ordering would re-leak under repeat /org/import calls", lookupAt, insertAt) + if lookupPos > insertPos { + t.Errorf("lookupExistingChild call at %s must come BEFORE INSERT INTO workspaces at %s — non-idempotent ordering would re-leak under repeat /org/import calls", + fset.Position(lookupPos), fset.Position(insertPos)) + } +} + +// TestGate_FailsWhenLookupAfterInsert proves the gate actually catches +// the bug it's named after — running it against synthetic Go source +// where the lookup call is positioned AFTER the workspaces INSERT must +// produce lookupPos > insertPos, which the production gate flags as +// an ERROR. Without this test the gate could regress to "always pass" +// and we wouldn't notice until the bug shipped again. +// +// Per memory feedback_assert_exact_not_substring.md: verify a +// tightened test FAILS on old code before merging. +func TestGate_FailsWhenLookupAfterInsert(t *testing.T) { + const buggySrc = `package handlers + +import "context" + +type fakeDB struct{} + +func (fakeDB) ExecContext(ctx context.Context, sql string, args ...interface{}) {} + +type fakeOrgHandler struct{} + +func (h *fakeOrgHandler) lookupExistingChild(ctx context.Context, name string, parentID *string) (string, bool, error) { + return "", false, nil +} + +func buggyCreate(h *fakeOrgHandler, db fakeDB, ctx context.Context, name string, parentID *string) { + // Bug shape: INSERT runs FIRST, lookup runs AFTER. This is the + // non-idempotent ordering the gate exists to forbid. + db.ExecContext(ctx, ` + "`INSERT INTO workspaces (id, name) VALUES ($1, $2)`" + `, "x", name) + h.lookupExistingChild(ctx, name, parentID) +} +` + lookupPos, insertPos, _ := findLookupAndWorkspacesInsertPos(t, "buggy.go", []byte(buggySrc)) + if lookupPos == token.NoPos || insertPos == token.NoPos { + t.Fatalf("synthetic buggy source missing expected nodes (lookupPos=%v insertPos=%v) — helper logic regression", lookupPos, insertPos) + } + if lookupPos < insertPos { + t.Fatalf("synthetic bug shape (lookup AFTER insert) returned lookupPos=%d < insertPos=%d — gate would NOT fire on actual bug, regression!", lookupPos, insertPos) + } + // Implicit: lookupPos > insertPos here, which the production gate + // flags via t.Errorf. This proves the gate is live, not vestigial. +} + +// TestGate_IgnoresAuditTableShadow proves the regex tightening +// actually ignores `INSERT INTO workspaces_audit` literals — the +// specific shape #2872 cited as the silent-false-pass failure mode +// for the previous bytes.Index gate. +func TestGate_IgnoresAuditTableShadow(t *testing.T) { + // Synthetic source with audit-table INSERT at line 1 (would be + // position 0 under prefix-match) and lookup + real INSERT at later + // positions. With the tightened regex, the audit literal is + // ignored: insertPos points at the REAL INSERT, lookup precedes it, + // gate passes correctly. + const src = `package handlers + +import "context" + +type fakeDB struct{} + +func (fakeDB) ExecContext(ctx context.Context, sql string, args ...interface{}) {} + +type fakeOrgHandler struct{} + +func (h *fakeOrgHandler) lookupExistingChild(ctx context.Context, name string, parentID *string) (string, bool, error) { + return "", false, nil +} + +func okCreateWithAudit(h *fakeOrgHandler, db fakeDB, ctx context.Context, name string, parentID *string) { + // Audit-table INSERT — should be IGNORED by the tightened regex. + db.ExecContext(ctx, ` + "`INSERT INTO workspaces_audit (id, action) VALUES ($1, $2)`" + `, "x", "create_attempt") + // Lookup BEFORE real INSERT — correct order. + h.lookupExistingChild(ctx, name, parentID) + // Real INSERT. + db.ExecContext(ctx, ` + "`INSERT INTO workspaces (id, name) VALUES ($1, $2)`" + `, "x", name) +} +` + lookupPos, insertPos, fset := findLookupAndWorkspacesInsertPos(t, "shadow.go", []byte(src)) + if lookupPos == token.NoPos || insertPos == token.NoPos { + t.Fatalf("expected to find lookup + real INSERT, got lookupPos=%v insertPos=%v", lookupPos, insertPos) + } + // The audit-table INSERT is at line ~16 (column ~20-ish), the + // lookup is at line 19, the real INSERT is at line 21. If the + // regex regressed to prefix-match, insertPos would point at the + // audit literal at line 16, and the gate would falsely fail + // (lookup at 19 > "insert" at 16). With the tightened regex, + // insertPos correctly points at line 21, and the gate passes. + insertLine := fset.Position(insertPos).Line + lookupLine := fset.Position(lookupPos).Line + if insertLine < lookupLine { + t.Errorf("regex regressed: audit shadow at line %d swallowed real INSERT (lookup at line %d). insertPos should point at the real INSERT (line ~21), not the audit literal.", + insertLine, lookupLine) + } + if lookupPos > insertPos { + t.Errorf("synthetic source has lookup at line %d before real INSERT at line %d, gate should pass (lookupPos < insertPos), got lookupPos=%d > insertPos=%d", + lookupLine, insertLine, lookupPos, insertPos) + } +} + +// TestWorkspacesInsertRE_RejectsLookalikes pins the regex that +// discriminates the real workspaces INSERT from prefix-matching +// lookalikes. If this regex regresses to a substring match, the +// AST gate above silently false-passes when a future refactor +// shadows the real INSERT with a workspaces_audit / workspace_secrets +// / canvas_layouts literal placed earlier in source. +func TestWorkspacesInsertRE_RejectsLookalikes(t *testing.T) { + cases := []struct { + sql string + want bool + comment string + }{ + {"INSERT INTO workspaces (id, name) VALUES ($1, $2)", true, "real target"}, + {"\n\t\tINSERT INTO workspaces (id, name)\n\t\tVALUES ($1, $2)", true, "real target with leading whitespace + newlines (raw string literal shape)"}, + {"INSERT INTO workspaces_audit (id) VALUES ($1)", false, "underscore-suffix lookalike (the #2872 specific failure mode)"}, + {"INSERT INTO workspace_secrets (key, value) VALUES ($1, $2)", false, "prefix without trailing 's' (workspace_*)"}, + {"INSERT INTO workspace_channels (id) VALUES ($1)", false, "another workspace_* prefix"}, + {"INSERT INTO canvas_layouts (workspace_id, x, y) VALUES ($1, $2, $3)", false, "unrelated table that contains 'workspace' in a column ref"}, + {"UPDATE workspaces SET status='running' WHERE id=$1", false, "UPDATE shouldn't match"}, + {"SELECT * FROM workspaces WHERE id=$1", false, "SELECT shouldn't match"}, + {"-- comment about INSERT INTO workspaces (\nSELECT 1", false, "comment shouldn't match"}, + } + for _, c := range cases { + got := workspacesInsertRE.MatchString(c.sql) + if got != c.want { + t.Errorf("workspacesInsertRE.MatchString(%q) = %v, want %v (%s)", c.sql, got, c.want, c.comment) + } + } +} + +// Confirm the regex actually matches the literal currently in +// org_import.go. Pins the shape so `gofmt` reflows or trivial edits +// to the SQL string don't silently disable the gate above. +func TestWorkspacesInsertRE_MatchesActualSourceLiteral(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + src, err := os.ReadFile(filepath.Join(wd, "org_import.go")) + if err != nil { + t.Fatalf("read org_import.go: %v", err) + } + // Strip backtick strings, find any whose content matches. + // Walk the source via parser.ParseFile to avoid string-search + // drift if the literal is reflowed. + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, filepath.Join(wd, "org_import.go"), src, parser.ParseComments) + if err != nil { + t.Fatalf("parse org_import.go: %v", err) + } + var matched bool + ast.Inspect(file, func(n ast.Node) bool { + lit, ok := n.(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + return true + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if workspacesInsertRE.MatchString(raw) { + matched = true + } + return true + }) + if !matched { + t.Fatalf("no SQL literal in org_import.go matches workspacesInsertRE — gate is dead. Either the INSERT was renamed (update the regex) or the file was restructured (review the gate logic).") + } + // strings.Contains keeps the test informative: if the regex + // stopped matching but the literal source still contains the + // magic phrase, that's a regex-side failure (test the fix above). + if !strings.Contains(string(src), "INSERT INTO workspaces") { + t.Fatalf("org_import.go has no `INSERT INTO workspaces` substring at all — schema change?") } }