From 700d44ec3dce6b304531932212ba892460738dc7 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 14:17:58 -0700 Subject: [PATCH] feat(mcp): multi-workspace routing for memory + chat_history + workspace_info MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-3 of the multi-workspace MCP rollout. PR-1 made the MCP server itself multi-workspace aware (one process, N workspace memberships). PR-2 added source_workspace_id threading to delegate_task / list_peers. This change closes the remaining workspace-scoped tools so a single agent registered into multiple workspaces no longer leaks memories or chat history across tenants. Tools now accepting `source_workspace_id`: - tool_commit_memory(content, scope, source_workspace_id=None) — routes POST to /workspaces/{src}/memories with the source workspace's Bearer token. Body still embeds source_workspace_id for the platform's audit + namespace-isolation enforcement. - tool_recall_memory(query, scope, source_workspace_id=None) — GET /workspaces/{src}/memories with the source workspace's token and ?workspace_id={src} query so the platform scopes the read to the caller's tenant view (PR-1 / multi-workspace mode). - tool_chat_history(peer_id, limit, before_ts, source_workspace_id=None) — auto-routes via the _peer_to_source cache populated by list_peers, with explicit override winning. Falls back to module-level WORKSPACE_ID if neither is available. URL: /workspaces/{src}/chat-history. - tool_get_workspace_info(source_workspace_id=None) — GET /workspaces/{src} with the source workspace's token. Useful for introspecting any workspace the agent is registered into, not just the primary. In every path, `src = source_workspace_id or WORKSPACE_ID`, so single-workspace operators see no behavior change. Tokens are resolved per-workspace via auth_headers(src) / _auth_headers_for_heartbeat(src), which fall through to the legacy AUTH_TOKEN env when not in multi-workspace mode. Also updates input_schemas in platform_tools/registry.py so the new optional parameter is advertised to LLM clients (claude-code, hermes-agent, langchain wrappers). Tests (4 new classes in test_a2a_multi_workspace.py, 21 new tests): - TestCommitMemorySourceRouting — URL + Authorization header per source - TestRecallMemorySourceRouting — URL + query param + Authorization - TestChatHistorySourceRouting — peer-cache auto-route + explicit override - TestGetWorkspaceInfoSourceRouting — URL + Authorization Inbox tools (peek/pop/wait_for_message) already multi-workspace aware since PR-1 — inbox.py spawns per-workspace pollers and tags every InboxMessage with arrival_workspace_id. No further plumbing needed. Suite: 1700 passed, 3 skipped, 2 xfailed. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 14 +- workspace/a2a_tools.py | 65 ++++-- workspace/platform_tools/registry.py | 42 +++- workspace/tests/test_a2a_multi_workspace.py | 217 ++++++++++++++++++++ 4 files changed, 319 insertions(+), 19 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 4d1c5c7a..3dbf33fa 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -491,20 +491,26 @@ async def get_peers() -> list[dict]: return peers -async def get_workspace_info() -> dict: +async def get_workspace_info(source_workspace_id: str | None = None) -> dict: """Get this workspace's info from the platform. + ``source_workspace_id`` selects which registered workspace to + introspect when the agent is registered into multiple workspaces + (multi-workspace mode). Unset → defaults to the module-level + WORKSPACE_ID — single-workspace operators see no behaviour change. + Distinguishes three failure shapes so callers can handle them distinctly (#2429): - 410 Gone → workspace was deleted; re-onboard required - 404 / other → workspace never existed (or transient) - exception → network / auth failure """ + src = source_workspace_id or WORKSPACE_ID async with httpx.AsyncClient(timeout=10.0) as client: try: resp = await client.get( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}", - headers=auth_headers(), + f"{PLATFORM_URL}/workspaces/{src}", + headers=auth_headers(src), ) if resp.status_code == 200: return resp.json() @@ -521,7 +527,7 @@ async def get_workspace_info() -> dict: body = {} return { "error": "removed", - "id": body.get("id", WORKSPACE_ID), + "id": body.get("id", src), "removed_at": body.get("removed_at"), "hint": body.get( "hint", diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 296bcc72..922a45e0 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -545,19 +545,34 @@ async def tool_list_peers(source_workspace_id: str | None = None) -> str: return "\n".join(lines) -async def tool_get_workspace_info() -> str: - """Get this workspace's own info.""" - info = await get_workspace_info() +async def tool_get_workspace_info(source_workspace_id: str | None = None) -> str: + """Get this workspace's own info. + + ``source_workspace_id`` selects which registered workspace to + introspect when the agent is registered into multiple workspaces. + Unset → falls back to module-level WORKSPACE_ID. + """ + info = await get_workspace_info(source_workspace_id=source_workspace_id) return json.dumps(info, indent=2) -async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str: +async def tool_commit_memory( + content: str, + scope: str = "LOCAL", + source_workspace_id: str | None = None, +) -> str: """Save important information to persistent memory. GLOBAL scope is writable only by root workspaces (tier == 0). RBAC memory.write permission is required for all scope levels. The source workspace_id is embedded in every record so the platform can enforce cross-workspace isolation and audit trail. + + ``source_workspace_id`` selects which registered workspace this + memory belongs to when the agent is registered into multiple + workspaces (PR-1 / multi-workspace mode). When unset, falls back + to the module-level WORKSPACE_ID — single-workspace operators see + no behaviour change. """ if not content: return "Error: content is required" @@ -581,18 +596,19 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str: "Non-root workspaces may use LOCAL or TEAM scope." ) + src = source_workspace_id or WORKSPACE_ID try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories", + f"{PLATFORM_URL}/workspaces/{src}/memories", json={ "content": content, "scope": scope, # Embed source workspace so the platform can namespace-isolate # and audit cross-workspace writes (GH#1610 fix). - "workspace_id": WORKSPACE_ID, + "workspace_id": src, }, - headers=_auth_headers_for_heartbeat(), + headers=_auth_headers_for_heartbeat(src), ) data = resp.json() if resp.status_code in (200, 201): @@ -602,13 +618,21 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str: return f"Error saving memory: {e}" -async def tool_recall_memory(query: str = "", scope: str = "") -> str: +async def tool_recall_memory( + query: str = "", + scope: str = "", + source_workspace_id: str | None = None, +) -> str: """Search persistent memory for previously saved information. RBAC memory.read permission is required (mirrors builtin_tools/memory.py). The workspace_id is sent as a query parameter so the platform can cross-validate it against the auth token and defend against any future path traversal / cross-tenant read bugs in the platform itself. + + ``source_workspace_id`` selects which registered workspace's memories + to search when the agent is registered into multiple workspaces. + Unset → defaults to the module-level WORKSPACE_ID. """ # RBAC: require memory.read permission (mirrors builtin_tools/memory.py) if not _check_memory_read_permission(): @@ -617,7 +641,8 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str: "permission for this operation." ) - params: dict[str, str] = {"workspace_id": WORKSPACE_ID} + src = source_workspace_id or WORKSPACE_ID + params: dict[str, str] = {"workspace_id": src} if query: params["q"] = query if scope: @@ -625,9 +650,9 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str: try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories", + f"{PLATFORM_URL}/workspaces/{src}/memories", params=params, - headers=_auth_headers_for_heartbeat(), + headers=_auth_headers_for_heartbeat(src), ) data = resp.json() if isinstance(data, list): @@ -664,7 +689,12 @@ _INBOX_NOT_ENABLED_MSG = ( ) -async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") -> str: +async def tool_chat_history( + peer_id: str, + limit: int = 20, + before_ts: str = "", + source_workspace_id: str | None = None, +) -> str: """Fetch the prior conversation with one peer. Hits ``/workspaces//activity?peer_id=&limit=`` @@ -686,6 +716,11 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") histories — pass the oldest ``ts`` from the previous response. Empty (default) returns the most recent ``limit`` rows. + source_workspace_id: Which registered workspace's activity log + to query. Auto-routes via ``_peer_to_source`` cache when + unset (the workspace this peer was discovered through); + falls back to module-level WORKSPACE_ID for single-workspace + operators. Returns a JSON-encoded list of activity rows (or an error string starting with ``Error:`` so the agent can branch). Each row carries @@ -701,6 +736,8 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") if limit > 500: limit = 500 + src = source_workspace_id or _peer_to_source.get(peer_id) or WORKSPACE_ID + params: dict[str, str] = { "peer_id": peer_id, "limit": str(limit), @@ -713,9 +750,9 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity", + f"{PLATFORM_URL}/workspaces/{src}/activity", params=params, - headers=_auth_headers_for_heartbeat(), + headers=_auth_headers_for_heartbeat(src), ) except Exception as exc: # noqa: BLE001 return f"Error: chat_history request failed: {exc}" diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py index d026b3c5..f4fa773e 100644 --- a/workspace/platform_tools/registry.py +++ b/workspace/platform_tools/registry.py @@ -271,7 +271,19 @@ _GET_WORKSPACE_INFO = ToolSpec( "back to the user, or to determine whether you're a tier-0 " "root that can write GLOBAL memory)." ), - input_schema={"type": "object", "properties": {}}, + input_schema={ + "type": "object", + "properties": { + "source_workspace_id": { + "type": "string", + "description": ( + "Optional. In multi-workspace mode (this agent registered " + "in N workspaces), introspect the named workspace instead " + "of the primary one. Single-workspace agents omit this." + ), + }, + }, + }, impl=tool_get_workspace_info, section=A2A_SECTION, ) @@ -455,6 +467,14 @@ _CHAT_HISTORY = ToolSpec( "Use the oldest `created_at` from a previous response." ), }, + "source_workspace_id": { + "type": "string", + "description": ( + "Optional. Multi-workspace mode: query the named " + "workspace's activity log instead of the primary one. " + "Auto-routes via the peer-discovery cache when unset." + ), + }, }, "required": ["peer_id"], }, @@ -515,6 +535,16 @@ _COMMIT_MEMORY = ToolSpec( "enum": ["LOCAL", "TEAM", "GLOBAL"], "description": "Memory scope (default LOCAL).", }, + "source_workspace_id": { + "type": "string", + "description": ( + "Optional. Multi-workspace mode: commit the memory " + "into the named workspace's namespace instead of " + "the primary one. Pair with the inbound message's " + "`arrival_workspace_id` so memories stay in the " + "tenant they were derived from." + ), + }, }, "required": ["content"], }, @@ -544,6 +574,16 @@ _RECALL_MEMORY = ToolSpec( "enum": ["LOCAL", "TEAM", "GLOBAL", ""], "description": "Filter by scope (empty = all accessible).", }, + "source_workspace_id": { + "type": "string", + "description": ( + "Optional. Multi-workspace mode: search the named " + "workspace's memories instead of the primary one. " + "Pair with the inbound message's " + "`arrival_workspace_id` to recall context for the " + "right tenant." + ), + }, }, }, impl=tool_recall_memory, diff --git a/workspace/tests/test_a2a_multi_workspace.py b/workspace/tests/test_a2a_multi_workspace.py index 5c6ecd56..84f929e6 100644 --- a/workspace/tests/test_a2a_multi_workspace.py +++ b/workspace/tests/test_a2a_multi_workspace.py @@ -426,3 +426,220 @@ class TestListRegisteredWorkspaces: platform_auth.register_workspace_token("ws-1", "tok-1") platform_auth.clear_cache() assert platform_auth.list_registered_workspaces() == [] + + +# --------------------------------------------------------------------------- +# Memory tools — commit/recall must namespace under source_workspace_id +# so an agent serving multiple tenants doesn't bleed memories across +# them. Single-workspace path (no source arg) keeps using WORKSPACE_ID. +# --------------------------------------------------------------------------- + + +class TestCommitMemorySourceRouting: + @pytest.mark.asyncio + async def test_url_and_auth_use_source_workspace_id(self, monkeypatch): + """commit_memory(source_workspace_id=X) must POST to /workspaces/X/ + with X's bearer token — otherwise a multi-tenant agent could + write into the wrong tenant's memory namespace.""" + import platform_auth, a2a_tools + + platform_auth.register_workspace_token("ffff6666-ffff-ffff-ffff-ffffffffffff", "token-F") + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return {"id": "mem-1"} + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def post(self, url, headers, json): + captured["url"] = url + captured["headers"] = headers + captured["body"] = json + return _Resp() + + monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client()) + + result = await a2a_tools.tool_commit_memory( + "remember this", + source_workspace_id="ffff6666-ffff-ffff-ffff-ffffffffffff", + ) + + assert "/workspaces/ffff6666-ffff-ffff-ffff-ffffffffffff/memories" in captured["url"] + assert captured["headers"]["Authorization"] == "Bearer token-F" + assert captured["body"]["workspace_id"] == "ffff6666-ffff-ffff-ffff-ffffffffffff" + import json as _json + assert _json.loads(result)["success"] is True + + @pytest.mark.asyncio + async def test_falls_back_to_module_workspace_id(self, monkeypatch): + """Without source_workspace_id, single-workspace operators keep + the legacy WORKSPACE_ID-based POST — no behavior change.""" + import a2a_client, a2a_tools + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return {"id": "mem-1"} + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def post(self, url, headers, json): + captured["url"] = url + return _Resp() + + monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client()) + + await a2a_tools.tool_commit_memory("remember this") + assert f"/workspaces/{a2a_client.WORKSPACE_ID}/memories" in captured["url"] + + +class TestRecallMemorySourceRouting: + @pytest.mark.asyncio + async def test_url_params_and_auth_use_source(self, monkeypatch): + """recall_memory routes the GET, the workspace_id query param, + and the auth header through source_workspace_id.""" + import platform_auth, a2a_tools + + platform_auth.register_workspace_token("aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa", "token-G") + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return [] + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def get(self, url, params, headers): + captured["url"] = url + captured["params"] = params + captured["headers"] = headers + return _Resp() + + monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client()) + + await a2a_tools.tool_recall_memory( + query="x", + source_workspace_id="aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + ) + + assert "/workspaces/aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa/memories" in captured["url"] + assert captured["params"]["workspace_id"] == "aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa" + assert captured["headers"]["Authorization"] == "Bearer token-G" + + +# --------------------------------------------------------------------------- +# chat_history — auto-routes via the peer→source cache so an inbound +# peer_agent push from workspace X sees its history queried against X. +# --------------------------------------------------------------------------- + + +class TestChatHistorySourceRouting: + @pytest.mark.asyncio + async def test_auto_routes_via_peer_cache(self, monkeypatch): + """chat_history(peer_id) without an explicit source falls back to + ``_peer_to_source[peer_id]`` — same auto-routing as delegate_task, + so the agent doesn't have to remember which workspace surfaced + each peer.""" + import platform_auth, a2a_client, a2a_tools + + platform_auth.register_workspace_token("bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb", "token-H") + peer_id = "1111aaaa-1111-1111-1111-111111111111" + a2a_client._peer_to_source[peer_id] = "bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return [] + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def get(self, url, params, headers): + captured["url"] = url + captured["headers"] = headers + return _Resp() + + monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client()) + + await a2a_tools.tool_chat_history(peer_id, limit=5) + + assert "/workspaces/bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb/activity" in captured["url"] + assert captured["headers"]["Authorization"] == "Bearer token-H" + + @pytest.mark.asyncio + async def test_explicit_source_beats_cache(self, monkeypatch): + import platform_auth, a2a_client, a2a_tools + + platform_auth.register_workspace_token("cccc9999-cccc-cccc-cccc-cccccccccccc", "token-I") + peer_id = "1111aaaa-1111-1111-1111-111111111111" + a2a_client._peer_to_source[peer_id] = "should-not-be-used" + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return [] + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def get(self, url, params, headers): + captured["url"] = url + return _Resp() + + monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client()) + + await a2a_tools.tool_chat_history( + peer_id, source_workspace_id="cccc9999-cccc-cccc-cccc-cccccccccccc", + ) + assert "/workspaces/cccc9999-cccc-cccc-cccc-cccccccccccc/activity" in captured["url"] + + +# --------------------------------------------------------------------------- +# get_workspace_info — multi-workspace introspection. +# --------------------------------------------------------------------------- + + +class TestGetWorkspaceInfoSourceRouting: + @pytest.mark.asyncio + async def test_introspects_named_workspace(self, monkeypatch): + import platform_auth, a2a_client + + platform_auth.register_workspace_token("dddd0000-dddd-dddd-dddd-dddddddddddd", "token-J") + + captured: dict = {} + + class _Resp: + status_code = 200 + def json(self): + return {"id": "dddd0000-dddd-dddd-dddd-dddddddddddd", "name": "wsJ"} + + class _Client: + async def __aenter__(self): return self + async def __aexit__(self, *a): return None + async def get(self, url, headers): + captured["url"] = url + captured["headers"] = headers + return _Resp() + + monkeypatch.setattr(a2a_client.httpx, "AsyncClient", lambda timeout: _Client()) + + info = await a2a_client.get_workspace_info( + source_workspace_id="dddd0000-dddd-dddd-dddd-dddddddddddd", + ) + assert info["id"] == "dddd0000-dddd-dddd-dddd-dddddddddddd" + assert "/workspaces/dddd0000-dddd-dddd-dddd-dddddddddddd" in captured["url"] + assert captured["headers"]["Authorization"] == "Bearer token-J"