forked from molecule-ai/molecule-core
Introduce ``workspace/a2a_response.py`` as the single source of truth for
the wire shapes the workspace-server proxy can return at
``/workspaces/<id>/a2a``:
* ``Result`` — JSON-RPC success
* ``Error`` — JSON-RPC error or platform-level error (with
restart-in-progress metadata when present)
* ``Queued`` — poll-mode short-circuit envelope: the platform
queued the message into the target's inbox, the
target will fetch via /activity poll
* ``Malformed`` — anything the parser can't classify (logged at
WARNING so a future server change is loud)
``send_a2a_message`` (in ``a2a_client.py``) now dispatches via
``a2a_response.parse(data)`` instead of inline ``"result" in data`` /
``"error" in data`` sniffing. The Queued variant returns a new
``_A2A_QUEUED_PREFIX`` sentinel so callers can distinguish "delivered
async, no synchronous reply" from both success-with-text and failure.
reno-stars production data caught two intermittent failures that
both reduced to the same root cause:
1. **File transfer announce silently failed** — when CEO Ryan PC
(poll-mode external molecule-mcp) sent the harmi.zip
announcement to Reno Stars Business Intelligent (also poll-mode
external), ``send_a2a_message`` saw the platform's poll-queued
envelope ``{"status":"queued","delivery_mode":"poll","method":"..."}``,
didn't recognize it as the synthetic delivery-acknowledgement
it is, and returned ``[A2A_ERROR] unexpected response shape``.
The agent fell back to a chunk-shipping path; receiver did get
the file but operator-facing logs showed a failure that didn't
actually fail.
2. **Duplicated agent comm** — same bug, inverted direction. d76
delegated to 67d, send_a2a_message returned the unexpected-shape
error, delegate_task wrapped it as DELEGATION FAILED, the calling
agent retried with sharper wording, the recipient saw the same
request twice and self-reported "二次请求 — 我先不执行".
External molecule-mcp standalone runtimes are inherently poll-mode
(they have no public URL), so every external↔external A2A pair was
hitting this on every send. The pre-fix client only handled JSON-RPC
``result``/``error`` keys and treated the queued envelope (which has
neither) as malformed. RFC #2339 PR 2 added the queued envelope on
the server side; the client never caught up.
When ``send_a2a_message`` returns the ``_A2A_QUEUED_PREFIX`` sentinel,
``tool_delegate_task`` now transparently falls back to
``_delegate_sync_via_polling`` (RFC #2829 PR-5's durable
``/delegate`` + ``/delegations`` polling path, which DOES work for
poll-mode peers because the platform's executeDelegation goroutine
writes to the inbox queue and the result row arrives when the target
picks it up + replies). The agent gets a real synchronous reply
instead of the empty queued sentinel.
* ``test_a2a_response.py`` — 62 tests, **100% line coverage** on
the parser (verified via ``coverage run --source=a2a_response``).
Includes adversarial-input fuzzing across ~25 pathological
payloads — parser must never raise.
* ``test_a2a_client.py::TestSendA2AMessagePollMode`` — 4 tests for
the new Queued/Error wiring in ``send_a2a_message``.
* ``test_delegation_sync_via_polling.py::TestPollModeAutoFallback``
— 3 tests for the auto-fallback in ``tool_delegate_task``,
including negative cases (push-mode reply must NOT trigger
fallback; genuine error must NOT silently retry).
* **Verified all new tests FAIL on pre-fix source** by stashing
a2a_client.py + a2a_tools_delegation.py and re-running — 5
failures including ImportError for the missing
``_A2A_QUEUED_PREFIX``.
Per the operator-debuggability directive:
* INFO at every Queued classification (expected variant; operator
sees normal poll-mode-peer queueing in log stream).
* INFO at the auto-fallback decision in ``tool_delegate_task``
so a future operator can correlate "send returned queued →
falling back to polling path" without reading the source.
* WARNING at every Malformed classification (server contract
drift; operator MUST see this immediately).
* Existing transient-retry WARNING preserved.
* Mirror Go-side typed model in workspace-server. The wire shape
is documented in ``a2a_response.py``'s module docstring with
file:line pointers to the canonical emitters; a future PR can
introduce ``models/a2a_response.go`` without changing wire
behavior. The fixture corpus in ``test_a2a_response.py`` is
designed so a one-sided edit breaks CI.
* ``send_message_to_user`` and ``chat_upload_receive`` use a
different endpoint (``/notify``) and aren't affected by this
bug; their parsing stays unchanged.
* 135 tests pass across ``test_a2a_response.py`` +
``test_a2a_client.py`` + ``test_delegation_sync_via_polling.py``
+ ``test_a2a_tools_impl.py``.
* ``coverage run --source=a2a_response -m pytest`` reports 100%
line coverage with 0 missing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
796 lines
35 KiB
Python
796 lines
35 KiB
Python
"""A2A protocol client — peer discovery, messaging, and workspace info.
|
||
|
||
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
|
||
a2a_tools and a2a_mcp_server can import them from a single place.
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import threading
|
||
import time
|
||
import uuid
|
||
from collections import OrderedDict
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
import httpx
|
||
|
||
import a2a_response
|
||
from platform_auth import auth_headers, self_source_headers
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||
if not _WORKSPACE_ID_raw:
|
||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||
else:
|
||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||
|
||
# Cache workspace ID → name mappings (populated by list_peers calls)
|
||
_peer_names: dict[str, str] = {}
|
||
|
||
# Cache: peer workspace_id → the source workspace_id whose registry
|
||
# returned that peer. Populated by ``a2a_tools.tool_list_peers`` whenever
|
||
# it queries a specific workspace's peers — so a later
|
||
# ``tool_delegate_task(target)`` can auto-route through the correct
|
||
# source workspace without the agent having to specify
|
||
# ``source_workspace_id`` explicitly.
|
||
#
|
||
# Single-workspace mode: dict stays empty, all delegations fall through
|
||
# to the module-level WORKSPACE_ID (existing behavior).
|
||
#
|
||
# Multi-workspace mode: as the agent calls list_peers, this map is
|
||
# populated with each peer's source. Subsequent delegate_task calls
|
||
# auto-route. If a peer is registered under multiple sources (rare —
|
||
# e.g. an org-wide capability) the LAST observed source wins; the agent
|
||
# can override by passing ``source_workspace_id`` explicitly.
|
||
_peer_to_source: dict[str, str] = {}
|
||
|
||
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
|
||
# Populated by tool_list_peers and by the lazy registry lookup in
|
||
# enrich_peer_metadata. The notification-callback path (channel envelope
|
||
# enrichment) reads this cache on every inbound peer_agent push, so the
|
||
# read shape stays a dict-like ``__getitem__`` lookup; entries carry
|
||
# their fetched-at timestamp so TTL eviction is in-line with the
|
||
# lookup. ``None`` as the record is the negative-cache sentinel:
|
||
# registry failure is cached for one TTL window so we don't re-fire
|
||
# the 2s-bounded GET on every push from a flaky peer.
|
||
#
|
||
# OrderedDict + maxsize bound (#2482): pre-fix this was an unbounded
|
||
# ``dict``, so a workspace receiving from N distinct peers across its
|
||
# lifetime accumulated ~100 bytes/entry × N indefinitely. At 10K peers
|
||
# that's ~1 MB; at 100K (a chatty platform-wide router) ~10 MB; not
|
||
# crash-class but unbounded. The LRU bound caps memory + the TTL caps
|
||
# per-entry staleness — both gates are needed because a runaway poller
|
||
# touching N new peer_ids per push could grow within a single TTL
|
||
# window.
|
||
#
|
||
# All reads / writes go through ``_peer_metadata_get`` /
|
||
# ``_peer_metadata_set`` so the LRU move-to-end + size-trim invariants
|
||
# stay co-located. Direct mutation is allowed only in test fixtures
|
||
# (clearing for isolation); production code path uses the helpers.
|
||
_PEER_METADATA_MAXSIZE = 1024
|
||
_peer_metadata: "OrderedDict[str, tuple[float, dict | None]]" = OrderedDict()
|
||
_peer_metadata_lock = threading.Lock()
|
||
|
||
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
|
||
# is the same window we use for delegation routing — long enough that a
|
||
# busy agent receiving repeated pushes from one peer doesn't hit the
|
||
# registry on every push, short enough that role/name renames propagate
|
||
# within a single agent session.
|
||
_PEER_METADATA_TTL_SECONDS = 300.0
|
||
|
||
|
||
def _peer_metadata_get(canon: str) -> tuple[float, dict | None] | None:
|
||
"""Read with LRU touch — moves the entry to the most-recently-used
|
||
position so steady-state pushes from a busy peer don't get evicted
|
||
by a cold-start burst from new peers. Returns the raw tuple shape
|
||
callers expect; TTL eviction stays at the call site.
|
||
"""
|
||
with _peer_metadata_lock:
|
||
entry = _peer_metadata.get(canon)
|
||
if entry is not None:
|
||
_peer_metadata.move_to_end(canon)
|
||
return entry
|
||
|
||
|
||
def _peer_metadata_set(canon: str, value: tuple[float, dict | None]) -> None:
|
||
"""Write + evict-if-over-maxsize. The eviction is in-process and
|
||
cheap (popitem(last=False) on an OrderedDict is O(1)). Holding the
|
||
lock across the trim keeps the size invariant stable under concurrent
|
||
writes from background enrichment workers.
|
||
"""
|
||
with _peer_metadata_lock:
|
||
_peer_metadata[canon] = value
|
||
_peer_metadata.move_to_end(canon)
|
||
# Trim the oldest entries until at-or-below maxsize. The bound
|
||
# is a soft cap — a single overrun (set called when at maxsize)
|
||
# evicts the LRU entry before returning, never letting size
|
||
# exceed maxsize.
|
||
while len(_peer_metadata) > _PEER_METADATA_MAXSIZE:
|
||
_peer_metadata.popitem(last=False)
|
||
|
||
|
||
# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484).
|
||
# A small pool — peers are highly TTL-cached, so the steady-state load
|
||
# is "one fetch per peer per 5 minutes." Two workers handle the cold-
|
||
# start burst when an agent starts receiving pushes from a new peer for
|
||
# the first time without backing up the inbox poller. Daemon threads:
|
||
# the executor must NOT block process exit if the inbox shuts down.
|
||
_enrich_executor: ThreadPoolExecutor | None = None
|
||
_enrich_executor_lock = threading.Lock()
|
||
|
||
# In-flight peer IDs — guards against a single peer's repeated pushes
|
||
# scheduling N concurrent registry fetches before the first one fills
|
||
# the cache. Set membership is "a worker is currently fetching this
|
||
# peer; subsequent calls should NOT schedule another."
|
||
_enrich_in_flight: set[str] = set()
|
||
_enrich_in_flight_lock = threading.Lock()
|
||
|
||
|
||
def _get_enrich_executor() -> ThreadPoolExecutor:
|
||
"""Lazy-init the enrichment worker pool. Lazy because most test
|
||
fixtures and short-lived CLI invocations don't need it; only the
|
||
long-running molecule-mcp / inbox-poller path actually schedules
|
||
background fetches.
|
||
"""
|
||
global _enrich_executor
|
||
if _enrich_executor is not None:
|
||
return _enrich_executor
|
||
with _enrich_executor_lock:
|
||
if _enrich_executor is None:
|
||
_enrich_executor = ThreadPoolExecutor(
|
||
max_workers=2,
|
||
thread_name_prefix="enrich-peer",
|
||
)
|
||
return _enrich_executor
|
||
|
||
|
||
def enrich_peer_metadata_nonblocking(
|
||
peer_id: str,
|
||
source_workspace_id: str | None = None,
|
||
) -> dict | None:
|
||
"""Cache-first variant of ``enrich_peer_metadata`` — returns
|
||
immediately without blocking on a registry GET.
|
||
|
||
Behavior:
|
||
- Cache hit (fresh): return the cached record.
|
||
- Cache miss or TTL expired: schedule a background fetch via the
|
||
worker pool, return ``None`` (caller renders bare peer_id).
|
||
The next push for this peer hits the warm cache and gets the
|
||
full record.
|
||
|
||
Why this exists (#2484): the inbox poller's notification callback
|
||
in molecule-mcp called the synchronous ``enrich_peer_metadata`` on
|
||
every push, blocking the poller for up to 2s × N uncached peers
|
||
per batch. Push-delivery latency was gated on registry latency —
|
||
the exact thing the negative-cache patch in PR #2471 was supposed
|
||
to avoid amplifying. Moving the fetch off the poller thread means
|
||
push delivery is bounded by the inbox poll interval, never by
|
||
registry RTT.
|
||
|
||
Trade-off: the FIRST push from a new peer arrives metadata-light
|
||
(no name/role). The MCP host renders the bare peer_id. Subsequent
|
||
pushes (within the 5-min TTL) hit the warm cache and get the full
|
||
record. Acceptable because:
|
||
- Channel-envelope enrichment is a UX nicety, not a correctness
|
||
invariant.
|
||
- The cold-cache window per peer is bounded to one push.
|
||
- The TTL is long enough that an active conversation never
|
||
re-enters the cold state.
|
||
"""
|
||
canon = _validate_peer_id(peer_id)
|
||
if canon is None:
|
||
return None
|
||
current = time.monotonic()
|
||
cached = _peer_metadata_get(canon)
|
||
if cached is not None:
|
||
fetched_at, record = cached
|
||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||
return record
|
||
# Schedule background fetch unless one is already in flight for this
|
||
# peer. The synchronous version atomically reads-then-writes; the
|
||
# async version splits that into "schedule fetch" + "fetch fills
|
||
# cache later." The in-flight set keeps a flurry of pushes from
|
||
# one peer (e.g., a chatty agent) from spawning N parallel GETs.
|
||
with _enrich_in_flight_lock:
|
||
if canon in _enrich_in_flight:
|
||
return None
|
||
_enrich_in_flight.add(canon)
|
||
try:
|
||
_get_enrich_executor().submit(
|
||
_enrich_peer_metadata_worker, canon, source_workspace_id
|
||
)
|
||
except RuntimeError:
|
||
# Executor was shut down (process exit path) — drop the request,
|
||
# let the caller render bare peer_id.
|
||
with _enrich_in_flight_lock:
|
||
_enrich_in_flight.discard(canon)
|
||
return None
|
||
|
||
|
||
def _enrich_peer_metadata_worker(
|
||
canon: str, source_workspace_id: str | None
|
||
) -> None:
|
||
"""Background-thread body for ``enrich_peer_metadata_nonblocking``.
|
||
Runs the same fetch logic as the synchronous helper but discards
|
||
the return value — the cache write is the only output anyone
|
||
needs. Always clears the in-flight marker so a future cache miss
|
||
can retry.
|
||
"""
|
||
try:
|
||
enrich_peer_metadata(canon, source_workspace_id)
|
||
except Exception as exc: # noqa: BLE001
|
||
# Background workers must not crash the executor — log and
|
||
# move on. The negative-cache path inside enrich_peer_metadata
|
||
# already records failures, so a re-attempt is rate-limited
|
||
# by TTL.
|
||
logger.debug("_enrich_peer_metadata_worker: %s failed: %s", canon, exc)
|
||
finally:
|
||
with _enrich_in_flight_lock:
|
||
_enrich_in_flight.discard(canon)
|
||
|
||
|
||
def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
|
||
"""Block until all in-flight enrichment workers have completed.
|
||
|
||
Test-only helper. Production code never has a reason to wait — the
|
||
point of the nonblocking path is that callers don't care when the
|
||
cache fills. Tests that want to assert "after the worker runs, the
|
||
cache has the record" use this to synchronise without sleeping.
|
||
|
||
Polls ``_enrich_in_flight`` rather than holding a Condition because
|
||
the worker pool is already serializing through ``_enrich_in_flight_lock``;
|
||
poll keeps the production hot path lock-free.
|
||
"""
|
||
deadline = time.monotonic() + timeout
|
||
while time.monotonic() < deadline:
|
||
with _enrich_in_flight_lock:
|
||
if not _enrich_in_flight:
|
||
return
|
||
time.sleep(0.01)
|
||
|
||
|
||
def enrich_peer_metadata(
|
||
peer_id: str,
|
||
source_workspace_id: str | None = None,
|
||
*,
|
||
now: float | None = None,
|
||
) -> dict | None:
|
||
"""Return cached or freshly-fetched metadata for ``peer_id``.
|
||
|
||
Sync helper — safe to call from the inbox poller's notification
|
||
callback thread (which is not async). Hits the in-process cache
|
||
first; on miss or TTL expiry, GETs ``/registry/discover/<peer_id>``
|
||
synchronously with a tight timeout. Returns None on validation
|
||
failure, network failure, or non-200 response so callers can
|
||
degrade gracefully (the channel envelope falls back to the raw
|
||
``peer_id`` instead of crashing the push path).
|
||
|
||
Negative caching: failure outcomes (4xx/5xx/non-JSON/network
|
||
exception) are stored as ``(now, None)`` and treated as
|
||
fresh-but-empty for the TTL window. Without this, a peer with a
|
||
flaky/missing registry record would re-fire the 2s-bounded GET on
|
||
EVERY push — turning the cache into a no-op for the exact failure
|
||
scenarios it most needs to defend against.
|
||
|
||
The fetched dict is stored as-is, so callers can read whatever
|
||
fields the platform exposes (currently: ``id``, ``name``, ``role``,
|
||
``status``, ``url``). New fields surface automatically without a
|
||
code change here.
|
||
"""
|
||
canon = _validate_peer_id(peer_id)
|
||
if canon is None:
|
||
return None
|
||
|
||
current = now if now is not None else time.monotonic()
|
||
cached = _peer_metadata_get(canon)
|
||
if cached is not None:
|
||
fetched_at, record = cached
|
||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||
# Fresh entry — return whatever's there. ``None`` is the
|
||
# negative-cache sentinel: caller treats absence of fields
|
||
# the same as a registry miss, which is the desired UX.
|
||
return record
|
||
|
||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||
url = f"{PLATFORM_URL}/registry/discover/{canon}"
|
||
try:
|
||
with httpx.Client(timeout=2.0) as client:
|
||
resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)})
|
||
except Exception as exc: # noqa: BLE001
|
||
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
|
||
_peer_metadata_set(canon, (current, None))
|
||
return None
|
||
|
||
if resp.status_code != 200:
|
||
logger.debug(
|
||
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
|
||
)
|
||
_peer_metadata_set(canon, (current, None))
|
||
return None
|
||
|
||
try:
|
||
data = resp.json()
|
||
except Exception: # noqa: BLE001
|
||
_peer_metadata_set(canon, (current, None))
|
||
return None
|
||
if not isinstance(data, dict):
|
||
_peer_metadata_set(canon, (current, None))
|
||
return None
|
||
|
||
_peer_metadata_set(canon, (current, data))
|
||
if name := data.get("name"):
|
||
_peer_names[canon] = name
|
||
return data
|
||
|
||
|
||
def _agent_card_url_for(peer_id: str) -> str:
|
||
"""Construct the platform-side agent-card URL for ``peer_id``.
|
||
|
||
Returns the empty string when ``peer_id`` is not a UUID — same
|
||
trust-boundary rationale as ``discover_peer``: never interpolate
|
||
path-traversal characters into a URL. An invalid id reflected back
|
||
to the receiving agent as ``…/registry/discover/../../foo`` is a
|
||
foothold we close at construction time.
|
||
|
||
Uses the registry's discovery path so the agent receiving a push
|
||
can hit a single endpoint to enumerate the sender's capabilities
|
||
+ role + URL. Same shape every workspace exposes regardless of
|
||
runtime — claude-code, hermes, langchain wrappers all register
|
||
through ``/registry/register`` and surface through ``/registry/discover``.
|
||
"""
|
||
safe_id = _validate_peer_id(peer_id)
|
||
if safe_id is None:
|
||
return ""
|
||
return f"{PLATFORM_URL}/registry/discover/{safe_id}"
|
||
|
||
# Sentinel prefix for errors originating from send_a2a_message / child agents.
|
||
# Used by delegate_task to distinguish real errors from normal response text.
|
||
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
||
|
||
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
|
||
# When the target workspace is registered as delivery_mode=poll (no
|
||
# public URL — typical for external molecule-mcp standalone runtimes),
|
||
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
|
||
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
|
||
# instead of dispatching over HTTP. The message IS delivered (written
|
||
# to the platform's inbox queue); there's just no synchronous reply
|
||
# to relay. Pre-#2967 the client treated this as "unexpected response
|
||
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
|
||
# duplicates. The Queued prefix lets callers branch on this outcome
|
||
# explicitly: "delivered async, no synchronous reply expected" is
|
||
# different from both success-with-text and failure.
|
||
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
|
||
|
||
# Workspace IDs are UUIDs everywhere we generate them (platform's
|
||
# workspaces.id column, /registry/discover/:id route param, etc.) but
|
||
# the agent-facing tool surface receives them as free-form strings via
|
||
# tool args. ``_validate_peer_id`` enforces UUID-shape at the
|
||
# trust boundary so we never interpolate `..` or `/` into a URL path,
|
||
# never silently coerce malformed input into a 404, and surface a
|
||
# clear error to the agent rather than letting an HTTP 4xx bubble up
|
||
# from the platform with a generic error message.
|
||
#
|
||
# Lenient on case + whitespace because real-world peer-id strings
|
||
# come from list_peers/discover_peer responses (canonical lowercase)
|
||
# or hand-typed agent input (mixed-case acceptable). Strict on
|
||
# everything else.
|
||
_UUID_RE = re.compile(
|
||
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
|
||
)
|
||
|
||
|
||
def _validate_peer_id(peer_id: str) -> str | None:
|
||
"""Return the canonicalised peer_id if valid, else None.
|
||
|
||
Returning None instead of raising so callers in tool surfaces can
|
||
convert to a friendly agent-facing string ("workspace_id is not a
|
||
valid UUID") rather than crashing with a stack trace.
|
||
"""
|
||
if not isinstance(peer_id, str):
|
||
return None
|
||
pid = peer_id.strip()
|
||
if not _UUID_RE.match(pid):
|
||
return None
|
||
return pid.lower()
|
||
|
||
|
||
async def discover_peer(target_id: str, source_workspace_id: str | None = None) -> dict | None:
|
||
"""Discover a peer workspace's URL via the platform registry.
|
||
|
||
Validates ``target_id`` is a UUID before constructing the URL — a
|
||
malformed id can't reach the platform handler now, which both
|
||
short-circuits an avoidable round-trip AND ensures we never
|
||
interpolate path-traversal characters into the URL.
|
||
|
||
``source_workspace_id`` selects which registered workspace asks the
|
||
question — both the X-Workspace-ID header AND the Authorization
|
||
bearer token must come from the same workspace, otherwise the
|
||
platform's TenantGuard rejects the request. Defaults to the
|
||
module-level WORKSPACE_ID for back-compat with single-workspace
|
||
callers.
|
||
"""
|
||
safe_id = _validate_peer_id(target_id)
|
||
if safe_id is None:
|
||
return None
|
||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
try:
|
||
resp = await client.get(
|
||
f"{PLATFORM_URL}/registry/discover/{safe_id}",
|
||
headers={"X-Workspace-ID": src, **auth_headers(src)},
|
||
)
|
||
if resp.status_code == 200:
|
||
return resp.json()
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Discovery failed for {target_id}: {e}")
|
||
return None
|
||
|
||
|
||
# httpx exception classes that indicate a transient transport-layer
|
||
# failure worth retrying — the request never produced an application
|
||
# response, so a fresh attempt has a real chance of succeeding. Any
|
||
# error not in this tuple is treated as deterministic (HTTP-status,
|
||
# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to
|
||
# the caller on the first try.
|
||
#
|
||
# Why each one belongs here:
|
||
# - ConnectError / ConnectTimeout: peer's listening socket wasn't
|
||
# ready (mid-restart, not yet bound). Fast failure, fast recovery.
|
||
# - RemoteProtocolError: peer closed the TCP connection without
|
||
# writing a response — observed on 2026-04-27 when a peer's prior
|
||
# in-flight Claude SDK session aborted and the new request's
|
||
# connection was reset mid-handler.
|
||
# - ReadError / WriteError: TCP read/write socket error mid-flight,
|
||
# typically a network blip on the Docker bridge or a peer worker
|
||
# crash.
|
||
# - ReadTimeout: peer didn't write ANY response bytes within the
|
||
# 300s read budget. Distinct from "peer is slow but progressing"
|
||
# (which httpx surfaces as a successful read with chunked bytes).
|
||
# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S.
|
||
_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = (
|
||
httpx.ConnectError,
|
||
httpx.ConnectTimeout,
|
||
httpx.ReadError,
|
||
httpx.WriteError,
|
||
httpx.RemoteProtocolError,
|
||
httpx.ReadTimeout,
|
||
)
|
||
|
||
# Retry budget. Up to 5 attempts (1 initial + 4 retries) with
|
||
# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25%
|
||
# to prevent synchronized retry storms across siblings if a peer flaps.
|
||
# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of
|
||
# ReadTimeouts can't make the caller wait 25 minutes — once the
|
||
# deadline elapses we stop retrying even if attempts remain. 600s = 10
|
||
# minutes is the agreed worst case the caller can tolerate before
|
||
# falling back to "peer unavailable" handling in tool_delegate_task.
|
||
_DELEGATE_MAX_ATTEMPTS = 5
|
||
_DELEGATE_BACKOFF_BASE_S = 1.0
|
||
_DELEGATE_BACKOFF_CAP_S = 16.0
|
||
_DELEGATE_TOTAL_BUDGET_S = 600.0
|
||
|
||
|
||
def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float:
|
||
"""Return the (jittered) backoff delay before retrying after the
|
||
given attempt index (0 = backoff before retry #1).
|
||
|
||
Pure function so the schedule is unit-testable without monkey-
|
||
patching asyncio.sleep. Jitter is symmetric ±25% on top of the
|
||
capped exponential — enough to break sync across simultaneous
|
||
callers without making the schedule unpredictable.
|
||
"""
|
||
base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S)
|
||
jitter = base * (0.5 * random.random() - 0.25)
|
||
return max(0.0, base + jitter)
|
||
|
||
|
||
def _format_a2a_error(exc: BaseException, target_url: str) -> str:
|
||
"""Format an httpx exception as an [A2A_ERROR] string.
|
||
|
||
Some httpx exceptions stringify to empty (RemoteProtocolError,
|
||
ConnectionReset variants) — the canvas would then render
|
||
"[A2A_ERROR] " with no detail and the operator has no signal to
|
||
act on. Always include the exception class name and the target
|
||
URL so the activity log + Agent Comms panel have actionable
|
||
information without a trip through container logs.
|
||
"""
|
||
msg = str(exc).strip()
|
||
type_name = type(exc).__name__
|
||
if not msg:
|
||
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
|
||
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
|
||
# Already prefixed with the type — don't double-prefix.
|
||
# Prefix-anchored check (not substring) so a message that
|
||
# happens to mention some OTHER class name mid-string
|
||
# (e.g. "got OSError on read") doesn't suppress our own
|
||
# type prefix and lose the diagnostic signal.
|
||
detail = msg
|
||
else:
|
||
detail = f"{type_name}: {msg}"
|
||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||
|
||
|
||
async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str | None = None) -> str:
|
||
"""Send an A2A ``message/send`` to a peer workspace via the platform proxy.
|
||
|
||
The target URL is constructed internally as
|
||
``${PLATFORM_URL}/workspaces/{peer_id}/a2a``. Going through the
|
||
platform's A2A proxy is the only path that works for both
|
||
in-container and external runtimes — see
|
||
a2a_tools.tool_delegate_task for the rationale.
|
||
|
||
``source_workspace_id`` is the SENDING workspace — drives both the
|
||
X-Workspace-ID source-tagging header and the bearer token. Defaults
|
||
to the module-level WORKSPACE_ID for back-compat. Multi-workspace
|
||
operators pass it explicitly so each registered workspace's peers
|
||
are reached via their own auth chain.
|
||
|
||
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
|
||
transport-layer errors (RemoteProtocolError, ConnectError,
|
||
ReadTimeout, etc.) with exponential-backoff + jitter, capped by
|
||
_DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx,
|
||
JSON-RPC error response, malformed JSON) are NOT retried — they
|
||
indicate a deterministic problem retry won't fix.
|
||
"""
|
||
safe_id = _validate_peer_id(peer_id)
|
||
if safe_id is None:
|
||
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
|
||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
|
||
|
||
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
||
# a hung upstream to block the agent indefinitely. Use a generous but bounded
|
||
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
|
||
timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
||
deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S
|
||
last_exc: BaseException | None = None
|
||
|
||
for attempt in range(_DELEGATE_MAX_ATTEMPTS):
|
||
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
|
||
try:
|
||
# self_source_headers() includes X-Workspace-ID so the
|
||
# platform's a2a_receive logger records source_id =
|
||
# WORKSPACE_ID. Otherwise peer-A2A messages — including
|
||
# the case where target_url resolves to this workspace's
|
||
# own /a2a — get logged with source_id=NULL and surface
|
||
# in the recipient's My Chat tab as user-typed input.
|
||
resp = await client.post(
|
||
target_url,
|
||
headers=self_source_headers(src),
|
||
json={
|
||
"jsonrpc": "2.0",
|
||
"id": str(uuid.uuid4()),
|
||
"method": "message/send",
|
||
"params": {
|
||
"message": {
|
||
"role": "user",
|
||
"messageId": str(uuid.uuid4()),
|
||
"parts": [{"kind": "text", "text": message}],
|
||
}
|
||
},
|
||
},
|
||
)
|
||
data = resp.json()
|
||
# Dispatch via the SSOT response model (a2a_response.py).
|
||
# All shape detection lives in one place — the parser
|
||
# never raises and routes unknown shapes to Malformed
|
||
# so a future server-side change is loud, not silent.
|
||
variant = a2a_response.parse(data)
|
||
if isinstance(variant, a2a_response.Result):
|
||
# Match legacy semantics:
|
||
# parts non-empty + first part has no text → ""
|
||
# parts empty → "(no response)"
|
||
# Differentiation matters for callers that assert
|
||
# on the empty-string case (test_a2a_client).
|
||
if variant.parts:
|
||
text = variant.text
|
||
else:
|
||
text = "(no response)"
|
||
# Tag child-reported errors so the caller can
|
||
# detect them reliably — agent-side bug surfaces
|
||
# text like "Agent error: <traceback>" inside a
|
||
# JSON-RPC success envelope.
|
||
if text.startswith("Agent error:"):
|
||
return f"{_A2A_ERROR_PREFIX}{text}"
|
||
return text
|
||
if isinstance(variant, a2a_response.Queued):
|
||
# Poll-mode peer — message accepted into the inbox
|
||
# queue, target agent will fetch via poll. NOT a
|
||
# failure. Return the queued sentinel so callers
|
||
# (delegate_task etc.) can render the outcome
|
||
# accurately instead of treating it as an error.
|
||
logger.info(
|
||
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
|
||
target_url,
|
||
variant.method,
|
||
)
|
||
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
|
||
if isinstance(variant, a2a_response.Error):
|
||
msg = variant.message
|
||
code = variant.code
|
||
if msg and code is not None:
|
||
detail = f"{msg} (code={code})"
|
||
elif msg:
|
||
detail = msg
|
||
elif code is not None:
|
||
detail = f"JSON-RPC error with no message (code={code})"
|
||
else:
|
||
detail = "JSON-RPC error with no message"
|
||
if variant.restarting:
|
||
# Surface platform-restart-in-progress
|
||
# explicitly — caller (UI / delegating agent)
|
||
# can render a softer "agent is restarting"
|
||
# message rather than a generic failure.
|
||
retry = (
|
||
f", retry_after={variant.retry_after}s"
|
||
if variant.retry_after is not None
|
||
else ""
|
||
)
|
||
detail = f"{detail} (restarting{retry})"
|
||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||
# Malformed — log loud + surface as error so the
|
||
# operator notices a server change. SSOT refactor
|
||
# subsumes the inline "queued" check that landed in
|
||
# the #2972 hotfix; that branch is now the typed
|
||
# Queued variant above.
|
||
logger.warning(
|
||
"send_a2a_message: malformed response (target=%s body=%.200s)",
|
||
target_url,
|
||
str(variant.raw),
|
||
)
|
||
return (
|
||
f"{_A2A_ERROR_PREFIX}unexpected response shape "
|
||
f"(no result, error, or queued envelope): "
|
||
f"{str(variant.raw)[:200]} [target={target_url}]"
|
||
)
|
||
except _TRANSIENT_HTTP_ERRORS as e:
|
||
last_exc = e
|
||
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
|
||
if attempts_remaining <= 0 or time.monotonic() >= deadline:
|
||
# Out of attempts OR out of total budget — surface
|
||
# the last error to the caller.
|
||
break
|
||
delay = _delegate_backoff_seconds(attempt)
|
||
# Don't sleep past the deadline — clamp.
|
||
remaining = deadline - time.monotonic()
|
||
if delay > remaining:
|
||
delay = max(0.0, remaining)
|
||
logger.warning(
|
||
"send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)",
|
||
type(e).__name__,
|
||
attempt + 1,
|
||
_DELEGATE_MAX_ATTEMPTS,
|
||
delay,
|
||
target_url,
|
||
)
|
||
await asyncio.sleep(delay)
|
||
continue
|
||
except Exception as e:
|
||
# Non-transient (HTTP-status, JSON parse, etc.) — don't retry.
|
||
return _format_a2a_error(e, target_url)
|
||
# Retries exhausted (or budget elapsed). last_exc must be set
|
||
# because we only break out of the loop after assigning it.
|
||
assert last_exc is not None # noqa: S101
|
||
return _format_a2a_error(last_exc, target_url)
|
||
|
||
|
||
async def get_peers_with_diagnostic(source_workspace_id: str | None = None) -> tuple[list[dict], str | None]:
|
||
"""Get this workspace's peers, returning (peers, diagnostic).
|
||
|
||
diagnostic is None when the call succeeded (status 200, even if the list
|
||
is empty). When peers is [] for a non-trivial reason (auth failure,
|
||
workspace-id missing from registry, platform error, network error),
|
||
diagnostic is a short human-readable string explaining what went wrong
|
||
so callers can surface it instead of "may be isolated" — see #2397.
|
||
|
||
``source_workspace_id`` selects which registered workspace's peers to
|
||
enumerate; defaults to the module-level WORKSPACE_ID for
|
||
single-workspace back-compat. Multi-workspace operators iterate over
|
||
each registered workspace separately so each set of peers is fetched
|
||
with the correct auth.
|
||
|
||
The legacy get_peers() shim below preserves the bare-list contract for
|
||
non-tool callers.
|
||
"""
|
||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||
url = f"{PLATFORM_URL}/registry/{src}/peers"
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
try:
|
||
resp = await client.get(
|
||
url,
|
||
headers={"X-Workspace-ID": src, **auth_headers(src)},
|
||
)
|
||
except Exception as e:
|
||
return [], f"Cannot reach platform at {PLATFORM_URL}: {e}"
|
||
|
||
if resp.status_code == 200:
|
||
try:
|
||
data = resp.json()
|
||
except Exception as e:
|
||
return [], f"Platform returned 200 but body was not JSON: {e}"
|
||
if not isinstance(data, list):
|
||
return [], f"Platform returned 200 but body was not a list: {type(data).__name__}"
|
||
return data, None
|
||
|
||
if resp.status_code in (401, 403):
|
||
return [], (
|
||
f"Authentication to platform failed (HTTP {resp.status_code}). "
|
||
"The workspace bearer token may be invalid — restarting the workspace usually re-mints it."
|
||
)
|
||
if resp.status_code == 404:
|
||
return [], (
|
||
f"Workspace ID {WORKSPACE_ID} is not registered with the platform (HTTP 404). "
|
||
"Re-registration via the platform's /registry/register endpoint is needed."
|
||
)
|
||
if 500 <= resp.status_code < 600:
|
||
return [], f"Platform error: HTTP {resp.status_code}."
|
||
return [], f"Unexpected platform response: HTTP {resp.status_code}."
|
||
|
||
|
||
async def get_peers() -> list[dict]:
|
||
"""Get this workspace's peers from the platform registry.
|
||
|
||
Bare-list shim over get_peers_with_diagnostic() — discards the diagnostic
|
||
so callers that don't care about the failure reason (e.g. system-prompt
|
||
bootstrap formatters) get the same shape they always had.
|
||
"""
|
||
peers, _ = await get_peers_with_diagnostic()
|
||
return peers
|
||
|
||
|
||
async def get_workspace_info(source_workspace_id: str | None = None) -> dict:
|
||
"""Get this workspace's info from the platform.
|
||
|
||
``source_workspace_id`` selects which registered workspace to
|
||
introspect when the agent is registered into multiple workspaces
|
||
(multi-workspace mode). Unset → defaults to the module-level
|
||
WORKSPACE_ID — single-workspace operators see no behaviour change.
|
||
|
||
Distinguishes three failure shapes so callers can handle them
|
||
distinctly (#2429):
|
||
- 410 Gone → workspace was deleted; re-onboard required
|
||
- 404 / other → workspace never existed (or transient)
|
||
- exception → network / auth failure
|
||
"""
|
||
src = source_workspace_id or WORKSPACE_ID
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
try:
|
||
resp = await client.get(
|
||
f"{PLATFORM_URL}/workspaces/{src}",
|
||
headers=auth_headers(src),
|
||
)
|
||
if resp.status_code == 200:
|
||
return resp.json()
|
||
if resp.status_code == 410:
|
||
# #2429: platform returns 410 when status='removed'.
|
||
# Surface "removed" + the actionable hint so callers
|
||
# can prompt re-onboard instead of falling through to
|
||
# "not found" — which made the 2026-04-30 incident
|
||
# impossible to diagnose ("workspace not found" with
|
||
# a workspace_id we KNEW we'd just registered).
|
||
try:
|
||
body = resp.json()
|
||
except Exception:
|
||
body = {}
|
||
return {
|
||
"error": "removed",
|
||
"id": body.get("id", src),
|
||
"removed_at": body.get("removed_at"),
|
||
"hint": body.get(
|
||
"hint",
|
||
"Workspace was deleted on the platform. "
|
||
"Regenerate workspace + token from the canvas → Tokens tab.",
|
||
),
|
||
}
|
||
return {"error": "not found"}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|