molecule-core/workspace/a2a_client.py
Hongming Wang 8b9f809966 fix(a2a): SSOT response parser — handle poll-mode queued envelope (#2967)
Introduce ``workspace/a2a_response.py`` as the single source of truth for
the wire shapes the workspace-server proxy can return at
``/workspaces/<id>/a2a``:

  * ``Result``    — JSON-RPC success
  * ``Error``     — JSON-RPC error or platform-level error (with
                    restart-in-progress metadata when present)
  * ``Queued``    — poll-mode short-circuit envelope: the platform
                    queued the message into the target's inbox, the
                    target will fetch via /activity poll
  * ``Malformed`` — anything the parser can't classify (logged at
                    WARNING so a future server change is loud)

``send_a2a_message`` (in ``a2a_client.py``) now dispatches via
``a2a_response.parse(data)`` instead of inline ``"result" in data`` /
``"error" in data`` sniffing. The Queued variant returns a new
``_A2A_QUEUED_PREFIX`` sentinel so callers can distinguish "delivered
async, no synchronous reply" from both success-with-text and failure.

reno-stars production data caught two intermittent failures that
both reduced to the same root cause:

  1. **File transfer announce silently failed** — when CEO Ryan PC
     (poll-mode external molecule-mcp) sent the harmi.zip
     announcement to Reno Stars Business Intelligent (also poll-mode
     external), ``send_a2a_message`` saw the platform's poll-queued
     envelope ``{"status":"queued","delivery_mode":"poll","method":"..."}``,
     didn't recognize it as the synthetic delivery-acknowledgement
     it is, and returned ``[A2A_ERROR] unexpected response shape``.
     The agent fell back to a chunk-shipping path; receiver did get
     the file but operator-facing logs showed a failure that didn't
     actually fail.

  2. **Duplicated agent comm** — same bug, inverted direction. d76
     delegated to 67d, send_a2a_message returned the unexpected-shape
     error, delegate_task wrapped it as DELEGATION FAILED, the calling
     agent retried with sharper wording, the recipient saw the same
     request twice and self-reported "二次请求 — 我先不执行".

External molecule-mcp standalone runtimes are inherently poll-mode
(they have no public URL), so every external↔external A2A pair was
hitting this on every send. The pre-fix client only handled JSON-RPC
``result``/``error`` keys and treated the queued envelope (which has
neither) as malformed. RFC #2339 PR 2 added the queued envelope on
the server side; the client never caught up.

When ``send_a2a_message`` returns the ``_A2A_QUEUED_PREFIX`` sentinel,
``tool_delegate_task`` now transparently falls back to
``_delegate_sync_via_polling`` (RFC #2829 PR-5's durable
``/delegate`` + ``/delegations`` polling path, which DOES work for
poll-mode peers because the platform's executeDelegation goroutine
writes to the inbox queue and the result row arrives when the target
picks it up + replies). The agent gets a real synchronous reply
instead of the empty queued sentinel.

  * ``test_a2a_response.py`` — 62 tests, **100% line coverage** on
    the parser (verified via ``coverage run --source=a2a_response``).
    Includes adversarial-input fuzzing across ~25 pathological
    payloads — parser must never raise.
  * ``test_a2a_client.py::TestSendA2AMessagePollMode`` — 4 tests for
    the new Queued/Error wiring in ``send_a2a_message``.
  * ``test_delegation_sync_via_polling.py::TestPollModeAutoFallback``
    — 3 tests for the auto-fallback in ``tool_delegate_task``,
    including negative cases (push-mode reply must NOT trigger
    fallback; genuine error must NOT silently retry).
  * **Verified all new tests FAIL on pre-fix source** by stashing
    a2a_client.py + a2a_tools_delegation.py and re-running — 5
    failures including ImportError for the missing
    ``_A2A_QUEUED_PREFIX``.

Per the operator-debuggability directive:

  * INFO at every Queued classification (expected variant; operator
    sees normal poll-mode-peer queueing in log stream).
  * INFO at the auto-fallback decision in ``tool_delegate_task``
    so a future operator can correlate "send returned queued →
    falling back to polling path" without reading the source.
  * WARNING at every Malformed classification (server contract
    drift; operator MUST see this immediately).
  * Existing transient-retry WARNING preserved.

  * Mirror Go-side typed model in workspace-server. The wire shape
    is documented in ``a2a_response.py``'s module docstring with
    file:line pointers to the canonical emitters; a future PR can
    introduce ``models/a2a_response.go`` without changing wire
    behavior. The fixture corpus in ``test_a2a_response.py`` is
    designed so a one-sided edit breaks CI.
  * ``send_message_to_user`` and ``chat_upload_receive`` use a
    different endpoint (``/notify``) and aren't affected by this
    bug; their parsing stays unchanged.

  * 135 tests pass across ``test_a2a_response.py`` +
    ``test_a2a_client.py`` + ``test_delegation_sync_via_polling.py``
    + ``test_a2a_tools_impl.py``.
  * ``coverage run --source=a2a_response -m pytest`` reports 100%
    line coverage with 0 missing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 17:21:28 -07:00

796 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""A2A protocol client — peer discovery, messaging, and workspace info.
Shared constants (WORKSPACE_ID, PLATFORM_URL) live here so that
a2a_tools and a2a_mcp_server can import them from a single place.
"""
import asyncio
import logging
import os
import random
import re
import threading
import time
import uuid
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
import httpx
import a2a_response
from platform_auth import auth_headers, self_source_headers
logger = logging.getLogger(__name__)
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Cache: peer workspace_id → the source workspace_id whose registry
# returned that peer. Populated by ``a2a_tools.tool_list_peers`` whenever
# it queries a specific workspace's peers — so a later
# ``tool_delegate_task(target)`` can auto-route through the correct
# source workspace without the agent having to specify
# ``source_workspace_id`` explicitly.
#
# Single-workspace mode: dict stays empty, all delegations fall through
# to the module-level WORKSPACE_ID (existing behavior).
#
# Multi-workspace mode: as the agent calls list_peers, this map is
# populated with each peer's source. Subsequent delegate_task calls
# auto-route. If a peer is registered under multiple sources (rare —
# e.g. an org-wide capability) the LAST observed source wins; the agent
# can override by passing ``source_workspace_id`` explicitly.
_peer_to_source: dict[str, str] = {}
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
# Populated by tool_list_peers and by the lazy registry lookup in
# enrich_peer_metadata. The notification-callback path (channel envelope
# enrichment) reads this cache on every inbound peer_agent push, so the
# read shape stays a dict-like ``__getitem__`` lookup; entries carry
# their fetched-at timestamp so TTL eviction is in-line with the
# lookup. ``None`` as the record is the negative-cache sentinel:
# registry failure is cached for one TTL window so we don't re-fire
# the 2s-bounded GET on every push from a flaky peer.
#
# OrderedDict + maxsize bound (#2482): pre-fix this was an unbounded
# ``dict``, so a workspace receiving from N distinct peers across its
# lifetime accumulated ~100 bytes/entry × N indefinitely. At 10K peers
# that's ~1 MB; at 100K (a chatty platform-wide router) ~10 MB; not
# crash-class but unbounded. The LRU bound caps memory + the TTL caps
# per-entry staleness — both gates are needed because a runaway poller
# touching N new peer_ids per push could grow within a single TTL
# window.
#
# All reads / writes go through ``_peer_metadata_get`` /
# ``_peer_metadata_set`` so the LRU move-to-end + size-trim invariants
# stay co-located. Direct mutation is allowed only in test fixtures
# (clearing for isolation); production code path uses the helpers.
_PEER_METADATA_MAXSIZE = 1024
_peer_metadata: "OrderedDict[str, tuple[float, dict | None]]" = OrderedDict()
_peer_metadata_lock = threading.Lock()
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
# is the same window we use for delegation routing — long enough that a
# busy agent receiving repeated pushes from one peer doesn't hit the
# registry on every push, short enough that role/name renames propagate
# within a single agent session.
_PEER_METADATA_TTL_SECONDS = 300.0
def _peer_metadata_get(canon: str) -> tuple[float, dict | None] | None:
"""Read with LRU touch — moves the entry to the most-recently-used
position so steady-state pushes from a busy peer don't get evicted
by a cold-start burst from new peers. Returns the raw tuple shape
callers expect; TTL eviction stays at the call site.
"""
with _peer_metadata_lock:
entry = _peer_metadata.get(canon)
if entry is not None:
_peer_metadata.move_to_end(canon)
return entry
def _peer_metadata_set(canon: str, value: tuple[float, dict | None]) -> None:
"""Write + evict-if-over-maxsize. The eviction is in-process and
cheap (popitem(last=False) on an OrderedDict is O(1)). Holding the
lock across the trim keeps the size invariant stable under concurrent
writes from background enrichment workers.
"""
with _peer_metadata_lock:
_peer_metadata[canon] = value
_peer_metadata.move_to_end(canon)
# Trim the oldest entries until at-or-below maxsize. The bound
# is a soft cap — a single overrun (set called when at maxsize)
# evicts the LRU entry before returning, never letting size
# exceed maxsize.
while len(_peer_metadata) > _PEER_METADATA_MAXSIZE:
_peer_metadata.popitem(last=False)
# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484).
# A small pool — peers are highly TTL-cached, so the steady-state load
# is "one fetch per peer per 5 minutes." Two workers handle the cold-
# start burst when an agent starts receiving pushes from a new peer for
# the first time without backing up the inbox poller. Daemon threads:
# the executor must NOT block process exit if the inbox shuts down.
_enrich_executor: ThreadPoolExecutor | None = None
_enrich_executor_lock = threading.Lock()
# In-flight peer IDs — guards against a single peer's repeated pushes
# scheduling N concurrent registry fetches before the first one fills
# the cache. Set membership is "a worker is currently fetching this
# peer; subsequent calls should NOT schedule another."
_enrich_in_flight: set[str] = set()
_enrich_in_flight_lock = threading.Lock()
def _get_enrich_executor() -> ThreadPoolExecutor:
"""Lazy-init the enrichment worker pool. Lazy because most test
fixtures and short-lived CLI invocations don't need it; only the
long-running molecule-mcp / inbox-poller path actually schedules
background fetches.
"""
global _enrich_executor
if _enrich_executor is not None:
return _enrich_executor
with _enrich_executor_lock:
if _enrich_executor is None:
_enrich_executor = ThreadPoolExecutor(
max_workers=2,
thread_name_prefix="enrich-peer",
)
return _enrich_executor
def enrich_peer_metadata_nonblocking(
peer_id: str,
source_workspace_id: str | None = None,
) -> dict | None:
"""Cache-first variant of ``enrich_peer_metadata`` — returns
immediately without blocking on a registry GET.
Behavior:
- Cache hit (fresh): return the cached record.
- Cache miss or TTL expired: schedule a background fetch via the
worker pool, return ``None`` (caller renders bare peer_id).
The next push for this peer hits the warm cache and gets the
full record.
Why this exists (#2484): the inbox poller's notification callback
in molecule-mcp called the synchronous ``enrich_peer_metadata`` on
every push, blocking the poller for up to 2s × N uncached peers
per batch. Push-delivery latency was gated on registry latency —
the exact thing the negative-cache patch in PR #2471 was supposed
to avoid amplifying. Moving the fetch off the poller thread means
push delivery is bounded by the inbox poll interval, never by
registry RTT.
Trade-off: the FIRST push from a new peer arrives metadata-light
(no name/role). The MCP host renders the bare peer_id. Subsequent
pushes (within the 5-min TTL) hit the warm cache and get the full
record. Acceptable because:
- Channel-envelope enrichment is a UX nicety, not a correctness
invariant.
- The cold-cache window per peer is bounded to one push.
- The TTL is long enough that an active conversation never
re-enters the cold state.
"""
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Schedule background fetch unless one is already in flight for this
# peer. The synchronous version atomically reads-then-writes; the
# async version splits that into "schedule fetch" + "fetch fills
# cache later." The in-flight set keeps a flurry of pushes from
# one peer (e.g., a chatty agent) from spawning N parallel GETs.
with _enrich_in_flight_lock:
if canon in _enrich_in_flight:
return None
_enrich_in_flight.add(canon)
try:
_get_enrich_executor().submit(
_enrich_peer_metadata_worker, canon, source_workspace_id
)
except RuntimeError:
# Executor was shut down (process exit path) — drop the request,
# let the caller render bare peer_id.
with _enrich_in_flight_lock:
_enrich_in_flight.discard(canon)
return None
def _enrich_peer_metadata_worker(
canon: str, source_workspace_id: str | None
) -> None:
"""Background-thread body for ``enrich_peer_metadata_nonblocking``.
Runs the same fetch logic as the synchronous helper but discards
the return value — the cache write is the only output anyone
needs. Always clears the in-flight marker so a future cache miss
can retry.
"""
try:
enrich_peer_metadata(canon, source_workspace_id)
except Exception as exc: # noqa: BLE001
# Background workers must not crash the executor — log and
# move on. The negative-cache path inside enrich_peer_metadata
# already records failures, so a re-attempt is rate-limited
# by TTL.
logger.debug("_enrich_peer_metadata_worker: %s failed: %s", canon, exc)
finally:
with _enrich_in_flight_lock:
_enrich_in_flight.discard(canon)
def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
"""Block until all in-flight enrichment workers have completed.
Test-only helper. Production code never has a reason to wait — the
point of the nonblocking path is that callers don't care when the
cache fills. Tests that want to assert "after the worker runs, the
cache has the record" use this to synchronise without sleeping.
Polls ``_enrich_in_flight`` rather than holding a Condition because
the worker pool is already serializing through ``_enrich_in_flight_lock``;
poll keeps the production hot path lock-free.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
with _enrich_in_flight_lock:
if not _enrich_in_flight:
return
time.sleep(0.01)
def enrich_peer_metadata(
peer_id: str,
source_workspace_id: str | None = None,
*,
now: float | None = None,
) -> dict | None:
"""Return cached or freshly-fetched metadata for ``peer_id``.
Sync helper — safe to call from the inbox poller's notification
callback thread (which is not async). Hits the in-process cache
first; on miss or TTL expiry, GETs ``/registry/discover/<peer_id>``
synchronously with a tight timeout. Returns None on validation
failure, network failure, or non-200 response so callers can
degrade gracefully (the channel envelope falls back to the raw
``peer_id`` instead of crashing the push path).
Negative caching: failure outcomes (4xx/5xx/non-JSON/network
exception) are stored as ``(now, None)`` and treated as
fresh-but-empty for the TTL window. Without this, a peer with a
flaky/missing registry record would re-fire the 2s-bounded GET on
EVERY push — turning the cache into a no-op for the exact failure
scenarios it most needs to defend against.
The fetched dict is stored as-is, so callers can read whatever
fields the platform exposes (currently: ``id``, ``name``, ``role``,
``status``, ``url``). New fields surface automatically without a
code change here.
"""
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = now if now is not None else time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
# Fresh entry — return whatever's there. ``None`` is the
# negative-cache sentinel: caller treats absence of fields
# the same as a registry miss, which is the desired UX.
return record
src = (source_workspace_id or "").strip() or WORKSPACE_ID
url = f"{PLATFORM_URL}/registry/discover/{canon}"
try:
with httpx.Client(timeout=2.0) as client:
resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)})
except Exception as exc: # noqa: BLE001
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
_peer_metadata_set(canon, (current, None))
return None
if resp.status_code != 200:
logger.debug(
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
)
_peer_metadata_set(canon, (current, None))
return None
try:
data = resp.json()
except Exception: # noqa: BLE001
_peer_metadata_set(canon, (current, None))
return None
if not isinstance(data, dict):
_peer_metadata_set(canon, (current, None))
return None
_peer_metadata_set(canon, (current, data))
if name := data.get("name"):
_peer_names[canon] = name
return data
def _agent_card_url_for(peer_id: str) -> str:
"""Construct the platform-side agent-card URL for ``peer_id``.
Returns the empty string when ``peer_id`` is not a UUID — same
trust-boundary rationale as ``discover_peer``: never interpolate
path-traversal characters into a URL. An invalid id reflected back
to the receiving agent as ``…/registry/discover/../../foo`` is a
foothold we close at construction time.
Uses the registry's discovery path so the agent receiving a push
can hit a single endpoint to enumerate the sender's capabilities
+ role + URL. Same shape every workspace exposes regardless of
runtime — claude-code, hermes, langchain wrappers all register
through ``/registry/register`` and surface through ``/registry/discover``.
"""
safe_id = _validate_peer_id(peer_id)
if safe_id is None:
return ""
return f"{PLATFORM_URL}/registry/discover/{safe_id}"
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
# When the target workspace is registered as delivery_mode=poll (no
# public URL — typical for external molecule-mcp standalone runtimes),
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
# instead of dispatching over HTTP. The message IS delivered (written
# to the platform's inbox queue); there's just no synchronous reply
# to relay. Pre-#2967 the client treated this as "unexpected response
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
# duplicates. The Queued prefix lets callers branch on this outcome
# explicitly: "delivered async, no synchronous reply expected" is
# different from both success-with-text and failure.
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
# tool args. ``_validate_peer_id`` enforces UUID-shape at the
# trust boundary so we never interpolate `..` or `/` into a URL path,
# never silently coerce malformed input into a 404, and surface a
# clear error to the agent rather than letting an HTTP 4xx bubble up
# from the platform with a generic error message.
#
# Lenient on case + whitespace because real-world peer-id strings
# come from list_peers/discover_peer responses (canonical lowercase)
# or hand-typed agent input (mixed-case acceptable). Strict on
# everything else.
_UUID_RE = re.compile(
r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
def _validate_peer_id(peer_id: str) -> str | None:
"""Return the canonicalised peer_id if valid, else None.
Returning None instead of raising so callers in tool surfaces can
convert to a friendly agent-facing string ("workspace_id is not a
valid UUID") rather than crashing with a stack trace.
"""
if not isinstance(peer_id, str):
return None
pid = peer_id.strip()
if not _UUID_RE.match(pid):
return None
return pid.lower()
async def discover_peer(target_id: str, source_workspace_id: str | None = None) -> dict | None:
"""Discover a peer workspace's URL via the platform registry.
Validates ``target_id`` is a UUID before constructing the URL — a
malformed id can't reach the platform handler now, which both
short-circuits an avoidable round-trip AND ensures we never
interpolate path-traversal characters into the URL.
``source_workspace_id`` selects which registered workspace asks the
question — both the X-Workspace-ID header AND the Authorization
bearer token must come from the same workspace, otherwise the
platform's TenantGuard rejects the request. Defaults to the
module-level WORKSPACE_ID for back-compat with single-workspace
callers.
"""
safe_id = _validate_peer_id(target_id)
if safe_id is None:
return None
src = (source_workspace_id or "").strip() or WORKSPACE_ID
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/registry/discover/{safe_id}",
headers={"X-Workspace-ID": src, **auth_headers(src)},
)
if resp.status_code == 200:
return resp.json()
return None
except Exception as e:
logger.error(f"Discovery failed for {target_id}: {e}")
return None
# httpx exception classes that indicate a transient transport-layer
# failure worth retrying — the request never produced an application
# response, so a fresh attempt has a real chance of succeeding. Any
# error not in this tuple is treated as deterministic (HTTP-status,
# JSON parse, runtime-returned JSON-RPC error, etc.) and surfaced to
# the caller on the first try.
#
# Why each one belongs here:
# - ConnectError / ConnectTimeout: peer's listening socket wasn't
# ready (mid-restart, not yet bound). Fast failure, fast recovery.
# - RemoteProtocolError: peer closed the TCP connection without
# writing a response — observed on 2026-04-27 when a peer's prior
# in-flight Claude SDK session aborted and the new request's
# connection was reset mid-handler.
# - ReadError / WriteError: TCP read/write socket error mid-flight,
# typically a network blip on the Docker bridge or a peer worker
# crash.
# - ReadTimeout: peer didn't write ANY response bytes within the
# 300s read budget. Distinct from "peer is slow but progressing"
# (which httpx surfaces as a successful read with chunked bytes).
# Retry budget caps the worst case — see _DELEGATE_TOTAL_BUDGET_S.
_TRANSIENT_HTTP_ERRORS: tuple[type[Exception], ...] = (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.WriteError,
httpx.RemoteProtocolError,
httpx.ReadTimeout,
)
# Retry budget. Up to 5 attempts (1 initial + 4 retries) with
# exponential backoff (1, 2, 4, 8 seconds), each backoff jittered ±25%
# to prevent synchronized retry storms across siblings if a peer flaps.
# _DELEGATE_TOTAL_BUDGET_S caps cumulative wall-clock so a string of
# ReadTimeouts can't make the caller wait 25 minutes — once the
# deadline elapses we stop retrying even if attempts remain. 600s = 10
# minutes is the agreed worst case the caller can tolerate before
# falling back to "peer unavailable" handling in tool_delegate_task.
_DELEGATE_MAX_ATTEMPTS = 5
_DELEGATE_BACKOFF_BASE_S = 1.0
_DELEGATE_BACKOFF_CAP_S = 16.0
_DELEGATE_TOTAL_BUDGET_S = 600.0
def _delegate_backoff_seconds(attempt_zero_indexed: int) -> float:
"""Return the (jittered) backoff delay before retrying after the
given attempt index (0 = backoff before retry #1).
Pure function so the schedule is unit-testable without monkey-
patching asyncio.sleep. Jitter is symmetric ±25% on top of the
capped exponential — enough to break sync across simultaneous
callers without making the schedule unpredictable.
"""
base = min(_DELEGATE_BACKOFF_BASE_S * (2 ** attempt_zero_indexed), _DELEGATE_BACKOFF_CAP_S)
jitter = base * (0.5 * random.random() - 0.25)
return max(0.0, base + jitter)
def _format_a2a_error(exc: BaseException, target_url: str) -> str:
"""Format an httpx exception as an [A2A_ERROR] string.
Some httpx exceptions stringify to empty (RemoteProtocolError,
ConnectionReset variants) — the canvas would then render
"[A2A_ERROR] " with no detail and the operator has no signal to
act on. Always include the exception class name and the target
URL so the activity log + Agent Comms panel have actionable
information without a trip through container logs.
"""
msg = str(exc).strip()
type_name = type(exc).__name__
if not msg:
detail = f"{type_name} (no message — likely connection reset or silent timeout)"
elif msg.startswith(f"{type_name}:") or msg.startswith(f"{type_name} "):
# Already prefixed with the type — don't double-prefix.
# Prefix-anchored check (not substring) so a message that
# happens to mention some OTHER class name mid-string
# (e.g. "got OSError on read") doesn't suppress our own
# type prefix and lose the diagnostic signal.
detail = msg
else:
detail = f"{type_name}: {msg}"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str | None = None) -> str:
"""Send an A2A ``message/send`` to a peer workspace via the platform proxy.
The target URL is constructed internally as
``${PLATFORM_URL}/workspaces/{peer_id}/a2a``. Going through the
platform's A2A proxy is the only path that works for both
in-container and external runtimes — see
a2a_tools.tool_delegate_task for the rationale.
``source_workspace_id`` is the SENDING workspace — drives both the
X-Workspace-ID source-tagging header and the bearer token. Defaults
to the module-level WORKSPACE_ID for back-compat. Multi-workspace
operators pass it explicitly so each registered workspace's peers
are reached via their own auth chain.
Auto-retries up to _DELEGATE_MAX_ATTEMPTS times on transient
transport-layer errors (RemoteProtocolError, ConnectError,
ReadTimeout, etc.) with exponential-backoff + jitter, capped by
_DELEGATE_TOTAL_BUDGET_S. Application-level failures (HTTP 4xx,
JSON-RPC error response, malformed JSON) are NOT retried — they
indicate a deterministic problem retry won't fix.
"""
safe_id = _validate_peer_id(peer_id)
if safe_id is None:
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
src = (source_workspace_id or "").strip() or WORKSPACE_ID
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
timeout_cfg = httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
deadline = time.monotonic() + _DELEGATE_TOTAL_BUDGET_S
last_exc: BaseException | None = None
for attempt in range(_DELEGATE_MAX_ATTEMPTS):
async with httpx.AsyncClient(timeout=timeout_cfg) as client:
try:
# self_source_headers() includes X-Workspace-ID so the
# platform's a2a_receive logger records source_id =
# WORKSPACE_ID. Otherwise peer-A2A messages — including
# the case where target_url resolves to this workspace's
# own /a2a — get logged with source_id=NULL and surface
# in the recipient's My Chat tab as user-typed input.
resp = await client.post(
target_url,
headers=self_source_headers(src),
json={
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
},
)
data = resp.json()
# Dispatch via the SSOT response model (a2a_response.py).
# All shape detection lives in one place — the parser
# never raises and routes unknown shapes to Malformed
# so a future server-side change is loud, not silent.
variant = a2a_response.parse(data)
if isinstance(variant, a2a_response.Result):
# Match legacy semantics:
# parts non-empty + first part has no text → ""
# parts empty → "(no response)"
# Differentiation matters for callers that assert
# on the empty-string case (test_a2a_client).
if variant.parts:
text = variant.text
else:
text = "(no response)"
# Tag child-reported errors so the caller can
# detect them reliably — agent-side bug surfaces
# text like "Agent error: <traceback>" inside a
# JSON-RPC success envelope.
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
if isinstance(variant, a2a_response.Queued):
# Poll-mode peer — message accepted into the inbox
# queue, target agent will fetch via poll. NOT a
# failure. Return the queued sentinel so callers
# (delegate_task etc.) can render the outcome
# accurately instead of treating it as an error.
logger.info(
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
target_url,
variant.method,
)
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
if isinstance(variant, a2a_response.Error):
msg = variant.message
code = variant.code
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
detail = msg
elif code is not None:
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
if variant.restarting:
# Surface platform-restart-in-progress
# explicitly — caller (UI / delegating agent)
# can render a softer "agent is restarting"
# message rather than a generic failure.
retry = (
f", retry_after={variant.retry_after}s"
if variant.retry_after is not None
else ""
)
detail = f"{detail} (restarting{retry})"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
# Malformed — log loud + surface as error so the
# operator notices a server change. SSOT refactor
# subsumes the inline "queued" check that landed in
# the #2972 hotfix; that branch is now the typed
# Queued variant above.
logger.warning(
"send_a2a_message: malformed response (target=%s body=%.200s)",
target_url,
str(variant.raw),
)
return (
f"{_A2A_ERROR_PREFIX}unexpected response shape "
f"(no result, error, or queued envelope): "
f"{str(variant.raw)[:200]} [target={target_url}]"
)
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
if attempts_remaining <= 0 or time.monotonic() >= deadline:
# Out of attempts OR out of total budget — surface
# the last error to the caller.
break
delay = _delegate_backoff_seconds(attempt)
# Don't sleep past the deadline — clamp.
remaining = deadline - time.monotonic()
if delay > remaining:
delay = max(0.0, remaining)
logger.warning(
"send_a2a_message: transient %s on attempt %d/%d, retrying in %.1fs (target=%s)",
type(e).__name__,
attempt + 1,
_DELEGATE_MAX_ATTEMPTS,
delay,
target_url,
)
await asyncio.sleep(delay)
continue
except Exception as e:
# Non-transient (HTTP-status, JSON parse, etc.) — don't retry.
return _format_a2a_error(e, target_url)
# Retries exhausted (or budget elapsed). last_exc must be set
# because we only break out of the loop after assigning it.
assert last_exc is not None # noqa: S101
return _format_a2a_error(last_exc, target_url)
async def get_peers_with_diagnostic(source_workspace_id: str | None = None) -> tuple[list[dict], str | None]:
"""Get this workspace's peers, returning (peers, diagnostic).
diagnostic is None when the call succeeded (status 200, even if the list
is empty). When peers is [] for a non-trivial reason (auth failure,
workspace-id missing from registry, platform error, network error),
diagnostic is a short human-readable string explaining what went wrong
so callers can surface it instead of "may be isolated" — see #2397.
``source_workspace_id`` selects which registered workspace's peers to
enumerate; defaults to the module-level WORKSPACE_ID for
single-workspace back-compat. Multi-workspace operators iterate over
each registered workspace separately so each set of peers is fetched
with the correct auth.
The legacy get_peers() shim below preserves the bare-list contract for
non-tool callers.
"""
src = (source_workspace_id or "").strip() or WORKSPACE_ID
url = f"{PLATFORM_URL}/registry/{src}/peers"
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
url,
headers={"X-Workspace-ID": src, **auth_headers(src)},
)
except Exception as e:
return [], f"Cannot reach platform at {PLATFORM_URL}: {e}"
if resp.status_code == 200:
try:
data = resp.json()
except Exception as e:
return [], f"Platform returned 200 but body was not JSON: {e}"
if not isinstance(data, list):
return [], f"Platform returned 200 but body was not a list: {type(data).__name__}"
return data, None
if resp.status_code in (401, 403):
return [], (
f"Authentication to platform failed (HTTP {resp.status_code}). "
"The workspace bearer token may be invalid — restarting the workspace usually re-mints it."
)
if resp.status_code == 404:
return [], (
f"Workspace ID {WORKSPACE_ID} is not registered with the platform (HTTP 404). "
"Re-registration via the platform's /registry/register endpoint is needed."
)
if 500 <= resp.status_code < 600:
return [], f"Platform error: HTTP {resp.status_code}."
return [], f"Unexpected platform response: HTTP {resp.status_code}."
async def get_peers() -> list[dict]:
"""Get this workspace's peers from the platform registry.
Bare-list shim over get_peers_with_diagnostic() — discards the diagnostic
so callers that don't care about the failure reason (e.g. system-prompt
bootstrap formatters) get the same shape they always had.
"""
peers, _ = await get_peers_with_diagnostic()
return peers
async def get_workspace_info(source_workspace_id: str | None = None) -> dict:
"""Get this workspace's info from the platform.
``source_workspace_id`` selects which registered workspace to
introspect when the agent is registered into multiple workspaces
(multi-workspace mode). Unset → defaults to the module-level
WORKSPACE_ID — single-workspace operators see no behaviour change.
Distinguishes three failure shapes so callers can handle them
distinctly (#2429):
- 410 Gone → workspace was deleted; re-onboard required
- 404 / other → workspace never existed (or transient)
- exception → network / auth failure
"""
src = source_workspace_id or WORKSPACE_ID
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{src}",
headers=auth_headers(src),
)
if resp.status_code == 200:
return resp.json()
if resp.status_code == 410:
# #2429: platform returns 410 when status='removed'.
# Surface "removed" + the actionable hint so callers
# can prompt re-onboard instead of falling through to
# "not found" — which made the 2026-04-30 incident
# impossible to diagnose ("workspace not found" with
# a workspace_id we KNEW we'd just registered).
try:
body = resp.json()
except Exception:
body = {}
return {
"error": "removed",
"id": body.get("id", src),
"removed_at": body.get("removed_at"),
"hint": body.get(
"hint",
"Workspace was deleted on the platform. "
"Regenerate workspace + token from the canvas → Tokens tab.",
),
}
return {"error": "not found"}
except Exception as e:
return {"error": str(e)}