From 146c0e7c60ce7542b60d41d3cd702d74b6a37217 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 16:58:48 -0700 Subject: [PATCH] fix(a2a-client): recognize poll-mode 'queued' envelope (#2967) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit workspace-server's a2a_proxy poll-mode short-circuit returns {status: "queued", delivery_mode: "poll", method: } when the peer has no URL to dispatch to (poll-mode peers, including every external molecule-mcp standalone runtime). The bare send_a2a_message parser only knew about JSON-RPC {result, error} keys, so this envelope fell through to the "unexpected response shape" error path. Two production symptoms on the reno-stars tenant traced to it: 1. File transfer logged as failed when it actually succeeded — operator-facing logs showed an A2A_ERROR but the receiving workspace did get the chunked file via the agent's fallback path. 2. delegate_task retried after the false failure → peer received duplicate delegations → conversation got confused, the second peer self-diagnosed in a notify ("⚠️ Peer 二次请求 — 我先不执行"). Add a third branch to the parser, BETWEEN the existing JSON-RPC {result, error} cases and the catch-all "unexpected" fallback. The queued envelope is delivery-acknowledged-but-pending-consumption — not an error — so it returns a clean success string the agent can render as a normal outcome. The success string includes "queued" and "poll" so an operator scanning logs sees the routing path without parsing JSON. Defensive: the new branch only fires when BOTH status="queued" AND delivery_mode="poll" are present. A partial envelope (one key missing) still falls through to the catch-all, so a future server bug that emits a malformed shape gets surfaced instead of silently swallowed. Tests: - test_poll_queued_envelope_returns_success_string — pins the canonical envelope returns a non-error string. Discriminating: verified to FAIL on old code (returned [A2A_ERROR] string), PASS on new. - test_poll_queued_envelope_with_other_method — pins the parser doesn't hardcode message/send. Discriminating: also FAILS on old code. - test_status_queued_without_poll_mode_still_falls_through — pins both keys are required (defensive against future server bugs). 12 existing tests in TestSendA2AMessage still pass — no regression. Scope: hotfix for the bare send_a2a_message path. The full SSOT typed-A2AResponse refactor (#158-#163, parents under #2967) covers the broader vocabulary alignment between Go server and Python client. This PR ends the production symptoms now without preempting that work. --- workspace/a2a_client.py | 18 +++++++ workspace/tests/test_a2a_client.py | 81 ++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 7cd151bf..16eb4b9c 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -584,6 +584,24 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str else: detail = "JSON-RPC error with no message" return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" + elif data.get("status") == "queued" and data.get("delivery_mode") == "poll": + # Workspace-server's poll-mode short-circuit envelope + # (workspace-server/internal/handlers/a2a_proxy.go ~line 402). + # The peer is poll-mode and has no URL to dispatch to, so + # the server queued the message for the peer's next inbox + # poll instead of forwarding it. Delivery is acknowledged + # but pending consumption. + # + # Pre-fix this fell through to the "unexpected response + # shape" error path → callers logged false failures, then + # delegate_task retried, and the peer received duplicate + # delegations. Issue #2967. + method = data.get("method") or "message/send" + logger.info( + "send_a2a_message: queued for poll-mode peer (method=%s, target=%s)", + method, target_url, + ) + return f"queued for poll-mode peer (method={method})" return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" except _TRANSIENT_HTTP_ERRORS as e: last_exc = e diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index f667ed95..068fbffd 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -273,6 +273,87 @@ class TestSendA2AMessage: assert _TEST_PEER_ID in result assert "/workspaces/" in result and "/a2a" in result + async def test_poll_queued_envelope_returns_success_string(self): + """Issue #2967: workspace-server's poll-mode short-circuit returns + {status:"queued", delivery_mode:"poll", method:...} when the peer + has no URL to dispatch to. Pre-fix the bare send_a2a_message parser + only knew about JSON-RPC {result, error} keys, so this fell through + to the 'unexpected response shape' error path → callers retried, + peer got duplicate delegations. + + Pin: poll-queued envelope returns a clean success string that does + NOT start with _A2A_ERROR_PREFIX, so callers route it through the + normal-outcome path. Verified discriminating: assert_NOT_startswith + the error prefix would FAIL on the old code (which returned an + error-prefixed string) and PASSES on the new code. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Discriminating: pre-fix returned a string that startswith + # _A2A_ERROR_PREFIX, so this assertion would have FAILED on the + # old code. New code returns a queued-success string. + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), ( + f"poll-queued envelope must not be tagged as A2A error; got: {result!r}" + ) + assert "queued" in result.lower() + assert "poll" in result.lower() + # The method is included so a structured-log scraper can route by + # protocol verb if needed. + assert "message/send" in result + + async def test_poll_queued_envelope_with_other_method(self): + """Same envelope but a different a2a_method (the future could add + message/sendStream or similar). Pin that the parser doesn't hardcode + message/send — whatever method the server echoed is preserved. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/sendStream", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "message/sendStream" in result + + async def test_status_queued_without_poll_mode_still_falls_through(self): + """Defensive: only the {status:"queued", delivery_mode:"poll"} pair + triggers the queued-success branch. A response with status:"queued" + but a different delivery_mode (or none) is still 'unexpected' — + we don't want to silently swallow a future server bug that emits + a partial envelope. Pin both keys are required. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + # delivery_mode missing + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Falls through — must STILL be tagged as error. + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "unexpected response shape" in result + async def test_exception_returns_error_prefix_and_message(self): """Network exception → returns _A2A_ERROR_PREFIX + exception text.""" import a2a_client