diff --git a/workspace/a2a_response.py b/workspace/a2a_response.py index 769715fe..07b65931 100644 --- a/workspace/a2a_response.py +++ b/workspace/a2a_response.py @@ -97,13 +97,15 @@ class Error: @dataclasses.dataclass(frozen=True) class Queued: - """Platform poll-mode short-circuit — message accepted, peer will pick up async. + """Platform short-circuit — message accepted, peer will pick up async. Returned when the target workspace is registered as ``delivery_mode=poll`` (no public URL — typical for external - standalone ``molecule-mcp`` runtimes). The message was written to - the platform's inbox queue; the target agent will fetch it via - ``GET /activity?since_id=`` polling. + standalone ``molecule-mcp`` runtimes) OR ``delivery_mode=push`` + (has a public URL but is at capacity). In both cases the message + was written to the platform's inbox queue; the target agent will + fetch it via ``GET /activity?since_id=`` polling (poll mode) or + receive it via the platform's push channel (push mode). NOT a failure. Callers that expect a synchronous reply (the agent's response text) won't get one here — they should either: @@ -116,7 +118,8 @@ class Queued: and lets the caller poll for the result row. ``method`` echoes the request method (``message/send``, ``notify``, - etc.) so callers can correlate. + etc.) so callers can correlate. ``delivery_mode`` is ``"poll"`` for + poll-mode workspaces and ``"push"`` for push-mode workspaces. """ method: str @@ -194,7 +197,7 @@ def parse(data: Any) -> Variant: method, data.get("queue_id", "?"), ) - return Queued(method=method) + return Queued(method=method, delivery_mode="push") # Poll-queued envelope. Both keys must be present — the workspace # server sets them together; if only one is present the body is diff --git a/workspace/tests/test_a2a_response.py b/workspace/tests/test_a2a_response.py index cf254b36..581d27c1 100644 --- a/workspace/tests/test_a2a_response.py +++ b/workspace/tests/test_a2a_response.py @@ -105,6 +105,27 @@ _FIXTURES = { "status": "queued", "delivery_mode": "poll", }, + # Push-mode queue envelope — returned when a push-mode workspace (one with + # a public URL) is at capacity. The platform queues the delegation request + # and returns {"queued": true, "message": "...", "queue_id": "..."}. + # Unlike the poll-mode envelope (status=queued + delivery_mode=poll), + # this shape has no delivery_mode key — distinguishable by + # data.get("queued") is True alone. + "push_queued_full": { + "queued": True, + "method": "tasks/send", + "message": "Queued for busy push-mode peer", + "queue_id": "q-abc123", + }, + "push_queued_default_method": { + "queued": True, + "message": "Workspace at capacity", + "queue_id": "q-xyz789", + }, + "push_queued_no_queue_id": { + "queued": True, + "method": "message/send", + }, "malformed_empty_dict": {}, "malformed_unexpected_keys": {"foo": "bar", "baz": 42}, "malformed_status_queued_no_delivery_mode": { @@ -159,6 +180,44 @@ class TestQueuedVariant: a2a_response.parse(_FIXTURES["poll_queued_full"]) assert any("queued for poll-mode peer" in r.message for r in caplog.records) + # ---- Push-mode queue envelope tests (PR #278) ---- + + def test_push_mode_queued_returns_queued_variant(self): + # data.get("queued") is True triggers the push-mode branch. + v = a2a_response.parse(_FIXTURES["push_queued_full"]) + assert isinstance(v, a2a_response.Queued) + assert v.delivery_mode == "push" + + def test_push_mode_queued_extracts_method(self): + v = a2a_response.parse(_FIXTURES["push_queued_full"]) + assert v.method == "tasks/send" + + def test_push_mode_queued_default_method(self): + # No method key — should default to "message/send". + v = a2a_response.parse(_FIXTURES["push_queued_default_method"]) + assert isinstance(v, a2a_response.Queued) + assert v.method == "message/send" + assert v.delivery_mode == "push" + + def test_push_mode_queued_logs_info(self, caplog): + with caplog.at_level(logging.INFO, logger="a2a_response"): + a2a_response.parse(_FIXTURES["push_queued_full"]) + assert any("queued for busy push-mode peer" in r.message for r in caplog.records) + assert any("queue_id=q-abc123" in r.message for r in caplog.records) + + def test_push_mode_takes_precedence_over_poll_mode(self): + # If a buggy server sends both shapes, push-mode should win + # because the push-mode check runs before the poll-mode check. + buggy = { + "queued": True, + "status": "queued", + "delivery_mode": "poll", + "method": "tasks/send", + } + v = a2a_response.parse(buggy) + assert isinstance(v, a2a_response.Queued) + assert v.delivery_mode == "push" + class TestResultVariant: """``parse()`` extracts the JSON-RPC ``result`` envelope into @@ -436,6 +495,9 @@ class TestRegressionGate: "poll_queued_full": a2a_response.Queued, "poll_queued_notify": a2a_response.Queued, "poll_queued_no_method": a2a_response.Queued, + "push_queued_full": a2a_response.Queued, + "push_queued_default_method": a2a_response.Queued, + "push_queued_no_queue_id": a2a_response.Queued, "malformed_empty_dict": a2a_response.Malformed, "malformed_unexpected_keys": a2a_response.Malformed, "malformed_status_queued_no_delivery_mode": a2a_response.Malformed, diff --git a/workspace/tests/test_a2a_tools_inbox_wrappers.py b/workspace/tests/test_a2a_tools_inbox_wrappers.py index adf5e8a9..64fdba85 100644 --- a/workspace/tests/test_a2a_tools_inbox_wrappers.py +++ b/workspace/tests/test_a2a_tools_inbox_wrappers.py @@ -15,7 +15,6 @@ The wrappers are ~40 LOC of glue. The full delivery behavior """ from __future__ import annotations -import asyncio import json from unittest.mock import MagicMock, patch @@ -29,24 +28,22 @@ def _require_workspace_id(monkeypatch): yield -def _run(coro): - return asyncio.get_event_loop().run_until_complete(coro) - - # --------------------------------------------------------------------------- # tool_inbox_peek # --------------------------------------------------------------------------- class TestToolInboxPeek: - def test_returns_not_enabled_when_state_none(self): + @pytest.mark.asyncio + async def test_returns_not_enabled_when_state_none(self): import a2a_tools with patch("inbox.get_state", return_value=None): - out = _run(a2a_tools.tool_inbox_peek()) + out = await a2a_tools.tool_inbox_peek() assert "not enabled" in out - def test_returns_json_array_of_messages(self): + @pytest.mark.asyncio + async def test_returns_json_array_of_messages(self): import a2a_tools msg1 = MagicMock() @@ -58,20 +55,21 @@ class TestToolInboxPeek: fake_state.peek.return_value = [msg1, msg2] with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_inbox_peek(limit=5)) + out = await a2a_tools.tool_inbox_peek(limit=5) # peek limit is forwarded fake_state.peek.assert_called_once_with(limit=5) parsed = json.loads(out) assert len(parsed) == 2 assert parsed[0]["activity_id"] == "a1" - def test_non_int_limit_falls_back_to_10(self): + @pytest.mark.asyncio + async def test_non_int_limit_falls_back_to_10(self): import a2a_tools fake_state = MagicMock() fake_state.peek.return_value = [] with patch("inbox.get_state", return_value=fake_state): - _run(a2a_tools.tool_inbox_peek(limit="garbage")) # type: ignore[arg-type] + await a2a_tools.tool_inbox_peek(limit="garbage") # type: ignore[arg-type] fake_state.peek.assert_called_once_with(limit=10) @@ -81,49 +79,54 @@ class TestToolInboxPeek: class TestToolInboxPop: - def test_returns_not_enabled_when_state_none(self): + @pytest.mark.asyncio + async def test_returns_not_enabled_when_state_none(self): import a2a_tools with patch("inbox.get_state", return_value=None): - out = _run(a2a_tools.tool_inbox_pop("act-1")) + out = await a2a_tools.tool_inbox_pop("act-1") assert "not enabled" in out - def test_rejects_empty_activity_id(self): + @pytest.mark.asyncio + async def test_rejects_empty_activity_id(self): import a2a_tools fake_state = MagicMock() with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_inbox_pop("")) + out = await a2a_tools.tool_inbox_pop("") assert "activity_id is required" in out fake_state.pop.assert_not_called() - def test_rejects_non_str_activity_id(self): + @pytest.mark.asyncio + async def test_rejects_non_str_activity_id(self): import a2a_tools fake_state = MagicMock() with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_inbox_pop(123)) # type: ignore[arg-type] + out = await a2a_tools.tool_inbox_pop(123) # type: ignore[arg-type] assert "activity_id is required" in out fake_state.pop.assert_not_called() - def test_returns_removed_true_when_popped(self): + @pytest.mark.asyncio + async def test_returns_removed_true_when_popped(self): import a2a_tools fake_state = MagicMock() fake_state.pop.return_value = MagicMock() # truthy = something was removed with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_inbox_pop("act-7")) + out = await a2a_tools.tool_inbox_pop("act-7") parsed = json.loads(out) assert parsed == {"removed": True, "activity_id": "act-7"} fake_state.pop.assert_called_once_with("act-7") - def test_returns_removed_false_when_unknown(self): + @pytest.mark.asyncio + async def test_returns_removed_false_when_unknown(self): import a2a_tools fake_state = MagicMock() fake_state.pop.return_value = None with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_inbox_pop("act-missing")) + out = await a2a_tools.tool_inbox_pop("act-missing") parsed = json.loads(out) assert parsed == {"removed": False, "activity_id": "act-missing"} @@ -134,25 +137,28 @@ class TestToolInboxPop: class TestToolWaitForMessage: - def test_returns_not_enabled_when_state_none(self): + @pytest.mark.asyncio + async def test_returns_not_enabled_when_state_none(self): import a2a_tools with patch("inbox.get_state", return_value=None): - out = _run(a2a_tools.tool_wait_for_message(timeout_secs=1.0)) + out = await a2a_tools.tool_wait_for_message(timeout_secs=1.0) assert "not enabled" in out - def test_timeout_payload_when_no_message(self): + @pytest.mark.asyncio + async def test_timeout_payload_when_no_message(self): import a2a_tools fake_state = MagicMock() fake_state.wait.return_value = None with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_wait_for_message(timeout_secs=0.1)) + out = await a2a_tools.tool_wait_for_message(timeout_secs=0.1) parsed = json.loads(out) assert parsed["timeout"] is True assert parsed["timeout_secs"] == 0.1 - def test_returns_message_when_delivered(self): + @pytest.mark.asyncio + async def test_returns_message_when_delivered(self): import a2a_tools msg = MagicMock() @@ -160,37 +166,40 @@ class TestToolWaitForMessage: fake_state = MagicMock() fake_state.wait.return_value = msg with patch("inbox.get_state", return_value=fake_state): - out = _run(a2a_tools.tool_wait_for_message(timeout_secs=2.0)) + out = await a2a_tools.tool_wait_for_message(timeout_secs=2.0) parsed = json.loads(out) assert parsed["activity_id"] == "a-9" - def test_timeout_clamped_to_300(self): + @pytest.mark.asyncio + async def test_timeout_clamped_to_300(self): import a2a_tools fake_state = MagicMock() fake_state.wait.return_value = None with patch("inbox.get_state", return_value=fake_state): - _run(a2a_tools.tool_wait_for_message(timeout_secs=99999)) + await a2a_tools.tool_wait_for_message(timeout_secs=99999) # Whatever wait was called with, it must not exceed 300 passed = fake_state.wait.call_args.args[0] assert passed == 300.0 - def test_timeout_clamped_to_zero_floor(self): + @pytest.mark.asyncio + async def test_timeout_clamped_to_zero_floor(self): import a2a_tools fake_state = MagicMock() fake_state.wait.return_value = None with patch("inbox.get_state", return_value=fake_state): - _run(a2a_tools.tool_wait_for_message(timeout_secs=-5)) + await a2a_tools.tool_wait_for_message(timeout_secs=-5) passed = fake_state.wait.call_args.args[0] assert passed == 0.0 - def test_non_numeric_timeout_falls_back_to_60(self): + @pytest.mark.asyncio + async def test_non_numeric_timeout_falls_back_to_60(self): import a2a_tools fake_state = MagicMock() fake_state.wait.return_value = None with patch("inbox.get_state", return_value=fake_state): - _run(a2a_tools.tool_wait_for_message(timeout_secs="garbage")) # type: ignore[arg-type] + await a2a_tools.tool_wait_for_message(timeout_secs="garbage") # type: ignore[arg-type] passed = fake_state.wait.call_args.args[0] assert passed == 60.0