feat(mcp): multi-workspace routing for memory + chat_history + workspace_info

PR-3 of the multi-workspace MCP rollout. PR-1 made the MCP server itself
multi-workspace aware (one process, N workspace memberships). PR-2 added
source_workspace_id threading to delegate_task / list_peers. This change
closes the remaining workspace-scoped tools so a single agent registered
into multiple workspaces no longer leaks memories or chat history across
tenants.

Tools now accepting `source_workspace_id`:

- tool_commit_memory(content, scope, source_workspace_id=None) —
  routes POST to /workspaces/{src}/memories with the source workspace's
  Bearer token. Body still embeds source_workspace_id for the platform's
  audit + namespace-isolation enforcement.

- tool_recall_memory(query, scope, source_workspace_id=None) —
  GET /workspaces/{src}/memories with the source workspace's token and
  ?workspace_id={src} query so the platform scopes the read to the
  caller's tenant view (PR-1 / multi-workspace mode).

- tool_chat_history(peer_id, limit, before_ts, source_workspace_id=None)
  — auto-routes via the _peer_to_source cache populated by list_peers,
  with explicit override winning. Falls back to module-level WORKSPACE_ID
  if neither is available. URL: /workspaces/{src}/chat-history.

- tool_get_workspace_info(source_workspace_id=None) — GET /workspaces/{src}
  with the source workspace's token. Useful for introspecting any
  workspace the agent is registered into, not just the primary.

In every path, `src = source_workspace_id or WORKSPACE_ID`, so
single-workspace operators see no behavior change. Tokens are resolved
per-workspace via auth_headers(src) / _auth_headers_for_heartbeat(src),
which fall through to the legacy AUTH_TOKEN env when not in
multi-workspace mode.

Also updates input_schemas in platform_tools/registry.py so the new
optional parameter is advertised to LLM clients (claude-code,
hermes-agent, langchain wrappers).

Tests (4 new classes in test_a2a_multi_workspace.py, 21 new tests):
- TestCommitMemorySourceRouting — URL + Authorization header per source
- TestRecallMemorySourceRouting — URL + query param + Authorization
- TestChatHistorySourceRouting — peer-cache auto-route + explicit override
- TestGetWorkspaceInfoSourceRouting — URL + Authorization

Inbox tools (peek/pop/wait_for_message) already multi-workspace aware
since PR-1 — inbox.py spawns per-workspace pollers and tags every
InboxMessage with arrival_workspace_id. No further plumbing needed.

Suite: 1700 passed, 3 skipped, 2 xfailed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-04 14:17:58 -07:00
parent f42feb4ed7
commit 700d44ec3d
4 changed files with 319 additions and 19 deletions

View File

@ -491,20 +491,26 @@ async def get_peers() -> list[dict]:
return peers
async def get_workspace_info() -> dict:
async def get_workspace_info(source_workspace_id: str | None = None) -> dict:
"""Get this workspace's info from the platform.
``source_workspace_id`` selects which registered workspace to
introspect when the agent is registered into multiple workspaces
(multi-workspace mode). Unset defaults to the module-level
WORKSPACE_ID single-workspace operators see no behaviour change.
Distinguishes three failure shapes so callers can handle them
distinctly (#2429):
- 410 Gone workspace was deleted; re-onboard required
- 404 / other workspace never existed (or transient)
- exception network / auth failure
"""
src = source_workspace_id or WORKSPACE_ID
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}",
headers=auth_headers(),
f"{PLATFORM_URL}/workspaces/{src}",
headers=auth_headers(src),
)
if resp.status_code == 200:
return resp.json()
@ -521,7 +527,7 @@ async def get_workspace_info() -> dict:
body = {}
return {
"error": "removed",
"id": body.get("id", WORKSPACE_ID),
"id": body.get("id", src),
"removed_at": body.get("removed_at"),
"hint": body.get(
"hint",

View File

@ -545,19 +545,34 @@ async def tool_list_peers(source_workspace_id: str | None = None) -> str:
return "\n".join(lines)
async def tool_get_workspace_info() -> str:
"""Get this workspace's own info."""
info = await get_workspace_info()
async def tool_get_workspace_info(source_workspace_id: str | None = None) -> str:
"""Get this workspace's own info.
``source_workspace_id`` selects which registered workspace to
introspect when the agent is registered into multiple workspaces.
Unset falls back to module-level WORKSPACE_ID.
"""
info = await get_workspace_info(source_workspace_id=source_workspace_id)
return json.dumps(info, indent=2)
async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
async def tool_commit_memory(
content: str,
scope: str = "LOCAL",
source_workspace_id: str | None = None,
) -> str:
"""Save important information to persistent memory.
GLOBAL scope is writable only by root workspaces (tier == 0).
RBAC memory.write permission is required for all scope levels.
The source workspace_id is embedded in every record so the platform
can enforce cross-workspace isolation and audit trail.
``source_workspace_id`` selects which registered workspace this
memory belongs to when the agent is registered into multiple
workspaces (PR-1 / multi-workspace mode). When unset, falls back
to the module-level WORKSPACE_ID single-workspace operators see
no behaviour change.
"""
if not content:
return "Error: content is required"
@ -581,18 +596,19 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
"Non-root workspaces may use LOCAL or TEAM scope."
)
src = source_workspace_id or WORKSPACE_ID
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{src}/memories",
json={
"content": content,
"scope": scope,
# Embed source workspace so the platform can namespace-isolate
# and audit cross-workspace writes (GH#1610 fix).
"workspace_id": WORKSPACE_ID,
"workspace_id": src,
},
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
data = resp.json()
if resp.status_code in (200, 201):
@ -602,13 +618,21 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
return f"Error saving memory: {e}"
async def tool_recall_memory(query: str = "", scope: str = "") -> str:
async def tool_recall_memory(
query: str = "",
scope: str = "",
source_workspace_id: str | None = None,
) -> str:
"""Search persistent memory for previously saved information.
RBAC memory.read permission is required (mirrors builtin_tools/memory.py).
The workspace_id is sent as a query parameter so the platform can
cross-validate it against the auth token and defend against any future
path traversal / cross-tenant read bugs in the platform itself.
``source_workspace_id`` selects which registered workspace's memories
to search when the agent is registered into multiple workspaces.
Unset defaults to the module-level WORKSPACE_ID.
"""
# RBAC: require memory.read permission (mirrors builtin_tools/memory.py)
if not _check_memory_read_permission():
@ -617,7 +641,8 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
"permission for this operation."
)
params: dict[str, str] = {"workspace_id": WORKSPACE_ID}
src = source_workspace_id or WORKSPACE_ID
params: dict[str, str] = {"workspace_id": src}
if query:
params["q"] = query
if scope:
@ -625,9 +650,9 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{src}/memories",
params=params,
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
data = resp.json()
if isinstance(data, list):
@ -664,7 +689,12 @@ _INBOX_NOT_ENABLED_MSG = (
)
async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") -> str:
async def tool_chat_history(
peer_id: str,
limit: int = 20,
before_ts: str = "",
source_workspace_id: str | None = None,
) -> str:
"""Fetch the prior conversation with one peer.
Hits ``/workspaces/<self>/activity?peer_id=<peer>&limit=<N>``
@ -686,6 +716,11 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
histories pass the oldest ``ts`` from the previous
response. Empty (default) returns the most recent ``limit``
rows.
source_workspace_id: Which registered workspace's activity log
to query. Auto-routes via ``_peer_to_source`` cache when
unset (the workspace this peer was discovered through);
falls back to module-level WORKSPACE_ID for single-workspace
operators.
Returns a JSON-encoded list of activity rows (or an error string
starting with ``Error:`` so the agent can branch). Each row carries
@ -701,6 +736,8 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
if limit > 500:
limit = 500
src = source_workspace_id or _peer_to_source.get(peer_id) or WORKSPACE_ID
params: dict[str, str] = {
"peer_id": peer_id,
"limit": str(limit),
@ -713,9 +750,9 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
f"{PLATFORM_URL}/workspaces/{src}/activity",
params=params,
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
except Exception as exc: # noqa: BLE001
return f"Error: chat_history request failed: {exc}"

View File

@ -271,7 +271,19 @@ _GET_WORKSPACE_INFO = ToolSpec(
"back to the user, or to determine whether you're a tier-0 "
"root that can write GLOBAL memory)."
),
input_schema={"type": "object", "properties": {}},
input_schema={
"type": "object",
"properties": {
"source_workspace_id": {
"type": "string",
"description": (
"Optional. In multi-workspace mode (this agent registered "
"in N workspaces), introspect the named workspace instead "
"of the primary one. Single-workspace agents omit this."
),
},
},
},
impl=tool_get_workspace_info,
section=A2A_SECTION,
)
@ -455,6 +467,14 @@ _CHAT_HISTORY = ToolSpec(
"Use the oldest `created_at` from a previous response."
),
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: query the named "
"workspace's activity log instead of the primary one. "
"Auto-routes via the peer-discovery cache when unset."
),
},
},
"required": ["peer_id"],
},
@ -515,6 +535,16 @@ _COMMIT_MEMORY = ToolSpec(
"enum": ["LOCAL", "TEAM", "GLOBAL"],
"description": "Memory scope (default LOCAL).",
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: commit the memory "
"into the named workspace's namespace instead of "
"the primary one. Pair with the inbound message's "
"`arrival_workspace_id` so memories stay in the "
"tenant they were derived from."
),
},
},
"required": ["content"],
},
@ -544,6 +574,16 @@ _RECALL_MEMORY = ToolSpec(
"enum": ["LOCAL", "TEAM", "GLOBAL", ""],
"description": "Filter by scope (empty = all accessible).",
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: search the named "
"workspace's memories instead of the primary one. "
"Pair with the inbound message's "
"`arrival_workspace_id` to recall context for the "
"right tenant."
),
},
},
},
impl=tool_recall_memory,

View File

@ -426,3 +426,220 @@ class TestListRegisteredWorkspaces:
platform_auth.register_workspace_token("ws-1", "tok-1")
platform_auth.clear_cache()
assert platform_auth.list_registered_workspaces() == []
# ---------------------------------------------------------------------------
# Memory tools — commit/recall must namespace under source_workspace_id
# so an agent serving multiple tenants doesn't bleed memories across
# them. Single-workspace path (no source arg) keeps using WORKSPACE_ID.
# ---------------------------------------------------------------------------
class TestCommitMemorySourceRouting:
@pytest.mark.asyncio
async def test_url_and_auth_use_source_workspace_id(self, monkeypatch):
"""commit_memory(source_workspace_id=X) must POST to /workspaces/X/
with X's bearer token — otherwise a multi-tenant agent could
write into the wrong tenant's memory namespace."""
import platform_auth, a2a_tools
platform_auth.register_workspace_token("ffff6666-ffff-ffff-ffff-ffffffffffff", "token-F")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "mem-1"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def post(self, url, headers, json):
captured["url"] = url
captured["headers"] = headers
captured["body"] = json
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
result = await a2a_tools.tool_commit_memory(
"remember this",
source_workspace_id="ffff6666-ffff-ffff-ffff-ffffffffffff",
)
assert "/workspaces/ffff6666-ffff-ffff-ffff-ffffffffffff/memories" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-F"
assert captured["body"]["workspace_id"] == "ffff6666-ffff-ffff-ffff-ffffffffffff"
import json as _json
assert _json.loads(result)["success"] is True
@pytest.mark.asyncio
async def test_falls_back_to_module_workspace_id(self, monkeypatch):
"""Without source_workspace_id, single-workspace operators keep
the legacy WORKSPACE_ID-based POST no behavior change."""
import a2a_client, a2a_tools
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "mem-1"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def post(self, url, headers, json):
captured["url"] = url
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_commit_memory("remember this")
assert f"/workspaces/{a2a_client.WORKSPACE_ID}/memories" in captured["url"]
class TestRecallMemorySourceRouting:
@pytest.mark.asyncio
async def test_url_params_and_auth_use_source(self, monkeypatch):
"""recall_memory routes the GET, the workspace_id query param,
and the auth header through source_workspace_id."""
import platform_auth, a2a_tools
platform_auth.register_workspace_token("aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa", "token-G")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
captured["params"] = params
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_recall_memory(
query="x",
source_workspace_id="aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
)
assert "/workspaces/aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa/memories" in captured["url"]
assert captured["params"]["workspace_id"] == "aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
assert captured["headers"]["Authorization"] == "Bearer token-G"
# ---------------------------------------------------------------------------
# chat_history — auto-routes via the peer→source cache so an inbound
# peer_agent push from workspace X sees its history queried against X.
# ---------------------------------------------------------------------------
class TestChatHistorySourceRouting:
@pytest.mark.asyncio
async def test_auto_routes_via_peer_cache(self, monkeypatch):
"""chat_history(peer_id) without an explicit source falls back to
``_peer_to_source[peer_id]`` same auto-routing as delegate_task,
so the agent doesn't have to remember which workspace surfaced
each peer."""
import platform_auth, a2a_client, a2a_tools
platform_auth.register_workspace_token("bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb", "token-H")
peer_id = "1111aaaa-1111-1111-1111-111111111111"
a2a_client._peer_to_source[peer_id] = "bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_chat_history(peer_id, limit=5)
assert "/workspaces/bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb/activity" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-H"
@pytest.mark.asyncio
async def test_explicit_source_beats_cache(self, monkeypatch):
import platform_auth, a2a_client, a2a_tools
platform_auth.register_workspace_token("cccc9999-cccc-cccc-cccc-cccccccccccc", "token-I")
peer_id = "1111aaaa-1111-1111-1111-111111111111"
a2a_client._peer_to_source[peer_id] = "should-not-be-used"
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_chat_history(
peer_id, source_workspace_id="cccc9999-cccc-cccc-cccc-cccccccccccc",
)
assert "/workspaces/cccc9999-cccc-cccc-cccc-cccccccccccc/activity" in captured["url"]
# ---------------------------------------------------------------------------
# get_workspace_info — multi-workspace introspection.
# ---------------------------------------------------------------------------
class TestGetWorkspaceInfoSourceRouting:
@pytest.mark.asyncio
async def test_introspects_named_workspace(self, monkeypatch):
import platform_auth, a2a_client
platform_auth.register_workspace_token("dddd0000-dddd-dddd-dddd-dddddddddddd", "token-J")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "dddd0000-dddd-dddd-dddd-dddddddddddd", "name": "wsJ"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, headers):
captured["url"] = url
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_client.httpx, "AsyncClient", lambda timeout: _Client())
info = await a2a_client.get_workspace_info(
source_workspace_id="dddd0000-dddd-dddd-dddd-dddddddddddd",
)
assert info["id"] == "dddd0000-dddd-dddd-dddd-dddddddddddd"
assert "/workspaces/dddd0000-dddd-dddd-dddd-dddddddddddd" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-J"