diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 7cd151bf..16eb4b9c 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -584,6 +584,24 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str else: detail = "JSON-RPC error with no message" return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]" + elif data.get("status") == "queued" and data.get("delivery_mode") == "poll": + # Workspace-server's poll-mode short-circuit envelope + # (workspace-server/internal/handlers/a2a_proxy.go ~line 402). + # The peer is poll-mode and has no URL to dispatch to, so + # the server queued the message for the peer's next inbox + # poll instead of forwarding it. Delivery is acknowledged + # but pending consumption. + # + # Pre-fix this fell through to the "unexpected response + # shape" error path → callers logged false failures, then + # delegate_task retried, and the peer received duplicate + # delegations. Issue #2967. + method = data.get("method") or "message/send" + logger.info( + "send_a2a_message: queued for poll-mode peer (method=%s, target=%s)", + method, target_url, + ) + return f"queued for poll-mode peer (method={method})" return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]" except _TRANSIENT_HTTP_ERRORS as e: last_exc = e diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index f667ed95..068fbffd 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -273,6 +273,87 @@ class TestSendA2AMessage: assert _TEST_PEER_ID in result assert "/workspaces/" in result and "/a2a" in result + async def test_poll_queued_envelope_returns_success_string(self): + """Issue #2967: workspace-server's poll-mode short-circuit returns + {status:"queued", delivery_mode:"poll", method:...} when the peer + has no URL to dispatch to. Pre-fix the bare send_a2a_message parser + only knew about JSON-RPC {result, error} keys, so this fell through + to the 'unexpected response shape' error path → callers retried, + peer got duplicate delegations. + + Pin: poll-queued envelope returns a clean success string that does + NOT start with _A2A_ERROR_PREFIX, so callers route it through the + normal-outcome path. Verified discriminating: assert_NOT_startswith + the error prefix would FAIL on the old code (which returned an + error-prefixed string) and PASSES on the new code. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Discriminating: pre-fix returned a string that startswith + # _A2A_ERROR_PREFIX, so this assertion would have FAILED on the + # old code. New code returns a queued-success string. + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), ( + f"poll-queued envelope must not be tagged as A2A error; got: {result!r}" + ) + assert "queued" in result.lower() + assert "poll" in result.lower() + # The method is included so a structured-log scraper can route by + # protocol verb if needed. + assert "message/send" in result + + async def test_poll_queued_envelope_with_other_method(self): + """Same envelope but a different a2a_method (the future could add + message/sendStream or similar). Pin that the parser doesn't hardcode + message/send — whatever method the server echoed is preserved. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + "delivery_mode": "poll", + "method": "message/sendStream", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + assert not result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "message/sendStream" in result + + async def test_status_queued_without_poll_mode_still_falls_through(self): + """Defensive: only the {status:"queued", delivery_mode:"poll"} pair + triggers the queued-success branch. A response with status:"queued" + but a different delivery_mode (or none) is still 'unexpected' — + we don't want to silently swallow a future server bug that emits + a partial envelope. Pin both keys are required. + """ + import a2a_client + + resp = _make_response(200, { + "status": "queued", + # delivery_mode missing + "method": "message/send", + }) + mock_client = _make_mock_client(post_resp=resp) + + with patch("a2a_client.httpx.AsyncClient", return_value=mock_client): + result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task") + + # Falls through — must STILL be tagged as error. + assert result.startswith(a2a_client._A2A_ERROR_PREFIX) + assert "unexpected response shape" in result + async def test_exception_returns_error_prefix_and_message(self): """Network exception → returns _A2A_ERROR_PREFIX + exception text.""" import a2a_client