From 8b9f80996690d8c312f82d076a0b16f7dfbcc9c0 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 17:16:51 -0700 Subject: [PATCH 1/3] =?UTF-8?q?fix(a2a):=20SSOT=20response=20parser=20?= =?UTF-8?q?=E2=80=94=20handle=20poll-mode=20queued=20envelope=20(#2967)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce ``workspace/a2a_response.py`` as the single source of truth for the wire shapes the workspace-server proxy can return at ``/workspaces//a2a``: * ``Result`` — JSON-RPC success * ``Error`` — JSON-RPC error or platform-level error (with restart-in-progress metadata when present) * ``Queued`` — poll-mode short-circuit envelope: the platform queued the message into the target's inbox, the target will fetch via /activity poll * ``Malformed`` — anything the parser can't classify (logged at WARNING so a future server change is loud) ``send_a2a_message`` (in ``a2a_client.py``) now dispatches via ``a2a_response.parse(data)`` instead of inline ``"result" in data`` / ``"error" in data`` sniffing. The Queued variant returns a new ``_A2A_QUEUED_PREFIX`` sentinel so callers can distinguish "delivered async, no synchronous reply" from both success-with-text and failure. reno-stars production data caught two intermittent failures that both reduced to the same root cause: 1. **File transfer announce silently failed** — when CEO Ryan PC (poll-mode external molecule-mcp) sent the harmi.zip announcement to Reno Stars Business Intelligent (also poll-mode external), ``send_a2a_message`` saw the platform's poll-queued envelope ``{"status":"queued","delivery_mode":"poll","method":"..."}``, didn't recognize it as the synthetic delivery-acknowledgement it is, and returned ``[A2A_ERROR] unexpected response shape``. The agent fell back to a chunk-shipping path; receiver did get the file but operator-facing logs showed a failure that didn't actually fail. 2. **Duplicated agent comm** — same bug, inverted direction. d76 delegated to 67d, send_a2a_message returned the unexpected-shape error, delegate_task wrapped it as DELEGATION FAILED, the calling agent retried with sharper wording, the recipient saw the same request twice and self-reported "二次请求 — 我先不执行". External molecule-mcp standalone runtimes are inherently poll-mode (they have no public URL), so every external↔external A2A pair was hitting this on every send. The pre-fix client only handled JSON-RPC ``result``/``error`` keys and treated the queued envelope (which has neither) as malformed. RFC #2339 PR 2 added the queued envelope on the server side; the client never caught up. When ``send_a2a_message`` returns the ``_A2A_QUEUED_PREFIX`` sentinel, ``tool_delegate_task`` now transparently falls back to ``_delegate_sync_via_polling`` (RFC #2829 PR-5's durable ``/delegate`` + ``/delegations`` polling path, which DOES work for poll-mode peers because the platform's executeDelegation goroutine writes to the inbox queue and the result row arrives when the target picks it up + replies). The agent gets a real synchronous reply instead of the empty queued sentinel. * ``test_a2a_response.py`` — 62 tests, **100% line coverage** on the parser (verified via ``coverage run --source=a2a_response``). Includes adversarial-input fuzzing across ~25 pathological payloads — parser must never raise. * ``test_a2a_client.py::TestSendA2AMessagePollMode`` — 4 tests for the new Queued/Error wiring in ``send_a2a_message``. * ``test_delegation_sync_via_polling.py::TestPollModeAutoFallback`` — 3 tests for the auto-fallback in ``tool_delegate_task``, including negative cases (push-mode reply must NOT trigger fallback; genuine error must NOT silently retry). * **Verified all new tests FAIL on pre-fix source** by stashing a2a_client.py + a2a_tools_delegation.py and re-running — 5 failures including ImportError for the missing ``_A2A_QUEUED_PREFIX``. Per the operator-debuggability directive: * INFO at every Queued classification (expected variant; operator sees normal poll-mode-peer queueing in log stream). * INFO at the auto-fallback decision in ``tool_delegate_task`` so a future operator can correlate "send returned queued → falling back to polling path" without reading the source. * WARNING at every Malformed classification (server contract drift; operator MUST see this immediately). * Existing transient-retry WARNING preserved. * Mirror Go-side typed model in workspace-server. The wire shape is documented in ``a2a_response.py``'s module docstring with file:line pointers to the canonical emitters; a future PR can introduce ``models/a2a_response.go`` without changing wire behavior. The fixture corpus in ``test_a2a_response.py`` is designed so a one-sided edit breaks CI. * ``send_message_to_user`` and ``chat_upload_receive`` use a different endpoint (``/notify``) and aren't affected by this bug; their parsing stays unchanged. * 135 tests pass across ``test_a2a_response.py`` + ``test_a2a_client.py`` + ``test_delegation_sync_via_polling.py`` + ``test_a2a_tools_impl.py``. * ``coverage run --source=a2a_response -m pytest`` reports 100% line coverage with 0 missing. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 102 ++-- workspace/a2a_response.py | 246 ++++++++++ workspace/a2a_tools_delegation.py | 27 ++ workspace/tests/test_a2a_client.py | 92 ++++ workspace/tests/test_a2a_response.py | 455 ++++++++++++++++++ .../tests/test_delegation_sync_via_polling.py | 118 +++++ 6 files changed, 1013 insertions(+), 27 deletions(-) create mode 100644 workspace/a2a_response.py create mode 100644 workspace/tests/test_a2a_response.py diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 16eb4b9c..8e499f40 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor import httpx +import a2a_response from platform_auth import auth_headers, self_source_headers logger = logging.getLogger(__name__) @@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str: # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " +# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967). +# When the target workspace is registered as delivery_mode=poll (no +# public URL — typical for external molecule-mcp standalone runtimes), +# the platform's a2a_proxy.go:402 short-circuit returns a synthetic +# {"status":"queued","delivery_mode":"poll","method":"..."} envelope +# instead of dispatching over HTTP. The message IS delivered (written +# to the platform's inbox queue); there's just no synchronous reply +# to relay. Pre-#2967 the client treated this as "unexpected response +# shape" → caller saw DELEGATION FAILED → retried → recipient saw +# duplicates. The Queued prefix lets callers branch on this outcome +# explicitly: "delivered async, no synchronous reply expected" is +# different from both success-with-text and failure. +_A2A_QUEUED_PREFIX = "[A2A_QUEUED] " + # Workspace IDs are UUIDs everywhere we generate them (platform's # workspaces.id column, /registry/discover/:id route param, etc.) but # the agent-facing tool surface receives them as free-form strings via @@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str }, ) data = resp.json() - if "result" in data: - parts = data["result"].get("parts", []) - text = parts[0].get("text", "") if parts else "(no response)" - # Tag child-reported errors so the caller can detect them reliably + # Dispatch via the SSOT response model (a2a_response.py). + # All shape detection lives in one place — the parser + # never raises and routes unknown shapes to Malformed + # so a future server-side change is loud, not silent. + variant = a2a_response.parse(data) + if isinstance(variant, a2a_response.Result): + # Match legacy semantics: + # parts non-empty + first part has no text → "" + # parts empty → "(no response)" + # Differentiation matters for callers that assert + # on the empty-string case (test_a2a_client). + if variant.parts: + text = variant.text + else: + text = "(no response)" + # Tag child-reported errors so the caller can + # detect them reliably — agent-side bug surfaces + # text like "Agent error: " inside a + # JSON-RPC success envelope. if text.startswith("Agent error:"): return f"{_A2A_ERROR_PREFIX}{text}" return text - elif "error" in data: - err = data["error"] - msg = (err.get("message") or "").strip() - code = err.get("code") + if isinstance(variant, a2a_response.Queued): + # Poll-mode peer — message accepted into the inbox + # queue, target agent will fetch via poll. NOT a + # failure. Return the queued sentinel so callers + # (delegate_task etc.) can render the outcome + # accurately instead of treating it as an error. + logger.info( + "send_a2a_message: queued for poll-mode peer (target=%s method=%s)", + target_url, + variant.method, + ) + return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}" + if isinstance(variant, a2a_response.Error): + msg = variant.message + code = variant.code if msg and code is not None: detail = f"{msg} (code={code})" elif msg: @@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str detail = f"JSON-RPC error with no message (code={code})" else: detail = "JSON-RPC error with no message" + if variant.restarting: + # Surface platform-restart-in-progress + # explicitly — caller (UI / delegating agent) + # can render a softer "agent is restarting" + # message rather than a generic failure. + retry = ( + f", retry_after={variant.retry_after}s" + if variant.retry_after is not None + else "" + ) + detail = f"{detail} (restarting{retry})" return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" - elif data.get("status") == "queued" and data.get("delivery_mode") == "poll": - # Workspace-server's poll-mode short-circuit envelope - # (workspace-server/internal/handlers/a2a_proxy.go ~line 402). - # The peer is poll-mode and has no URL to dispatch to, so - # the server queued the message for the peer's next inbox - # poll instead of forwarding it. Delivery is acknowledged - # but pending consumption. - # - # Pre-fix this fell through to the "unexpected response - # shape" error path → callers logged false failures, then - # delegate_task retried, and the peer received duplicate - # delegations. Issue #2967. - method = data.get("method") or "message/send" - logger.info( - "send_a2a_message: queued for poll-mode peer (method=%s, target=%s)", - method, target_url, - ) - return f"queued for poll-mode peer (method={method})" - return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" + # Malformed — log loud + surface as error so the + # operator notices a server change. SSOT refactor + # subsumes the inline "queued" check that landed in + # the #2972 hotfix; that branch is now the typed + # Queued variant above. + logger.warning( + "send_a2a_message: malformed response (target=%s body=%.200s)", + target_url, + str(variant.raw), + ) + return ( + f"{_A2A_ERROR_PREFIX}unexpected response shape " + f"(no result, error, or queued envelope): " + f"{str(variant.raw)[:200]} [target={target_url}]" + ) except _TRANSIENT_HTTP_ERRORS as e: last_exc = e attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1) diff --git a/workspace/a2a_response.py b/workspace/a2a_response.py new file mode 100644 index 00000000..ae48465a --- /dev/null +++ b/workspace/a2a_response.py @@ -0,0 +1,246 @@ +"""Single source of truth for A2A ``/workspaces//a2a`` response shapes. + +The workspace-server proxy at +``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical +emitter) returns one of the following shapes for a single A2A call: + + * **JSON-RPC success** — + ``{"jsonrpc": "2.0", "result": {...}, "id": "..."}`` + The agent's reply, passed through unchanged. + + * **JSON-RPC error** — + ``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}`` + The agent reported a structured error. + + * **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see + ``a2a_proxy.go:402-406``) — + ``{"status": "queued", "delivery_mode": "poll", "method": "..."}`` + The target is a poll-mode workspace (no public URL); the message + was written to the platform's inbox queue. The target agent will + fetch it via ``GET /activity?since_id=`` polling. NOT a failure — + delivery succeeded, there's just no synchronous reply to relay. + + * **Platform error** — ``{"error": "...", "restarting": true?, "retry_after": int?}`` + HTTP-level failure synthesized by the proxy when the agent is + unreachable, the container is restarting, or some other infrastructure + failure happened. ``restarting=true`` flags the platform-initiated + container-restart path. + + * **Malformed** — anything else. Surfaced explicitly so a future server + change is loud rather than silent. + +The ``parse(data)`` function classifies a pre-decoded JSON body into a +typed variant. Callers ``match`` on the variant and never re-implement +shape detection — that's the SSOT discipline. + +# SSOT contract + +This file is the Python half. The Go server emits these shapes today +via inline ``gin.H{...}`` literals. A future PR can introduce a Go +mirror (e.g. ``workspace-server/internal/models/a2a_response.go``) +with a typed marshaller — until then, **any change to the wire shape +must be reflected here** and gated by ``test_a2a_response.py``'s +fixture corpus. The corpus exists specifically so a one-sided edit +breaks CI. + +# Why a typed model (vs. dict-key sniffing at every site) + +The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result`` +or ``error`` keys inline and treated everything else as malformed — +which silently broke poll-mode peers (the queued envelope has neither +key). Inline sniffing per call site multiplies the surface area where +a new shape gets misclassified. A single typed parser with an +explicit ``Malformed`` escape hatch makes shape additions a +one-line change here + a fixture entry in the test corpus, instead of +a hunt through every parsing site in the runtime. +""" +from __future__ import annotations + +import dataclasses +import logging +from typing import Any, Optional, Union + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class Result: + """JSON-RPC success — agent's reply available synchronously. + + ``text`` is the convenience extraction from ``parts[0].text`` (the + A2A multipart shape). ``parts`` is the full list, available for + callers that need richer rendering (multiple parts, non-text parts). + ``raw_result`` preserves the unparsed ``result`` field for any + caller that needs it (e.g. activity-row response_body audit). + """ + + text: str + parts: list[dict[str, Any]] = dataclasses.field(default_factory=list) + raw_result: Optional[dict[str, Any]] = None + + +@dataclasses.dataclass(frozen=True) +class Error: + """JSON-RPC error or platform-level error response. + + ``code`` is the JSON-RPC integer code when present, else None. + ``restarting`` / ``retry_after`` are platform-restart-in-progress + metadata: when both are set, the caller knows the container is + being recycled and may surface a softer error to the user. + """ + + message: str + code: Optional[int] = None + restarting: bool = False + retry_after: Optional[int] = None + + +@dataclasses.dataclass(frozen=True) +class Queued: + """Platform poll-mode short-circuit — message accepted, peer will pick up async. + + Returned when the target workspace is registered as + ``delivery_mode=poll`` (no public URL — typical for external + standalone ``molecule-mcp`` runtimes). The message was written to + the platform's inbox queue; the target agent will fetch it via + ``GET /activity?since_id=`` polling. + + NOT a failure. Callers that expect a synchronous reply (the agent's + response text) won't get one here — they should either: + + * Tolerate the absence of a reply (fire-and-forget semantics). + * Fall back to the durable ``/workspaces/:id/delegate`` + + ``/delegations`` polling path (see ``a2a_tools_delegation``'s + ``_delegate_sync_via_polling``), which writes the same A2A + request through the platform's executeDelegation goroutine + and lets the caller poll for the result row. + + ``method`` echoes the request method (``message/send``, ``notify``, + etc.) so callers can correlate. + """ + + method: str + delivery_mode: str = "poll" + + +@dataclasses.dataclass(frozen=True) +class Malformed: + """Server returned a body the parser can't classify. + + Carries the raw decoded payload for diagnostic logging. Callers + typically render this as an error to the user (see + ``send_a2a_message``) — but the Malformed variant is a separate + type so logging / metrics can distinguish it from genuine + JSON-RPC ``Error`` responses. + """ + + raw: Any # whatever the server returned: dict / list / str / number / etc. + + +Variant = Union[Result, Error, Queued, Malformed] + + +# Field-name constants — the wire vocabulary. Single source of truth; +# the parser references these by name so a change here is a +# one-line edit instead of a hunt through string literals. +_KEY_RESULT = "result" +_KEY_ERROR = "error" +_KEY_STATUS = "status" +_KEY_DELIVERY_MODE = "delivery_mode" +_KEY_METHOD = "method" +_KEY_RESTARTING = "restarting" +_KEY_RETRY_AFTER = "retry_after" + +_STATUS_QUEUED = "queued" +_DELIVERY_MODE_POLL = "poll" + + +def parse(data: Any) -> Variant: + """Classify a pre-decoded ``/a2a`` JSON response into a typed variant. + + Never raises. Every branch is total: any input that doesn't match a + known shape routes to ``Malformed`` so the caller can decide how + to surface it. + + The order of checks matters: + + 1. Non-dict input → Malformed (server contract is dict-shaped). + 2. Poll-queued envelope is checked BEFORE result/error because a + server bug that sets both ``status=queued`` and ``result`` + should be loud, not silently treated as Result. + 3. ``result`` → Result (the JSON-RPC success path). + 4. ``error`` → Error (JSON-RPC error or platform error). + 5. Anything else → Malformed. + """ + if not isinstance(data, dict): + logger.warning( + "a2a_response.parse: non-dict body — got %s", + type(data).__name__, + ) + return Malformed(raw=data) + + # Poll-queued envelope. Both keys must be present — the workspace + # server sets them together; if only one is present the body is + # ambiguous and we route to Malformed for visibility. + if ( + data.get(_KEY_STATUS) == _STATUS_QUEUED + and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL + ): + method_raw = data.get(_KEY_METHOD) + method = str(method_raw) if method_raw is not None else "unknown" + logger.info( + "a2a_response.parse: queued for poll-mode peer (method=%s)", + method, + ) + return Queued(method=method) + + # JSON-RPC success. + if _KEY_RESULT in data: + result = data[_KEY_RESULT] + if isinstance(result, dict): + parts_raw = result.get("parts") + parts = parts_raw if isinstance(parts_raw, list) else [] + text = "" + if parts: + first = parts[0] + if isinstance(first, dict): + text_raw = first.get("text") + text = str(text_raw) if text_raw is not None else "" + return Result(text=text, parts=parts, raw_result=result) + # ``result`` present but not a dict — unusual but not an error; + # surface as a Result with the value rendered to text. + return Result(text=str(result), parts=[], raw_result=None) + + # JSON-RPC error or platform error. + if _KEY_ERROR in data: + err_raw = data[_KEY_ERROR] + message = "" + code: Optional[int] = None + if isinstance(err_raw, dict): + msg_raw = err_raw.get("message") + if msg_raw is not None: + message = str(msg_raw).strip() + code_raw = err_raw.get("code") + if isinstance(code_raw, int): + code = code_raw + elif isinstance(err_raw, str): + message = err_raw.strip() + else: + message = str(err_raw) + + restarting = bool(data.get(_KEY_RESTARTING, False)) + retry_after_raw = data.get(_KEY_RETRY_AFTER) + retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None + + return Error( + message=message, + code=code, + restarting=restarting, + retry_after=retry_after, + ) + + logger.warning( + "a2a_response.parse: unrecognized shape — keys=%s", + sorted(data.keys()), + ) + return Malformed(raw=data) diff --git a/workspace/a2a_tools_delegation.py b/workspace/a2a_tools_delegation.py index 170a5333..79f42fd1 100644 --- a/workspace/a2a_tools_delegation.py +++ b/workspace/a2a_tools_delegation.py @@ -29,14 +29,18 @@ from __future__ import annotations import hashlib import json +import logging import os import httpx +logger = logging.getLogger(__name__) + from a2a_client import ( PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _A2A_QUEUED_PREFIX, _peer_names, _peer_to_source, discover_peer, @@ -245,6 +249,29 @@ async def tool_delegate_task( # (the platform proxy) so the same code works for in-container and # external (standalone molecule-mcp) callers. result = await send_a2a_message(workspace_id, task, source_workspace_id=src) + # #2967: when the target is a poll-mode peer, the platform's + # a2a_proxy short-circuits and returns a queued envelope — + # send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX + # sentinel. The synchronous proxy path can't deliver a reply + # because the target has no public URL; fall back to the + # durable /delegate + /delegations polling path which DOES + # work for poll-mode peers (the executeDelegation goroutine + # writes to the inbox queue and the result row arrives when + # the target picks it up + replies). + # + # This is what makes external-runtime-to-external-runtime + # A2A actually deliver synchronous replies — without the + # fallback the calling agent sees the queued sentinel as + # success-with-no-text and never gets the peer's response. + if result.startswith(_A2A_QUEUED_PREFIX): + logger.info( + "tool_delegate_task: target=%s is poll-mode; " + "falling back from message/send to /delegate-poll path", + workspace_id, + ) + result = await _delegate_sync_via_polling( + workspace_id, task, src or WORKSPACE_ID, + ) # Detect delegation failures — wrap them clearly so the calling agent # can decide to retry, use another peer, or handle the task itself. diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 068fbffd..97a8c739 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -462,6 +462,98 @@ def _make_seq_mock_client(post_side_effect): return mock_client +class TestSendA2AMessagePollMode: + """Pin the #2967 fix: send_a2a_message recognizes the platform's + poll-mode short-circuit envelope and returns a queued sentinel + instead of an "unexpected response shape" error. + + Pre-#2967 the client treated the queued envelope as malformed, + causing the calling agent to retry, which delivered the same + message twice to the (poll-mode) recipient. The Queued sentinel + lets delegate_task fall back to the durable polling path + transparently — see test_delegation_sync_via_polling for the + fallback verification. + """ + + async def test_poll_queued_envelope_returns_queued_sentinel(self): + # Workspace-server returns this shape (a2a_proxy.go:402-406) + # when the target workspace is registered as delivery_mode=poll + # (no public URL, typical for external molecule-mcp standalone + # runtimes). + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Sentinel + structured payload so callers can branch on it. + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + # Critically: NOT the error sentinel. Pre-#2967 it was the error path. + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + # Carries enough info for the caller to log meaningfully. + assert _TEST_PEER_ID in result + assert "message/send" in result + + async def test_poll_queued_envelope_method_is_recorded(self): + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + assert "notify" in result + + async def test_status_queued_without_delivery_mode_is_unexpected_shape(self): + # Server bug: only ``status=queued`` set, ``delivery_mode`` + # missing. Surface as the malformed branch (not Queued) — the + # SSOT parser treats this as Malformed because the documented + # contract requires both keys. + import a2a_client + + resp = _make_response(200, {"status": "queued", "method": "message/send"}) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "unexpected response shape" in result + # Must explicitly mention "or queued envelope" so an operator + # debugging this knows the parser HAS a Queued branch and the + # body just didn't match — not that the parser is missing the + # logic entirely (the pre-#2967 confusion). + assert "queued envelope" in result + + async def test_platform_error_with_restart_metadata_surfaces_in_message(self): + # The platform error envelope: 503 with restart metadata. + # Surfaced as an error string that includes "restarting" so + # the caller / agent can render a softer error to the user. + import a2a_client + + resp = _make_response(200, { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "restarting" in result + assert "retry_after=15" in result + + class TestSendA2AMessageRetry: """Verify auto-retry on transient transport errors (RemoteProtocolError, ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times. diff --git a/workspace/tests/test_a2a_response.py b/workspace/tests/test_a2a_response.py new file mode 100644 index 00000000..cf254b36 --- /dev/null +++ b/workspace/tests/test_a2a_response.py @@ -0,0 +1,455 @@ +"""Tests for the A2A response SSOT parser (workspace/a2a_response.py). + +Branch coverage target: 100%. Each variant of ``parse()`` exercised in +isolation, plus adversarial-input fuzzing to assert the parser never +raises. + +Pre-#2967, the response shape was sniffed inline at every call site +(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` / +``"error" in data`` checks). The bare ``else`` returned an +"unexpected response shape" error — which silently broke poll-mode +peers because the workspace-server's poll-queued envelope has neither +``result`` nor ``error``. The SSOT parser has an explicit ``Queued`` +variant for that path and routes anything truly unrecognized to +``Malformed`` so a future server-side change fails loudly. + +The "this test FAILS on pre-fix source" guarantee is enforced by +running the legacy-shape sniffer alongside the new parser in +``test_legacy_sniffer_misclassified_queued`` — that test fails on +the pre-#2967 ``a2a_client.py`` shape because the legacy code +returns the unexpected-shape error path for the Queued envelope. +""" +from __future__ import annotations + +import logging +from typing import Any + +import pytest + +import a2a_response + + +# ============== Fixture corpus — the canonical wire shapes ============== + + +# Every shape below mirrors a path the workspace-server's a2a_proxy.go +# can return. When you add a new server-side response shape, add a +# fixture entry here and a corresponding test method below. +_FIXTURES = { + "jsonrpc_success_with_text": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [{"kind": "text", "text": "hello world"}], + }, + }, + "jsonrpc_success_multipart": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [ + {"kind": "text", "text": "first"}, + {"kind": "text", "text": "second"}, + ], + }, + }, + "jsonrpc_success_no_parts": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {}, + }, + "jsonrpc_success_part_no_text_key": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {"parts": [{"kind": "text"}]}, + }, + "jsonrpc_error_with_message_and_code": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited", "code": -32003}, + }, + "jsonrpc_error_message_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited"}, + }, + "jsonrpc_error_code_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"code": -32603}, + }, + "jsonrpc_error_string_form": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": "string-shaped error", + }, + "platform_error_with_restart": { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }, + "platform_error_plain": { + "error": "workspace not found", + }, + "poll_queued_full": { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }, + "poll_queued_notify": { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }, + "poll_queued_no_method": { + "status": "queued", + "delivery_mode": "poll", + }, + "malformed_empty_dict": {}, + "malformed_unexpected_keys": {"foo": "bar", "baz": 42}, + "malformed_status_queued_no_delivery_mode": { + # Server bug — status set but delivery_mode missing. + # Should be Malformed, not Queued, because the contract says both. + "status": "queued", + }, + "malformed_delivery_mode_no_status": { + "delivery_mode": "poll", + }, +} + + +# ============== Variant-by-variant coverage ============== + + +class TestQueuedVariant: + """``parse()`` recognizes the workspace-server poll-mode short-circuit + envelope (a2a_proxy.go:402-406) and returns ``Queued``.""" + + def test_full_envelope_with_method_message_send(self): + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "poll" + + def test_envelope_with_method_notify(self): + v = a2a_response.parse(_FIXTURES["poll_queued_notify"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "notify" + + def test_envelope_missing_method_uses_unknown_sentinel(self): + # Envelope without ``method`` key — server contract should + # always set it, but the parser must not raise on absence. + v = a2a_response.parse(_FIXTURES["poll_queued_no_method"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "unknown" + + def test_status_queued_alone_is_malformed_not_queued(self): + # ``status=queued`` without ``delivery_mode=poll`` does not match + # the documented envelope. Surface as Malformed for visibility. + v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"]) + assert isinstance(v, a2a_response.Malformed) + + def test_delivery_mode_alone_is_malformed_not_queued(self): + v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"]) + assert isinstance(v, a2a_response.Malformed) + + def test_logs_info_on_queued(self, caplog): + # Comprehensive logging — operator should see queued events at INFO. + with caplog.at_level(logging.INFO, logger="a2a_response"): + a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert any("queued for poll-mode peer" in r.message for r in caplog.records) + + +class TestResultVariant: + """``parse()`` extracts the JSON-RPC ``result`` envelope into + ``Result(text, parts, raw_result)``.""" + + def test_simple_text_result(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello world" + assert len(v.parts) == 1 + assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]} + + def test_multipart_result_extracts_first_part_text(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "first" + assert len(v.parts) == 2 + + def test_result_with_no_parts(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == [] + + def test_part_without_text_key(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"]) + assert isinstance(v, a2a_response.Result) + # No "text" key — extracted text is empty, parts list intact. + assert v.text == "" + assert len(v.parts) == 1 + + def test_result_non_dict_returns_text_form(self): + # Pathological but legal: ``result`` is a string instead of a dict. + v = a2a_response.parse({"result": "hello"}) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello" + assert v.parts == [] + + def test_result_takes_precedence_when_no_queued_envelope(self): + # Both ``result`` and ``error`` keys present — result wins + # because it's checked first after the Queued path. + v = a2a_response.parse({ + "result": {"parts": [{"kind": "text", "text": "ok"}]}, + "error": {"message": "should-be-ignored"}, + }) + assert isinstance(v, a2a_response.Result) + assert v.text == "ok" + + def test_part_with_non_dict_first_entry(self): + # ``parts[0]`` is a string instead of a dict — parser tolerates it, + # text falls back to empty. + v = a2a_response.parse({"result": {"parts": ["bare-string"]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == ["bare-string"] + + def test_part_text_value_none(self): + # ``parts[0].text`` is explicitly None — extracted as "". + v = a2a_response.parse({"result": {"parts": [{"text": None}]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + + def test_parts_not_a_list(self): + # Server bug: ``parts`` is a dict instead of a list. Parser falls + # back to empty parts rather than raising. + v = a2a_response.parse({"result": {"parts": {"oops": True}}}) + assert isinstance(v, a2a_response.Result) + assert v.parts == [] + assert v.text == "" + + +class TestErrorVariant: + """``parse()`` extracts ``error`` envelopes into ``Error`` and + annotates platform-restart metadata when present.""" + + def test_message_and_code(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code == -32003 + assert v.restarting is False + assert v.retry_after is None + + def test_message_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code is None + + def test_code_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "" + assert v.code == -32603 + + def test_error_string_form(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "string-shaped error" + assert v.code is None + + def test_error_non_dict_non_string(self): + v = a2a_response.parse({"error": 12345}) + assert isinstance(v, a2a_response.Error) + assert v.message == "12345" + + def test_platform_error_with_restart_metadata(self): + v = a2a_response.parse(_FIXTURES["platform_error_with_restart"]) + assert isinstance(v, a2a_response.Error) + assert "workspace agent unreachable" in v.message + assert v.restarting is True + assert v.retry_after == 15 + + def test_platform_error_without_restart(self): + v = a2a_response.parse(_FIXTURES["platform_error_plain"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "workspace not found" + assert v.restarting is False + assert v.retry_after is None + + def test_error_message_with_whitespace_stripped(self): + v = a2a_response.parse({"error": {"message": " trimmed "}}) + assert isinstance(v, a2a_response.Error) + assert v.message == "trimmed" + + def test_non_int_code_dropped(self): + v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}}) + assert isinstance(v, a2a_response.Error) + assert v.code is None + + def test_non_int_retry_after_dropped(self): + v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"}) + assert isinstance(v, a2a_response.Error) + assert v.retry_after is None + + +class TestMalformedVariant: + """``parse()`` returns ``Malformed`` for any shape it can't classify + and logs at WARNING so operators see new server response shapes.""" + + def test_empty_dict(self): + v = a2a_response.parse(_FIXTURES["malformed_empty_dict"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {} + + def test_unexpected_keys(self): + v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {"foo": "bar", "baz": 42} + + def test_non_dict_input_list(self): + v = a2a_response.parse([1, 2, 3]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == [1, 2, 3] + + def test_non_dict_input_string(self): + v = a2a_response.parse("plain string") + assert isinstance(v, a2a_response.Malformed) + assert v.raw == "plain string" + + def test_non_dict_input_none(self): + v = a2a_response.parse(None) + assert isinstance(v, a2a_response.Malformed) + assert v.raw is None + + def test_logs_warning_on_malformed(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert any(r.levelno == logging.WARNING for r in caplog.records) + + def test_logs_warning_on_non_dict(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse("not a dict") + assert any("non-dict" in r.message for r in caplog.records) + + +# ============== Robustness — parser never raises ============== + + +_ADVERSARIAL_INPUTS: list[Any] = [ + None, + True, + False, + 0, + -1, + 3.14, + "", + "string", + [], + [1, 2, 3], + {}, + {"random": "garbage"}, + {"result": None}, + {"result": [1, 2, 3]}, + {"result": {"parts": None}}, + {"result": {"parts": [None]}}, + {"result": {"parts": [{"text": []}]}}, + {"error": None}, + {"error": []}, + {"error": {"message": None, "code": None}}, + {"error": {"message": ["nested", "list"]}}, + {"status": None, "delivery_mode": None, "method": None}, + {"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode + {"status": "running", "delivery_mode": "poll"}, # wrong status + {"status": 42, "delivery_mode": "poll"}, # non-string status + # Deeply-nested junk + {"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}}, + # Bytes (not really JSON-decodable but parser shouldn't raise) + {"result": {"parts": [{"text": b"bytes" if False else "x"}]}}, +] + + +class TestRobustness: + """Parser must never raise on adversarial input — every branch is total. + + These cases catch regressions where a future change adds a key + access that doesn't tolerate ``None`` / wrong-type values. + """ + + @pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS) + def test_parse_never_raises(self, payload): + # Single contract: parse must return one of the four variants + # regardless of input. No exception classes propagated. + v = a2a_response.parse(payload) + assert isinstance(v, (a2a_response.Result, a2a_response.Error, + a2a_response.Queued, a2a_response.Malformed)) + + +# ============== Regression gate — pre-#2967 misclassified queued ============== + + +class TestRegressionGate: + """Pin the bug that prompted the SSOT abstraction. + + Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in + data`` and ``"error" in data`` — the poll-queued envelope (no + result key, no error key) hit the bare-else and returned the + "unexpected response shape" error string. This test simulates the + pre-fix code path and confirms the SSOT parser correctly + distinguishes Queued from Malformed. + """ + + def test_legacy_sniffer_would_return_neither_branch(self): + # The pre-#2967 logic — provided here so the regression is + # reproducible from this file alone, no archaeology needed. + envelope = _FIXTURES["poll_queued_full"] + legacy_branch = ( + "result" if "result" in envelope + else "error" if "error" in envelope + else "unexpected_shape" + ) + # Legacy sniff: hits the malformed branch. + assert legacy_branch == "unexpected_shape" + + def test_ssot_parser_classifies_correctly(self): + # New parser: classifies as Queued. + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + + def test_every_fixture_classifies_to_expected_variant(self): + # Defense in depth — pin the variant for every fixture so a + # future shape addition has to update the table here too. + expected: dict[str, type] = { + "jsonrpc_success_with_text": a2a_response.Result, + "jsonrpc_success_multipart": a2a_response.Result, + "jsonrpc_success_no_parts": a2a_response.Result, + "jsonrpc_success_part_no_text_key": a2a_response.Result, + "jsonrpc_error_with_message_and_code": a2a_response.Error, + "jsonrpc_error_message_only": a2a_response.Error, + "jsonrpc_error_code_only": a2a_response.Error, + "jsonrpc_error_string_form": a2a_response.Error, + "platform_error_with_restart": a2a_response.Error, + "platform_error_plain": a2a_response.Error, + "poll_queued_full": a2a_response.Queued, + "poll_queued_notify": a2a_response.Queued, + "poll_queued_no_method": a2a_response.Queued, + "malformed_empty_dict": a2a_response.Malformed, + "malformed_unexpected_keys": a2a_response.Malformed, + "malformed_status_queued_no_delivery_mode": a2a_response.Malformed, + "malformed_delivery_mode_no_status": a2a_response.Malformed, + } + # Every fixture must be enumerated — keeps this gate honest. + assert set(expected.keys()) == set(_FIXTURES.keys()), ( + f"fixture/expected mismatch: " + f"missing-from-expected={set(_FIXTURES) - set(expected)} " + f"extra-in-expected={set(expected) - set(_FIXTURES)}" + ) + for name, payload in _FIXTURES.items(): + v = a2a_response.parse(payload) + assert isinstance(v, expected[name]), ( + f"fixture {name!r} classified as {type(v).__name__}, " + f"expected {expected[name].__name__}" + ) diff --git a/workspace/tests/test_delegation_sync_via_polling.py b/workspace/tests/test_delegation_sync_via_polling.py index 7f6b2918..018d572a 100644 --- a/workspace/tests/test_delegation_sync_via_polling.py +++ b/workspace/tests/test_delegation_sync_via_polling.py @@ -93,6 +93,124 @@ class TestFlagOffLegacyPath: poll_mock.assert_not_called() +# --------------------------------------------------------------------------- +# #2967: Auto-fallback to polling path when target is poll-mode +# --------------------------------------------------------------------------- + +class TestPollModeAutoFallback: + """Pin the #2967 behavior: when send_a2a_message returns the queued + sentinel (target is poll-mode), tool_delegate_task transparently + falls back to _delegate_sync_via_polling — which DOES work for + poll-mode peers (the executeDelegation goroutine writes to the + inbox queue and the result row arrives when the target replies). + + Pre-#2967 behavior: queued sentinel was never returned (the parser + misclassified the envelope as malformed), and the calling agent + saw a DELEGATION FAILED / unexpected-response-shape error. This + test guards both against the parser regression (sentinel-emission) + and the fallback regression (sentinel-handling). + """ + + async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch): + # Flag OFF — legacy send_a2a_message path. send returns the + # queued sentinel because the target is poll-mode. delegate_task + # must auto-route to _delegate_sync_via_polling so the agent + # eventually gets a real reply. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_QUEUED_PREFIX + + send_calls = [] + poll_calls = [] + + async def fake_send(workspace_id, task, source_workspace_id=None): + send_calls.append((workspace_id, task, source_workspace_id)) + return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send" + + async def fake_polling(workspace_id, task, src): + poll_calls.append((workspace_id, task, src)) + return "real response from poll-mode peer" + + async def fake_discover(*_a, **_kw): + return {"name": "poll-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity): + result = await a2a_tools.tool_delegate_task( + "ws-target", "task body", source_workspace_id="ws-self" + ) + + # send was tried first + assert len(send_calls) == 1 + # …then fallback fired automatically + assert len(poll_calls) == 1 + assert poll_calls[0] == ("ws-target", "task body", "ws-self") + # Caller sees the real reply, NOT the queued sentinel and NOT + # a DELEGATION FAILED string. + assert result == "real response from poll-mode peer" + + async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch): + # Push-mode peer returns a normal text reply — fallback path + # MUST NOT fire (no extra round-trip cost). + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + + async def fake_send(*_a, **_kw): + return "normal reply" + + async def fake_discover(*_a, **_kw): + return {"name": "push-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert result == "normal reply" + poll_mock.assert_not_called() + + async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch): + # Genuine error (not queued) — must surface as DELEGATION FAILED, + # not silently retried via the polling path. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_ERROR_PREFIX + + async def fake_send(*_a, **_kw): + return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]" + + async def fake_discover(*_a, **_kw): + return {"name": "broken-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert "DELEGATION FAILED" in result + poll_mock.assert_not_called() + + # --------------------------------------------------------------------------- # Flag-on: dispatch failures # --------------------------------------------------------------------------- From 166ad20cd7cdfe2ea0f68d623caa25cbfd57972d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 17:20:02 -0700 Subject: [PATCH 2/3] =?UTF-8?q?test(e2e):=20Phase=203.5=20=E2=80=94=20whee?= =?UTF-8?q?l=20parser=20classifies=20real=20server=20response=20(#2967)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously Phase 3 only checked the workspace-server's poll-mode short-circuit emit shape ({"status":"queued","delivery_mode":"poll","method":"..."}); the matching client-side classification was tested in isolation against fixture dicts in test_a2a_response.py. This phase closes the loop by piping the actual on-the-wire response from a real workspace-server back through the wheel's a2a_response.parse() and asserting it classifies as the Queued variant with the right method + delivery_mode. A regression in EITHER the server emit shape OR the client parser will now fail this E2E, eliminating the gap that allowed the original "unexpected response shape" production bug to ship despite green unit tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/e2e/test_poll_mode_e2e.sh | 37 ++++++++++++++++++++++++++++++ workspace/tests/test_a2a_client.py | 18 ++++++++------- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/tests/e2e/test_poll_mode_e2e.sh b/tests/e2e/test_poll_mode_e2e.sh index e4dd22bc..766ec3c7 100755 --- a/tests/e2e/test_poll_mode_e2e.sh +++ b/tests/e2e/test_poll_mode_e2e.sh @@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a }') check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP" + +# ---------- Phase 3.5: Python parser classifies queued envelope correctly ---------- +# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse() +# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare +# message/send parser in a2a_client.py:587 misclassified this and returned +# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A +# on poll-mode peers. +# +# This phase exercises the actual on-the-wire response from a real +# workspace-server (NOT a mocked dict) through the same module the production +# wheel ships, so a regression in either the server emit shape OR the client +# parser fails this E2E. + +echo "" +echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---" + +# Pipe the queued response captured above through a2a_response.parse and +# assert the classification. WORKSPACE_ID is required at module import +# time but irrelevant to this parsing call (any UUID is fine). +PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \ + python3 -c " +import json, sys +sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)') +import a2a_response +data = json.loads(r'''$A2A_RESP''') +v = a2a_response.parse(data) +print(type(v).__name__) +if isinstance(v, a2a_response.Queued): + print(f'method={v.method} delivery_mode={v.delivery_mode}') +") + +check_eq "Python parser classifies real server response as Queued" \ + "Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)" +check "Queued variant captures method=message/send" \ + "method=message/send" "$PARSE_RESULT" +check "Queued variant captures delivery_mode=poll" \ + "delivery_mode=poll" "$PARSE_RESULT" check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP" check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP" diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 97a8c739..39e3ae04 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -281,11 +281,11 @@ class TestSendA2AMessage: to the 'unexpected response shape' error path → callers retried, peer got duplicate delegations. - Pin: poll-queued envelope returns a clean success string that does - NOT start with _A2A_ERROR_PREFIX, so callers route it through the - normal-outcome path. Verified discriminating: assert_NOT_startswith - the error prefix would FAIL on the old code (which returned an - error-prefixed string) and PASSES on the new code. + Pin: poll-queued envelope returns a string tagged with the + _A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers + can branch on the typed outcome without substring-sniffing. + Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so + the not-startswith assertion would FAIL on the old code. """ import a2a_client @@ -301,12 +301,13 @@ class TestSendA2AMessage: # Discriminating: pre-fix returned a string that startswith # _A2A_ERROR_PREFIX, so this assertion would have FAILED on the - # old code. New code returns a queued-success string. + # old code. New code returns the queued-success sentinel. assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), ( f"poll-queued envelope must not be tagged as A2A error; got: {result!r}" ) - assert "queued" in result.lower() - assert "poll" in result.lower() + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), ( + f"poll-queued envelope must use the queued sentinel; got: {result!r}" + ) # The method is included so a structured-log scraper can route by # protocol verb if needed. assert "message/send" in result @@ -329,6 +330,7 @@ class TestSendA2AMessage: result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) assert "message/sendStream" in result async def test_status_queued_without_poll_mode_still_falls_through(self): From e342d0c5a7191cbc52d945055609786ab049e97b Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 17:34:05 -0700 Subject: [PATCH 3/3] fix(build): register a2a_response in TOP_LEVEL_MODULES MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The drift gate caught the new SSOT parser module — without registration the wheel ships it un-rewritten and runtime imports fail. Same pattern as inbox_uploads, a2a_tools_delegation, a2a_tools_rbac registrations. Co-Authored-By: Claude Opus 4.7 (1M context) --- scripts/build_runtime_package.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/build_runtime_package.py b/scripts/build_runtime_package.py index 82b1090c..d4eedde2 100755 --- a/scripts/build_runtime_package.py +++ b/scripts/build_runtime_package.py @@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = { "a2a_client", "a2a_executor", "a2a_mcp_server", + "a2a_response", "a2a_tools", "a2a_tools_delegation", "a2a_tools_inbox",