Merge pull request #2979 from Molecule-AI/fix/a2a-poll-mode-response-shape-2967
feat(a2a): SSOT typed-variant response parser + auto-fallback for poll-mode peers (#2967)
This commit is contained in:
commit
9dd29882e2
@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = {
|
|||||||
"a2a_client",
|
"a2a_client",
|
||||||
"a2a_executor",
|
"a2a_executor",
|
||||||
"a2a_mcp_server",
|
"a2a_mcp_server",
|
||||||
|
"a2a_response",
|
||||||
"a2a_tools",
|
"a2a_tools",
|
||||||
"a2a_tools_delegation",
|
"a2a_tools_delegation",
|
||||||
"a2a_tools_inbox",
|
"a2a_tools_inbox",
|
||||||
|
|||||||
@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a
|
|||||||
}')
|
}')
|
||||||
|
|
||||||
check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP"
|
check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP"
|
||||||
|
|
||||||
|
# ---------- Phase 3.5: Python parser classifies queued envelope correctly ----------
|
||||||
|
# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse()
|
||||||
|
# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare
|
||||||
|
# message/send parser in a2a_client.py:587 misclassified this and returned
|
||||||
|
# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A
|
||||||
|
# on poll-mode peers.
|
||||||
|
#
|
||||||
|
# This phase exercises the actual on-the-wire response from a real
|
||||||
|
# workspace-server (NOT a mocked dict) through the same module the production
|
||||||
|
# wheel ships, so a regression in either the server emit shape OR the client
|
||||||
|
# parser fails this E2E.
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---"
|
||||||
|
|
||||||
|
# Pipe the queued response captured above through a2a_response.parse and
|
||||||
|
# assert the classification. WORKSPACE_ID is required at module import
|
||||||
|
# time but irrelevant to this parsing call (any UUID is fine).
|
||||||
|
PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \
|
||||||
|
python3 -c "
|
||||||
|
import json, sys
|
||||||
|
sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)')
|
||||||
|
import a2a_response
|
||||||
|
data = json.loads(r'''$A2A_RESP''')
|
||||||
|
v = a2a_response.parse(data)
|
||||||
|
print(type(v).__name__)
|
||||||
|
if isinstance(v, a2a_response.Queued):
|
||||||
|
print(f'method={v.method} delivery_mode={v.delivery_mode}')
|
||||||
|
")
|
||||||
|
|
||||||
|
check_eq "Python parser classifies real server response as Queued" \
|
||||||
|
"Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)"
|
||||||
|
check "Queued variant captures method=message/send" \
|
||||||
|
"method=message/send" "$PARSE_RESULT"
|
||||||
|
check "Queued variant captures delivery_mode=poll" \
|
||||||
|
"delivery_mode=poll" "$PARSE_RESULT"
|
||||||
check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP"
|
check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP"
|
||||||
check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP"
|
check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP"
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
|
|||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
import a2a_response
|
||||||
from platform_auth import auth_headers, self_source_headers
|
from platform_auth import auth_headers, self_source_headers
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str:
|
|||||||
# Used by delegate_task to distinguish real errors from normal response text.
|
# Used by delegate_task to distinguish real errors from normal response text.
|
||||||
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
||||||
|
|
||||||
|
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
|
||||||
|
# When the target workspace is registered as delivery_mode=poll (no
|
||||||
|
# public URL — typical for external molecule-mcp standalone runtimes),
|
||||||
|
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
|
||||||
|
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
|
||||||
|
# instead of dispatching over HTTP. The message IS delivered (written
|
||||||
|
# to the platform's inbox queue); there's just no synchronous reply
|
||||||
|
# to relay. Pre-#2967 the client treated this as "unexpected response
|
||||||
|
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
|
||||||
|
# duplicates. The Queued prefix lets callers branch on this outcome
|
||||||
|
# explicitly: "delivered async, no synchronous reply expected" is
|
||||||
|
# different from both success-with-text and failure.
|
||||||
|
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
|
||||||
|
|
||||||
# Workspace IDs are UUIDs everywhere we generate them (platform's
|
# Workspace IDs are UUIDs everywhere we generate them (platform's
|
||||||
# workspaces.id column, /registry/discover/:id route param, etc.) but
|
# workspaces.id column, /registry/discover/:id route param, etc.) but
|
||||||
# the agent-facing tool surface receives them as free-form strings via
|
# the agent-facing tool surface receives them as free-form strings via
|
||||||
@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
if "result" in data:
|
# Dispatch via the SSOT response model (a2a_response.py).
|
||||||
parts = data["result"].get("parts", [])
|
# All shape detection lives in one place — the parser
|
||||||
text = parts[0].get("text", "") if parts else "(no response)"
|
# never raises and routes unknown shapes to Malformed
|
||||||
# Tag child-reported errors so the caller can detect them reliably
|
# so a future server-side change is loud, not silent.
|
||||||
|
variant = a2a_response.parse(data)
|
||||||
|
if isinstance(variant, a2a_response.Result):
|
||||||
|
# Match legacy semantics:
|
||||||
|
# parts non-empty + first part has no text → ""
|
||||||
|
# parts empty → "(no response)"
|
||||||
|
# Differentiation matters for callers that assert
|
||||||
|
# on the empty-string case (test_a2a_client).
|
||||||
|
if variant.parts:
|
||||||
|
text = variant.text
|
||||||
|
else:
|
||||||
|
text = "(no response)"
|
||||||
|
# Tag child-reported errors so the caller can
|
||||||
|
# detect them reliably — agent-side bug surfaces
|
||||||
|
# text like "Agent error: <traceback>" inside a
|
||||||
|
# JSON-RPC success envelope.
|
||||||
if text.startswith("Agent error:"):
|
if text.startswith("Agent error:"):
|
||||||
return f"{_A2A_ERROR_PREFIX}{text}"
|
return f"{_A2A_ERROR_PREFIX}{text}"
|
||||||
return text
|
return text
|
||||||
elif "error" in data:
|
if isinstance(variant, a2a_response.Queued):
|
||||||
err = data["error"]
|
# Poll-mode peer — message accepted into the inbox
|
||||||
msg = (err.get("message") or "").strip()
|
# queue, target agent will fetch via poll. NOT a
|
||||||
code = err.get("code")
|
# failure. Return the queued sentinel so callers
|
||||||
|
# (delegate_task etc.) can render the outcome
|
||||||
|
# accurately instead of treating it as an error.
|
||||||
|
logger.info(
|
||||||
|
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
|
||||||
|
target_url,
|
||||||
|
variant.method,
|
||||||
|
)
|
||||||
|
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
|
||||||
|
if isinstance(variant, a2a_response.Error):
|
||||||
|
msg = variant.message
|
||||||
|
code = variant.code
|
||||||
if msg and code is not None:
|
if msg and code is not None:
|
||||||
detail = f"{msg} (code={code})"
|
detail = f"{msg} (code={code})"
|
||||||
elif msg:
|
elif msg:
|
||||||
@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
|
|||||||
detail = f"JSON-RPC error with no message (code={code})"
|
detail = f"JSON-RPC error with no message (code={code})"
|
||||||
else:
|
else:
|
||||||
detail = "JSON-RPC error with no message"
|
detail = "JSON-RPC error with no message"
|
||||||
|
if variant.restarting:
|
||||||
|
# Surface platform-restart-in-progress
|
||||||
|
# explicitly — caller (UI / delegating agent)
|
||||||
|
# can render a softer "agent is restarting"
|
||||||
|
# message rather than a generic failure.
|
||||||
|
retry = (
|
||||||
|
f", retry_after={variant.retry_after}s"
|
||||||
|
if variant.retry_after is not None
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
detail = f"{detail} (restarting{retry})"
|
||||||
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
|
||||||
elif data.get("status") == "queued" and data.get("delivery_mode") == "poll":
|
# Malformed — log loud + surface as error so the
|
||||||
# Workspace-server's poll-mode short-circuit envelope
|
# operator notices a server change. SSOT refactor
|
||||||
# (workspace-server/internal/handlers/a2a_proxy.go ~line 402).
|
# subsumes the inline "queued" check that landed in
|
||||||
# The peer is poll-mode and has no URL to dispatch to, so
|
# the #2972 hotfix; that branch is now the typed
|
||||||
# the server queued the message for the peer's next inbox
|
# Queued variant above.
|
||||||
# poll instead of forwarding it. Delivery is acknowledged
|
logger.warning(
|
||||||
# but pending consumption.
|
"send_a2a_message: malformed response (target=%s body=%.200s)",
|
||||||
#
|
target_url,
|
||||||
# Pre-fix this fell through to the "unexpected response
|
str(variant.raw),
|
||||||
# shape" error path → callers logged false failures, then
|
)
|
||||||
# delegate_task retried, and the peer received duplicate
|
return (
|
||||||
# delegations. Issue #2967.
|
f"{_A2A_ERROR_PREFIX}unexpected response shape "
|
||||||
method = data.get("method") or "message/send"
|
f"(no result, error, or queued envelope): "
|
||||||
logger.info(
|
f"{str(variant.raw)[:200]} [target={target_url}]"
|
||||||
"send_a2a_message: queued for poll-mode peer (method=%s, target=%s)",
|
)
|
||||||
method, target_url,
|
|
||||||
)
|
|
||||||
return f"queued for poll-mode peer (method={method})"
|
|
||||||
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
|
|
||||||
except _TRANSIENT_HTTP_ERRORS as e:
|
except _TRANSIENT_HTTP_ERRORS as e:
|
||||||
last_exc = e
|
last_exc = e
|
||||||
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
|
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
|
||||||
|
|||||||
246
workspace/a2a_response.py
Normal file
246
workspace/a2a_response.py
Normal file
@ -0,0 +1,246 @@
|
|||||||
|
"""Single source of truth for A2A ``/workspaces/<id>/a2a`` response shapes.
|
||||||
|
|
||||||
|
The workspace-server proxy at
|
||||||
|
``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical
|
||||||
|
emitter) returns one of the following shapes for a single A2A call:
|
||||||
|
|
||||||
|
* **JSON-RPC success** —
|
||||||
|
``{"jsonrpc": "2.0", "result": {...}, "id": "..."}``
|
||||||
|
The agent's reply, passed through unchanged.
|
||||||
|
|
||||||
|
* **JSON-RPC error** —
|
||||||
|
``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}``
|
||||||
|
The agent reported a structured error.
|
||||||
|
|
||||||
|
* **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see
|
||||||
|
``a2a_proxy.go:402-406``) —
|
||||||
|
``{"status": "queued", "delivery_mode": "poll", "method": "..."}``
|
||||||
|
The target is a poll-mode workspace (no public URL); the message
|
||||||
|
was written to the platform's inbox queue. The target agent will
|
||||||
|
fetch it via ``GET /activity?since_id=`` polling. NOT a failure —
|
||||||
|
delivery succeeded, there's just no synchronous reply to relay.
|
||||||
|
|
||||||
|
* **Platform error** — ``{"error": "...", "restarting": true?, "retry_after": int?}``
|
||||||
|
HTTP-level failure synthesized by the proxy when the agent is
|
||||||
|
unreachable, the container is restarting, or some other infrastructure
|
||||||
|
failure happened. ``restarting=true`` flags the platform-initiated
|
||||||
|
container-restart path.
|
||||||
|
|
||||||
|
* **Malformed** — anything else. Surfaced explicitly so a future server
|
||||||
|
change is loud rather than silent.
|
||||||
|
|
||||||
|
The ``parse(data)`` function classifies a pre-decoded JSON body into a
|
||||||
|
typed variant. Callers ``match`` on the variant and never re-implement
|
||||||
|
shape detection — that's the SSOT discipline.
|
||||||
|
|
||||||
|
# SSOT contract
|
||||||
|
|
||||||
|
This file is the Python half. The Go server emits these shapes today
|
||||||
|
via inline ``gin.H{...}`` literals. A future PR can introduce a Go
|
||||||
|
mirror (e.g. ``workspace-server/internal/models/a2a_response.go``)
|
||||||
|
with a typed marshaller — until then, **any change to the wire shape
|
||||||
|
must be reflected here** and gated by ``test_a2a_response.py``'s
|
||||||
|
fixture corpus. The corpus exists specifically so a one-sided edit
|
||||||
|
breaks CI.
|
||||||
|
|
||||||
|
# Why a typed model (vs. dict-key sniffing at every site)
|
||||||
|
|
||||||
|
The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result``
|
||||||
|
or ``error`` keys inline and treated everything else as malformed —
|
||||||
|
which silently broke poll-mode peers (the queued envelope has neither
|
||||||
|
key). Inline sniffing per call site multiplies the surface area where
|
||||||
|
a new shape gets misclassified. A single typed parser with an
|
||||||
|
explicit ``Malformed`` escape hatch makes shape additions a
|
||||||
|
one-line change here + a fixture entry in the test corpus, instead of
|
||||||
|
a hunt through every parsing site in the runtime.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import dataclasses
|
||||||
|
import logging
|
||||||
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Result:
|
||||||
|
"""JSON-RPC success — agent's reply available synchronously.
|
||||||
|
|
||||||
|
``text`` is the convenience extraction from ``parts[0].text`` (the
|
||||||
|
A2A multipart shape). ``parts`` is the full list, available for
|
||||||
|
callers that need richer rendering (multiple parts, non-text parts).
|
||||||
|
``raw_result`` preserves the unparsed ``result`` field for any
|
||||||
|
caller that needs it (e.g. activity-row response_body audit).
|
||||||
|
"""
|
||||||
|
|
||||||
|
text: str
|
||||||
|
parts: list[dict[str, Any]] = dataclasses.field(default_factory=list)
|
||||||
|
raw_result: Optional[dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Error:
|
||||||
|
"""JSON-RPC error or platform-level error response.
|
||||||
|
|
||||||
|
``code`` is the JSON-RPC integer code when present, else None.
|
||||||
|
``restarting`` / ``retry_after`` are platform-restart-in-progress
|
||||||
|
metadata: when both are set, the caller knows the container is
|
||||||
|
being recycled and may surface a softer error to the user.
|
||||||
|
"""
|
||||||
|
|
||||||
|
message: str
|
||||||
|
code: Optional[int] = None
|
||||||
|
restarting: bool = False
|
||||||
|
retry_after: Optional[int] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Queued:
|
||||||
|
"""Platform poll-mode short-circuit — message accepted, peer will pick up async.
|
||||||
|
|
||||||
|
Returned when the target workspace is registered as
|
||||||
|
``delivery_mode=poll`` (no public URL — typical for external
|
||||||
|
standalone ``molecule-mcp`` runtimes). The message was written to
|
||||||
|
the platform's inbox queue; the target agent will fetch it via
|
||||||
|
``GET /activity?since_id=`` polling.
|
||||||
|
|
||||||
|
NOT a failure. Callers that expect a synchronous reply (the agent's
|
||||||
|
response text) won't get one here — they should either:
|
||||||
|
|
||||||
|
* Tolerate the absence of a reply (fire-and-forget semantics).
|
||||||
|
* Fall back to the durable ``/workspaces/:id/delegate`` +
|
||||||
|
``/delegations`` polling path (see ``a2a_tools_delegation``'s
|
||||||
|
``_delegate_sync_via_polling``), which writes the same A2A
|
||||||
|
request through the platform's executeDelegation goroutine
|
||||||
|
and lets the caller poll for the result row.
|
||||||
|
|
||||||
|
``method`` echoes the request method (``message/send``, ``notify``,
|
||||||
|
etc.) so callers can correlate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
method: str
|
||||||
|
delivery_mode: str = "poll"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Malformed:
|
||||||
|
"""Server returned a body the parser can't classify.
|
||||||
|
|
||||||
|
Carries the raw decoded payload for diagnostic logging. Callers
|
||||||
|
typically render this as an error to the user (see
|
||||||
|
``send_a2a_message``) — but the Malformed variant is a separate
|
||||||
|
type so logging / metrics can distinguish it from genuine
|
||||||
|
JSON-RPC ``Error`` responses.
|
||||||
|
"""
|
||||||
|
|
||||||
|
raw: Any # whatever the server returned: dict / list / str / number / etc.
|
||||||
|
|
||||||
|
|
||||||
|
Variant = Union[Result, Error, Queued, Malformed]
|
||||||
|
|
||||||
|
|
||||||
|
# Field-name constants — the wire vocabulary. Single source of truth;
|
||||||
|
# the parser references these by name so a change here is a
|
||||||
|
# one-line edit instead of a hunt through string literals.
|
||||||
|
_KEY_RESULT = "result"
|
||||||
|
_KEY_ERROR = "error"
|
||||||
|
_KEY_STATUS = "status"
|
||||||
|
_KEY_DELIVERY_MODE = "delivery_mode"
|
||||||
|
_KEY_METHOD = "method"
|
||||||
|
_KEY_RESTARTING = "restarting"
|
||||||
|
_KEY_RETRY_AFTER = "retry_after"
|
||||||
|
|
||||||
|
_STATUS_QUEUED = "queued"
|
||||||
|
_DELIVERY_MODE_POLL = "poll"
|
||||||
|
|
||||||
|
|
||||||
|
def parse(data: Any) -> Variant:
|
||||||
|
"""Classify a pre-decoded ``/a2a`` JSON response into a typed variant.
|
||||||
|
|
||||||
|
Never raises. Every branch is total: any input that doesn't match a
|
||||||
|
known shape routes to ``Malformed`` so the caller can decide how
|
||||||
|
to surface it.
|
||||||
|
|
||||||
|
The order of checks matters:
|
||||||
|
|
||||||
|
1. Non-dict input → Malformed (server contract is dict-shaped).
|
||||||
|
2. Poll-queued envelope is checked BEFORE result/error because a
|
||||||
|
server bug that sets both ``status=queued`` and ``result``
|
||||||
|
should be loud, not silently treated as Result.
|
||||||
|
3. ``result`` → Result (the JSON-RPC success path).
|
||||||
|
4. ``error`` → Error (JSON-RPC error or platform error).
|
||||||
|
5. Anything else → Malformed.
|
||||||
|
"""
|
||||||
|
if not isinstance(data, dict):
|
||||||
|
logger.warning(
|
||||||
|
"a2a_response.parse: non-dict body — got %s",
|
||||||
|
type(data).__name__,
|
||||||
|
)
|
||||||
|
return Malformed(raw=data)
|
||||||
|
|
||||||
|
# Poll-queued envelope. Both keys must be present — the workspace
|
||||||
|
# server sets them together; if only one is present the body is
|
||||||
|
# ambiguous and we route to Malformed for visibility.
|
||||||
|
if (
|
||||||
|
data.get(_KEY_STATUS) == _STATUS_QUEUED
|
||||||
|
and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL
|
||||||
|
):
|
||||||
|
method_raw = data.get(_KEY_METHOD)
|
||||||
|
method = str(method_raw) if method_raw is not None else "unknown"
|
||||||
|
logger.info(
|
||||||
|
"a2a_response.parse: queued for poll-mode peer (method=%s)",
|
||||||
|
method,
|
||||||
|
)
|
||||||
|
return Queued(method=method)
|
||||||
|
|
||||||
|
# JSON-RPC success.
|
||||||
|
if _KEY_RESULT in data:
|
||||||
|
result = data[_KEY_RESULT]
|
||||||
|
if isinstance(result, dict):
|
||||||
|
parts_raw = result.get("parts")
|
||||||
|
parts = parts_raw if isinstance(parts_raw, list) else []
|
||||||
|
text = ""
|
||||||
|
if parts:
|
||||||
|
first = parts[0]
|
||||||
|
if isinstance(first, dict):
|
||||||
|
text_raw = first.get("text")
|
||||||
|
text = str(text_raw) if text_raw is not None else ""
|
||||||
|
return Result(text=text, parts=parts, raw_result=result)
|
||||||
|
# ``result`` present but not a dict — unusual but not an error;
|
||||||
|
# surface as a Result with the value rendered to text.
|
||||||
|
return Result(text=str(result), parts=[], raw_result=None)
|
||||||
|
|
||||||
|
# JSON-RPC error or platform error.
|
||||||
|
if _KEY_ERROR in data:
|
||||||
|
err_raw = data[_KEY_ERROR]
|
||||||
|
message = ""
|
||||||
|
code: Optional[int] = None
|
||||||
|
if isinstance(err_raw, dict):
|
||||||
|
msg_raw = err_raw.get("message")
|
||||||
|
if msg_raw is not None:
|
||||||
|
message = str(msg_raw).strip()
|
||||||
|
code_raw = err_raw.get("code")
|
||||||
|
if isinstance(code_raw, int):
|
||||||
|
code = code_raw
|
||||||
|
elif isinstance(err_raw, str):
|
||||||
|
message = err_raw.strip()
|
||||||
|
else:
|
||||||
|
message = str(err_raw)
|
||||||
|
|
||||||
|
restarting = bool(data.get(_KEY_RESTARTING, False))
|
||||||
|
retry_after_raw = data.get(_KEY_RETRY_AFTER)
|
||||||
|
retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None
|
||||||
|
|
||||||
|
return Error(
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
restarting=restarting,
|
||||||
|
retry_after=retry_after,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"a2a_response.parse: unrecognized shape — keys=%s",
|
||||||
|
sorted(data.keys()),
|
||||||
|
)
|
||||||
|
return Malformed(raw=data)
|
||||||
@ -29,14 +29,18 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
from a2a_client import (
|
from a2a_client import (
|
||||||
PLATFORM_URL,
|
PLATFORM_URL,
|
||||||
WORKSPACE_ID,
|
WORKSPACE_ID,
|
||||||
_A2A_ERROR_PREFIX,
|
_A2A_ERROR_PREFIX,
|
||||||
|
_A2A_QUEUED_PREFIX,
|
||||||
_peer_names,
|
_peer_names,
|
||||||
_peer_to_source,
|
_peer_to_source,
|
||||||
discover_peer,
|
discover_peer,
|
||||||
@ -245,6 +249,29 @@ async def tool_delegate_task(
|
|||||||
# (the platform proxy) so the same code works for in-container and
|
# (the platform proxy) so the same code works for in-container and
|
||||||
# external (standalone molecule-mcp) callers.
|
# external (standalone molecule-mcp) callers.
|
||||||
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
||||||
|
# #2967: when the target is a poll-mode peer, the platform's
|
||||||
|
# a2a_proxy short-circuits and returns a queued envelope —
|
||||||
|
# send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX
|
||||||
|
# sentinel. The synchronous proxy path can't deliver a reply
|
||||||
|
# because the target has no public URL; fall back to the
|
||||||
|
# durable /delegate + /delegations polling path which DOES
|
||||||
|
# work for poll-mode peers (the executeDelegation goroutine
|
||||||
|
# writes to the inbox queue and the result row arrives when
|
||||||
|
# the target picks it up + replies).
|
||||||
|
#
|
||||||
|
# This is what makes external-runtime-to-external-runtime
|
||||||
|
# A2A actually deliver synchronous replies — without the
|
||||||
|
# fallback the calling agent sees the queued sentinel as
|
||||||
|
# success-with-no-text and never gets the peer's response.
|
||||||
|
if result.startswith(_A2A_QUEUED_PREFIX):
|
||||||
|
logger.info(
|
||||||
|
"tool_delegate_task: target=%s is poll-mode; "
|
||||||
|
"falling back from message/send to /delegate-poll path",
|
||||||
|
workspace_id,
|
||||||
|
)
|
||||||
|
result = await _delegate_sync_via_polling(
|
||||||
|
workspace_id, task, src or WORKSPACE_ID,
|
||||||
|
)
|
||||||
|
|
||||||
# Detect delegation failures — wrap them clearly so the calling agent
|
# Detect delegation failures — wrap them clearly so the calling agent
|
||||||
# can decide to retry, use another peer, or handle the task itself.
|
# can decide to retry, use another peer, or handle the task itself.
|
||||||
|
|||||||
@ -281,11 +281,11 @@ class TestSendA2AMessage:
|
|||||||
to the 'unexpected response shape' error path → callers retried,
|
to the 'unexpected response shape' error path → callers retried,
|
||||||
peer got duplicate delegations.
|
peer got duplicate delegations.
|
||||||
|
|
||||||
Pin: poll-queued envelope returns a clean success string that does
|
Pin: poll-queued envelope returns a string tagged with the
|
||||||
NOT start with _A2A_ERROR_PREFIX, so callers route it through the
|
_A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers
|
||||||
normal-outcome path. Verified discriminating: assert_NOT_startswith
|
can branch on the typed outcome without substring-sniffing.
|
||||||
the error prefix would FAIL on the old code (which returned an
|
Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so
|
||||||
error-prefixed string) and PASSES on the new code.
|
the not-startswith assertion would FAIL on the old code.
|
||||||
"""
|
"""
|
||||||
import a2a_client
|
import a2a_client
|
||||||
|
|
||||||
@ -301,12 +301,13 @@ class TestSendA2AMessage:
|
|||||||
|
|
||||||
# Discriminating: pre-fix returned a string that startswith
|
# Discriminating: pre-fix returned a string that startswith
|
||||||
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
|
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
|
||||||
# old code. New code returns a queued-success string.
|
# old code. New code returns the queued-success sentinel.
|
||||||
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
|
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
|
||||||
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
|
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
|
||||||
)
|
)
|
||||||
assert "queued" in result.lower()
|
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), (
|
||||||
assert "poll" in result.lower()
|
f"poll-queued envelope must use the queued sentinel; got: {result!r}"
|
||||||
|
)
|
||||||
# The method is included so a structured-log scraper can route by
|
# The method is included so a structured-log scraper can route by
|
||||||
# protocol verb if needed.
|
# protocol verb if needed.
|
||||||
assert "message/send" in result
|
assert "message/send" in result
|
||||||
@ -329,6 +330,7 @@ class TestSendA2AMessage:
|
|||||||
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||||
|
|
||||||
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||||
|
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
|
||||||
assert "message/sendStream" in result
|
assert "message/sendStream" in result
|
||||||
|
|
||||||
async def test_status_queued_without_poll_mode_still_falls_through(self):
|
async def test_status_queued_without_poll_mode_still_falls_through(self):
|
||||||
@ -462,6 +464,98 @@ def _make_seq_mock_client(post_side_effect):
|
|||||||
return mock_client
|
return mock_client
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendA2AMessagePollMode:
|
||||||
|
"""Pin the #2967 fix: send_a2a_message recognizes the platform's
|
||||||
|
poll-mode short-circuit envelope and returns a queued sentinel
|
||||||
|
instead of an "unexpected response shape" error.
|
||||||
|
|
||||||
|
Pre-#2967 the client treated the queued envelope as malformed,
|
||||||
|
causing the calling agent to retry, which delivered the same
|
||||||
|
message twice to the (poll-mode) recipient. The Queued sentinel
|
||||||
|
lets delegate_task fall back to the durable polling path
|
||||||
|
transparently — see test_delegation_sync_via_polling for the
|
||||||
|
fallback verification.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def test_poll_queued_envelope_returns_queued_sentinel(self):
|
||||||
|
# Workspace-server returns this shape (a2a_proxy.go:402-406)
|
||||||
|
# when the target workspace is registered as delivery_mode=poll
|
||||||
|
# (no public URL, typical for external molecule-mcp standalone
|
||||||
|
# runtimes).
|
||||||
|
import a2a_client
|
||||||
|
|
||||||
|
resp = _make_response(200, {
|
||||||
|
"status": "queued",
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
"method": "message/send",
|
||||||
|
})
|
||||||
|
mock_client = _make_mock_client(post_resp=resp)
|
||||||
|
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||||
|
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||||
|
|
||||||
|
# Sentinel + structured payload so callers can branch on it.
|
||||||
|
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
|
||||||
|
# Critically: NOT the error sentinel. Pre-#2967 it was the error path.
|
||||||
|
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||||
|
# Carries enough info for the caller to log meaningfully.
|
||||||
|
assert _TEST_PEER_ID in result
|
||||||
|
assert "message/send" in result
|
||||||
|
|
||||||
|
async def test_poll_queued_envelope_method_is_recorded(self):
|
||||||
|
import a2a_client
|
||||||
|
|
||||||
|
resp = _make_response(200, {
|
||||||
|
"status": "queued",
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
"method": "notify",
|
||||||
|
})
|
||||||
|
mock_client = _make_mock_client(post_resp=resp)
|
||||||
|
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||||
|
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||||
|
|
||||||
|
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
|
||||||
|
assert "notify" in result
|
||||||
|
|
||||||
|
async def test_status_queued_without_delivery_mode_is_unexpected_shape(self):
|
||||||
|
# Server bug: only ``status=queued`` set, ``delivery_mode``
|
||||||
|
# missing. Surface as the malformed branch (not Queued) — the
|
||||||
|
# SSOT parser treats this as Malformed because the documented
|
||||||
|
# contract requires both keys.
|
||||||
|
import a2a_client
|
||||||
|
|
||||||
|
resp = _make_response(200, {"status": "queued", "method": "message/send"})
|
||||||
|
mock_client = _make_mock_client(post_resp=resp)
|
||||||
|
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||||
|
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||||
|
|
||||||
|
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||||
|
assert "unexpected response shape" in result
|
||||||
|
# Must explicitly mention "or queued envelope" so an operator
|
||||||
|
# debugging this knows the parser HAS a Queued branch and the
|
||||||
|
# body just didn't match — not that the parser is missing the
|
||||||
|
# logic entirely (the pre-#2967 confusion).
|
||||||
|
assert "queued envelope" in result
|
||||||
|
|
||||||
|
async def test_platform_error_with_restart_metadata_surfaces_in_message(self):
|
||||||
|
# The platform error envelope: 503 with restart metadata.
|
||||||
|
# Surfaced as an error string that includes "restarting" so
|
||||||
|
# the caller / agent can render a softer error to the user.
|
||||||
|
import a2a_client
|
||||||
|
|
||||||
|
resp = _make_response(200, {
|
||||||
|
"error": "workspace agent unreachable — container restart triggered",
|
||||||
|
"restarting": True,
|
||||||
|
"retry_after": 15,
|
||||||
|
})
|
||||||
|
mock_client = _make_mock_client(post_resp=resp)
|
||||||
|
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
|
||||||
|
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
|
||||||
|
|
||||||
|
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
|
||||||
|
assert "restarting" in result
|
||||||
|
assert "retry_after=15" in result
|
||||||
|
|
||||||
|
|
||||||
class TestSendA2AMessageRetry:
|
class TestSendA2AMessageRetry:
|
||||||
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
|
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
|
||||||
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
|
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
|
||||||
|
|||||||
455
workspace/tests/test_a2a_response.py
Normal file
455
workspace/tests/test_a2a_response.py
Normal file
@ -0,0 +1,455 @@
|
|||||||
|
"""Tests for the A2A response SSOT parser (workspace/a2a_response.py).
|
||||||
|
|
||||||
|
Branch coverage target: 100%. Each variant of ``parse()`` exercised in
|
||||||
|
isolation, plus adversarial-input fuzzing to assert the parser never
|
||||||
|
raises.
|
||||||
|
|
||||||
|
Pre-#2967, the response shape was sniffed inline at every call site
|
||||||
|
(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` /
|
||||||
|
``"error" in data`` checks). The bare ``else`` returned an
|
||||||
|
"unexpected response shape" error — which silently broke poll-mode
|
||||||
|
peers because the workspace-server's poll-queued envelope has neither
|
||||||
|
``result`` nor ``error``. The SSOT parser has an explicit ``Queued``
|
||||||
|
variant for that path and routes anything truly unrecognized to
|
||||||
|
``Malformed`` so a future server-side change fails loudly.
|
||||||
|
|
||||||
|
The "this test FAILS on pre-fix source" guarantee is enforced by
|
||||||
|
running the legacy-shape sniffer alongside the new parser in
|
||||||
|
``test_legacy_sniffer_misclassified_queued`` — that test fails on
|
||||||
|
the pre-#2967 ``a2a_client.py`` shape because the legacy code
|
||||||
|
returns the unexpected-shape error path for the Queued envelope.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import a2a_response
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Fixture corpus — the canonical wire shapes ==============
|
||||||
|
|
||||||
|
|
||||||
|
# Every shape below mirrors a path the workspace-server's a2a_proxy.go
|
||||||
|
# can return. When you add a new server-side response shape, add a
|
||||||
|
# fixture entry here and a corresponding test method below.
|
||||||
|
_FIXTURES = {
|
||||||
|
"jsonrpc_success_with_text": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"result": {
|
||||||
|
"parts": [{"kind": "text", "text": "hello world"}],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"jsonrpc_success_multipart": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"result": {
|
||||||
|
"parts": [
|
||||||
|
{"kind": "text", "text": "first"},
|
||||||
|
{"kind": "text", "text": "second"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"jsonrpc_success_no_parts": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"result": {},
|
||||||
|
},
|
||||||
|
"jsonrpc_success_part_no_text_key": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"result": {"parts": [{"kind": "text"}]},
|
||||||
|
},
|
||||||
|
"jsonrpc_error_with_message_and_code": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"error": {"message": "rate limited", "code": -32003},
|
||||||
|
},
|
||||||
|
"jsonrpc_error_message_only": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"error": {"message": "rate limited"},
|
||||||
|
},
|
||||||
|
"jsonrpc_error_code_only": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"error": {"code": -32603},
|
||||||
|
},
|
||||||
|
"jsonrpc_error_string_form": {
|
||||||
|
"jsonrpc": "2.0",
|
||||||
|
"id": "abc-123",
|
||||||
|
"error": "string-shaped error",
|
||||||
|
},
|
||||||
|
"platform_error_with_restart": {
|
||||||
|
"error": "workspace agent unreachable — container restart triggered",
|
||||||
|
"restarting": True,
|
||||||
|
"retry_after": 15,
|
||||||
|
},
|
||||||
|
"platform_error_plain": {
|
||||||
|
"error": "workspace not found",
|
||||||
|
},
|
||||||
|
"poll_queued_full": {
|
||||||
|
"status": "queued",
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
"method": "message/send",
|
||||||
|
},
|
||||||
|
"poll_queued_notify": {
|
||||||
|
"status": "queued",
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
"method": "notify",
|
||||||
|
},
|
||||||
|
"poll_queued_no_method": {
|
||||||
|
"status": "queued",
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
},
|
||||||
|
"malformed_empty_dict": {},
|
||||||
|
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
|
||||||
|
"malformed_status_queued_no_delivery_mode": {
|
||||||
|
# Server bug — status set but delivery_mode missing.
|
||||||
|
# Should be Malformed, not Queued, because the contract says both.
|
||||||
|
"status": "queued",
|
||||||
|
},
|
||||||
|
"malformed_delivery_mode_no_status": {
|
||||||
|
"delivery_mode": "poll",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Variant-by-variant coverage ==============
|
||||||
|
|
||||||
|
|
||||||
|
class TestQueuedVariant:
|
||||||
|
"""``parse()`` recognizes the workspace-server poll-mode short-circuit
|
||||||
|
envelope (a2a_proxy.go:402-406) and returns ``Queued``."""
|
||||||
|
|
||||||
|
def test_full_envelope_with_method_message_send(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||||
|
assert isinstance(v, a2a_response.Queued)
|
||||||
|
assert v.method == "message/send"
|
||||||
|
assert v.delivery_mode == "poll"
|
||||||
|
|
||||||
|
def test_envelope_with_method_notify(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["poll_queued_notify"])
|
||||||
|
assert isinstance(v, a2a_response.Queued)
|
||||||
|
assert v.method == "notify"
|
||||||
|
|
||||||
|
def test_envelope_missing_method_uses_unknown_sentinel(self):
|
||||||
|
# Envelope without ``method`` key — server contract should
|
||||||
|
# always set it, but the parser must not raise on absence.
|
||||||
|
v = a2a_response.parse(_FIXTURES["poll_queued_no_method"])
|
||||||
|
assert isinstance(v, a2a_response.Queued)
|
||||||
|
assert v.method == "unknown"
|
||||||
|
|
||||||
|
def test_status_queued_alone_is_malformed_not_queued(self):
|
||||||
|
# ``status=queued`` without ``delivery_mode=poll`` does not match
|
||||||
|
# the documented envelope. Surface as Malformed for visibility.
|
||||||
|
v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"])
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
|
||||||
|
def test_delivery_mode_alone_is_malformed_not_queued(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"])
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
|
||||||
|
def test_logs_info_on_queued(self, caplog):
|
||||||
|
# Comprehensive logging — operator should see queued events at INFO.
|
||||||
|
with caplog.at_level(logging.INFO, logger="a2a_response"):
|
||||||
|
a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||||
|
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
|
||||||
|
|
||||||
|
|
||||||
|
class TestResultVariant:
|
||||||
|
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
|
||||||
|
``Result(text, parts, raw_result)``."""
|
||||||
|
|
||||||
|
def test_simple_text_result(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"])
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == "hello world"
|
||||||
|
assert len(v.parts) == 1
|
||||||
|
assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]}
|
||||||
|
|
||||||
|
def test_multipart_result_extracts_first_part_text(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"])
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == "first"
|
||||||
|
assert len(v.parts) == 2
|
||||||
|
|
||||||
|
def test_result_with_no_parts(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"])
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == ""
|
||||||
|
assert v.parts == []
|
||||||
|
|
||||||
|
def test_part_without_text_key(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"])
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
# No "text" key — extracted text is empty, parts list intact.
|
||||||
|
assert v.text == ""
|
||||||
|
assert len(v.parts) == 1
|
||||||
|
|
||||||
|
def test_result_non_dict_returns_text_form(self):
|
||||||
|
# Pathological but legal: ``result`` is a string instead of a dict.
|
||||||
|
v = a2a_response.parse({"result": "hello"})
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == "hello"
|
||||||
|
assert v.parts == []
|
||||||
|
|
||||||
|
def test_result_takes_precedence_when_no_queued_envelope(self):
|
||||||
|
# Both ``result`` and ``error`` keys present — result wins
|
||||||
|
# because it's checked first after the Queued path.
|
||||||
|
v = a2a_response.parse({
|
||||||
|
"result": {"parts": [{"kind": "text", "text": "ok"}]},
|
||||||
|
"error": {"message": "should-be-ignored"},
|
||||||
|
})
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == "ok"
|
||||||
|
|
||||||
|
def test_part_with_non_dict_first_entry(self):
|
||||||
|
# ``parts[0]`` is a string instead of a dict — parser tolerates it,
|
||||||
|
# text falls back to empty.
|
||||||
|
v = a2a_response.parse({"result": {"parts": ["bare-string"]}})
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == ""
|
||||||
|
assert v.parts == ["bare-string"]
|
||||||
|
|
||||||
|
def test_part_text_value_none(self):
|
||||||
|
# ``parts[0].text`` is explicitly None — extracted as "".
|
||||||
|
v = a2a_response.parse({"result": {"parts": [{"text": None}]}})
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.text == ""
|
||||||
|
|
||||||
|
def test_parts_not_a_list(self):
|
||||||
|
# Server bug: ``parts`` is a dict instead of a list. Parser falls
|
||||||
|
# back to empty parts rather than raising.
|
||||||
|
v = a2a_response.parse({"result": {"parts": {"oops": True}}})
|
||||||
|
assert isinstance(v, a2a_response.Result)
|
||||||
|
assert v.parts == []
|
||||||
|
assert v.text == ""
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrorVariant:
|
||||||
|
"""``parse()`` extracts ``error`` envelopes into ``Error`` and
|
||||||
|
annotates platform-restart metadata when present."""
|
||||||
|
|
||||||
|
def test_message_and_code(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "rate limited"
|
||||||
|
assert v.code == -32003
|
||||||
|
assert v.restarting is False
|
||||||
|
assert v.retry_after is None
|
||||||
|
|
||||||
|
def test_message_only(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "rate limited"
|
||||||
|
assert v.code is None
|
||||||
|
|
||||||
|
def test_code_only(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == ""
|
||||||
|
assert v.code == -32603
|
||||||
|
|
||||||
|
def test_error_string_form(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "string-shaped error"
|
||||||
|
assert v.code is None
|
||||||
|
|
||||||
|
def test_error_non_dict_non_string(self):
|
||||||
|
v = a2a_response.parse({"error": 12345})
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "12345"
|
||||||
|
|
||||||
|
def test_platform_error_with_restart_metadata(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["platform_error_with_restart"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert "workspace agent unreachable" in v.message
|
||||||
|
assert v.restarting is True
|
||||||
|
assert v.retry_after == 15
|
||||||
|
|
||||||
|
def test_platform_error_without_restart(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["platform_error_plain"])
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "workspace not found"
|
||||||
|
assert v.restarting is False
|
||||||
|
assert v.retry_after is None
|
||||||
|
|
||||||
|
def test_error_message_with_whitespace_stripped(self):
|
||||||
|
v = a2a_response.parse({"error": {"message": " trimmed "}})
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.message == "trimmed"
|
||||||
|
|
||||||
|
def test_non_int_code_dropped(self):
|
||||||
|
v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}})
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.code is None
|
||||||
|
|
||||||
|
def test_non_int_retry_after_dropped(self):
|
||||||
|
v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"})
|
||||||
|
assert isinstance(v, a2a_response.Error)
|
||||||
|
assert v.retry_after is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestMalformedVariant:
|
||||||
|
"""``parse()`` returns ``Malformed`` for any shape it can't classify
|
||||||
|
and logs at WARNING so operators see new server response shapes."""
|
||||||
|
|
||||||
|
def test_empty_dict(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["malformed_empty_dict"])
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
assert v.raw == {}
|
||||||
|
|
||||||
|
def test_unexpected_keys(self):
|
||||||
|
v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
assert v.raw == {"foo": "bar", "baz": 42}
|
||||||
|
|
||||||
|
def test_non_dict_input_list(self):
|
||||||
|
v = a2a_response.parse([1, 2, 3])
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
assert v.raw == [1, 2, 3]
|
||||||
|
|
||||||
|
def test_non_dict_input_string(self):
|
||||||
|
v = a2a_response.parse("plain string")
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
assert v.raw == "plain string"
|
||||||
|
|
||||||
|
def test_non_dict_input_none(self):
|
||||||
|
v = a2a_response.parse(None)
|
||||||
|
assert isinstance(v, a2a_response.Malformed)
|
||||||
|
assert v.raw is None
|
||||||
|
|
||||||
|
def test_logs_warning_on_malformed(self, caplog):
|
||||||
|
with caplog.at_level(logging.WARNING, logger="a2a_response"):
|
||||||
|
a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
|
||||||
|
assert any(r.levelno == logging.WARNING for r in caplog.records)
|
||||||
|
|
||||||
|
def test_logs_warning_on_non_dict(self, caplog):
|
||||||
|
with caplog.at_level(logging.WARNING, logger="a2a_response"):
|
||||||
|
a2a_response.parse("not a dict")
|
||||||
|
assert any("non-dict" in r.message for r in caplog.records)
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Robustness — parser never raises ==============
|
||||||
|
|
||||||
|
|
||||||
|
_ADVERSARIAL_INPUTS: list[Any] = [
|
||||||
|
None,
|
||||||
|
True,
|
||||||
|
False,
|
||||||
|
0,
|
||||||
|
-1,
|
||||||
|
3.14,
|
||||||
|
"",
|
||||||
|
"string",
|
||||||
|
[],
|
||||||
|
[1, 2, 3],
|
||||||
|
{},
|
||||||
|
{"random": "garbage"},
|
||||||
|
{"result": None},
|
||||||
|
{"result": [1, 2, 3]},
|
||||||
|
{"result": {"parts": None}},
|
||||||
|
{"result": {"parts": [None]}},
|
||||||
|
{"result": {"parts": [{"text": []}]}},
|
||||||
|
{"error": None},
|
||||||
|
{"error": []},
|
||||||
|
{"error": {"message": None, "code": None}},
|
||||||
|
{"error": {"message": ["nested", "list"]}},
|
||||||
|
{"status": None, "delivery_mode": None, "method": None},
|
||||||
|
{"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode
|
||||||
|
{"status": "running", "delivery_mode": "poll"}, # wrong status
|
||||||
|
{"status": 42, "delivery_mode": "poll"}, # non-string status
|
||||||
|
# Deeply-nested junk
|
||||||
|
{"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}},
|
||||||
|
# Bytes (not really JSON-decodable but parser shouldn't raise)
|
||||||
|
{"result": {"parts": [{"text": b"bytes" if False else "x"}]}},
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class TestRobustness:
|
||||||
|
"""Parser must never raise on adversarial input — every branch is total.
|
||||||
|
|
||||||
|
These cases catch regressions where a future change adds a key
|
||||||
|
access that doesn't tolerate ``None`` / wrong-type values.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS)
|
||||||
|
def test_parse_never_raises(self, payload):
|
||||||
|
# Single contract: parse must return one of the four variants
|
||||||
|
# regardless of input. No exception classes propagated.
|
||||||
|
v = a2a_response.parse(payload)
|
||||||
|
assert isinstance(v, (a2a_response.Result, a2a_response.Error,
|
||||||
|
a2a_response.Queued, a2a_response.Malformed))
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Regression gate — pre-#2967 misclassified queued ==============
|
||||||
|
|
||||||
|
|
||||||
|
class TestRegressionGate:
|
||||||
|
"""Pin the bug that prompted the SSOT abstraction.
|
||||||
|
|
||||||
|
Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in
|
||||||
|
data`` and ``"error" in data`` — the poll-queued envelope (no
|
||||||
|
result key, no error key) hit the bare-else and returned the
|
||||||
|
"unexpected response shape" error string. This test simulates the
|
||||||
|
pre-fix code path and confirms the SSOT parser correctly
|
||||||
|
distinguishes Queued from Malformed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_legacy_sniffer_would_return_neither_branch(self):
|
||||||
|
# The pre-#2967 logic — provided here so the regression is
|
||||||
|
# reproducible from this file alone, no archaeology needed.
|
||||||
|
envelope = _FIXTURES["poll_queued_full"]
|
||||||
|
legacy_branch = (
|
||||||
|
"result" if "result" in envelope
|
||||||
|
else "error" if "error" in envelope
|
||||||
|
else "unexpected_shape"
|
||||||
|
)
|
||||||
|
# Legacy sniff: hits the malformed branch.
|
||||||
|
assert legacy_branch == "unexpected_shape"
|
||||||
|
|
||||||
|
def test_ssot_parser_classifies_correctly(self):
|
||||||
|
# New parser: classifies as Queued.
|
||||||
|
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||||
|
assert isinstance(v, a2a_response.Queued)
|
||||||
|
assert v.method == "message/send"
|
||||||
|
|
||||||
|
def test_every_fixture_classifies_to_expected_variant(self):
|
||||||
|
# Defense in depth — pin the variant for every fixture so a
|
||||||
|
# future shape addition has to update the table here too.
|
||||||
|
expected: dict[str, type] = {
|
||||||
|
"jsonrpc_success_with_text": a2a_response.Result,
|
||||||
|
"jsonrpc_success_multipart": a2a_response.Result,
|
||||||
|
"jsonrpc_success_no_parts": a2a_response.Result,
|
||||||
|
"jsonrpc_success_part_no_text_key": a2a_response.Result,
|
||||||
|
"jsonrpc_error_with_message_and_code": a2a_response.Error,
|
||||||
|
"jsonrpc_error_message_only": a2a_response.Error,
|
||||||
|
"jsonrpc_error_code_only": a2a_response.Error,
|
||||||
|
"jsonrpc_error_string_form": a2a_response.Error,
|
||||||
|
"platform_error_with_restart": a2a_response.Error,
|
||||||
|
"platform_error_plain": a2a_response.Error,
|
||||||
|
"poll_queued_full": a2a_response.Queued,
|
||||||
|
"poll_queued_notify": a2a_response.Queued,
|
||||||
|
"poll_queued_no_method": a2a_response.Queued,
|
||||||
|
"malformed_empty_dict": a2a_response.Malformed,
|
||||||
|
"malformed_unexpected_keys": a2a_response.Malformed,
|
||||||
|
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
|
||||||
|
"malformed_delivery_mode_no_status": a2a_response.Malformed,
|
||||||
|
}
|
||||||
|
# Every fixture must be enumerated — keeps this gate honest.
|
||||||
|
assert set(expected.keys()) == set(_FIXTURES.keys()), (
|
||||||
|
f"fixture/expected mismatch: "
|
||||||
|
f"missing-from-expected={set(_FIXTURES) - set(expected)} "
|
||||||
|
f"extra-in-expected={set(expected) - set(_FIXTURES)}"
|
||||||
|
)
|
||||||
|
for name, payload in _FIXTURES.items():
|
||||||
|
v = a2a_response.parse(payload)
|
||||||
|
assert isinstance(v, expected[name]), (
|
||||||
|
f"fixture {name!r} classified as {type(v).__name__}, "
|
||||||
|
f"expected {expected[name].__name__}"
|
||||||
|
)
|
||||||
@ -93,6 +93,124 @@ class TestFlagOffLegacyPath:
|
|||||||
poll_mock.assert_not_called()
|
poll_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# #2967: Auto-fallback to polling path when target is poll-mode
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestPollModeAutoFallback:
|
||||||
|
"""Pin the #2967 behavior: when send_a2a_message returns the queued
|
||||||
|
sentinel (target is poll-mode), tool_delegate_task transparently
|
||||||
|
falls back to _delegate_sync_via_polling — which DOES work for
|
||||||
|
poll-mode peers (the executeDelegation goroutine writes to the
|
||||||
|
inbox queue and the result row arrives when the target replies).
|
||||||
|
|
||||||
|
Pre-#2967 behavior: queued sentinel was never returned (the parser
|
||||||
|
misclassified the envelope as malformed), and the calling agent
|
||||||
|
saw a DELEGATION FAILED / unexpected-response-shape error. This
|
||||||
|
test guards both against the parser regression (sentinel-emission)
|
||||||
|
and the fallback regression (sentinel-handling).
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch):
|
||||||
|
# Flag OFF — legacy send_a2a_message path. send returns the
|
||||||
|
# queued sentinel because the target is poll-mode. delegate_task
|
||||||
|
# must auto-route to _delegate_sync_via_polling so the agent
|
||||||
|
# eventually gets a real reply.
|
||||||
|
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||||
|
|
||||||
|
import a2a_tools
|
||||||
|
from a2a_client import _A2A_QUEUED_PREFIX
|
||||||
|
|
||||||
|
send_calls = []
|
||||||
|
poll_calls = []
|
||||||
|
|
||||||
|
async def fake_send(workspace_id, task, source_workspace_id=None):
|
||||||
|
send_calls.append((workspace_id, task, source_workspace_id))
|
||||||
|
return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send"
|
||||||
|
|
||||||
|
async def fake_polling(workspace_id, task, src):
|
||||||
|
poll_calls.append((workspace_id, task, src))
|
||||||
|
return "real response from poll-mode peer"
|
||||||
|
|
||||||
|
async def fake_discover(*_a, **_kw):
|
||||||
|
return {"name": "poll-peer", "status": "online"}
|
||||||
|
|
||||||
|
async def fake_report_activity(*_a, **_kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
|
patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \
|
||||||
|
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
|
patch("a2a_tools.report_activity", side_effect=fake_report_activity):
|
||||||
|
result = await a2a_tools.tool_delegate_task(
|
||||||
|
"ws-target", "task body", source_workspace_id="ws-self"
|
||||||
|
)
|
||||||
|
|
||||||
|
# send was tried first
|
||||||
|
assert len(send_calls) == 1
|
||||||
|
# …then fallback fired automatically
|
||||||
|
assert len(poll_calls) == 1
|
||||||
|
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
|
||||||
|
# Caller sees the real reply, NOT the queued sentinel and NOT
|
||||||
|
# a DELEGATION FAILED string.
|
||||||
|
assert result == "real response from poll-mode peer"
|
||||||
|
|
||||||
|
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
|
||||||
|
# Push-mode peer returns a normal text reply — fallback path
|
||||||
|
# MUST NOT fire (no extra round-trip cost).
|
||||||
|
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||||
|
|
||||||
|
import a2a_tools
|
||||||
|
|
||||||
|
async def fake_send(*_a, **_kw):
|
||||||
|
return "normal reply"
|
||||||
|
|
||||||
|
async def fake_discover(*_a, **_kw):
|
||||||
|
return {"name": "push-peer", "status": "online"}
|
||||||
|
|
||||||
|
async def fake_report_activity(*_a, **_kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
|
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
|
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
|
||||||
|
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
|
||||||
|
result = await a2a_tools.tool_delegate_task(
|
||||||
|
"ws-target", "task", source_workspace_id="ws-self"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result == "normal reply"
|
||||||
|
poll_mock.assert_not_called()
|
||||||
|
|
||||||
|
async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch):
|
||||||
|
# Genuine error (not queued) — must surface as DELEGATION FAILED,
|
||||||
|
# not silently retried via the polling path.
|
||||||
|
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||||
|
|
||||||
|
import a2a_tools
|
||||||
|
from a2a_client import _A2A_ERROR_PREFIX
|
||||||
|
|
||||||
|
async def fake_send(*_a, **_kw):
|
||||||
|
return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]"
|
||||||
|
|
||||||
|
async def fake_discover(*_a, **_kw):
|
||||||
|
return {"name": "broken-peer", "status": "online"}
|
||||||
|
|
||||||
|
async def fake_report_activity(*_a, **_kw):
|
||||||
|
return None
|
||||||
|
|
||||||
|
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
|
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
|
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
|
||||||
|
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
|
||||||
|
result = await a2a_tools.tool_delegate_task(
|
||||||
|
"ws-target", "task", source_workspace_id="ws-self"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "DELEGATION FAILED" in result
|
||||||
|
poll_mock.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Flag-on: dispatch failures
|
# Flag-on: dispatch failures
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user