From 829ab66462e7f5540924f88607495846de274e85 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 08:06:00 -0700 Subject: [PATCH] mcp: support multi-workspace external-agent registration (PR-1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit External MCP agents (e.g. Claude Code installed on a company PC) can now register against MULTIPLE workspaces from a single process — the agent participates as a peer in workspace A (company) AND workspace B (personal) simultaneously, with one merged inbox tagged so replies route to the correct tenant. Use case (verbatim from operator): "I have this computer AI thats in company's PC, he is going to be put in company's workspace, but personally, I want to register it to my own workspace as well, so that I can talk to it and asking him to do work." ## What changed **Wire format** — new env var: MOLECULE_WORKSPACES='[ {"id":"","token":""}, {"id":"","token":""} ]' When set, mcp_cli iterates the array and spawns one (register + heartbeat + inbox poller) trio per workspace. Single-workspace mode (WORKSPACE_ID + MOLECULE_WORKSPACE_TOKEN) is unchanged — every existing operator's setup keeps working bit-for-bit. **Per-workspace token registry** (platform_auth.py): register_workspace_token(wsid, tok) — populated by mcp_cli once per workspace before any thread spawns; thread-safe registration + lock-free reads on the hot path. auth_headers(workspace_id=...) routes to the per-workspace token; auth_headers() with no arg uses the legacy resolution path unchanged (back-compat). **Per-workspace inbox cursors** (inbox.py): InboxState now supports cursor_paths={wsid: Path,...}. Each poller advances its own cursor — one workspace's slow poll can't stall another, and a 410 only resets the affected workspace's cursor. Single-workspace constructor (cursor_path=Path(...)) still works exactly as before via __post_init__ promotion to the empty-string key. Cursor filenames disambiguated by workspace_id[:8] when multi-workspace; single-workspace keeps the legacy filename so upgrade doesn't invalidate on-disk state. **Arrival workspace tagging** (inbox.py): InboxMessage.arrival_workspace_id — tells the agent which OF ITS workspaces the inbound message arrived on. Set by the poller from the cursor key. to_dict() omits the field when empty so single- workspace consumers see no shape change. **Reply routing** (a2a_tools.py + a2a_mcp_server.py + registry.py): send_message_to_user(workspace_id=...) — optional override that selects which workspace's /notify endpoint to POST to (and which token authenticates). Multi-workspace agents pass the inbound message's arrival_workspace_id; single-workspace agents omit it and route to the only registered workspace via the legacy URL. ## Out of scope (future PRs) - PR-2: cross-workspace delegation auto-routing — when an agent receives a request from personal-ws "delegate to ops-bot" and ops-bot lives in company-ws, the agent should auto-pick its company-ws identity for the outbound delegate_task. Today the agent must pass via_workspace explicitly (or fall through to primary workspace). - PR-3: memory namespacing — commit_memory() still writes to the primary workspace's memory regardless of inbound context. Will revisit when the new memory system (PR #2733 just landed) settles. ## Tests workspace/tests/test_mcp_cli_multi_workspace.py — 24 new tests: * MOLECULE_WORKSPACES JSON parsing (valid + 6 error shapes) * Token registry register / lookup / rotation / clear * auth_headers routing by workspace_id with legacy fallback * Per-workspace cursor save/load/reset isolation * arrival_workspace_id present-when-set, omitted-when-empty * default_cursor_path namespacing All 110 pre-existing tests in test_mcp_cli.py / test_inbox.py / test_platform_auth.py still pass — back-compat is mechanical. Refs: project memory entry "External agent multi-workspace registration", design questions answered 2026-05-04 by user (JSON env var; explicit memory writes deferred to PR-3). Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_mcp_server.py | 1 + workspace/a2a_tools.py | 46 ++- workspace/inbox.py | 188 +++++++--- workspace/mcp_cli.py | 211 +++++++++-- workspace/platform_auth.py | 73 +++- workspace/platform_tools/registry.py | 11 + .../tests/test_mcp_cli_multi_workspace.py | 335 ++++++++++++++++++ 7 files changed, 769 insertions(+), 96 deletions(-) create mode 100644 workspace/tests/test_mcp_cli_multi_workspace.py diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 7db512e5..0c979a18 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -113,6 +113,7 @@ async def handle_tool_call(name: str, arguments: dict) -> str: return await tool_send_message_to_user( arguments.get("message", ""), attachments=attachments, + workspace_id=arguments.get("workspace_id") or None, ) elif name == "list_peers": return await tool_list_peers() diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index a6ffed7e..e5ce78ec 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -102,12 +102,18 @@ def _is_root_workspace() -> bool: return _get_workspace_tier() == 0 -def _auth_headers_for_heartbeat() -> dict[str, str]: +def _auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]: """Return Phase 30.1 auth headers; tolerate platform_auth being absent - in older installs (e.g. during rolling upgrade).""" + in older installs (e.g. during rolling upgrade). + + ``workspace_id`` selects the per-workspace token from the multi- + workspace registry when set (PR-1: external agent registered in + multiple workspaces). With no arg the legacy single-token path is + unchanged. + """ try: from platform_auth import auth_headers - return auth_headers() + return auth_headers(workspace_id) if workspace_id else auth_headers() except Exception: return {} @@ -313,7 +319,11 @@ async def tool_check_task_status(workspace_id: str, task_id: str) -> str: return f"Error checking delegations: {e}" -async def _upload_chat_files(client: httpx.AsyncClient, paths: list[str]) -> tuple[list[dict], str | None]: +async def _upload_chat_files( + client: httpx.AsyncClient, + paths: list[str], + workspace_id: str | None = None, +) -> tuple[list[dict], str | None]: """Upload local file paths through /workspaces//chat/uploads. The platform stages each upload under /workspace/.molecule/chat-uploads @@ -353,11 +363,12 @@ async def _upload_chat_files(client: httpx.AsyncClient, paths: list[str]) -> tup if not mime_type: mime_type = "application/octet-stream" files_payload.append(("files", (os.path.basename(p), data, mime_type))) + target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID try: resp = await client.post( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/chat/uploads", + f"{PLATFORM_URL}/workspaces/{target_workspace_id}/chat/uploads", files=files_payload, - headers=_auth_headers_for_heartbeat(), + headers=_auth_headers_for_heartbeat(target_workspace_id), ) except Exception as e: return [], f"Error uploading attachments: {e}" @@ -373,7 +384,11 @@ async def _upload_chat_files(client: httpx.AsyncClient, paths: list[str]) -> tup return uploaded, None -async def tool_send_message_to_user(message: str, attachments: list[str] | None = None) -> str: +async def tool_send_message_to_user( + message: str, + attachments: list[str] | None = None, + workspace_id: str | None = None, +) -> str: """Send a message directly to the user's canvas chat via WebSocket. Args: @@ -388,21 +403,32 @@ async def tool_send_message_to_user(message: str, attachments: list[str] | None Examples: attachments=["/tmp/build-output.zip"] attachments=["/workspace/report.pdf", "/workspace/data.csv"] + workspace_id: Optional. When the agent is registered in MULTIPLE + workspaces (external multi-workspace MCP path), this + selects which workspace's chat to deliver the message to — + should match the ``arrival_workspace_id`` of the inbound + message you're replying to so the user sees the reply in + the same canvas they typed in. Single-workspace agents + omit this; the message routes to the only registered + workspace. """ if not message: return "Error: message is required" + target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID try: async with httpx.AsyncClient(timeout=60.0) as client: - uploaded, upload_err = await _upload_chat_files(client, attachments or []) + uploaded, upload_err = await _upload_chat_files( + client, attachments or [], workspace_id=target_workspace_id, + ) if upload_err: return upload_err payload: dict = {"message": message} if uploaded: payload["attachments"] = uploaded resp = await client.post( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/notify", + f"{PLATFORM_URL}/workspaces/{target_workspace_id}/notify", json=payload, - headers=_auth_headers_for_heartbeat(), + headers=_auth_headers_for_heartbeat(target_workspace_id), ) if resp.status_code == 200: if uploaded: diff --git a/workspace/inbox.py b/workspace/inbox.py index b0718f82..94417243 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -93,8 +93,16 @@ class InboxMessage: method: str # JSON-RPC method ("message/send", "tasks/send", etc.) created_at: str # RFC3339 timestamp from the activity row + # Which OF MY workspaces did this message arrive on. Only meaningful + # for the multi-workspace external agent (one process registered + # against multiple workspaces). Empty string = single-workspace + # path / pre-multi-workspace caller — back-compat with consumers + # that don't set it. Tools like send_message_to_user use this to + # know which workspace's identity to reply with. + arrival_workspace_id: str = "" + def to_dict(self) -> dict[str, Any]: - return { + d = { "activity_id": self.activity_id, "text": self.text, "peer_id": self.peer_id, @@ -102,49 +110,85 @@ class InboxMessage: "method": self.method, "created_at": self.created_at, } + # Only surface arrival_workspace_id when it's set, so single- + # workspace consumers don't see a new key in their existing + # output. + if self.arrival_workspace_id: + d["arrival_workspace_id"] = self.arrival_workspace_id + return d @dataclass class InboxState: """Thread-safe queue of pending inbound messages. - Producer: the poller thread, calling ``record(message)``. - Consumers: the MCP tool handlers, calling ``peek``, ``pop``, - or ``wait``. Synchronization is via a single ``threading.Lock`` - (cheap — every operation is O(n) over a small deque) plus an - ``Event`` that wakes ``wait`` callers when a new message lands. + Producer: the poller thread(s), calling ``record(message)``. Consumers: + the MCP tool handlers, calling ``peek``, ``pop``, or ``wait``. + Synchronization is via a single ``threading.Lock`` (cheap — every + operation is O(n) over a small deque) plus an ``Event`` that wakes + ``wait`` callers when a new message lands. + + Cursors are per-workspace. Single-workspace operators construct with + ``InboxState(cursor_path=...)`` (back-compat — the path becomes the + cursor file for the empty-string workspace_id key). Multi-workspace + operators construct with ``InboxState(cursor_paths={wsid: path,...})`` + so each poller advances its own cursor independently — one + workspace's slow poll can't stall another's, and a 410 on one cursor + only resets that one. """ - cursor_path: Path - """File path that persists ``activity_logs.id`` of the most - recently observed row, so a restart doesn't replay backlog.""" + cursor_path: Path | None = None + """Single-workspace cursor file. Sets ``cursor_paths[""]`` if + ``cursor_paths`` not also supplied. Kept on the dataclass for + back-compat — existing callers pass ``cursor_path=`` positionally.""" + + cursor_paths: dict[str, Path] = field(default_factory=dict) + """Per-workspace cursor files keyed by workspace_id. Multi-workspace + pollers each own their own row here.""" _queue: deque[InboxMessage] = field(default_factory=lambda: deque(maxlen=MAX_QUEUED_MESSAGES)) _lock: threading.Lock = field(default_factory=threading.Lock) _arrival: threading.Event = field(default_factory=threading.Event) - _cursor: str | None = None - _cursor_loaded: bool = False + _cursors: dict[str, str | None] = field(default_factory=dict) + _cursors_loaded: dict[str, bool] = field(default_factory=dict) - def load_cursor(self) -> str | None: + def __post_init__(self) -> None: + # Back-compat: single-workspace constructor passes + # cursor_path=Path(...). Promote it into the dict under the + # empty-string key so the lookup APIs are uniform. + if self.cursor_path is not None and "" not in self.cursor_paths: + self.cursor_paths[""] = self.cursor_path + + def _path_for(self, workspace_id: str) -> Path | None: + """Resolve the cursor path for a workspace_id key, or None.""" + return self.cursor_paths.get(workspace_id or "") + + def load_cursor(self, workspace_id: str = "") -> str | None: """Read the persisted cursor from disk. Cached after first call. Missing/unreadable file → None (poller will fall back to the initial-backlog window). We never raise: a corrupt cursor is less bad than the inbox refusing to start. - """ - with self._lock: - if self._cursor_loaded: - return self._cursor - try: - if self.cursor_path.is_file(): - self._cursor = self.cursor_path.read_text().strip() or None - except OSError as exc: - logger.warning("inbox: failed to read cursor %s: %s", self.cursor_path, exc) - self._cursor = None - self._cursor_loaded = True - return self._cursor - def save_cursor(self, activity_id: str) -> None: + ``workspace_id=""`` is the single-workspace path, untouched. + """ + path = self._path_for(workspace_id) + with self._lock: + if self._cursors_loaded.get(workspace_id): + return self._cursors.get(workspace_id) + cursor: str | None = None + if path is not None: + try: + if path.is_file(): + cursor = path.read_text().strip() or None + except OSError as exc: + logger.warning("inbox: failed to read cursor %s: %s", path, exc) + cursor = None + self._cursors[workspace_id] = cursor + self._cursors_loaded[workspace_id] = True + return cursor + + def save_cursor(self, activity_id: str, workspace_id: str = "") -> None: """Persist the cursor. Best-effort — log + continue on failure. Loss of the cursor on a write failure means an extra page of @@ -152,27 +196,33 @@ class InboxState: would mask a permission misconfiguration on the operator's configs dir; warn loudly so they can fix it. """ + path = self._path_for(workspace_id) with self._lock: - self._cursor = activity_id - self._cursor_loaded = True + self._cursors[workspace_id] = activity_id + self._cursors_loaded[workspace_id] = True + if path is None: + return try: - self.cursor_path.parent.mkdir(parents=True, exist_ok=True) - tmp = self.cursor_path.with_suffix(self.cursor_path.suffix + ".tmp") + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(activity_id) - tmp.replace(self.cursor_path) + tmp.replace(path) except OSError as exc: - logger.warning("inbox: failed to persist cursor to %s: %s", self.cursor_path, exc) + logger.warning("inbox: failed to persist cursor to %s: %s", path, exc) - def reset_cursor(self) -> None: + def reset_cursor(self, workspace_id: str = "") -> None: """Forget the cursor. Used after a 410 from the activity API.""" + path = self._path_for(workspace_id) with self._lock: - self._cursor = None - self._cursor_loaded = True + self._cursors[workspace_id] = None + self._cursors_loaded[workspace_id] = True + if path is None: + return try: - if self.cursor_path.is_file(): - self.cursor_path.unlink() + if path.is_file(): + path.unlink() except OSError as exc: - logger.warning("inbox: failed to delete cursor %s: %s", self.cursor_path, exc) + logger.warning("inbox: failed to delete cursor %s: %s", path, exc) def record(self, message: InboxMessage) -> None: """Append a message, wake any waiter, and fire the notification @@ -418,12 +468,25 @@ def _poll_once( Idempotent and stateless apart from the InboxState passed in — safe to call from tests with a stub state + a real httpx mock. + + ``workspace_id`` doubles as the cursor key on InboxState — pollers + for distinct workspaces get distinct cursors and don't trample each + other. For the single-workspace path the cursor key is the empty + string (per InboxState.__post_init__'s back-compat promotion of + ``cursor_path``). """ import httpx url = f"{platform_url}/workspaces/{workspace_id}/activity" + # Dual cursor key resolution: in single-workspace mode the cursor + # was historically stored under the "" key (back-compat). In + # multi-workspace mode each poller's cursor lives under its own + # workspace_id. Try the workspace-specific key first; if absent on + # this state, fall back to the legacy empty-string slot so existing + # InboxState-with-cursor_path-only constructors keep working. + cursor_key = workspace_id if workspace_id in state.cursor_paths else "" params: dict[str, str] = {"type": "a2a_receive"} - cursor = state.load_cursor() + cursor = state.load_cursor(cursor_key) if cursor: params["since_id"] = cursor else: @@ -444,7 +507,7 @@ def _poll_once( cursor, INITIAL_BACKLOG_SECONDS, ) - state.reset_cursor() + state.reset_cursor(cursor_key) return 0 if resp.status_code >= 400: @@ -499,12 +562,17 @@ def _poll_once( message = message_from_activity(row) if not message.activity_id: continue + # Tag the message with the workspace it arrived on so the agent + # (and tools like send_message_to_user) can route the reply to + # the right tenant. Empty-string in single-workspace mode keeps + # to_dict()'s output shape unchanged for back-compat consumers. + message.arrival_workspace_id = workspace_id if cursor_key else "" state.record(message) last_id = message.activity_id new_count += 1 if last_id is not None: - state.save_cursor(last_id) + state.save_cursor(last_id, cursor_key) return new_count @@ -517,15 +585,21 @@ def _poll_loop( ) -> None: """Daemon-thread body: poll forever until stop_event fires. - auth_headers() is rebuilt every iteration so a token rotation via - env var or .auth_token file is picked up without a restart. Cheap - (a dict + an env read). + auth_headers(workspace_id) is rebuilt every iteration so a token + rotation via env var, .auth_token file, or per-workspace registry + is picked up without a restart. Cheap (a dict + an env read). + + Multi-workspace pollers pass the workspace_id so the per-workspace + bearer token is selected from platform_auth's registry; single- + workspace pollers fall through to the legacy resolution path + (workspace_id arg is still passed but the registry lookup misses + and auth_headers falls back to the cached/file/env token). """ from platform_auth import auth_headers while True: try: - _poll_once(state, platform_url, workspace_id, auth_headers()) + _poll_once(state, platform_url, workspace_id, auth_headers(workspace_id)) except Exception as exc: # noqa: BLE001 logger.warning("inbox poller: iteration crashed: %s", exc) if stop_event is not None and stop_event.wait(interval): @@ -545,22 +619,42 @@ def start_poller_thread( daemon=True so the poller dies with the main process — same rationale as mcp_cli's heartbeat thread (no leaks, no stale workspace writes after the operator hits Ctrl-C). + + Thread name embeds the workspace_id (truncated) so a multi-workspace + operator running ``ps -eL`` or eyeballing ``threading.enumerate()`` + can tell which thread is which without reverse-engineering it from + crash tracebacks. """ + name = "molecule-mcp-inbox-poller" + if workspace_id: + name = f"{name}-{workspace_id[:8]}" t = threading.Thread( target=_poll_loop, args=(state, platform_url, workspace_id, interval), - name="molecule-mcp-inbox-poller", + name=name, daemon=True, ) t.start() return t -def default_cursor_path() -> Path: +def default_cursor_path(workspace_id: str = "") -> Path: """Standard cursor location: ``/.mcp_inbox_cursor``. Resolved via configs_dir so the cursor lives next to .auth_token + .platform_inbound_secret regardless of whether the runtime is in-container (/configs) or external (~/.molecule-workspace). + + Multi-workspace operators pass ``workspace_id`` to get a unique + cursor file per workspace (``.mcp_inbox_cursor_``) so + pollers don't trample each other's cursors. Single-workspace + operators omit the arg and keep the legacy filename — back-compat + with existing on-disk cursors. """ - return configs_dir.resolve() / ".mcp_inbox_cursor" + base = configs_dir.resolve() / ".mcp_inbox_cursor" + if workspace_id: + # 8-char prefix is enough to disambiguate two workspaces in the + # same operator's setup (UUID v4 first 32 bits ≈ 4 billion of + # entropy) without hash-bombing the filename. + return base.with_name(f".mcp_inbox_cursor_{workspace_id[:8]}") + return base diff --git a/workspace/mcp_cli.py b/workspace/mcp_cli.py index 1acb247a..ccae2d4a 100644 --- a/workspace/mcp_cli.py +++ b/workspace/mcp_cli.py @@ -34,6 +34,7 @@ own heartbeat loop in ``heartbeat.py`` so we don't double-heartbeat. """ from __future__ import annotations +import json import logging import os import sys @@ -345,6 +346,90 @@ def _start_heartbeat_thread( return t +def _resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]: + """Return the list of ``(workspace_id, token)`` pairs to register. + + Resolution order: + + 1. ``MOLECULE_WORKSPACES`` env var — JSON array of + ``{"id": "...", "token": "..."}`` objects. Activates the + multi-workspace external-agent path (one process registered into + N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN`` + are IGNORED — the JSON is the source of truth. + + 2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from + ``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``. + This is the pre-existing path; back-compat exact. + + Returns ``(workspaces, errors)``: + * ``workspaces``: list of ``(workspace_id, token)`` — non-empty + on the happy path. + * ``errors``: human-readable strings describing what's missing / + malformed. ``main()`` surfaces these with the same shape as + ``_print_missing_env_help`` so the operator's first run gives + actionable output. + + Why JSON env (not file): ergonomic for Claude Code MCP config (one + string in ``mcpServers.molecule.env`` instead of a sidecar file) + and for CI / launchers. A separate config-file path can be added + later without breaking this. + """ + raw = os.environ.get("MOLECULE_WORKSPACES", "").strip() + if raw: + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + return [], [ + f"MOLECULE_WORKSPACES is not valid JSON ({exc.msg} at pos " + f"{exc.pos}). Expected: '[{{\"id\":\"\",\"token\":" + f"\"\"}},{{...}}]'" + ] + if not isinstance(parsed, list) or not parsed: + return [], [ + "MOLECULE_WORKSPACES must be a non-empty JSON array of " + "{\"id\":\"...\",\"token\":\"...\"} objects" + ] + out: list[tuple[str, str]] = [] + seen: set[str] = set() + errors: list[str] = [] + for i, entry in enumerate(parsed): + if not isinstance(entry, dict): + errors.append( + f"MOLECULE_WORKSPACES[{i}] is not an object — got {type(entry).__name__}" + ) + continue + wsid = str(entry.get("id", "")).strip() + tok = str(entry.get("token", "")).strip() + if not wsid or not tok: + errors.append( + f"MOLECULE_WORKSPACES[{i}] missing 'id' or 'token'" + ) + continue + if wsid in seen: + errors.append( + f"MOLECULE_WORKSPACES[{i}] duplicate workspace id {wsid!r}" + ) + continue + seen.add(wsid) + out.append((wsid, tok)) + if errors: + return [], errors + return out, [] + + # Single-workspace back-compat path. + wsid = os.environ.get("WORKSPACE_ID", "").strip() + if not wsid: + return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"] + tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip() + if not tok: + tok = _read_token_file() + if not tok: + return [], [ + "MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required" + ] + return [(wsid, tok)], [] + + def _print_missing_env_help(missing: list[str], have_token_file: bool) -> None: print("molecule-mcp: missing required environment.\n", file=sys.stderr) print("Set the following before running molecule-mcp:", file=sys.stderr) @@ -369,37 +454,52 @@ def main() -> None: Returns nothing — calls ``sys.exit`` on validation failure or on normal completion of the underlying MCP server loop. - """ - missing: list[str] = [] - if not os.environ.get("WORKSPACE_ID", "").strip(): - missing.append("WORKSPACE_ID") - if not os.environ.get("PLATFORM_URL", "").strip(): - missing.append("PLATFORM_URL") - # Token can come from env OR file — only flag when both are absent. - # Mirrors platform_auth.get_token's resolution order (file-first, - # env-fallback). configs_dir.resolve() handles in-container vs - # external-runtime fallback so we don't probe a non-existent - # /configs on a laptop and falsely report no-token-file. - has_token_file = (configs_dir.resolve() / ".auth_token").is_file() - has_token_env = bool(os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()) - if not has_token_file and not has_token_env: - missing.append("MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token)") - if missing: - _print_missing_env_help(missing, have_token_file=has_token_file) + Two registration shapes: + * Single-workspace (legacy): ``WORKSPACE_ID`` + token env/file. + Unchanged behavior. + * Multi-workspace: ``MOLECULE_WORKSPACES`` JSON env var with N + ``{"id": ..., "token": ...}`` entries. One register + heartbeat + + inbox poller per entry; messages from any workspace land in + the same agent inbox tagged with ``arrival_workspace_id``. + """ + if not os.environ.get("PLATFORM_URL", "").strip(): + _print_missing_env_help( + ["PLATFORM_URL"], + have_token_file=(configs_dir.resolve() / ".auth_token").is_file(), + ) + sys.exit(2) + + workspaces, errors = _resolve_workspaces() + if errors or not workspaces: + # Reuse the missing-env help printer for legacy WORKSPACE_ID + + # token shape, which is what most first-run operators hit. For + # MOLECULE_WORKSPACES errors, print directly so the JSON-shape + # message isn't mangled into the WORKSPACE_ID-style help. + if os.environ.get("MOLECULE_WORKSPACES", "").strip(): + print("molecule-mcp: invalid MOLECULE_WORKSPACES:", file=sys.stderr) + for e in errors: + print(f" - {e}", file=sys.stderr) + else: + _print_missing_env_help( + errors or ["WORKSPACE_ID", "MOLECULE_WORKSPACE_TOKEN"], + have_token_file=(configs_dir.resolve() / ".auth_token").is_file(), + ) sys.exit(2) - # Resolve the effective token: env wins (operator override), then - # the on-disk file (in-container default). Mirrors - # platform_auth.get_token's resolution order so we don't - # double-implement. - token = ( - os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip() - or _read_token_file() - ) - workspace_id = os.environ["WORKSPACE_ID"].strip() platform_url = os.environ["PLATFORM_URL"].strip().rstrip("/") + # In multi-workspace mode the FIRST entry is treated as the + # "primary" — it gets exported to a2a_client.py's module-level + # WORKSPACE_ID (which gates a RuntimeError at import time) and is + # used by tools that don't yet take an explicit workspace_id. PR-2 + # parameterizes those tools; for now this preserves existing + # outbound-tool behavior unchanged for single-workspace operators + # AND for the multi-workspace operator's first registered + # workspace. + primary_workspace_id, _primary_token = workspaces[0] + os.environ["WORKSPACE_ID"] = primary_workspace_id + # Configure logging so the operator sees register/heartbeat status # without needing to set up logging themselves. WARNING by default # keeps the steady-state quiet (only failures); MOLECULE_MCP_VERBOSE=1 @@ -411,6 +511,18 @@ def main() -> None: ) logging.basicConfig(level=log_level, format="[molecule-mcp] %(message)s") + # Populate the per-workspace token registry so heartbeat threads, + # the inbox poller, and (later) outbound tools resolve the right + # token for each workspace via ``platform_auth.auth_headers(wsid)``. + # Done BEFORE register/heartbeat thread spawn so a thread that + # races to fire its first request always sees its token. + try: + from platform_auth import register_workspace_token + for wsid, tok in workspaces: + register_workspace_token(wsid, tok) + except ImportError: + pass + # Standalone-mode register + heartbeat. Skipped via env var so an # in-container caller (which has its own heartbeat loop) can reuse # this entry point without double-heartbeating. The wheel's main @@ -418,21 +530,23 @@ def main() -> None: # MOLECULE_MCP_DISABLE_HEARTBEAT escape hatch exists for tests + # the rare embedded use-case. if not os.environ.get("MOLECULE_MCP_DISABLE_HEARTBEAT", "").strip(): - _platform_register(platform_url, workspace_id, token) - _start_heartbeat_thread(platform_url, workspace_id, token) + for wsid, tok in workspaces: + _platform_register(platform_url, wsid, tok) + _start_heartbeat_thread(platform_url, wsid, tok) # Inbox poller — the inbound side of the standalone path. Without # this thread, the universal MCP server is OUTBOUND-ONLY: an agent # can call delegate_task / send_message_to_user but never observe - # canvas-user or peer-agent messages. The poller fills an in-memory - # queue from the platform's /activity?type=a2a_receive endpoint; - # the agent reads via wait_for_message / inbox_peek / inbox_pop. + # canvas-user or peer-agent messages. One poller per workspace; all + # of them write to the SAME shared inbox state so the agent's + # inbox_peek/pop/wait tools see a merged view (each message tagged + # with arrival_workspace_id so the agent can route the reply). # # Same disable pattern as heartbeat: in-container callers (with # push delivery via canvas WebSocket) skip this to avoid duplicate # delivery; tests use the env to keep imports cheap. if not os.environ.get("MOLECULE_MCP_DISABLE_INBOX", "").strip(): - _start_inbox_poller(platform_url, workspace_id) + _start_inbox_pollers(platform_url, [w[0] for w in workspaces]) # Env is valid — safe to import the heavy module now. Importing # earlier would trigger a2a_client.py:22's module-level RuntimeError @@ -441,8 +555,8 @@ def main() -> None: cli_main() -def _start_inbox_poller(platform_url: str, workspace_id: str) -> None: - """Activate the inbox singleton + spawn the poller daemon thread. +def _start_inbox_pollers(platform_url: str, workspace_ids: list[str]) -> None: + """Activate the inbox singleton + spawn one poller daemon thread per workspace. Done lazily here (not at module import) because importing inbox pulls in platform_auth, which only resolves cleanly AFTER env @@ -450,7 +564,17 @@ def _start_inbox_poller(platform_url: str, workspace_id: str) -> None: so a stray double-call (e.g. test harness re-entering main) is harmless. - The poller thread is daemon=True — dies with the main process. + The poller threads are daemon=True — die with the main process. + + Single-workspace path: one poller, single cursor file at the legacy + location (``.mcp_inbox_cursor``). Cursor-key resolution falls back + to the empty string for back-compat with operators whose existing + on-disk cursor was written by the pre-multi-workspace code. + + Multi-workspace path: N pollers, each with its own cursor file + keyed by ``workspace_id[:8]``. Cursors live next to each other in + configs_dir so an operator inspecting state sees all of them + together. """ try: import inbox @@ -458,9 +582,22 @@ def _start_inbox_poller(platform_url: str, workspace_id: str) -> None: logger.warning("molecule-mcp: inbox module unavailable: %s", exc) return - state = inbox.InboxState(cursor_path=inbox.default_cursor_path()) + if len(workspace_ids) <= 1: + # Back-compat exact: single-workspace mode reuses the legacy + # cursor filename + cursor_path constructor arg, so an existing + # operator's on-disk state isn't invalidated by upgrade. + wsid = workspace_ids[0] + state = inbox.InboxState(cursor_path=inbox.default_cursor_path()) + inbox.activate(state) + inbox.start_poller_thread(state, platform_url, wsid) + return + + # Multi-workspace: per-workspace cursor file, one shared queue. + cursor_paths = {wsid: inbox.default_cursor_path(wsid) for wsid in workspace_ids} + state = inbox.InboxState(cursor_paths=cursor_paths) inbox.activate(state) - inbox.start_poller_thread(state, platform_url, workspace_id) + for wsid in workspace_ids: + inbox.start_poller_thread(state, platform_url, wsid) def _read_token_file() -> str: diff --git a/workspace/platform_auth.py b/workspace/platform_auth.py index e6b3d789..17157428 100644 --- a/workspace/platform_auth.py +++ b/workspace/platform_auth.py @@ -22,6 +22,7 @@ from __future__ import annotations import logging import os +import threading from pathlib import Path import configs_dir @@ -33,6 +34,20 @@ logger = logging.getLogger(__name__) # is wasteful. The file is the durable copy; this var is the hot path. _cached_token: str | None = None +# Per-workspace token registry — populated by mcp_cli when the operator +# runs a multi-workspace external agent (MOLECULE_WORKSPACES env var). +# Keyed by workspace_id, value is the bearer token issued by that +# workspace's tenant. Distinct from `_cached_token` (which is the +# single-workspace path's token); the two coexist so single-workspace +# back-compat is preserved exactly. +# +# Lock guards mutations from the registration phase (one writer per +# workspace, but the writers run in main(), not in heartbeat threads). +# Reads are lock-free for the hot path; the dict is finalized before +# any heartbeat / poller thread starts. +_WORKSPACE_TOKENS: dict[str, str] = {} +_WORKSPACE_TOKENS_LOCK = threading.Lock() + def _token_file() -> Path: """Path to the on-disk token file. Resolved via configs_dir so @@ -111,7 +126,43 @@ def save_token(token: str) -> None: _cached_token = token -def auth_headers() -> dict[str, str]: +def register_workspace_token(workspace_id: str, token: str) -> None: + """Register a per-workspace bearer token in the multi-workspace registry. + + Called by ``mcp_cli`` once per entry in the ``MOLECULE_WORKSPACES`` + env var so per-workspace heartbeat / poller threads can resolve their + own auth via ``auth_headers(workspace_id=...)`` without each thread + closing over a token literal. + + Idempotent: re-registering the same workspace_id with the same token + is a no-op; with a different token it overwrites and logs at INFO + (the legitimate case is operator token rotation between restarts). + """ + workspace_id = (workspace_id or "").strip() + token = (token or "").strip() + if not workspace_id or not token: + return + with _WORKSPACE_TOKENS_LOCK: + prior = _WORKSPACE_TOKENS.get(workspace_id) + if prior == token: + return + if prior is not None: + logger.info( + "platform_auth: workspace_id %s token rotated", workspace_id, + ) + _WORKSPACE_TOKENS[workspace_id] = token + + +def get_workspace_token(workspace_id: str) -> str | None: + """Return the per-workspace token from the registry, or None. + + Lookup is lock-free: writes happen in main() before threads start, + reads are stable thereafter. + """ + return _WORKSPACE_TOKENS.get((workspace_id or "").strip()) + + +def auth_headers(workspace_id: str | None = None) -> dict[str, str]: """Return a header dict to merge into httpx calls. Empty if no token is available yet — callers send the request as-is and the platform's heartbeat handler grandfathers pre-token workspaces through until @@ -126,12 +177,28 @@ def auth_headers() -> dict[str, str]: Discovered while smoke-testing the molecule-mcp external-runtime path against a live tenant — every tool call returned "not found" because the WAF was eating them. + + Token resolution order: + 1. ``workspace_id`` arg → per-workspace registry + (multi-workspace external agent — set by mcp_cli) + 2. Single-workspace cache + .auth_token file + env var + (pre-existing path; back-compat unchanged) + + Single-workspace operators see no behavior change: ``auth_headers()`` + with no arg routes through the legacy resolution path exactly as + before. Multi-workspace operators pass ``workspace_id`` so each + thread (heartbeat, poller, send_message_to_user) authenticates + against the correct workspace. """ headers: dict[str, str] = {} platform_url = os.environ.get("PLATFORM_URL", "").strip() if platform_url: headers["Origin"] = platform_url - tok = get_token() + tok: str | None = None + if workspace_id: + tok = get_workspace_token(workspace_id) + if tok is None: + tok = get_token() if tok: headers["Authorization"] = f"Bearer {tok}" return headers @@ -162,6 +229,8 @@ def clear_cache() -> None: files between cases.""" global _cached_token _cached_token = None + with _WORKSPACE_TOKENS_LOCK: + _WORKSPACE_TOKENS.clear() def refresh_cache() -> str | None: diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py index 1c1de25b..6da1bb6c 100644 --- a/workspace/platform_tools/registry.py +++ b/workspace/platform_tools/registry.py @@ -295,6 +295,17 @@ _SEND_MESSAGE_TO_USER = ToolSpec( ), "items": {"type": "string"}, }, + "workspace_id": { + "type": "string", + "description": ( + "Optional. Set ONLY when this agent is registered in MULTIPLE " + "workspaces (external multi-workspace MCP path) — pass the " + "`arrival_workspace_id` of the inbound message you're replying " + "to so the user sees the reply in the same canvas they typed in. " + "Single-workspace agents omit this; the message routes to the " + "only registered workspace." + ), + }, }, "required": ["message"], }, diff --git a/workspace/tests/test_mcp_cli_multi_workspace.py b/workspace/tests/test_mcp_cli_multi_workspace.py new file mode 100644 index 00000000..fbef22df --- /dev/null +++ b/workspace/tests/test_mcp_cli_multi_workspace.py @@ -0,0 +1,335 @@ +"""Tests for mcp_cli's multi-workspace resolution + parallel +register/heartbeat/poller spawning. + +Single-workspace path is exhaustively covered in test_mcp_cli.py; this +file covers ONLY the new MOLECULE_WORKSPACES path so a regression that +breaks multi-workspace doesn't get hidden in a 1000-line test file. +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path +from unittest.mock import patch + +import pytest + +# Add workspace dir to path so `import mcp_cli` works regardless of pytest +# cwd. Mirrors the pattern in tests/conftest.py. +_THIS = Path(__file__).resolve() +sys.path.insert(0, str(_THIS.parent.parent)) + + +@pytest.fixture(autouse=True) +def _isolate_env(monkeypatch): + """Strip every env var the resolver looks at so each test starts clean. + + Tests set ONLY the vars they care about. Without this fixture an + unrelated test that exported MOLECULE_WORKSPACES would silently + influence the next test's outcome. + """ + for var in ( + "MOLECULE_WORKSPACES", + "WORKSPACE_ID", + "MOLECULE_WORKSPACE_TOKEN", + "PLATFORM_URL", + ): + monkeypatch.delenv(var, raising=False) + + +def _import_mcp_cli(): + # Late import so monkeypatch has scrubbed the env first. + import importlib + + import mcp_cli + + return importlib.reload(mcp_cli) + + +class TestResolveWorkspaces: + def test_multi_workspace_json_returns_pairs(self, monkeypatch): + monkeypatch.setenv( + "MOLECULE_WORKSPACES", + json.dumps([ + {"id": "ws-a", "token": "tok-a"}, + {"id": "ws-b", "token": "tok-b"}, + ]), + ) + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert errors == [] + assert out == [("ws-a", "tok-a"), ("ws-b", "tok-b")] + + def test_multi_workspace_ignores_legacy_env_vars(self, monkeypatch): + # When MOLECULE_WORKSPACES is set, WORKSPACE_ID + token env are + # ignored. This is the documented contract — JSON wins, no + # silent merging of two sources. + monkeypatch.setenv("WORKSPACE_ID", "should-be-ignored") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "should-be-ignored") + monkeypatch.setenv( + "MOLECULE_WORKSPACES", + json.dumps([{"id": "ws-only", "token": "tok-only"}]), + ) + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert errors == [] + assert out == [("ws-only", "tok-only")] + + def test_invalid_json_returns_error(self, monkeypatch): + monkeypatch.setenv("MOLECULE_WORKSPACES", "{not valid json") + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("not valid JSON" in e for e in errors) + + def test_non_array_returns_error(self, monkeypatch): + monkeypatch.setenv("MOLECULE_WORKSPACES", '{"id":"ws","token":"tok"}') + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("non-empty JSON array" in e for e in errors) + + def test_empty_array_returns_error(self, monkeypatch): + monkeypatch.setenv("MOLECULE_WORKSPACES", "[]") + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("non-empty JSON array" in e for e in errors) + + def test_missing_id_or_token_in_entry_returns_error(self, monkeypatch): + monkeypatch.setenv( + "MOLECULE_WORKSPACES", + json.dumps([{"id": "ws-a"}, {"token": "tok-only"}]), + ) + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert len(errors) >= 2 + assert any("[0] missing 'id' or 'token'" in e for e in errors) + assert any("[1] missing 'id' or 'token'" in e for e in errors) + + def test_duplicate_workspace_id_returns_error(self, monkeypatch): + # Two registrations with the same workspace_id is almost + # certainly an operator typo — heartbeat threads would race + # against each other. Reject it loudly. + monkeypatch.setenv( + "MOLECULE_WORKSPACES", + json.dumps([ + {"id": "ws-a", "token": "tok-1"}, + {"id": "ws-a", "token": "tok-2"}, + ]), + ) + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("duplicate workspace id" in e for e in errors) + + def test_legacy_single_workspace_via_env(self, monkeypatch): + monkeypatch.setenv("WORKSPACE_ID", "legacy-ws") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "legacy-tok") + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert errors == [] + assert out == [("legacy-ws", "legacy-tok")] + + def test_legacy_no_workspace_id_returns_error(self, monkeypatch): + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok") + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("WORKSPACE_ID" in e for e in errors) + + def test_legacy_no_token_returns_error(self, monkeypatch, tmp_path): + # Force configs_dir.resolve() to a clean dir so the .auth_token + # fallback finds nothing. + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + monkeypatch.setenv("WORKSPACE_ID", "ws") + mcp_cli = _import_mcp_cli() + out, errors = mcp_cli._resolve_workspaces() + assert out == [] + assert any("MOLECULE_WORKSPACE_TOKEN" in e for e in errors) + + +class TestPlatformAuthRegistry: + """The token registry is what wires per-workspace heartbeats / + pollers / send_message_to_user to the right tenant. If this dies, + all multi-workspace traffic 401s — guard tightly. + """ + + def setup_method(self): + # Each test runs against a clean registry — clear_cache also + # wipes the multi-workspace dict (see platform_auth changes). + import platform_auth + + platform_auth.clear_cache() + + def test_register_and_lookup(self): + import platform_auth + + platform_auth.register_workspace_token("ws-a", "tok-a") + platform_auth.register_workspace_token("ws-b", "tok-b") + assert platform_auth.get_workspace_token("ws-a") == "tok-a" + assert platform_auth.get_workspace_token("ws-b") == "tok-b" + assert platform_auth.get_workspace_token("ws-c") is None + + def test_auth_headers_routes_by_workspace(self, monkeypatch): + import platform_auth + + monkeypatch.setenv("PLATFORM_URL", "https://example.test") + platform_auth.register_workspace_token("ws-a", "tok-a") + platform_auth.register_workspace_token("ws-b", "tok-b") + + a = platform_auth.auth_headers("ws-a") + b = platform_auth.auth_headers("ws-b") + assert a["Authorization"] == "Bearer tok-a" + assert b["Authorization"] == "Bearer tok-b" + assert a["Origin"] == "https://example.test" + + def test_auth_headers_with_no_arg_uses_legacy_path(self, monkeypatch): + import platform_auth + + monkeypatch.setenv("PLATFORM_URL", "https://example.test") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "legacy-tok") + # Multi-workspace registry populated, but auth_headers() with + # no arg ignores it and uses the legacy resolution path. This + # is the back-compat invariant for single-workspace tools that + # haven't been updated yet to thread workspace_id through. + platform_auth.register_workspace_token("ws-a", "tok-a") + + h = platform_auth.auth_headers() + assert h["Authorization"] == "Bearer legacy-tok" + + def test_auth_headers_with_unknown_workspace_falls_back_to_legacy( + self, monkeypatch + ): + import platform_auth + + monkeypatch.setenv("PLATFORM_URL", "https://example.test") + monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "legacy-tok") + platform_auth.register_workspace_token("ws-a", "tok-a") + + # workspace_id arg points to a workspace NOT in the registry — + # auth_headers falls back to the legacy single-workspace token + # rather than 401-ing. Lets a single-workspace install accept + # workspace_id args without crashing. + h = platform_auth.auth_headers("ws-unknown") + assert h["Authorization"] == "Bearer legacy-tok" + + def test_register_idempotent_same_token(self): + import platform_auth + + platform_auth.register_workspace_token("ws-a", "tok-a") + platform_auth.register_workspace_token("ws-a", "tok-a") + assert platform_auth.get_workspace_token("ws-a") == "tok-a" + + def test_register_token_rotation(self): + import platform_auth + + platform_auth.register_workspace_token("ws-a", "tok-old") + platform_auth.register_workspace_token("ws-a", "tok-new") + assert platform_auth.get_workspace_token("ws-a") == "tok-new" + + def test_clear_cache_wipes_registry(self): + import platform_auth + + platform_auth.register_workspace_token("ws-a", "tok-a") + platform_auth.clear_cache() + assert platform_auth.get_workspace_token("ws-a") is None + + +class TestInboxStateMultiWorkspace: + def test_per_workspace_cursor(self, tmp_path): + import inbox + + path_a = tmp_path / ".cursor_a" + path_b = tmp_path / ".cursor_b" + state = inbox.InboxState(cursor_paths={"ws-a": path_a, "ws-b": path_b}) + + state.save_cursor("activity-1", workspace_id="ws-a") + state.save_cursor("activity-2", workspace_id="ws-b") + + assert path_a.read_text() == "activity-1" + assert path_b.read_text() == "activity-2" + assert state.load_cursor("ws-a") == "activity-1" + assert state.load_cursor("ws-b") == "activity-2" + + def test_reset_only_targeted_workspace(self, tmp_path): + import inbox + + path_a = tmp_path / ".cursor_a" + path_b = tmp_path / ".cursor_b" + state = inbox.InboxState(cursor_paths={"ws-a": path_a, "ws-b": path_b}) + state.save_cursor("a-1", workspace_id="ws-a") + state.save_cursor("b-1", workspace_id="ws-b") + + state.reset_cursor(workspace_id="ws-a") + + assert not path_a.exists() + assert path_b.read_text() == "b-1" + assert state.load_cursor("ws-a") is None + assert state.load_cursor("ws-b") == "b-1" + + def test_back_compat_single_workspace_cursor_path(self, tmp_path): + # Single-workspace constructor (positional cursor_path=) still + # works exactly as before. Cursor key is the empty string. + import inbox + + path = tmp_path / ".legacy_cursor" + state = inbox.InboxState(cursor_path=path) + state.save_cursor("act-1") # no workspace_id arg + assert path.read_text() == "act-1" + assert state.load_cursor() == "act-1" + + def test_arrival_workspace_id_in_message_to_dict(self): + import inbox + + m = inbox.InboxMessage( + activity_id="a1", + text="hi", + peer_id="", + method="message/send", + created_at="2026-05-04T15:00:00Z", + arrival_workspace_id="ws-personal", + ) + d = m.to_dict() + assert d["arrival_workspace_id"] == "ws-personal" + + def test_arrival_workspace_id_omitted_when_empty(self): + # Single-workspace consumers shouldn't see the new key in their + # output — back-compat exact. + import inbox + + m = inbox.InboxMessage( + activity_id="a1", + text="hi", + peer_id="", + method="message/send", + created_at="2026-05-04T15:00:00Z", + ) + d = m.to_dict() + assert "arrival_workspace_id" not in d + + +class TestDefaultCursorPathPerWorkspace: + def test_with_workspace_id_returns_namespaced_path(self, monkeypatch, tmp_path): + # configs_dir.resolve() reads CONFIGS_DIR env; pin it so the + # test doesn't depend on the operator's home dir. + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + import inbox + + p_a = inbox.default_cursor_path("ws-aaaa11112222") + p_b = inbox.default_cursor_path("ws-bbbb33334444") + assert p_a != p_b + # Names should disambiguate by 8-char prefix. + assert "ws-aaaa1" in p_a.name + assert "ws-bbbb3" in p_b.name + + def test_no_workspace_id_returns_legacy_filename(self, monkeypatch, tmp_path): + monkeypatch.setenv("CONFIGS_DIR", str(tmp_path)) + import inbox + + # Legacy single-workspace operators must keep their existing on-disk + # cursor — the filename is `.mcp_inbox_cursor` (no suffix). + p = inbox.default_cursor_path() + assert p.name == ".mcp_inbox_cursor"