Merge pull request #2979 from Molecule-AI/fix/a2a-poll-mode-response-shape-2967

feat(a2a): SSOT typed-variant response parser + auto-fallback for poll-mode peers (#2967)
This commit is contained in:
Hongming Wang 2026-05-06 00:41:43 +00:00 committed by GitHub
commit 9dd29882e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1061 additions and 35 deletions

View File

@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = {
"a2a_client",
"a2a_executor",
"a2a_mcp_server",
"a2a_response",
"a2a_tools",
"a2a_tools_delegation",
"a2a_tools_inbox",

View File

@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a
}')
check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP"
# ---------- Phase 3.5: Python parser classifies queued envelope correctly ----------
# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse()
# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare
# message/send parser in a2a_client.py:587 misclassified this and returned
# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A
# on poll-mode peers.
#
# This phase exercises the actual on-the-wire response from a real
# workspace-server (NOT a mocked dict) through the same module the production
# wheel ships, so a regression in either the server emit shape OR the client
# parser fails this E2E.
echo ""
echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---"
# Pipe the queued response captured above through a2a_response.parse and
# assert the classification. WORKSPACE_ID is required at module import
# time but irrelevant to this parsing call (any UUID is fine).
PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \
python3 -c "
import json, sys
sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)')
import a2a_response
data = json.loads(r'''$A2A_RESP''')
v = a2a_response.parse(data)
print(type(v).__name__)
if isinstance(v, a2a_response.Queued):
print(f'method={v.method} delivery_mode={v.delivery_mode}')
")
check_eq "Python parser classifies real server response as Queued" \
"Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)"
check "Queued variant captures method=message/send" \
"method=message/send" "$PARSE_RESULT"
check "Queued variant captures delivery_mode=poll" \
"delivery_mode=poll" "$PARSE_RESULT"
check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP"
check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP"

View File

@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
import httpx
import a2a_response
from platform_auth import auth_headers, self_source_headers
logger = logging.getLogger(__name__)
@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str:
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
# When the target workspace is registered as delivery_mode=poll (no
# public URL — typical for external molecule-mcp standalone runtimes),
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
# instead of dispatching over HTTP. The message IS delivered (written
# to the platform's inbox queue); there's just no synchronous reply
# to relay. Pre-#2967 the client treated this as "unexpected response
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
# duplicates. The Queued prefix lets callers branch on this outcome
# explicitly: "delivered async, no synchronous reply expected" is
# different from both success-with-text and failure.
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
# Dispatch via the SSOT response model (a2a_response.py).
# All shape detection lives in one place — the parser
# never raises and routes unknown shapes to Malformed
# so a future server-side change is loud, not silent.
variant = a2a_response.parse(data)
if isinstance(variant, a2a_response.Result):
# Match legacy semantics:
# parts non-empty + first part has no text → ""
# parts empty → "(no response)"
# Differentiation matters for callers that assert
# on the empty-string case (test_a2a_client).
if variant.parts:
text = variant.text
else:
text = "(no response)"
# Tag child-reported errors so the caller can
# detect them reliably — agent-side bug surfaces
# text like "Agent error: <traceback>" inside a
# JSON-RPC success envelope.
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if isinstance(variant, a2a_response.Queued):
# Poll-mode peer — message accepted into the inbox
# queue, target agent will fetch via poll. NOT a
# failure. Return the queued sentinel so callers
# (delegate_task etc.) can render the outcome
# accurately instead of treating it as an error.
logger.info(
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
target_url,
variant.method,
)
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
if isinstance(variant, a2a_response.Error):
msg = variant.message
code = variant.code
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
elif data.get("status") == "queued" and data.get("delivery_mode") == "poll":
# Workspace-server's poll-mode short-circuit envelope
# (workspace-server/internal/handlers/a2a_proxy.go ~line 402).
# The peer is poll-mode and has no URL to dispatch to, so
# the server queued the message for the peer's next inbox
# poll instead of forwarding it. Delivery is acknowledged
# but pending consumption.
#
# Pre-fix this fell through to the "unexpected response
# shape" error path → callers logged false failures, then
# delegate_task retried, and the peer received duplicate
# delegations. Issue #2967.
method = data.get("method") or "message/send"
logger.info(
"send_a2a_message: queued for poll-mode peer (method=%s, target=%s)",
method, target_url,
if variant.restarting:
# Surface platform-restart-in-progress
# explicitly — caller (UI / delegating agent)
# can render a softer "agent is restarting"
# message rather than a generic failure.
retry = (
f", retry_after={variant.retry_after}s"
if variant.retry_after is not None
else ""
)
detail = f"{detail} (restarting{retry})"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
# Malformed — log loud + surface as error so the
# operator notices a server change. SSOT refactor
# subsumes the inline "queued" check that landed in
# the #2972 hotfix; that branch is now the typed
# Queued variant above.
logger.warning(
"send_a2a_message: malformed response (target=%s body=%.200s)",
target_url,
str(variant.raw),
)
return (
f"{_A2A_ERROR_PREFIX}unexpected response shape "
f"(no result, error, or queued envelope): "
f"{str(variant.raw)[:200]} [target={target_url}]"
)
return f"queued for poll-mode peer (method={method})"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)

246
workspace/a2a_response.py Normal file
View File

@ -0,0 +1,246 @@
"""Single source of truth for A2A ``/workspaces/<id>/a2a`` response shapes.
The workspace-server proxy at
``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical
emitter) returns one of the following shapes for a single A2A call:
* **JSON-RPC success**
``{"jsonrpc": "2.0", "result": {...}, "id": "..."}``
The agent's reply, passed through unchanged.
* **JSON-RPC error**
``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}``
The agent reported a structured error.
* **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see
``a2a_proxy.go:402-406``)
``{"status": "queued", "delivery_mode": "poll", "method": "..."}``
The target is a poll-mode workspace (no public URL); the message
was written to the platform's inbox queue. The target agent will
fetch it via ``GET /activity?since_id=`` polling. NOT a failure
delivery succeeded, there's just no synchronous reply to relay.
* **Platform error** ``{"error": "...", "restarting": true?, "retry_after": int?}``
HTTP-level failure synthesized by the proxy when the agent is
unreachable, the container is restarting, or some other infrastructure
failure happened. ``restarting=true`` flags the platform-initiated
container-restart path.
* **Malformed** anything else. Surfaced explicitly so a future server
change is loud rather than silent.
The ``parse(data)`` function classifies a pre-decoded JSON body into a
typed variant. Callers ``match`` on the variant and never re-implement
shape detection that's the SSOT discipline.
# SSOT contract
This file is the Python half. The Go server emits these shapes today
via inline ``gin.H{...}`` literals. A future PR can introduce a Go
mirror (e.g. ``workspace-server/internal/models/a2a_response.go``)
with a typed marshaller until then, **any change to the wire shape
must be reflected here** and gated by ``test_a2a_response.py``'s
fixture corpus. The corpus exists specifically so a one-sided edit
breaks CI.
# Why a typed model (vs. dict-key sniffing at every site)
The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result``
or ``error`` keys inline and treated everything else as malformed
which silently broke poll-mode peers (the queued envelope has neither
key). Inline sniffing per call site multiplies the surface area where
a new shape gets misclassified. A single typed parser with an
explicit ``Malformed`` escape hatch makes shape additions a
one-line change here + a fixture entry in the test corpus, instead of
a hunt through every parsing site in the runtime.
"""
from __future__ import annotations
import dataclasses
import logging
from typing import Any, Optional, Union
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Result:
"""JSON-RPC success — agent's reply available synchronously.
``text`` is the convenience extraction from ``parts[0].text`` (the
A2A multipart shape). ``parts`` is the full list, available for
callers that need richer rendering (multiple parts, non-text parts).
``raw_result`` preserves the unparsed ``result`` field for any
caller that needs it (e.g. activity-row response_body audit).
"""
text: str
parts: list[dict[str, Any]] = dataclasses.field(default_factory=list)
raw_result: Optional[dict[str, Any]] = None
@dataclasses.dataclass(frozen=True)
class Error:
"""JSON-RPC error or platform-level error response.
``code`` is the JSON-RPC integer code when present, else None.
``restarting`` / ``retry_after`` are platform-restart-in-progress
metadata: when both are set, the caller knows the container is
being recycled and may surface a softer error to the user.
"""
message: str
code: Optional[int] = None
restarting: bool = False
retry_after: Optional[int] = None
@dataclasses.dataclass(frozen=True)
class Queued:
"""Platform poll-mode short-circuit — message accepted, peer will pick up async.
Returned when the target workspace is registered as
``delivery_mode=poll`` (no public URL typical for external
standalone ``molecule-mcp`` runtimes). The message was written to
the platform's inbox queue; the target agent will fetch it via
``GET /activity?since_id=`` polling.
NOT a failure. Callers that expect a synchronous reply (the agent's
response text) won't get one here — they should either:
* Tolerate the absence of a reply (fire-and-forget semantics).
* Fall back to the durable ``/workspaces/:id/delegate`` +
``/delegations`` polling path (see ``a2a_tools_delegation``'s
``_delegate_sync_via_polling``), which writes the same A2A
request through the platform's executeDelegation goroutine
and lets the caller poll for the result row.
``method`` echoes the request method (``message/send``, ``notify``,
etc.) so callers can correlate.
"""
method: str
delivery_mode: str = "poll"
@dataclasses.dataclass(frozen=True)
class Malformed:
"""Server returned a body the parser can't classify.
Carries the raw decoded payload for diagnostic logging. Callers
typically render this as an error to the user (see
``send_a2a_message``) but the Malformed variant is a separate
type so logging / metrics can distinguish it from genuine
JSON-RPC ``Error`` responses.
"""
raw: Any # whatever the server returned: dict / list / str / number / etc.
Variant = Union[Result, Error, Queued, Malformed]
# Field-name constants — the wire vocabulary. Single source of truth;
# the parser references these by name so a change here is a
# one-line edit instead of a hunt through string literals.
_KEY_RESULT = "result"
_KEY_ERROR = "error"
_KEY_STATUS = "status"
_KEY_DELIVERY_MODE = "delivery_mode"
_KEY_METHOD = "method"
_KEY_RESTARTING = "restarting"
_KEY_RETRY_AFTER = "retry_after"
_STATUS_QUEUED = "queued"
_DELIVERY_MODE_POLL = "poll"
def parse(data: Any) -> Variant:
"""Classify a pre-decoded ``/a2a`` JSON response into a typed variant.
Never raises. Every branch is total: any input that doesn't match a
known shape routes to ``Malformed`` so the caller can decide how
to surface it.
The order of checks matters:
1. Non-dict input Malformed (server contract is dict-shaped).
2. Poll-queued envelope is checked BEFORE result/error because a
server bug that sets both ``status=queued`` and ``result``
should be loud, not silently treated as Result.
3. ``result`` Result (the JSON-RPC success path).
4. ``error`` Error (JSON-RPC error or platform error).
5. Anything else Malformed.
"""
if not isinstance(data, dict):
logger.warning(
"a2a_response.parse: non-dict body — got %s",
type(data).__name__,
)
return Malformed(raw=data)
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
# ambiguous and we route to Malformed for visibility.
if (
data.get(_KEY_STATUS) == _STATUS_QUEUED
and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL
):
method_raw = data.get(_KEY_METHOD)
method = str(method_raw) if method_raw is not None else "unknown"
logger.info(
"a2a_response.parse: queued for poll-mode peer (method=%s)",
method,
)
return Queued(method=method)
# JSON-RPC success.
if _KEY_RESULT in data:
result = data[_KEY_RESULT]
if isinstance(result, dict):
parts_raw = result.get("parts")
parts = parts_raw if isinstance(parts_raw, list) else []
text = ""
if parts:
first = parts[0]
if isinstance(first, dict):
text_raw = first.get("text")
text = str(text_raw) if text_raw is not None else ""
return Result(text=text, parts=parts, raw_result=result)
# ``result`` present but not a dict — unusual but not an error;
# surface as a Result with the value rendered to text.
return Result(text=str(result), parts=[], raw_result=None)
# JSON-RPC error or platform error.
if _KEY_ERROR in data:
err_raw = data[_KEY_ERROR]
message = ""
code: Optional[int] = None
if isinstance(err_raw, dict):
msg_raw = err_raw.get("message")
if msg_raw is not None:
message = str(msg_raw).strip()
code_raw = err_raw.get("code")
if isinstance(code_raw, int):
code = code_raw
elif isinstance(err_raw, str):
message = err_raw.strip()
else:
message = str(err_raw)
restarting = bool(data.get(_KEY_RESTARTING, False))
retry_after_raw = data.get(_KEY_RETRY_AFTER)
retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None
return Error(
message=message,
code=code,
restarting=restarting,
retry_after=retry_after,
)
logger.warning(
"a2a_response.parse: unrecognized shape — keys=%s",
sorted(data.keys()),
)
return Malformed(raw=data)

View File

@ -29,14 +29,18 @@ from __future__ import annotations
import hashlib
import json
import logging
import os
import httpx
logger = logging.getLogger(__name__)
from a2a_client import (
PLATFORM_URL,
WORKSPACE_ID,
_A2A_ERROR_PREFIX,
_A2A_QUEUED_PREFIX,
_peer_names,
_peer_to_source,
discover_peer,
@ -245,6 +249,29 @@ async def tool_delegate_task(
# (the platform proxy) so the same code works for in-container and
# external (standalone molecule-mcp) callers.
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
# #2967: when the target is a poll-mode peer, the platform's
# a2a_proxy short-circuits and returns a queued envelope —
# send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX
# sentinel. The synchronous proxy path can't deliver a reply
# because the target has no public URL; fall back to the
# durable /delegate + /delegations polling path which DOES
# work for poll-mode peers (the executeDelegation goroutine
# writes to the inbox queue and the result row arrives when
# the target picks it up + replies).
#
# This is what makes external-runtime-to-external-runtime
# A2A actually deliver synchronous replies — without the
# fallback the calling agent sees the queued sentinel as
# success-with-no-text and never gets the peer's response.
if result.startswith(_A2A_QUEUED_PREFIX):
logger.info(
"tool_delegate_task: target=%s is poll-mode; "
"falling back from message/send to /delegate-poll path",
workspace_id,
)
result = await _delegate_sync_via_polling(
workspace_id, task, src or WORKSPACE_ID,
)
# Detect delegation failures — wrap them clearly so the calling agent
# can decide to retry, use another peer, or handle the task itself.

View File

@ -281,11 +281,11 @@ class TestSendA2AMessage:
to the 'unexpected response shape' error path callers retried,
peer got duplicate delegations.
Pin: poll-queued envelope returns a clean success string that does
NOT start with _A2A_ERROR_PREFIX, so callers route it through the
normal-outcome path. Verified discriminating: assert_NOT_startswith
the error prefix would FAIL on the old code (which returned an
error-prefixed string) and PASSES on the new code.
Pin: poll-queued envelope returns a string tagged with the
_A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers
can branch on the typed outcome without substring-sniffing.
Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so
the not-startswith assertion would FAIL on the old code.
"""
import a2a_client
@ -301,12 +301,13 @@ class TestSendA2AMessage:
# Discriminating: pre-fix returned a string that startswith
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
# old code. New code returns a queued-success string.
# old code. New code returns the queued-success sentinel.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
)
assert "queued" in result.lower()
assert "poll" in result.lower()
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), (
f"poll-queued envelope must use the queued sentinel; got: {result!r}"
)
# The method is included so a structured-log scraper can route by
# protocol verb if needed.
assert "message/send" in result
@ -329,6 +330,7 @@ class TestSendA2AMessage:
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "message/sendStream" in result
async def test_status_queued_without_poll_mode_still_falls_through(self):
@ -462,6 +464,98 @@ def _make_seq_mock_client(post_side_effect):
return mock_client
class TestSendA2AMessagePollMode:
"""Pin the #2967 fix: send_a2a_message recognizes the platform's
poll-mode short-circuit envelope and returns a queued sentinel
instead of an "unexpected response shape" error.
Pre-#2967 the client treated the queued envelope as malformed,
causing the calling agent to retry, which delivered the same
message twice to the (poll-mode) recipient. The Queued sentinel
lets delegate_task fall back to the durable polling path
transparently see test_delegation_sync_via_polling for the
fallback verification.
"""
async def test_poll_queued_envelope_returns_queued_sentinel(self):
# Workspace-server returns this shape (a2a_proxy.go:402-406)
# when the target workspace is registered as delivery_mode=poll
# (no public URL, typical for external molecule-mcp standalone
# runtimes).
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Sentinel + structured payload so callers can branch on it.
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
# Critically: NOT the error sentinel. Pre-#2967 it was the error path.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
# Carries enough info for the caller to log meaningfully.
assert _TEST_PEER_ID in result
assert "message/send" in result
async def test_poll_queued_envelope_method_is_recorded(self):
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "notify" in result
async def test_status_queued_without_delivery_mode_is_unexpected_shape(self):
# Server bug: only ``status=queued`` set, ``delivery_mode``
# missing. Surface as the malformed branch (not Queued) — the
# SSOT parser treats this as Malformed because the documented
# contract requires both keys.
import a2a_client
resp = _make_response(200, {"status": "queued", "method": "message/send"})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
# Must explicitly mention "or queued envelope" so an operator
# debugging this knows the parser HAS a Queued branch and the
# body just didn't match — not that the parser is missing the
# logic entirely (the pre-#2967 confusion).
assert "queued envelope" in result
async def test_platform_error_with_restart_metadata_surfaces_in_message(self):
# The platform error envelope: 503 with restart metadata.
# Surfaced as an error string that includes "restarting" so
# the caller / agent can render a softer error to the user.
import a2a_client
resp = _make_response(200, {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "restarting" in result
assert "retry_after=15" in result
class TestSendA2AMessageRetry:
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.

View File

@ -0,0 +1,455 @@
"""Tests for the A2A response SSOT parser (workspace/a2a_response.py).
Branch coverage target: 100%. Each variant of ``parse()`` exercised in
isolation, plus adversarial-input fuzzing to assert the parser never
raises.
Pre-#2967, the response shape was sniffed inline at every call site
(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` /
``"error" in data`` checks). The bare ``else`` returned an
"unexpected response shape" error which silently broke poll-mode
peers because the workspace-server's poll-queued envelope has neither
``result`` nor ``error``. The SSOT parser has an explicit ``Queued``
variant for that path and routes anything truly unrecognized to
``Malformed`` so a future server-side change fails loudly.
The "this test FAILS on pre-fix source" guarantee is enforced by
running the legacy-shape sniffer alongside the new parser in
``test_legacy_sniffer_misclassified_queued`` that test fails on
the pre-#2967 ``a2a_client.py`` shape because the legacy code
returns the unexpected-shape error path for the Queued envelope.
"""
from __future__ import annotations
import logging
from typing import Any
import pytest
import a2a_response
# ============== Fixture corpus — the canonical wire shapes ==============
# Every shape below mirrors a path the workspace-server's a2a_proxy.go
# can return. When you add a new server-side response shape, add a
# fixture entry here and a corresponding test method below.
_FIXTURES = {
"jsonrpc_success_with_text": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [{"kind": "text", "text": "hello world"}],
},
},
"jsonrpc_success_multipart": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [
{"kind": "text", "text": "first"},
{"kind": "text", "text": "second"},
],
},
},
"jsonrpc_success_no_parts": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {},
},
"jsonrpc_success_part_no_text_key": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {"parts": [{"kind": "text"}]},
},
"jsonrpc_error_with_message_and_code": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited", "code": -32003},
},
"jsonrpc_error_message_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited"},
},
"jsonrpc_error_code_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"code": -32603},
},
"jsonrpc_error_string_form": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": "string-shaped error",
},
"platform_error_with_restart": {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
},
"platform_error_plain": {
"error": "workspace not found",
},
"poll_queued_full": {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
},
"poll_queued_notify": {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
},
"poll_queued_no_method": {
"status": "queued",
"delivery_mode": "poll",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
# Server bug — status set but delivery_mode missing.
# Should be Malformed, not Queued, because the contract says both.
"status": "queued",
},
"malformed_delivery_mode_no_status": {
"delivery_mode": "poll",
},
}
# ============== Variant-by-variant coverage ==============
class TestQueuedVariant:
"""``parse()`` recognizes the workspace-server poll-mode short-circuit
envelope (a2a_proxy.go:402-406) and returns ``Queued``."""
def test_full_envelope_with_method_message_send(self):
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "poll"
def test_envelope_with_method_notify(self):
v = a2a_response.parse(_FIXTURES["poll_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
def test_envelope_missing_method_uses_unknown_sentinel(self):
# Envelope without ``method`` key — server contract should
# always set it, but the parser must not raise on absence.
v = a2a_response.parse(_FIXTURES["poll_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "unknown"
def test_status_queued_alone_is_malformed_not_queued(self):
# ``status=queued`` without ``delivery_mode=poll`` does not match
# the documented envelope. Surface as Malformed for visibility.
v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"])
assert isinstance(v, a2a_response.Malformed)
def test_delivery_mode_alone_is_malformed_not_queued(self):
v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"])
assert isinstance(v, a2a_response.Malformed)
def test_logs_info_on_queued(self, caplog):
# Comprehensive logging — operator should see queued events at INFO.
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
``Result(text, parts, raw_result)``."""
def test_simple_text_result(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"])
assert isinstance(v, a2a_response.Result)
assert v.text == "hello world"
assert len(v.parts) == 1
assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]}
def test_multipart_result_extracts_first_part_text(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"])
assert isinstance(v, a2a_response.Result)
assert v.text == "first"
assert len(v.parts) == 2
def test_result_with_no_parts(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"])
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == []
def test_part_without_text_key(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"])
assert isinstance(v, a2a_response.Result)
# No "text" key — extracted text is empty, parts list intact.
assert v.text == ""
assert len(v.parts) == 1
def test_result_non_dict_returns_text_form(self):
# Pathological but legal: ``result`` is a string instead of a dict.
v = a2a_response.parse({"result": "hello"})
assert isinstance(v, a2a_response.Result)
assert v.text == "hello"
assert v.parts == []
def test_result_takes_precedence_when_no_queued_envelope(self):
# Both ``result`` and ``error`` keys present — result wins
# because it's checked first after the Queued path.
v = a2a_response.parse({
"result": {"parts": [{"kind": "text", "text": "ok"}]},
"error": {"message": "should-be-ignored"},
})
assert isinstance(v, a2a_response.Result)
assert v.text == "ok"
def test_part_with_non_dict_first_entry(self):
# ``parts[0]`` is a string instead of a dict — parser tolerates it,
# text falls back to empty.
v = a2a_response.parse({"result": {"parts": ["bare-string"]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == ["bare-string"]
def test_part_text_value_none(self):
# ``parts[0].text`` is explicitly None — extracted as "".
v = a2a_response.parse({"result": {"parts": [{"text": None}]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
def test_parts_not_a_list(self):
# Server bug: ``parts`` is a dict instead of a list. Parser falls
# back to empty parts rather than raising.
v = a2a_response.parse({"result": {"parts": {"oops": True}}})
assert isinstance(v, a2a_response.Result)
assert v.parts == []
assert v.text == ""
class TestErrorVariant:
"""``parse()`` extracts ``error`` envelopes into ``Error`` and
annotates platform-restart metadata when present."""
def test_message_and_code(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code == -32003
assert v.restarting is False
assert v.retry_after is None
def test_message_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code is None
def test_code_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == ""
assert v.code == -32603
def test_error_string_form(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"])
assert isinstance(v, a2a_response.Error)
assert v.message == "string-shaped error"
assert v.code is None
def test_error_non_dict_non_string(self):
v = a2a_response.parse({"error": 12345})
assert isinstance(v, a2a_response.Error)
assert v.message == "12345"
def test_platform_error_with_restart_metadata(self):
v = a2a_response.parse(_FIXTURES["platform_error_with_restart"])
assert isinstance(v, a2a_response.Error)
assert "workspace agent unreachable" in v.message
assert v.restarting is True
assert v.retry_after == 15
def test_platform_error_without_restart(self):
v = a2a_response.parse(_FIXTURES["platform_error_plain"])
assert isinstance(v, a2a_response.Error)
assert v.message == "workspace not found"
assert v.restarting is False
assert v.retry_after is None
def test_error_message_with_whitespace_stripped(self):
v = a2a_response.parse({"error": {"message": " trimmed "}})
assert isinstance(v, a2a_response.Error)
assert v.message == "trimmed"
def test_non_int_code_dropped(self):
v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}})
assert isinstance(v, a2a_response.Error)
assert v.code is None
def test_non_int_retry_after_dropped(self):
v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"})
assert isinstance(v, a2a_response.Error)
assert v.retry_after is None
class TestMalformedVariant:
"""``parse()`` returns ``Malformed`` for any shape it can't classify
and logs at WARNING so operators see new server response shapes."""
def test_empty_dict(self):
v = a2a_response.parse(_FIXTURES["malformed_empty_dict"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {}
def test_unexpected_keys(self):
v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {"foo": "bar", "baz": 42}
def test_non_dict_input_list(self):
v = a2a_response.parse([1, 2, 3])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == [1, 2, 3]
def test_non_dict_input_string(self):
v = a2a_response.parse("plain string")
assert isinstance(v, a2a_response.Malformed)
assert v.raw == "plain string"
def test_non_dict_input_none(self):
v = a2a_response.parse(None)
assert isinstance(v, a2a_response.Malformed)
assert v.raw is None
def test_logs_warning_on_malformed(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert any(r.levelno == logging.WARNING for r in caplog.records)
def test_logs_warning_on_non_dict(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse("not a dict")
assert any("non-dict" in r.message for r in caplog.records)
# ============== Robustness — parser never raises ==============
_ADVERSARIAL_INPUTS: list[Any] = [
None,
True,
False,
0,
-1,
3.14,
"",
"string",
[],
[1, 2, 3],
{},
{"random": "garbage"},
{"result": None},
{"result": [1, 2, 3]},
{"result": {"parts": None}},
{"result": {"parts": [None]}},
{"result": {"parts": [{"text": []}]}},
{"error": None},
{"error": []},
{"error": {"message": None, "code": None}},
{"error": {"message": ["nested", "list"]}},
{"status": None, "delivery_mode": None, "method": None},
{"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode
{"status": "running", "delivery_mode": "poll"}, # wrong status
{"status": 42, "delivery_mode": "poll"}, # non-string status
# Deeply-nested junk
{"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}},
# Bytes (not really JSON-decodable but parser shouldn't raise)
{"result": {"parts": [{"text": b"bytes" if False else "x"}]}},
]
class TestRobustness:
"""Parser must never raise on adversarial input — every branch is total.
These cases catch regressions where a future change adds a key
access that doesn't tolerate ``None`` / wrong-type values.
"""
@pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS)
def test_parse_never_raises(self, payload):
# Single contract: parse must return one of the four variants
# regardless of input. No exception classes propagated.
v = a2a_response.parse(payload)
assert isinstance(v, (a2a_response.Result, a2a_response.Error,
a2a_response.Queued, a2a_response.Malformed))
# ============== Regression gate — pre-#2967 misclassified queued ==============
class TestRegressionGate:
"""Pin the bug that prompted the SSOT abstraction.
Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in
data`` and ``"error" in data`` the poll-queued envelope (no
result key, no error key) hit the bare-else and returned the
"unexpected response shape" error string. This test simulates the
pre-fix code path and confirms the SSOT parser correctly
distinguishes Queued from Malformed.
"""
def test_legacy_sniffer_would_return_neither_branch(self):
# The pre-#2967 logic — provided here so the regression is
# reproducible from this file alone, no archaeology needed.
envelope = _FIXTURES["poll_queued_full"]
legacy_branch = (
"result" if "result" in envelope
else "error" if "error" in envelope
else "unexpected_shape"
)
# Legacy sniff: hits the malformed branch.
assert legacy_branch == "unexpected_shape"
def test_ssot_parser_classifies_correctly(self):
# New parser: classifies as Queued.
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
def test_every_fixture_classifies_to_expected_variant(self):
# Defense in depth — pin the variant for every fixture so a
# future shape addition has to update the table here too.
expected: dict[str, type] = {
"jsonrpc_success_with_text": a2a_response.Result,
"jsonrpc_success_multipart": a2a_response.Result,
"jsonrpc_success_no_parts": a2a_response.Result,
"jsonrpc_success_part_no_text_key": a2a_response.Result,
"jsonrpc_error_with_message_and_code": a2a_response.Error,
"jsonrpc_error_message_only": a2a_response.Error,
"jsonrpc_error_code_only": a2a_response.Error,
"jsonrpc_error_string_form": a2a_response.Error,
"platform_error_with_restart": a2a_response.Error,
"platform_error_plain": a2a_response.Error,
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
"malformed_delivery_mode_no_status": a2a_response.Malformed,
}
# Every fixture must be enumerated — keeps this gate honest.
assert set(expected.keys()) == set(_FIXTURES.keys()), (
f"fixture/expected mismatch: "
f"missing-from-expected={set(_FIXTURES) - set(expected)} "
f"extra-in-expected={set(expected) - set(_FIXTURES)}"
)
for name, payload in _FIXTURES.items():
v = a2a_response.parse(payload)
assert isinstance(v, expected[name]), (
f"fixture {name!r} classified as {type(v).__name__}, "
f"expected {expected[name].__name__}"
)

View File

@ -93,6 +93,124 @@ class TestFlagOffLegacyPath:
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# #2967: Auto-fallback to polling path when target is poll-mode
# ---------------------------------------------------------------------------
class TestPollModeAutoFallback:
"""Pin the #2967 behavior: when send_a2a_message returns the queued
sentinel (target is poll-mode), tool_delegate_task transparently
falls back to _delegate_sync_via_polling which DOES work for
poll-mode peers (the executeDelegation goroutine writes to the
inbox queue and the result row arrives when the target replies).
Pre-#2967 behavior: queued sentinel was never returned (the parser
misclassified the envelope as malformed), and the calling agent
saw a DELEGATION FAILED / unexpected-response-shape error. This
test guards both against the parser regression (sentinel-emission)
and the fallback regression (sentinel-handling).
"""
async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch):
# Flag OFF — legacy send_a2a_message path. send returns the
# queued sentinel because the target is poll-mode. delegate_task
# must auto-route to _delegate_sync_via_polling so the agent
# eventually gets a real reply.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_QUEUED_PREFIX
send_calls = []
poll_calls = []
async def fake_send(workspace_id, task, source_workspace_id=None):
send_calls.append((workspace_id, task, source_workspace_id))
return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send"
async def fake_polling(workspace_id, task, src):
poll_calls.append((workspace_id, task, src))
return "real response from poll-mode peer"
async def fake_discover(*_a, **_kw):
return {"name": "poll-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity):
result = await a2a_tools.tool_delegate_task(
"ws-target", "task body", source_workspace_id="ws-self"
)
# send was tried first
assert len(send_calls) == 1
# …then fallback fired automatically
assert len(poll_calls) == 1
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
# Caller sees the real reply, NOT the queued sentinel and NOT
# a DELEGATION FAILED string.
assert result == "real response from poll-mode peer"
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
# Push-mode peer returns a normal text reply — fallback path
# MUST NOT fire (no extra round-trip cost).
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
async def fake_send(*_a, **_kw):
return "normal reply"
async def fake_discover(*_a, **_kw):
return {"name": "push-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert result == "normal reply"
poll_mock.assert_not_called()
async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch):
# Genuine error (not queued) — must surface as DELEGATION FAILED,
# not silently retried via the polling path.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_ERROR_PREFIX
async def fake_send(*_a, **_kw):
return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]"
async def fake_discover(*_a, **_kw):
return {"name": "broken-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert "DELEGATION FAILED" in result
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# Flag-on: dispatch failures
# ---------------------------------------------------------------------------