fix(workspace): async test pollution in inbox_wrappers + push-mode queue delivery_mode #323
@ -97,13 +97,15 @@ class Error:
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class Queued:
|
||||
"""Platform poll-mode short-circuit — message accepted, peer will pick up async.
|
||||
"""Platform short-circuit — message accepted, peer will pick up async.
|
||||
|
||||
Returned when the target workspace is registered as
|
||||
``delivery_mode=poll`` (no public URL — typical for external
|
||||
standalone ``molecule-mcp`` runtimes). The message was written to
|
||||
the platform's inbox queue; the target agent will fetch it via
|
||||
``GET /activity?since_id=`` polling.
|
||||
standalone ``molecule-mcp`` runtimes) OR ``delivery_mode=push``
|
||||
(has a public URL but is at capacity). In both cases the message
|
||||
was written to the platform's inbox queue; the target agent will
|
||||
fetch it via ``GET /activity?since_id=`` polling (poll mode) or
|
||||
receive it via the platform's push channel (push mode).
|
||||
|
||||
NOT a failure. Callers that expect a synchronous reply (the agent's
|
||||
response text) won't get one here — they should either:
|
||||
@ -116,7 +118,8 @@ class Queued:
|
||||
and lets the caller poll for the result row.
|
||||
|
||||
``method`` echoes the request method (``message/send``, ``notify``,
|
||||
etc.) so callers can correlate.
|
||||
etc.) so callers can correlate. ``delivery_mode`` is ``"poll"`` for
|
||||
poll-mode workspaces and ``"push"`` for push-mode workspaces.
|
||||
"""
|
||||
|
||||
method: str
|
||||
@ -194,7 +197,7 @@ def parse(data: Any) -> Variant:
|
||||
method,
|
||||
data.get("queue_id", "?"),
|
||||
)
|
||||
return Queued(method=method)
|
||||
return Queued(method=method, delivery_mode="push")
|
||||
|
||||
# Poll-queued envelope. Both keys must be present — the workspace
|
||||
# server sets them together; if only one is present the body is
|
||||
|
||||
@ -105,6 +105,27 @@ _FIXTURES = {
|
||||
"status": "queued",
|
||||
"delivery_mode": "poll",
|
||||
},
|
||||
# Push-mode queue envelope — returned when a push-mode workspace (one with
|
||||
# a public URL) is at capacity. The platform queues the delegation request
|
||||
# and returns {"queued": true, "message": "...", "queue_id": "..."}.
|
||||
# Unlike the poll-mode envelope (status=queued + delivery_mode=poll),
|
||||
# this shape has no delivery_mode key — distinguishable by
|
||||
# data.get("queued") is True alone.
|
||||
"push_queued_full": {
|
||||
"queued": True,
|
||||
"method": "tasks/send",
|
||||
"message": "Queued for busy push-mode peer",
|
||||
"queue_id": "q-abc123",
|
||||
},
|
||||
"push_queued_default_method": {
|
||||
"queued": True,
|
||||
"message": "Workspace at capacity",
|
||||
"queue_id": "q-xyz789",
|
||||
},
|
||||
"push_queued_no_queue_id": {
|
||||
"queued": True,
|
||||
"method": "message/send",
|
||||
},
|
||||
"malformed_empty_dict": {},
|
||||
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
|
||||
"malformed_status_queued_no_delivery_mode": {
|
||||
@ -159,6 +180,44 @@ class TestQueuedVariant:
|
||||
a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
|
||||
|
||||
# ---- Push-mode queue envelope tests (PR #278) ----
|
||||
|
||||
def test_push_mode_queued_returns_queued_variant(self):
|
||||
# data.get("queued") is True triggers the push-mode branch.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_mode_queued_extracts_method(self):
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert v.method == "tasks/send"
|
||||
|
||||
def test_push_mode_queued_default_method(self):
|
||||
# No method key — should default to "message/send".
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_default_method"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_mode_queued_logs_info(self, caplog):
|
||||
with caplog.at_level(logging.INFO, logger="a2a_response"):
|
||||
a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert any("queued for busy push-mode peer" in r.message for r in caplog.records)
|
||||
assert any("queue_id=q-abc123" in r.message for r in caplog.records)
|
||||
|
||||
def test_push_mode_takes_precedence_over_poll_mode(self):
|
||||
# If a buggy server sends both shapes, push-mode should win
|
||||
# because the push-mode check runs before the poll-mode check.
|
||||
buggy = {
|
||||
"queued": True,
|
||||
"status": "queued",
|
||||
"delivery_mode": "poll",
|
||||
"method": "tasks/send",
|
||||
}
|
||||
v = a2a_response.parse(buggy)
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
|
||||
class TestResultVariant:
|
||||
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
|
||||
@ -436,6 +495,9 @@ class TestRegressionGate:
|
||||
"poll_queued_full": a2a_response.Queued,
|
||||
"poll_queued_notify": a2a_response.Queued,
|
||||
"poll_queued_no_method": a2a_response.Queued,
|
||||
"push_queued_full": a2a_response.Queued,
|
||||
"push_queued_default_method": a2a_response.Queued,
|
||||
"push_queued_no_queue_id": a2a_response.Queued,
|
||||
"malformed_empty_dict": a2a_response.Malformed,
|
||||
"malformed_unexpected_keys": a2a_response.Malformed,
|
||||
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
|
||||
|
||||
@ -15,7 +15,6 @@ The wrappers are ~40 LOC of glue. The full delivery behavior
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
@ -29,24 +28,22 @@ def _require_workspace_id(monkeypatch):
|
||||
yield
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tool_inbox_peek
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolInboxPeek:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_inbox_peek())
|
||||
out = await a2a_tools.tool_inbox_peek()
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_returns_json_array_of_messages(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_json_array_of_messages(self):
|
||||
import a2a_tools
|
||||
|
||||
msg1 = MagicMock()
|
||||
@ -58,20 +55,21 @@ class TestToolInboxPeek:
|
||||
fake_state.peek.return_value = [msg1, msg2]
|
||||
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_peek(limit=5))
|
||||
out = await a2a_tools.tool_inbox_peek(limit=5)
|
||||
# peek limit is forwarded
|
||||
fake_state.peek.assert_called_once_with(limit=5)
|
||||
parsed = json.loads(out)
|
||||
assert len(parsed) == 2
|
||||
assert parsed[0]["activity_id"] == "a1"
|
||||
|
||||
def test_non_int_limit_falls_back_to_10(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_int_limit_falls_back_to_10(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.peek.return_value = []
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_inbox_peek(limit="garbage")) # type: ignore[arg-type]
|
||||
await a2a_tools.tool_inbox_peek(limit="garbage") # type: ignore[arg-type]
|
||||
fake_state.peek.assert_called_once_with(limit=10)
|
||||
|
||||
|
||||
@ -81,49 +79,54 @@ class TestToolInboxPeek:
|
||||
|
||||
|
||||
class TestToolInboxPop:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-1"))
|
||||
out = await a2a_tools.tool_inbox_pop("act-1")
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_rejects_empty_activity_id(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_empty_activity_id(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop(""))
|
||||
out = await a2a_tools.tool_inbox_pop("")
|
||||
assert "activity_id is required" in out
|
||||
fake_state.pop.assert_not_called()
|
||||
|
||||
def test_rejects_non_str_activity_id(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_non_str_activity_id(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop(123)) # type: ignore[arg-type]
|
||||
out = await a2a_tools.tool_inbox_pop(123) # type: ignore[arg-type]
|
||||
assert "activity_id is required" in out
|
||||
fake_state.pop.assert_not_called()
|
||||
|
||||
def test_returns_removed_true_when_popped(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_removed_true_when_popped(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.pop.return_value = MagicMock() # truthy = something was removed
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-7"))
|
||||
out = await a2a_tools.tool_inbox_pop("act-7")
|
||||
parsed = json.loads(out)
|
||||
assert parsed == {"removed": True, "activity_id": "act-7"}
|
||||
fake_state.pop.assert_called_once_with("act-7")
|
||||
|
||||
def test_returns_removed_false_when_unknown(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_removed_false_when_unknown(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.pop.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-missing"))
|
||||
out = await a2a_tools.tool_inbox_pop("act-missing")
|
||||
parsed = json.loads(out)
|
||||
assert parsed == {"removed": False, "activity_id": "act-missing"}
|
||||
|
||||
@ -134,25 +137,28 @@ class TestToolInboxPop:
|
||||
|
||||
|
||||
class TestToolWaitForMessage:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=1.0))
|
||||
out = await a2a_tools.tool_wait_for_message(timeout_secs=1.0)
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_timeout_payload_when_no_message(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_payload_when_no_message(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=0.1))
|
||||
out = await a2a_tools.tool_wait_for_message(timeout_secs=0.1)
|
||||
parsed = json.loads(out)
|
||||
assert parsed["timeout"] is True
|
||||
assert parsed["timeout_secs"] == 0.1
|
||||
|
||||
def test_returns_message_when_delivered(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_message_when_delivered(self):
|
||||
import a2a_tools
|
||||
|
||||
msg = MagicMock()
|
||||
@ -160,37 +166,40 @@ class TestToolWaitForMessage:
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = msg
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=2.0))
|
||||
out = await a2a_tools.tool_wait_for_message(timeout_secs=2.0)
|
||||
parsed = json.loads(out)
|
||||
assert parsed["activity_id"] == "a-9"
|
||||
|
||||
def test_timeout_clamped_to_300(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_clamped_to_300(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs=99999))
|
||||
await a2a_tools.tool_wait_for_message(timeout_secs=99999)
|
||||
# Whatever wait was called with, it must not exceed 300
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 300.0
|
||||
|
||||
def test_timeout_clamped_to_zero_floor(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_clamped_to_zero_floor(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs=-5))
|
||||
await a2a_tools.tool_wait_for_message(timeout_secs=-5)
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 0.0
|
||||
|
||||
def test_non_numeric_timeout_falls_back_to_60(self):
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_numeric_timeout_falls_back_to_60(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs="garbage")) # type: ignore[arg-type]
|
||||
await a2a_tools.tool_wait_for_message(timeout_secs="garbage") # type: ignore[arg-type]
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 60.0
|
||||
|
||||
Loading…
Reference in New Issue
Block a user