molecule-core/workspace/a2a_client.py
Hongming Wang 1161b97faf feat(mcp): cross-workspace delegation routing (multi-ws PR-2)
PR-2 of the multi-workspace external-agent stack. PR-1 (#2739)
landed per-workspace auth + heartbeat + inbox. This PR threads
``source_workspace_id`` through the A2A client + tool surface so an
agent registered against multiple workspaces can list peers across
all of them and delegate from a specific source.

Changes
-------

* ``a2a_client``: ``discover_peer``, ``send_a2a_message``,
  ``get_peers_with_diagnostic``, and ``enrich_peer_metadata`` now
  accept ``source_workspace_id``. Routing uses it for both the
  X-Workspace-ID header and (transitively, via ``auth_headers(src)``)
  the bearer token. Defaults to module-level WORKSPACE_ID for
  back-compat.
* ``a2a_client._peer_to_source``: a new lock-free cache mapping each
  discovered peer back to the source workspace whose registry
  surfaced it. ``tool_list_peers`` populates the cache on every call;
  ``tool_delegate_task`` consults it for auto-routing.
* ``a2a_tools.tool_list_peers(source_workspace_id=None)``: when
  multiple workspaces are registered (MOLECULE_WORKSPACES) and no
  explicit source is passed, aggregates peers across every
  registered workspace and tags each entry with ``via: <src[:8]>``.
  Single-workspace mode is unchanged — no ``via:`` annotation, same
  output shape.
* ``a2a_tools.tool_delegate_task`` and ``tool_delegate_task_async``
  resolve source via ``source_workspace_id arg → _peer_to_source[target]
  → WORKSPACE_ID``. Agents almost never need to specify ``source_*``
  explicitly — call ``list_peers`` first and the cache handles the
  rest.
* ``tool_delegate_task_async`` idempotency key now includes the
  source workspace, so the same task delegated from two registered
  workspaces produces two distinct delegations (the right behavior
  — one per tenant audit trail).
* ``platform_auth.list_registered_workspaces()``: new helper for the
  tool layer to enumerate the multi-ws registry. Lock-free reads
  matched by the existing single-writer-per-workspace contract from
  PR-1.
* ``platform_auth.self_source_headers``: now passes ``workspace_id``
  through to ``auth_headers`` — without this, a multi-workspace POST
  source-tagged with ``X-Workspace-ID=ws_b`` was authenticating
  with ws_a's token (or no token if MOLECULE_WORKSPACE_TOKEN unset).
  Latent PR-1 bug exposed by the new tool surface.
* ``a2a_mcp_server`` tool dispatch passes ``source_workspace_id``
  from the tool call arguments.
* ``platform_tools.registry``: add ``source_workspace_id`` to the
  delegate_task, delegate_task_async, check_task_status, list_peers
  input schemas with copy explaining when to use it (rarely — the
  cache handles it).

Tests (15 new, all passing)
---------------------------

``test_a2a_multi_workspace.py``:
* TestDiscoverPeerSourceRouting (3): src arg drives header+token,
  fallback to module ws when omitted, invalid target short-circuits
  before any HTTP attempt.
* TestSendA2AMessageSourceRouting (1): X-Workspace-ID source header
  + Authorization bearer both come from the source arg via the
  patched self_source_headers chain.
* TestGetPeersSourceRouting (1): URL path AND headers use the
  source workspace id.
* TestToolListPeersAggregation (4): aggregates across multiple
  registered workspaces, tags origin, leaves single-workspace path
  unchanged, explicit src arg overrides aggregation, diagnostic
  joining when every workspace returns empty.
* TestToolDelegateTaskAutoRouting (3): cache-driven auto-route,
  explicit override beats cache, single-workspace fallback to
  module WORKSPACE_ID.
* TestListRegisteredWorkspaces (3): registry enumeration helper.

Plus ``tests/snapshots/a2a_instructions_mcp.txt`` regenerated to
absorb the new ``source_workspace_id`` schema entries.

Back-compat
-----------

Every change defaults ``source_workspace_id=None``; legacy
single-workspace operators (no MOLECULE_WORKSPACES) see identical
behavior — same URLs, same headers, same tool output. The 24
PR-1 tests + 125 existing A2A tests all still pass.

Out of scope (PR-3)
-------------------

Memory namespacing per registered workspace lands after the new
memory system v2 PR (#2740) settles in production.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 08:32:24 -07:00

535 lines
24 KiB
Python

"""A2A protocol client — peer discovery, messaging, and workspace info.
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import asyncio
import logging
import os
import random
import re
import time
import uuid
import httpx
from platform_auth import auth_headers, self_source_headers
logger = logging.getLogger(__name__)
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Cache: peer workspace_id → the source workspace_id whose registry
# returned that peer. Populated by ``a2a_tools.tool_list_peers`` whenever
# it queries a specific workspace's peers — so a later
# ``tool_delegate_task(target)`` can auto-route through the correct
# source workspace without the agent having to specify
# ``source_workspace_id`` explicitly.
#
# Single-workspace mode: dict stays empty, all delegations fall through
# to the module-level WORKSPACE_ID (existing behavior).
#
# Multi-workspace mode: as the agent calls list_peers, this map is
# populated with each peer's source. Subsequent delegate_task calls
# auto-route. If a peer is registered under multiple sources (rare —
# e.g. an org-wide capability) the LAST observed source wins; the agent
# can override by passing ``source_workspace_id`` explicitly.
_peer_to_source: dict[str, str] = {}
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
# Populated by tool_list_peers and by the lazy registry lookup in
# enrich_peer_metadata. The notification-callback path (channel envelope
# enrichment) reads this cache on every inbound peer_agent push, so a
# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read
# shape; entries carry their fetched-at timestamp so TTL eviction is
# in-line with the lookup. ``None`` as the record is the negative-cache
# sentinel: registry failure is cached for one TTL window so we don't
# re-fire the 2s-bounded GET on every push from a flaky peer.
_peer_metadata: dict[str, tuple[float, dict | None]] = {}
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
# is the same window we use for delegation routing — long enough that a
# busy agent receiving repeated pushes from one peer doesn't hit the
# registry on every push, short enough that role/name renames propagate
# within a single agent session.
_PEER_METADATA_TTL_SECONDS = 300.0
def enrich_peer_metadata(
peer_id: str,
source_workspace_id: str | None = None,
*,
now: float | None = None,
) -> dict | None:
"""Return cached or freshly-fetched metadata for ``peer_id``.
Sync helper — safe to call from the inbox poller's notification
callback thread (which is not async). Hits the in-process cache
first; on miss or TTL expiry, GETs ``/registry/discover/<peer_id>``
synchronously with a tight timeout. Returns None on validation
failure, network failure, or non-200 response so callers can
degrade gracefully (the channel envelope falls back to the raw
``peer_id`` instead of crashing the push path).
Negative caching: failure outcomes (4xx/5xx/non-JSON/network
exception) are stored as ``(now, None)`` and treated as
fresh-but-empty for the TTL window. Without this, a peer with a
flaky/missing registry record would re-fire the 2s-bounded GET on
EVERY push — turning the cache into a no-op for the exact failure
scenarios it most needs to defend against.
The fetched dict is stored as-is, so callers can read whatever
fields the platform exposes (currently: ``id``, ``name``, ``role``,
``status``, ``url``). New fields surface automatically without a
code change here.
"""
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = now if now is not None else time.monotonic()
cached = _peer_metadata.get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
# Fresh entry — return whatever's there. ``None`` is the
# negative-cache sentinel: caller treats absence of fields
# the same as a registry miss, which is the desired UX.
return record
src = (source_workspace_id or "").strip() or WORKSPACE_ID
url = f"{PLATFORM_URL}/registry/discover/{canon}"
try:
with httpx.Client(timeout=2.0) as client:
resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)})
except Exception as exc: # noqa: BLE001
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
_peer_metadata[canon] = (current, None)
return None
if resp.status_code != 200:
logger.debug(
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
)
_peer_metadata[canon] = (current, None)
return None
try:
data = resp.json()
except Exception: # noqa: BLE001
_peer_metadata[canon] = (current, None)
return None
if not isinstance(data, dict):
_peer_metadata[canon] = (current, None)
return None
_peer_metadata[canon] = (current, data)
if name := data.get("name"):
_peer_names[canon] = name
return data
def _agent_card_url_for(peer_id: str) -> str:
"""Construct the platform-side agent-card URL for ``peer_id``.
Returns the empty string when ``peer_id`` is not a UUID — same
trust-boundary rationale as ``discover_peer``: never interpolate
path-traversal characters into a URL. An invalid id reflected back
to the receiving agent as ``…/registry/discover/../../foo`` is a
foothold we close at construction time.
Uses the registry's discovery path so the agent receiving a push
can hit a single endpoint to enumerate the sender's capabilities
+ role + URL. Same shape every workspace exposes regardless of
runtime — claude-code, hermes, langchain wrappers all register
through ``/registry/register`` and surface through ``/registry/discover``.
"""
safe_id = _validate_peer_id(peer_id)
if safe_id is None:
return ""
return f"{PLATFORM_URL}/registry/discover/{safe_id}"
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
# tool args. ``_validate_peer_id`` enforces UUID-shape at the
# trust boundary so we never interpolate `..` or `/` into a URL path,
# never silently coerce malformed input into a 404, and surface a
# clear error to the agent rather than letting an HTTP 4xx bubble up
# from the platform with a generic error message.
#
# Lenient on case + whitespace because real-world peer-id strings
# come from list_peers/discover_peer responses (canonical lowercase)
# or hand-typed agent input (mixed-case acceptable). Strict on
# everything else.
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
def _validate_peer_id(peer_id: str) -> str | None:
"""Return the canonicalised peer_id if valid, else None.
Returning None instead of raising so callers in tool surfaces can
convert to a friendly agent-facing string ("workspace_id is not a
valid UUID") rather than crashing with a stack trace.
"""
if not isinstance(peer_id, str):
return None
pid = peer_id.strip()
if not _UUID_RE.match(pid):
return None
return pid.lower()
async def discover_peer(target_id: str, source_workspace_id: str | None = None) -> dict | None:
"""Discover a peer workspace's URL via the platform registry.
Validates ``target_id`` is a UUID before constructing the URL — a
malformed id can't reach the platform handler now, which both
short-circuits an avoidable round-trip AND ensures we never
interpolate path-traversal characters into the URL.
``source_workspace_id`` selects which registered workspace asks the
question — both the X-Workspace-ID header AND the Authorization
bearer token must come from the same workspace, otherwise the
platform's TenantGuard rejects the request. Defaults to the
module-level WORKSPACE_ID for back-compat with single-workspace
callers.
"""
safe_id = _validate_peer_id(target_id)
if safe_id is None:
return None
src = (source_workspace_id or "").strip() or WORKSPACE_ID
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{safe_id}",
headers={"X-Workspace-ID": src, **auth_headers(src)},
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
logger.error(f"Discovery failed for {target_id}: {e}")
return None
# httpx exception classes that indicate a transient transport-layer
# failure worth retrying — the request never produced an application
# response, so a fresh attempt has a real chance of succeeding. Any
# error not in this tuple is treated as deterministic (HTTP-status,
# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to
# the caller on the first try.
#
# Why each one belongs here:
# - ConnectError / ConnectTimeout: peer's listening socket wasn't
# ready (mid-restart, not yet bound). Fast failure, fast recovery.
# - RemoteProtocolError: peer closed the TCP connection without
# writing a response — observed on 2026-04-27 when a peer's prior
# in-flight Claude SDK session aborted and the new request's
# connection was reset mid-handler.
# - ReadError / WriteError: TCP read/write socket error mid-flight,
# typically a network blip on the Docker bridge or a peer worker
# crash.
# - ReadTimeout: peer didn't write ANY response bytes within the
# 300s read budget. Distinct from "peer is slow but progressing"
# (which httpx surfaces as a successful read with chunked bytes).
# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S.
_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.WriteError,
httpx.RemoteProtocolError,
httpx.ReadTimeout,
)
# Retry budget. Up to 5 attempts (1 initial + 4 retries) with
# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25%
# to prevent synchronized retry storms across siblings if a peer flaps.
# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of
# ReadTimeouts can't make the caller wait 25 minutes — once the
# deadline elapses we stop retrying even if attempts remain. 600s = 10
# minutes is the agreed worst case the caller can tolerate before
# falling back to "peer unavailable" handling in tool_delegate_task.
_DELEGATE_MAX_ATTEMPTS = 5
_DELEGATE_BACKOFF_BASE_S = 1.0
_DELEGATE_BACKOFF_CAP_S = 16.0
_DELEGATE_TOTAL_BUDGET_S = 600.0
def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float:
"""Return the (jittered) backoff delay before retrying after the
given attempt index (0 = backoff before retry #1).
Pure function so the schedule is unit-testable without monkey-
patching asyncio.sleep. Jitter is symmetric ±25% on top of the
capped exponential — enough to break sync across simultaneous
callers without making the schedule unpredictable.
"""
base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S)
jitter = base * (0.5 * random.random() - 0.25)
return max(0.0, base + jitter)
def _format_a2a_error(exc: BaseException, target_url: str) -> str:
"""Format an httpx exception as an [A2A_ERROR] string.
Some httpx exceptions stringify to empty (RemoteProtocolError,
ConnectionReset variants) — the canvas would then render
"[A2A_ERROR] " with no detail and the operator has no signal to
act on. Always include the exception class name and the target
URL so the activity log + Agent Comms panel have actionable
information without a trip through container logs.
"""
msg = str(exc).strip()
type_name = type(exc).__name__
if not msg:
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
# Already prefixed with the type — don't double-prefix.
# Prefix-anchored check (not substring) so a message that
# happens to mention some OTHER class name mid-string
# (e.g. "got OSError on read") doesn't suppress our own
# type prefix and lose the diagnostic signal.
detail = msg
else:
detail = f"{type_name}: {msg}"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str | None = None) -> str:
"""Send an A2A ``message/send`` to a peer workspace via the platform proxy.
The target URL is constructed internally as
``${PLATFORM_URL}/workspaces/{peer_id}/a2a``. Going through the
platform's A2A proxy is the only path that works for both
in-container and external runtimes — see
a2a_tools.tool_delegate_task for the rationale.
``source_workspace_id`` is the SENDING workspace — drives both the
X-Workspace-ID source-tagging header and the bearer token. Defaults
to the module-level WORKSPACE_ID for back-compat. Multi-workspace
operators pass it explicitly so each registered workspace's peers
are reached via their own auth chain.
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
transport-layer errors (RemoteProtocolError, ConnectError,
ReadTimeout, etc.) with exponential-backoff + jitter, capped by
_DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx,
JSON-RPC error response, malformed JSON) are NOT retried — they
indicate a deterministic problem retry won't fix.
"""
safe_id = _validate_peer_id(peer_id)
if safe_id is None:
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
src = (source_workspace_id or "").strip() or WORKSPACE_ID
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S
last_exc: BaseException | None = None
for attempt in range(_DELEGATE_MAX_ATTEMPTS):
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
try:
# self_source_headers() includes X-Workspace-ID so the
# platform's a2a_receive logger records source_id =
# WORKSPACE_ID. Otherwise peer-A2A messages — including
# the case where target_url resolves to this workspace's
# own /a2a — get logged with source_id=NULL and surface
# in the recipient's My Chat tab as user-typed input.
resp = await client.post(
target_url,
headers=self_source_headers(src),
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
detail = msg
elif code is not None:
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
if attempts_remaining <= 0 or time.monotonic() >= deadline:
# Out of attempts OR out of total budget — surface
# the last error to the caller.
break
delay = _delegate_backoff_seconds(attempt)
# Don't sleep past the deadline — clamp.
remaining = deadline - time.monotonic()
if delay > remaining:
delay = max(0.0, remaining)
logger.warning(
"send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)",
type(e).__name__,
attempt + 1,
_DELEGATE_MAX_ATTEMPTS,
delay,
target_url,
)
await asyncio.sleep(delay)
continue
except Exception as e:
# Non-transient (HTTP-status, JSON parse, etc.) — don't retry.
return _format_a2a_error(e, target_url)
# Retries exhausted (or budget elapsed). last_exc must be set
# because we only break out of the loop after assigning it.
assert last_exc is not None # noqa: S101
return _format_a2a_error(last_exc, target_url)
async def get_peers_with_diagnostic(source_workspace_id: str | None = None) -> tuple[list[dict], str | None]:
"""Get this workspace's peers, returning (peers, diagnostic).
diagnostic is None when the call succeeded (status 200, even if the list
is empty). When peers is [] for a non-trivial reason (auth failure,
workspace-id missing from registry, platform error, network error),
diagnostic is a short human-readable string explaining what went wrong
so callers can surface it instead of "may be isolated" — see #2397.
``source_workspace_id`` selects which registered workspace's peers to
enumerate; defaults to the module-level WORKSPACE_ID for
single-workspace back-compat. Multi-workspace operators iterate over
each registered workspace separately so each set of peers is fetched
with the correct auth.
The legacy get_peers() shim below preserves the bare-list contract for
non-tool callers.
"""
src = (source_workspace_id or "").strip() or WORKSPACE_ID
url = f"{PLATFORM_URL}/registry/{src}/peers"
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
url,
headers={"X-Workspace-ID": src, **auth_headers(src)},
)
except Exception as e:
return [], f"Cannot reach platform at {PLATFORM_URL}: {e}"
if resp.status_code == 200:
try:
data = resp.json()
except Exception as e:
return [], f"Platform returned 200 but body was not JSON: {e}"
if not isinstance(data, list):
return [], f"Platform returned 200 but body was not a list: {type(data).__name__}"
return data, None
if resp.status_code in (401, 403):
return [], (
f"Authentication to platform failed (HTTP {resp.status_code}). "
"The workspace bearer token may be invalid — restarting the workspace usually re-mints it."
)
if resp.status_code == 404:
return [], (
f"Workspace ID {WORKSPACE_ID} is not registered with the platform (HTTP 404). "
"Re-registration via the platform's /registry/register endpoint is needed."
)
if 500 <= resp.status_code < 600:
return [], f"Platform error: HTTP {resp.status_code}."
return [], f"Unexpected platform response: HTTP {resp.status_code}."
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry.
Bare-list shim over get_peers_with_diagnostic() — discards the diagnostic
so callers that don't care about the failure reason (e.g. system-prompt
bootstrap formatters) get the same shape they always had.
"""
peers, _ = await get_peers_with_diagnostic()
return peers
async def get_workspace_info() -> dict:
"""Get this workspace's info from the platform.
Distinguishes three failure shapes so callers can handle them
distinctly (#2429):
- 410 Gone → workspace was deleted; re-onboard required
- 404 / other → workspace never existed (or transient)
- exception → network / auth failure
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}",
headers=auth_headers(),
)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 410:
# #2429: platform returns 410 when status='removed'.
# Surface "removed" + the actionable hint so callers
# can prompt re-onboard instead of falling through to
# "not found" — which made the 2026-04-30 incident
# impossible to diagnose ("workspace not found" with
# a workspace_id we KNEW we'd just registered).
try:
body = resp.json()
except Exception:
body = {}
return {
"error": "removed",
"id": body.get("id", WORKSPACE_ID),
"removed_at": body.get("removed_at"),
"hint": body.get(
"hint",
"Workspace was deleted on the platform. "
"Regenerate workspace + token from the canvas → Tokens tab.",
),
}
return {"error": "not found"}
except Exception as e:
return {"error": str(e)}