diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 16eb4b9c..8e499f40 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor import httpx +import a2a_response from platform_auth import auth_headers, self_source_headers logger = logging.getLogger(__name__) @@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str: # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " +# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967). +# When the target workspace is registered as delivery_mode=poll (no +# public URL — typical for external molecule-mcp standalone runtimes), +# the platform's a2a_proxy.go:402 short-circuit returns a synthetic +# {"status":"queued","delivery_mode":"poll","method":"..."} envelope +# instead of dispatching over HTTP. The message IS delivered (written +# to the platform's inbox queue); there's just no synchronous reply +# to relay. Pre-#2967 the client treated this as "unexpected response +# shape" → caller saw DELEGATION FAILED → retried → recipient saw +# duplicates. The Queued prefix lets callers branch on this outcome +# explicitly: "delivered async, no synchronous reply expected" is +# different from both success-with-text and failure. +_A2A_QUEUED_PREFIX = "[A2A_QUEUED] " + # Workspace IDs are UUIDs everywhere we generate them (platform's # workspaces.id column, /registry/discover/:id route param, etc.) but # the agent-facing tool surface receives them as free-form strings via @@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str }, ) data = resp.json() - if "result" in data: - parts = data["result"].get("parts", []) - text = parts[0].get("text", "") if parts else "(no response)" - # Tag child-reported errors so the caller can detect them reliably + # Dispatch via the SSOT response model (a2a_response.py). + # All shape detection lives in one place — the parser + # never raises and routes unknown shapes to Malformed + # so a future server-side change is loud, not silent. + variant = a2a_response.parse(data) + if isinstance(variant, a2a_response.Result): + # Match legacy semantics: + # parts non-empty + first part has no text → "" + # parts empty → "(no response)" + # Differentiation matters for callers that assert + # on the empty-string case (test_a2a_client). + if variant.parts: + text = variant.text + else: + text = "(no response)" + # Tag child-reported errors so the caller can + # detect them reliably — agent-side bug surfaces + # text like "Agent error: " inside a + # JSON-RPC success envelope. if text.startswith("Agent error:"): return f"{_A2A_ERROR_PREFIX}{text}" return text - elif "error" in data: - err = data["error"] - msg = (err.get("message") or "").strip() - code = err.get("code") + if isinstance(variant, a2a_response.Queued): + # Poll-mode peer — message accepted into the inbox + # queue, target agent will fetch via poll. NOT a + # failure. Return the queued sentinel so callers + # (delegate_task etc.) can render the outcome + # accurately instead of treating it as an error. + logger.info( + "send_a2a_message: queued for poll-mode peer (target=%s method=%s)", + target_url, + variant.method, + ) + return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}" + if isinstance(variant, a2a_response.Error): + msg = variant.message + code = variant.code if msg and code is not None: detail = f"{msg} (code={code})" elif msg: @@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str detail = f"JSON-RPC error with no message (code={code})" else: detail = "JSON-RPC error with no message" + if variant.restarting: + # Surface platform-restart-in-progress + # explicitly — caller (UI / delegating agent) + # can render a softer "agent is restarting" + # message rather than a generic failure. + retry = ( + f", retry_after={variant.retry_after}s" + if variant.retry_after is not None + else "" + ) + detail = f"{detail} (restarting{retry})" return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" - elif data.get("status") == "queued" and data.get("delivery_mode") == "poll": - # Workspace-server's poll-mode short-circuit envelope - # (workspace-server/internal/handlers/a2a_proxy.go ~line 402). - # The peer is poll-mode and has no URL to dispatch to, so - # the server queued the message for the peer's next inbox - # poll instead of forwarding it. Delivery is acknowledged - # but pending consumption. - # - # Pre-fix this fell through to the "unexpected response - # shape" error path → callers logged false failures, then - # delegate_task retried, and the peer received duplicate - # delegations. Issue #2967. - method = data.get("method") or "message/send" - logger.info( - "send_a2a_message: queued for poll-mode peer (method=%s, target=%s)", - method, target_url, - ) - return f"queued for poll-mode peer (method={method})" - return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" + # Malformed — log loud + surface as error so the + # operator notices a server change. SSOT refactor + # subsumes the inline "queued" check that landed in + # the #2972 hotfix; that branch is now the typed + # Queued variant above. + logger.warning( + "send_a2a_message: malformed response (target=%s body=%.200s)", + target_url, + str(variant.raw), + ) + return ( + f"{_A2A_ERROR_PREFIX}unexpected response shape " + f"(no result, error, or queued envelope): " + f"{str(variant.raw)[:200]} [target={target_url}]" + ) except _TRANSIENT_HTTP_ERRORS as e: last_exc = e attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1) diff --git a/workspace/a2a_response.py b/workspace/a2a_response.py new file mode 100644 index 00000000..ae48465a --- /dev/null +++ b/workspace/a2a_response.py @@ -0,0 +1,246 @@ +"""Single source of truth for A2A ``/workspaces//a2a`` response shapes. + +The workspace-server proxy at +``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical +emitter) returns one of the following shapes for a single A2A call: + + * **JSON-RPC success** — + ``{"jsonrpc": "2.0", "result": {...}, "id": "..."}`` + The agent's reply, passed through unchanged. + + * **JSON-RPC error** — + ``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}`` + The agent reported a structured error. + + * **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see + ``a2a_proxy.go:402-406``) — + ``{"status": "queued", "delivery_mode": "poll", "method": "..."}`` + The target is a poll-mode workspace (no public URL); the message + was written to the platform's inbox queue. The target agent will + fetch it via ``GET /activity?since_id=`` polling. NOT a failure — + delivery succeeded, there's just no synchronous reply to relay. + + * **Platform error** — ``{"error": "...", "restarting": true?, "retry_after": int?}`` + HTTP-level failure synthesized by the proxy when the agent is + unreachable, the container is restarting, or some other infrastructure + failure happened. ``restarting=true`` flags the platform-initiated + container-restart path. + + * **Malformed** — anything else. Surfaced explicitly so a future server + change is loud rather than silent. + +The ``parse(data)`` function classifies a pre-decoded JSON body into a +typed variant. Callers ``match`` on the variant and never re-implement +shape detection — that's the SSOT discipline. + +# SSOT contract + +This file is the Python half. The Go server emits these shapes today +via inline ``gin.H{...}`` literals. A future PR can introduce a Go +mirror (e.g. ``workspace-server/internal/models/a2a_response.go``) +with a typed marshaller — until then, **any change to the wire shape +must be reflected here** and gated by ``test_a2a_response.py``'s +fixture corpus. The corpus exists specifically so a one-sided edit +breaks CI. + +# Why a typed model (vs. dict-key sniffing at every site) + +The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result`` +or ``error`` keys inline and treated everything else as malformed — +which silently broke poll-mode peers (the queued envelope has neither +key). Inline sniffing per call site multiplies the surface area where +a new shape gets misclassified. A single typed parser with an +explicit ``Malformed`` escape hatch makes shape additions a +one-line change here + a fixture entry in the test corpus, instead of +a hunt through every parsing site in the runtime. +""" +from __future__ import annotations + +import dataclasses +import logging +from typing import Any, Optional, Union + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class Result: + """JSON-RPC success — agent's reply available synchronously. + + ``text`` is the convenience extraction from ``parts[0].text`` (the + A2A multipart shape). ``parts`` is the full list, available for + callers that need richer rendering (multiple parts, non-text parts). + ``raw_result`` preserves the unparsed ``result`` field for any + caller that needs it (e.g. activity-row response_body audit). + """ + + text: str + parts: list[dict[str, Any]] = dataclasses.field(default_factory=list) + raw_result: Optional[dict[str, Any]] = None + + +@dataclasses.dataclass(frozen=True) +class Error: + """JSON-RPC error or platform-level error response. + + ``code`` is the JSON-RPC integer code when present, else None. + ``restarting`` / ``retry_after`` are platform-restart-in-progress + metadata: when both are set, the caller knows the container is + being recycled and may surface a softer error to the user. + """ + + message: str + code: Optional[int] = None + restarting: bool = False + retry_after: Optional[int] = None + + +@dataclasses.dataclass(frozen=True) +class Queued: + """Platform poll-mode short-circuit — message accepted, peer will pick up async. + + Returned when the target workspace is registered as + ``delivery_mode=poll`` (no public URL — typical for external + standalone ``molecule-mcp`` runtimes). The message was written to + the platform's inbox queue; the target agent will fetch it via + ``GET /activity?since_id=`` polling. + + NOT a failure. Callers that expect a synchronous reply (the agent's + response text) won't get one here — they should either: + + * Tolerate the absence of a reply (fire-and-forget semantics). + * Fall back to the durable ``/workspaces/:id/delegate`` + + ``/delegations`` polling path (see ``a2a_tools_delegation``'s + ``_delegate_sync_via_polling``), which writes the same A2A + request through the platform's executeDelegation goroutine + and lets the caller poll for the result row. + + ``method`` echoes the request method (``message/send``, ``notify``, + etc.) so callers can correlate. + """ + + method: str + delivery_mode: str = "poll" + + +@dataclasses.dataclass(frozen=True) +class Malformed: + """Server returned a body the parser can't classify. + + Carries the raw decoded payload for diagnostic logging. Callers + typically render this as an error to the user (see + ``send_a2a_message``) — but the Malformed variant is a separate + type so logging / metrics can distinguish it from genuine + JSON-RPC ``Error`` responses. + """ + + raw: Any # whatever the server returned: dict / list / str / number / etc. + + +Variant = Union[Result, Error, Queued, Malformed] + + +# Field-name constants — the wire vocabulary. Single source of truth; +# the parser references these by name so a change here is a +# one-line edit instead of a hunt through string literals. +_KEY_RESULT = "result" +_KEY_ERROR = "error" +_KEY_STATUS = "status" +_KEY_DELIVERY_MODE = "delivery_mode" +_KEY_METHOD = "method" +_KEY_RESTARTING = "restarting" +_KEY_RETRY_AFTER = "retry_after" + +_STATUS_QUEUED = "queued" +_DELIVERY_MODE_POLL = "poll" + + +def parse(data: Any) -> Variant: + """Classify a pre-decoded ``/a2a`` JSON response into a typed variant. + + Never raises. Every branch is total: any input that doesn't match a + known shape routes to ``Malformed`` so the caller can decide how + to surface it. + + The order of checks matters: + + 1. Non-dict input → Malformed (server contract is dict-shaped). + 2. Poll-queued envelope is checked BEFORE result/error because a + server bug that sets both ``status=queued`` and ``result`` + should be loud, not silently treated as Result. + 3. ``result`` → Result (the JSON-RPC success path). + 4. ``error`` → Error (JSON-RPC error or platform error). + 5. Anything else → Malformed. + """ + if not isinstance(data, dict): + logger.warning( + "a2a_response.parse: non-dict body — got %s", + type(data).__name__, + ) + return Malformed(raw=data) + + # Poll-queued envelope. Both keys must be present — the workspace + # server sets them together; if only one is present the body is + # ambiguous and we route to Malformed for visibility. + if ( + data.get(_KEY_STATUS) == _STATUS_QUEUED + and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL + ): + method_raw = data.get(_KEY_METHOD) + method = str(method_raw) if method_raw is not None else "unknown" + logger.info( + "a2a_response.parse: queued for poll-mode peer (method=%s)", + method, + ) + return Queued(method=method) + + # JSON-RPC success. + if _KEY_RESULT in data: + result = data[_KEY_RESULT] + if isinstance(result, dict): + parts_raw = result.get("parts") + parts = parts_raw if isinstance(parts_raw, list) else [] + text = "" + if parts: + first = parts[0] + if isinstance(first, dict): + text_raw = first.get("text") + text = str(text_raw) if text_raw is not None else "" + return Result(text=text, parts=parts, raw_result=result) + # ``result`` present but not a dict — unusual but not an error; + # surface as a Result with the value rendered to text. + return Result(text=str(result), parts=[], raw_result=None) + + # JSON-RPC error or platform error. + if _KEY_ERROR in data: + err_raw = data[_KEY_ERROR] + message = "" + code: Optional[int] = None + if isinstance(err_raw, dict): + msg_raw = err_raw.get("message") + if msg_raw is not None: + message = str(msg_raw).strip() + code_raw = err_raw.get("code") + if isinstance(code_raw, int): + code = code_raw + elif isinstance(err_raw, str): + message = err_raw.strip() + else: + message = str(err_raw) + + restarting = bool(data.get(_KEY_RESTARTING, False)) + retry_after_raw = data.get(_KEY_RETRY_AFTER) + retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None + + return Error( + message=message, + code=code, + restarting=restarting, + retry_after=retry_after, + ) + + logger.warning( + "a2a_response.parse: unrecognized shape — keys=%s", + sorted(data.keys()), + ) + return Malformed(raw=data) diff --git a/workspace/a2a_tools_delegation.py b/workspace/a2a_tools_delegation.py index 170a5333..79f42fd1 100644 --- a/workspace/a2a_tools_delegation.py +++ b/workspace/a2a_tools_delegation.py @@ -29,14 +29,18 @@ from __future__ import annotations import hashlib import json +import logging import os import httpx +logger = logging.getLogger(__name__) + from a2a_client import ( PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _A2A_QUEUED_PREFIX, _peer_names, _peer_to_source, discover_peer, @@ -245,6 +249,29 @@ async def tool_delegate_task( # (the platform proxy) so the same code works for in-container and # external (standalone molecule-mcp) callers. result = await send_a2a_message(workspace_id, task, source_workspace_id=src) + # #2967: when the target is a poll-mode peer, the platform's + # a2a_proxy short-circuits and returns a queued envelope — + # send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX + # sentinel. The synchronous proxy path can't deliver a reply + # because the target has no public URL; fall back to the + # durable /delegate + /delegations polling path which DOES + # work for poll-mode peers (the executeDelegation goroutine + # writes to the inbox queue and the result row arrives when + # the target picks it up + replies). + # + # This is what makes external-runtime-to-external-runtime + # A2A actually deliver synchronous replies — without the + # fallback the calling agent sees the queued sentinel as + # success-with-no-text and never gets the peer's response. + if result.startswith(_A2A_QUEUED_PREFIX): + logger.info( + "tool_delegate_task: target=%s is poll-mode; " + "falling back from message/send to /delegate-poll path", + workspace_id, + ) + result = await _delegate_sync_via_polling( + workspace_id, task, src or WORKSPACE_ID, + ) # Detect delegation failures — wrap them clearly so the calling agent # can decide to retry, use another peer, or handle the task itself. diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 068fbffd..97a8c739 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -462,6 +462,98 @@ def _make_seq_mock_client(post_side_effect): return mock_client +class TestSendA2AMessagePollMode: + """Pin the #2967 fix: send_a2a_message recognizes the platform's + poll-mode short-circuit envelope and returns a queued sentinel + instead of an "unexpected response shape" error. + + Pre-#2967 the client treated the queued envelope as malformed, + causing the calling agent to retry, which delivered the same + message twice to the (poll-mode) recipient. The Queued sentinel + lets delegate_task fall back to the durable polling path + transparently — see test_delegation_sync_via_polling for the + fallback verification. + """ + + async def test_poll_queued_envelope_returns_queued_sentinel(self): + # Workspace-server returns this shape (a2a_proxy.go:402-406) + # when the target workspace is registered as delivery_mode=poll + # (no public URL, typical for external molecule-mcp standalone + # runtimes). + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Sentinel + structured payload so callers can branch on it. + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + # Critically: NOT the error sentinel. Pre-#2967 it was the error path. + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + # Carries enough info for the caller to log meaningfully. + assert _TEST_PEER_ID in result + assert "message/send" in result + + async def test_poll_queued_envelope_method_is_recorded(self): + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_QUEUED_PREFIX) + assert "notify" in result + + async def test_status_queued_without_delivery_mode_is_unexpected_shape(self): + # Server bug: only ``status=queued`` set, ``delivery_mode`` + # missing. Surface as the malformed branch (not Queued) — the + # SSOT parser treats this as Malformed because the documented + # contract requires both keys. + import a2a_client + + resp = _make_response(200, {"status": "queued", "method": "message/send"}) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "unexpected response shape" in result + # Must explicitly mention "or queued envelope" so an operator + # debugging this knows the parser HAS a Queued branch and the + # body just didn't match — not that the parser is missing the + # logic entirely (the pre-#2967 confusion). + assert "queued envelope" in result + + async def test_platform_error_with_restart_metadata_surfaces_in_message(self): + # The platform error envelope: 503 with restart metadata. + # Surfaced as an error string that includes "restarting" so + # the caller / agent can render a softer error to the user. + import a2a_client + + resp = _make_response(200, { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }) + mock_client = _make_mock_client(post_resp=resp) + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "restarting" in result + assert "retry_after=15" in result + + class TestSendA2AMessageRetry: """Verify auto-retry on transient transport errors (RemoteProtocolError, ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times. diff --git a/workspace/tests/test_a2a_response.py b/workspace/tests/test_a2a_response.py new file mode 100644 index 00000000..cf254b36 --- /dev/null +++ b/workspace/tests/test_a2a_response.py @@ -0,0 +1,455 @@ +"""Tests for the A2A response SSOT parser (workspace/a2a_response.py). + +Branch coverage target: 100%. Each variant of ``parse()`` exercised in +isolation, plus adversarial-input fuzzing to assert the parser never +raises. + +Pre-#2967, the response shape was sniffed inline at every call site +(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` / +``"error" in data`` checks). The bare ``else`` returned an +"unexpected response shape" error — which silently broke poll-mode +peers because the workspace-server's poll-queued envelope has neither +``result`` nor ``error``. The SSOT parser has an explicit ``Queued`` +variant for that path and routes anything truly unrecognized to +``Malformed`` so a future server-side change fails loudly. + +The "this test FAILS on pre-fix source" guarantee is enforced by +running the legacy-shape sniffer alongside the new parser in +``test_legacy_sniffer_misclassified_queued`` — that test fails on +the pre-#2967 ``a2a_client.py`` shape because the legacy code +returns the unexpected-shape error path for the Queued envelope. +""" +from __future__ import annotations + +import logging +from typing import Any + +import pytest + +import a2a_response + + +# ============== Fixture corpus — the canonical wire shapes ============== + + +# Every shape below mirrors a path the workspace-server's a2a_proxy.go +# can return. When you add a new server-side response shape, add a +# fixture entry here and a corresponding test method below. +_FIXTURES = { + "jsonrpc_success_with_text": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [{"kind": "text", "text": "hello world"}], + }, + }, + "jsonrpc_success_multipart": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": { + "parts": [ + {"kind": "text", "text": "first"}, + {"kind": "text", "text": "second"}, + ], + }, + }, + "jsonrpc_success_no_parts": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {}, + }, + "jsonrpc_success_part_no_text_key": { + "jsonrpc": "2.0", + "id": "abc-123", + "result": {"parts": [{"kind": "text"}]}, + }, + "jsonrpc_error_with_message_and_code": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited", "code": -32003}, + }, + "jsonrpc_error_message_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"message": "rate limited"}, + }, + "jsonrpc_error_code_only": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": {"code": -32603}, + }, + "jsonrpc_error_string_form": { + "jsonrpc": "2.0", + "id": "abc-123", + "error": "string-shaped error", + }, + "platform_error_with_restart": { + "error": "workspace agent unreachable — container restart triggered", + "restarting": True, + "retry_after": 15, + }, + "platform_error_plain": { + "error": "workspace not found", + }, + "poll_queued_full": { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }, + "poll_queued_notify": { + "status": "queued", + "delivery_mode": "poll", + "method": "notify", + }, + "poll_queued_no_method": { + "status": "queued", + "delivery_mode": "poll", + }, + "malformed_empty_dict": {}, + "malformed_unexpected_keys": {"foo": "bar", "baz": 42}, + "malformed_status_queued_no_delivery_mode": { + # Server bug — status set but delivery_mode missing. + # Should be Malformed, not Queued, because the contract says both. + "status": "queued", + }, + "malformed_delivery_mode_no_status": { + "delivery_mode": "poll", + }, +} + + +# ============== Variant-by-variant coverage ============== + + +class TestQueuedVariant: + """``parse()`` recognizes the workspace-server poll-mode short-circuit + envelope (a2a_proxy.go:402-406) and returns ``Queued``.""" + + def test_full_envelope_with_method_message_send(self): + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "poll" + + def test_envelope_with_method_notify(self): + v = a2a_response.parse(_FIXTURES["poll_queued_notify"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "notify" + + def test_envelope_missing_method_uses_unknown_sentinel(self): + # Envelope without ``method`` key — server contract should + # always set it, but the parser must not raise on absence. + v = a2a_response.parse(_FIXTURES["poll_queued_no_method"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "unknown" + + def test_status_queued_alone_is_malformed_not_queued(self): + # ``status=queued`` without ``delivery_mode=poll`` does not match + # the documented envelope. Surface as Malformed for visibility. + v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"]) + assert isinstance(v, a2a_response.Malformed) + + def test_delivery_mode_alone_is_malformed_not_queued(self): + v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"]) + assert isinstance(v, a2a_response.Malformed) + + def test_logs_info_on_queued(self, caplog): + # Comprehensive logging — operator should see queued events at INFO. + with caplog.at_level(logging.INFO, logger="a2a_response"): + a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert any("queued for poll-mode peer" in r.message for r in caplog.records) + + +class TestResultVariant: + """``parse()`` extracts the JSON-RPC ``result`` envelope into + ``Result(text, parts, raw_result)``.""" + + def test_simple_text_result(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello world" + assert len(v.parts) == 1 + assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]} + + def test_multipart_result_extracts_first_part_text(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "first" + assert len(v.parts) == 2 + + def test_result_with_no_parts(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"]) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == [] + + def test_part_without_text_key(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"]) + assert isinstance(v, a2a_response.Result) + # No "text" key — extracted text is empty, parts list intact. + assert v.text == "" + assert len(v.parts) == 1 + + def test_result_non_dict_returns_text_form(self): + # Pathological but legal: ``result`` is a string instead of a dict. + v = a2a_response.parse({"result": "hello"}) + assert isinstance(v, a2a_response.Result) + assert v.text == "hello" + assert v.parts == [] + + def test_result_takes_precedence_when_no_queued_envelope(self): + # Both ``result`` and ``error`` keys present — result wins + # because it's checked first after the Queued path. + v = a2a_response.parse({ + "result": {"parts": [{"kind": "text", "text": "ok"}]}, + "error": {"message": "should-be-ignored"}, + }) + assert isinstance(v, a2a_response.Result) + assert v.text == "ok" + + def test_part_with_non_dict_first_entry(self): + # ``parts[0]`` is a string instead of a dict — parser tolerates it, + # text falls back to empty. + v = a2a_response.parse({"result": {"parts": ["bare-string"]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + assert v.parts == ["bare-string"] + + def test_part_text_value_none(self): + # ``parts[0].text`` is explicitly None — extracted as "". + v = a2a_response.parse({"result": {"parts": [{"text": None}]}}) + assert isinstance(v, a2a_response.Result) + assert v.text == "" + + def test_parts_not_a_list(self): + # Server bug: ``parts`` is a dict instead of a list. Parser falls + # back to empty parts rather than raising. + v = a2a_response.parse({"result": {"parts": {"oops": True}}}) + assert isinstance(v, a2a_response.Result) + assert v.parts == [] + assert v.text == "" + + +class TestErrorVariant: + """``parse()`` extracts ``error`` envelopes into ``Error`` and + annotates platform-restart metadata when present.""" + + def test_message_and_code(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code == -32003 + assert v.restarting is False + assert v.retry_after is None + + def test_message_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "rate limited" + assert v.code is None + + def test_code_only(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "" + assert v.code == -32603 + + def test_error_string_form(self): + v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "string-shaped error" + assert v.code is None + + def test_error_non_dict_non_string(self): + v = a2a_response.parse({"error": 12345}) + assert isinstance(v, a2a_response.Error) + assert v.message == "12345" + + def test_platform_error_with_restart_metadata(self): + v = a2a_response.parse(_FIXTURES["platform_error_with_restart"]) + assert isinstance(v, a2a_response.Error) + assert "workspace agent unreachable" in v.message + assert v.restarting is True + assert v.retry_after == 15 + + def test_platform_error_without_restart(self): + v = a2a_response.parse(_FIXTURES["platform_error_plain"]) + assert isinstance(v, a2a_response.Error) + assert v.message == "workspace not found" + assert v.restarting is False + assert v.retry_after is None + + def test_error_message_with_whitespace_stripped(self): + v = a2a_response.parse({"error": {"message": " trimmed "}}) + assert isinstance(v, a2a_response.Error) + assert v.message == "trimmed" + + def test_non_int_code_dropped(self): + v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}}) + assert isinstance(v, a2a_response.Error) + assert v.code is None + + def test_non_int_retry_after_dropped(self): + v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"}) + assert isinstance(v, a2a_response.Error) + assert v.retry_after is None + + +class TestMalformedVariant: + """``parse()`` returns ``Malformed`` for any shape it can't classify + and logs at WARNING so operators see new server response shapes.""" + + def test_empty_dict(self): + v = a2a_response.parse(_FIXTURES["malformed_empty_dict"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {} + + def test_unexpected_keys(self): + v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == {"foo": "bar", "baz": 42} + + def test_non_dict_input_list(self): + v = a2a_response.parse([1, 2, 3]) + assert isinstance(v, a2a_response.Malformed) + assert v.raw == [1, 2, 3] + + def test_non_dict_input_string(self): + v = a2a_response.parse("plain string") + assert isinstance(v, a2a_response.Malformed) + assert v.raw == "plain string" + + def test_non_dict_input_none(self): + v = a2a_response.parse(None) + assert isinstance(v, a2a_response.Malformed) + assert v.raw is None + + def test_logs_warning_on_malformed(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse(_FIXTURES["malformed_unexpected_keys"]) + assert any(r.levelno == logging.WARNING for r in caplog.records) + + def test_logs_warning_on_non_dict(self, caplog): + with caplog.at_level(logging.WARNING, logger="a2a_response"): + a2a_response.parse("not a dict") + assert any("non-dict" in r.message for r in caplog.records) + + +# ============== Robustness — parser never raises ============== + + +_ADVERSARIAL_INPUTS: list[Any] = [ + None, + True, + False, + 0, + -1, + 3.14, + "", + "string", + [], + [1, 2, 3], + {}, + {"random": "garbage"}, + {"result": None}, + {"result": [1, 2, 3]}, + {"result": {"parts": None}}, + {"result": {"parts": [None]}}, + {"result": {"parts": [{"text": []}]}}, + {"error": None}, + {"error": []}, + {"error": {"message": None, "code": None}}, + {"error": {"message": ["nested", "list"]}}, + {"status": None, "delivery_mode": None, "method": None}, + {"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode + {"status": "running", "delivery_mode": "poll"}, # wrong status + {"status": 42, "delivery_mode": "poll"}, # non-string status + # Deeply-nested junk + {"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}}, + # Bytes (not really JSON-decodable but parser shouldn't raise) + {"result": {"parts": [{"text": b"bytes" if False else "x"}]}}, +] + + +class TestRobustness: + """Parser must never raise on adversarial input — every branch is total. + + These cases catch regressions where a future change adds a key + access that doesn't tolerate ``None`` / wrong-type values. + """ + + @pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS) + def test_parse_never_raises(self, payload): + # Single contract: parse must return one of the four variants + # regardless of input. No exception classes propagated. + v = a2a_response.parse(payload) + assert isinstance(v, (a2a_response.Result, a2a_response.Error, + a2a_response.Queued, a2a_response.Malformed)) + + +# ============== Regression gate — pre-#2967 misclassified queued ============== + + +class TestRegressionGate: + """Pin the bug that prompted the SSOT abstraction. + + Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in + data`` and ``"error" in data`` — the poll-queued envelope (no + result key, no error key) hit the bare-else and returned the + "unexpected response shape" error string. This test simulates the + pre-fix code path and confirms the SSOT parser correctly + distinguishes Queued from Malformed. + """ + + def test_legacy_sniffer_would_return_neither_branch(self): + # The pre-#2967 logic — provided here so the regression is + # reproducible from this file alone, no archaeology needed. + envelope = _FIXTURES["poll_queued_full"] + legacy_branch = ( + "result" if "result" in envelope + else "error" if "error" in envelope + else "unexpected_shape" + ) + # Legacy sniff: hits the malformed branch. + assert legacy_branch == "unexpected_shape" + + def test_ssot_parser_classifies_correctly(self): + # New parser: classifies as Queued. + v = a2a_response.parse(_FIXTURES["poll_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + + def test_every_fixture_classifies_to_expected_variant(self): + # Defense in depth — pin the variant for every fixture so a + # future shape addition has to update the table here too. + expected: dict[str, type] = { + "jsonrpc_success_with_text": a2a_response.Result, + "jsonrpc_success_multipart": a2a_response.Result, + "jsonrpc_success_no_parts": a2a_response.Result, + "jsonrpc_success_part_no_text_key": a2a_response.Result, + "jsonrpc_error_with_message_and_code": a2a_response.Error, + "jsonrpc_error_message_only": a2a_response.Error, + "jsonrpc_error_code_only": a2a_response.Error, + "jsonrpc_error_string_form": a2a_response.Error, + "platform_error_with_restart": a2a_response.Error, + "platform_error_plain": a2a_response.Error, + "poll_queued_full": a2a_response.Queued, + "poll_queued_notify": a2a_response.Queued, + "poll_queued_no_method": a2a_response.Queued, + "malformed_empty_dict": a2a_response.Malformed, + "malformed_unexpected_keys": a2a_response.Malformed, + "malformed_status_queued_no_delivery_mode": a2a_response.Malformed, + "malformed_delivery_mode_no_status": a2a_response.Malformed, + } + # Every fixture must be enumerated — keeps this gate honest. + assert set(expected.keys()) == set(_FIXTURES.keys()), ( + f"fixture/expected mismatch: " + f"missing-from-expected={set(_FIXTURES) - set(expected)} " + f"extra-in-expected={set(expected) - set(_FIXTURES)}" + ) + for name, payload in _FIXTURES.items(): + v = a2a_response.parse(payload) + assert isinstance(v, expected[name]), ( + f"fixture {name!r} classified as {type(v).__name__}, " + f"expected {expected[name].__name__}" + ) diff --git a/workspace/tests/test_delegation_sync_via_polling.py b/workspace/tests/test_delegation_sync_via_polling.py index 7f6b2918..018d572a 100644 --- a/workspace/tests/test_delegation_sync_via_polling.py +++ b/workspace/tests/test_delegation_sync_via_polling.py @@ -93,6 +93,124 @@ class TestFlagOffLegacyPath: poll_mock.assert_not_called() +# --------------------------------------------------------------------------- +# #2967: Auto-fallback to polling path when target is poll-mode +# --------------------------------------------------------------------------- + +class TestPollModeAutoFallback: + """Pin the #2967 behavior: when send_a2a_message returns the queued + sentinel (target is poll-mode), tool_delegate_task transparently + falls back to _delegate_sync_via_polling — which DOES work for + poll-mode peers (the executeDelegation goroutine writes to the + inbox queue and the result row arrives when the target replies). + + Pre-#2967 behavior: queued sentinel was never returned (the parser + misclassified the envelope as malformed), and the calling agent + saw a DELEGATION FAILED / unexpected-response-shape error. This + test guards both against the parser regression (sentinel-emission) + and the fallback regression (sentinel-handling). + """ + + async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch): + # Flag OFF — legacy send_a2a_message path. send returns the + # queued sentinel because the target is poll-mode. delegate_task + # must auto-route to _delegate_sync_via_polling so the agent + # eventually gets a real reply. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_QUEUED_PREFIX + + send_calls = [] + poll_calls = [] + + async def fake_send(workspace_id, task, source_workspace_id=None): + send_calls.append((workspace_id, task, source_workspace_id)) + return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send" + + async def fake_polling(workspace_id, task, src): + poll_calls.append((workspace_id, task, src)) + return "real response from poll-mode peer" + + async def fake_discover(*_a, **_kw): + return {"name": "poll-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity): + result = await a2a_tools.tool_delegate_task( + "ws-target", "task body", source_workspace_id="ws-self" + ) + + # send was tried first + assert len(send_calls) == 1 + # …then fallback fired automatically + assert len(poll_calls) == 1 + assert poll_calls[0] == ("ws-target", "task body", "ws-self") + # Caller sees the real reply, NOT the queued sentinel and NOT + # a DELEGATION FAILED string. + assert result == "real response from poll-mode peer" + + async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch): + # Push-mode peer returns a normal text reply — fallback path + # MUST NOT fire (no extra round-trip cost). + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + + async def fake_send(*_a, **_kw): + return "normal reply" + + async def fake_discover(*_a, **_kw): + return {"name": "push-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert result == "normal reply" + poll_mock.assert_not_called() + + async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch): + # Genuine error (not queued) — must surface as DELEGATION FAILED, + # not silently retried via the polling path. + monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False) + + import a2a_tools + from a2a_client import _A2A_ERROR_PREFIX + + async def fake_send(*_a, **_kw): + return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]" + + async def fake_discover(*_a, **_kw): + return {"name": "broken-peer", "status": "online"} + + async def fake_report_activity(*_a, **_kw): + return None + + with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \ + patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \ + patch("a2a_tools.report_activity", side_effect=fake_report_activity), \ + patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock: + result = await a2a_tools.tool_delegate_task( + "ws-target", "task", source_workspace_id="ws-self" + ) + + assert "DELEGATION FAILED" in result + poll_mock.assert_not_called() + + # --------------------------------------------------------------------------- # Flag-on: dispatch failures # ---------------------------------------------------------------------------