Merge pull request #2972 from Molecule-AI/fix/a2a-poll-queued-envelope-2967

fix(a2a-client): recognize poll-mode 'queued' envelope (#2967)
This commit is contained in:
Hongming Wang 2026-05-06 00:05:27 +00:00 committed by GitHub
commit d3e115cb06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 99 additions and 0 deletions

View File

@ -584,6 +584,24 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
else:
detail = "JSON-RPC error with no message"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
elif data.get("status") == "queued" and data.get("delivery_mode") == "poll":
# Workspace-server's poll-mode short-circuit envelope
# (workspace-server/internal/handlers/a2a_proxy.go ~line 402).
# The peer is poll-mode and has no URL to dispatch to, so
# the server queued the message for the peer's next inbox
# poll instead of forwarding it. Delivery is acknowledged
# but pending consumption.
#
# Pre-fix this fell through to the "unexpected response
# shape" error path → callers logged false failures, then
# delegate_task retried, and the peer received duplicate
# delegations. Issue #2967.
method = data.get("method") or "message/send"
logger.info(
"send_a2a_message: queued for poll-mode peer (method=%s, target=%s)",
method, target_url,
)
return f"queued for poll-mode peer (method={method})"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e

View File

@ -273,6 +273,87 @@ class TestSendA2AMessage:
assert _TEST_PEER_ID in result
assert "/workspaces/" in result and "/a2a" in result
async def test_poll_queued_envelope_returns_success_string(self):
"""Issue #2967: workspace-server's poll-mode short-circuit returns
{status:"queued", delivery_mode:"poll", method:...} when the peer
has no URL to dispatch to. Pre-fix the bare send_a2a_message parser
only knew about JSON-RPC {result, error} keys, so this fell through
to the 'unexpected response shape' error path callers retried,
peer got duplicate delegations.
Pin: poll-queued envelope returns a clean success string that does
NOT start with _A2A_ERROR_PREFIX, so callers route it through the
normal-outcome path. Verified discriminating: assert_NOT_startswith
the error prefix would FAIL on the old code (which returned an
error-prefixed string) and PASSES on the new code.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Discriminating: pre-fix returned a string that startswith
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
# old code. New code returns a queued-success string.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
)
assert "queued" in result.lower()
assert "poll" in result.lower()
# The method is included so a structured-log scraper can route by
# protocol verb if needed.
assert "message/send" in result
async def test_poll_queued_envelope_with_other_method(self):
"""Same envelope but a different a2a_method (the future could add
message/sendStream or similar). Pin that the parser doesn't hardcode
message/send whatever method the server echoed is preserved.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/sendStream",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "message/sendStream" in result
async def test_status_queued_without_poll_mode_still_falls_through(self):
"""Defensive: only the {status:"queued", delivery_mode:"poll"} pair
triggers the queued-success branch. A response with status:"queued"
but a different delivery_mode (or none) is still 'unexpected'
we don't want to silently swallow a future server bug that emits
a partial envelope. Pin both keys are required.
"""
import a2a_client
resp = _make_response(200, {
"status": "queued",
# delivery_mode missing
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Falls through — must STILL be tagged as error.
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
async def test_exception_returns_error_prefix_and_message(self):
"""Network exception → returns _A2A_ERROR_PREFIX + exception text."""
import a2a_client