molecule-sdk-python/tests/test_inbound.py
Hongming Wang 87a4cfcc55 fix: address self-review findings — lint + comment + missing test
- Drop unused `import time` from inbound.py and `import call` from
  test_inbound.py (caught by ruff in CI; would have caught locally if I'd
  run it before pushing).
- Rewrite the misleading comment in PollDelivery.run_once: the cursor DOES
  advance past handler exceptions (poison-pill resilience). The previous
  comment claimed otherwise, which would have confused future readers.
- Drop `_parse_activity_row` from inbound.py's `__all__`. The leading
  underscore signals "private helper"; exposing it via `__all__`
  contradicted the convention. Tests still import it directly via the
  module path.
- Add `test_fetch_inbound_429_retries_via_get_with_retry` — the PR
  description claimed branch-coverage of the 429 path but no test
  exercised it. Closes the gap.
2026-04-30 13:09:06 -07:00

641 lines
23 KiB
Python

"""Tests for poll-mode inbound delivery (Phase 30.8c).
Covers:
* :func:`_parse_activity_row` source normalization and edge cases.
* :py:meth:`RemoteAgentClient.fetch_inbound` happy path, cursor, 410, shapes.
* :py:meth:`RemoteAgentClient.reply` smart-routing (canvas vs peer).
* :class:`PollDelivery` cursor advancement, async/sync handler dispatch,
error handling, 410 reset, cursor-file persistence, stop().
* :py:meth:`RemoteAgentClient.run_agent_loop` heartbeat + state + delivery
composition, default-delivery selection, terminal-status handling, sleep
cadence selection.
Mocking style matches ``tests/test_remote_agent.py``: a ``FakeResponse`` /
``MagicMock`` session, no third-party HTTP mock library.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
import requests
from molecule_agent import (
CursorLostError,
InboundMessage,
PollDelivery,
PushDelivery,
RemoteAgentClient,
WorkspaceState,
)
from molecule_agent.inbound import _parse_activity_row
# ---------------------------------------------------------------------------
# FakeResponse — same shape as the existing test_remote_agent helper
# ---------------------------------------------------------------------------
class FakeResponse:
def __init__(self, status_code: int = 200, json_body: Any = None, text: str = ""):
self.status_code = status_code
self._json = json_body
self.text = text
self.headers: dict[str, str] = {}
def json(self) -> Any:
return self._json
def raise_for_status(self) -> None:
if self.status_code >= 400:
raise requests.HTTPError(f"HTTP {self.status_code}")
@pytest.fixture
def tmp_token_dir(tmp_path: Path) -> Path:
return tmp_path / "molecule-token-cache"
@pytest.fixture
def client(tmp_token_dir: Path) -> RemoteAgentClient:
session = MagicMock()
c = RemoteAgentClient(
workspace_id="ws-abc-123",
platform_url="http://platform.test",
agent_card={"name": "test-agent"},
token_dir=tmp_token_dir,
session=session,
)
# Pre-seed the cached token so _auth_headers returns one and we don't
# have to mock /registry/register on every test.
c.save_token("test-token-secret")
return c
# ---------------------------------------------------------------------------
# _parse_activity_row
# ---------------------------------------------------------------------------
def test_parse_activity_row_canvas_user_explicit():
row = {
"id": "act-1",
"type": "a2a_receive",
"source_id": "user",
"data": {"source": "canvas_user", "text": "hi"},
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.activity_id == "act-1"
assert msg.source == "canvas_user"
assert msg.source_id == "user"
assert msg.text == "hi"
def test_parse_activity_row_legacy_user_normalizes_to_canvas():
# Older platform versions used 'user' instead of 'canvas_user'.
row = {"id": "act-2", "data": {"source": "user", "text": "hello"}}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "canvas_user"
def test_parse_activity_row_peer_agent_explicit():
row = {
"id": "act-3",
"source_id": "peer-ws-77",
"data": {"source": "peer_agent", "text": "ping"},
}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "peer_agent"
assert msg.source_id == "peer-ws-77"
def test_parse_activity_row_inferred_peer_from_source_id():
# No explicit source field but a non-'user' source_id present → infer peer_agent.
# This protects us from server-side variants that omit 'source' in data.
row = {"id": "act-4", "source_id": "peer-ws-88", "data": {"text": "ping"}}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "peer_agent"
def test_parse_activity_row_inferred_canvas_from_user_source_id():
row = {"id": "act-5", "source_id": "user", "data": {"text": "hi"}}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "canvas_user"
def test_parse_activity_row_unknown_source_falls_through():
# No source_id, no source → unknown. Reply path will refuse to guess.
row = {"id": "act-6", "data": {"text": "??"}}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.source == "unknown"
def test_parse_activity_row_no_id_returns_none():
row = {"data": {"source": "canvas_user", "text": "no id"}}
assert _parse_activity_row(row) is None
def test_parse_activity_row_text_alt_key():
# Some server paths use 'message' instead of 'text'. Accept both.
row = {"id": "act-7", "data": {"source": "canvas_user", "message": "alt"}}
msg = _parse_activity_row(row)
assert msg is not None
assert msg.text == "alt"
# ---------------------------------------------------------------------------
# fetch_inbound
# ---------------------------------------------------------------------------
def test_fetch_inbound_happy_path(client: RemoteAgentClient):
rows = [
{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}},
{"id": "act-2", "source_id": "peer-77", "data": {"source": "peer_agent", "text": "ping"}},
]
client._session.get.return_value = FakeResponse(200, rows)
out = client.fetch_inbound()
assert len(out) == 2
assert out[0].source == "canvas_user"
assert out[1].source == "peer_agent"
# Verify the GET shape.
call_args = client._session.get.call_args
assert call_args.args[0] == "http://platform.test/workspaces/ws-abc-123/activity"
assert call_args.kwargs["params"]["type"] == "a2a_receive"
assert call_args.kwargs["params"]["limit"] == "100"
assert "since_id" not in call_args.kwargs["params"]
def test_fetch_inbound_with_since_id_passes_cursor(client: RemoteAgentClient):
client._session.get.return_value = FakeResponse(200, [])
client.fetch_inbound(since_id="act-prev")
params = client._session.get.call_args.kwargs["params"]
assert params["since_id"] == "act-prev"
def test_fetch_inbound_410_raises_cursor_lost(client: RemoteAgentClient):
client._session.get.return_value = FakeResponse(410, {"error": "cursor lost"})
with pytest.raises(CursorLostError):
client.fetch_inbound(since_id="act-stale")
def test_fetch_inbound_accepts_dict_items_wrapper(client: RemoteAgentClient):
# If a future server version wraps in {"items": [...]}, we still parse.
body = {"items": [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}]}
client._session.get.return_value = FakeResponse(200, body)
out = client.fetch_inbound()
assert len(out) == 1
assert out[0].activity_id == "act-1"
def test_fetch_inbound_skips_malformed_rows(client: RemoteAgentClient):
rows = [
{"id": "act-1", "data": {"source": "canvas_user", "text": "ok"}},
"not a dict",
{"data": {"text": "no id"}}, # missing id → skipped
]
client._session.get.return_value = FakeResponse(200, rows)
out = client.fetch_inbound()
assert len(out) == 1
assert out[0].activity_id == "act-1"
def test_fetch_inbound_401_raises_http_error(client: RemoteAgentClient):
client._session.get.return_value = FakeResponse(401)
with pytest.raises(requests.HTTPError):
client.fetch_inbound()
def test_fetch_inbound_empty_returns_empty(client: RemoteAgentClient):
client._session.get.return_value = FakeResponse(200, [])
assert client.fetch_inbound() == []
def test_fetch_inbound_429_retries_via_get_with_retry(
client: RemoteAgentClient, monkeypatch
):
"""A 429 on the first GET should route through _get_with_retry, which
honours Retry-After / jittered backoff and eventually returns a 2xx.
"""
# Don't actually sleep during the retry — keeps the test fast.
monkeypatch.setattr("time.sleep", lambda _s: None)
rows = [{"id": "act-after-retry", "data": {"source": "canvas_user", "text": "ok"}}]
# First call: 429. Second call (the retry): 200 + rows. _get_with_retry
# will see 429 and call session.get again with the rebuilt URL — both
# responses come from the same mocked session.get, so we use side_effect.
first_429 = FakeResponse(429)
first_429.headers = {"Retry-After": "0"}
second_200 = FakeResponse(200, rows)
client._session.get.side_effect = [first_429, second_200]
out = client.fetch_inbound(since_id="act-prev")
assert len(out) == 1
assert out[0].activity_id == "act-after-retry"
# Two GETs total: one 429, one 200.
assert client._session.get.call_count == 2
# ---------------------------------------------------------------------------
# reply()
# ---------------------------------------------------------------------------
def test_reply_canvas_user_hits_notify(client: RemoteAgentClient):
msg = InboundMessage(
activity_id="act-1", source="canvas_user", source_id="user", text="hi"
)
client._session.post.return_value = FakeResponse(200, {"status": "sent"})
client.reply(msg, "hello")
call_args = client._session.post.call_args
assert call_args.args[0] == "http://platform.test/workspaces/ws-abc-123/notify"
assert call_args.kwargs["json"] == {"message": "hello"}
assert call_args.kwargs["headers"]["Authorization"] == "Bearer test-token-secret"
def test_reply_peer_agent_hits_a2a(client: RemoteAgentClient):
msg = InboundMessage(
activity_id="act-2", source="peer_agent", source_id="peer-ws-77", text="ping"
)
client._session.post.return_value = FakeResponse(200, {"jsonrpc": "2.0", "result": {}})
client.reply(msg, "pong")
call_args = client._session.post.call_args
assert call_args.args[0] == "http://platform.test/workspaces/peer-ws-77/a2a"
body = call_args.kwargs["json"]
assert body["jsonrpc"] == "2.0"
assert body["method"] == "message/send"
assert body["params"]["message"]["parts"][0]["text"] == "pong"
headers = call_args.kwargs["headers"]
assert headers["X-Source-Workspace-Id"] == "ws-abc-123"
assert headers["X-Workspace-ID"] == "ws-abc-123"
def test_reply_unknown_source_raises_value_error(client: RemoteAgentClient):
msg = InboundMessage(activity_id="act-3", source="unknown", source_id="", text="?")
with pytest.raises(ValueError, match="cannot auto-route"):
client.reply(msg, "won't send")
client._session.post.assert_not_called()
def test_reply_empty_text_raises_value_error(client: RemoteAgentClient):
msg = InboundMessage(activity_id="act-4", source="canvas_user", source_id="user", text="hi")
with pytest.raises(ValueError, match="non-empty"):
client.reply(msg, "")
with pytest.raises(ValueError, match="non-empty"):
client.reply(msg, " ")
client._session.post.assert_not_called()
def test_reply_peer_agent_missing_source_id_raises(client: RemoteAgentClient):
msg = InboundMessage(activity_id="act-5", source="peer_agent", source_id="", text="?")
with pytest.raises(ValueError, match="no source_id"):
client.reply(msg, "won't send")
def test_reply_propagates_http_error(client: RemoteAgentClient):
msg = InboundMessage(activity_id="act-6", source="canvas_user", source_id="user", text="hi")
client._session.post.return_value = FakeResponse(500)
with pytest.raises(requests.HTTPError):
client.reply(msg, "boom")
# ---------------------------------------------------------------------------
# PollDelivery
# ---------------------------------------------------------------------------
def test_poll_delivery_run_once_advances_cursor(client: RemoteAgentClient):
rows = [
{"id": "act-1", "data": {"source": "canvas_user", "text": "a"}},
{"id": "act-2", "data": {"source": "canvas_user", "text": "b"}},
]
client._session.get.return_value = FakeResponse(200, rows)
delivery = PollDelivery(client, interval=0.0)
received: list[str] = []
def handler(msg: InboundMessage, _client: RemoteAgentClient):
received.append(msg.text)
return None # no reply
n = delivery.run_once(handler)
assert n == 2
assert received == ["a", "b"]
assert delivery.cursor == "act-2"
def test_poll_delivery_handler_exception_advances_and_continues(
client: RemoteAgentClient, caplog
):
rows = [
{"id": "act-1", "data": {"source": "canvas_user", "text": "poison"}},
{"id": "act-2", "data": {"source": "canvas_user", "text": "next"}},
]
client._session.get.return_value = FakeResponse(200, rows)
delivery = PollDelivery(client, interval=0.0)
seen: list[str] = []
def handler(msg, _c):
seen.append(msg.text)
if msg.text == "poison":
raise RuntimeError("kaboom")
return None
n = delivery.run_once(handler)
# Both messages should be dispatched even though the first raised.
assert n == 2
assert seen == ["poison", "next"]
# Cursor advances past the failure so we don't get stuck on poison forever.
assert delivery.cursor == "act-2"
def test_poll_delivery_async_handler_awaited(client: RemoteAgentClient):
rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "ahoy"}}]
client._session.get.return_value = FakeResponse(200, rows)
delivery = PollDelivery(client, interval=0.0)
seen: list[str] = []
async def async_handler(msg, _c):
await asyncio.sleep(0)
seen.append(msg.text)
return None
n = delivery.run_once(async_handler)
assert n == 1
assert seen == ["ahoy"]
def test_poll_delivery_handler_returns_text_triggers_reply(client: RemoteAgentClient):
rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}]
# First mock the GET (fetch_inbound), then the POST (reply).
client._session.get.return_value = FakeResponse(200, rows)
client._session.post.return_value = FakeResponse(200, {"status": "sent"})
delivery = PollDelivery(client, interval=0.0)
def handler(msg, _c):
return f"echo:{msg.text}"
n = delivery.run_once(handler)
assert n == 1
# /notify should have been called with the echo body.
post_call = client._session.post.call_args
assert "/notify" in post_call.args[0]
assert post_call.kwargs["json"] == {"message": "echo:hi"}
def test_poll_delivery_handler_returns_none_no_reply(client: RemoteAgentClient):
rows = [{"id": "act-1", "data": {"source": "canvas_user", "text": "hi"}}]
client._session.get.return_value = FakeResponse(200, rows)
delivery = PollDelivery(client, interval=0.0)
def handler(_msg, _c):
return None
delivery.run_once(handler)
client._session.post.assert_not_called()
def test_poll_delivery_410_resets_cursor(client: RemoteAgentClient):
delivery = PollDelivery(client, interval=0.0)
delivery._cursor = "act-stale"
client._session.get.return_value = FakeResponse(410, {"error": "gone"})
n = delivery.run_once(lambda *_: None)
# No messages dispatched, cursor reset to None.
assert n == 0
assert delivery.cursor is None
def test_poll_delivery_cursor_file_persistence(
client: RemoteAgentClient, tmp_path: Path
):
cursor_file = tmp_path / "cursor"
rows = [{"id": "act-XYZ", "data": {"source": "canvas_user", "text": "hi"}}]
client._session.get.return_value = FakeResponse(200, rows)
delivery = PollDelivery(client, interval=0.0, cursor_file=cursor_file)
assert delivery.cursor is None # nothing on disk yet
delivery.run_once(lambda *_: None)
assert cursor_file.read_text() == "act-XYZ"
# New delivery instance reads the cursor from disk.
fresh = PollDelivery(client, interval=0.0, cursor_file=cursor_file)
assert fresh.cursor == "act-XYZ"
def test_poll_delivery_stop_makes_run_once_noop(client: RemoteAgentClient):
delivery = PollDelivery(client, interval=0.0)
delivery.stop()
n = delivery.run_once(lambda *_: None)
assert n == 0
# GET should not have been issued.
client._session.get.assert_not_called()
# ---------------------------------------------------------------------------
# PushDelivery
# ---------------------------------------------------------------------------
def test_push_delivery_run_once_is_noop(client: RemoteAgentClient):
fake_server = MagicMock()
delivery = PushDelivery(client, fake_server)
n = delivery.run_once(lambda *_: None)
assert n == 0
def test_push_delivery_stop_calls_server_stop(client: RemoteAgentClient):
fake_server = MagicMock()
delivery = PushDelivery(client, fake_server)
delivery.stop()
fake_server.stop.assert_called_once()
def test_push_delivery_stop_swallows_server_exception(
client: RemoteAgentClient, caplog
):
fake_server = MagicMock()
fake_server.stop.side_effect = RuntimeError("server down hard")
delivery = PushDelivery(client, fake_server)
# Should not raise.
delivery.stop()
# ---------------------------------------------------------------------------
# run_agent_loop
# ---------------------------------------------------------------------------
def _stub_state(client: RemoteAgentClient, paused=False, deleted=False, status="online"):
"""Make poll_state return a stub WorkspaceState."""
client.poll_state = MagicMock( # type: ignore[method-assign]
return_value=WorkspaceState(
workspace_id=client.workspace_id,
status=status,
paused=paused,
deleted=deleted,
)
)
def test_run_agent_loop_exits_on_paused(client: RemoteAgentClient, monkeypatch):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
terminal = client.run_agent_loop(lambda *_: None, delivery=delivery)
assert terminal == "paused"
delivery.stop.assert_called_once()
def test_run_agent_loop_exits_on_deleted(client: RemoteAgentClient, monkeypatch):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, deleted=True, status="removed")
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
terminal = client.run_agent_loop(lambda *_: None, delivery=delivery)
assert terminal == "removed"
def test_run_agent_loop_max_iterations(client: RemoteAgentClient, monkeypatch):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client) # online forever
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
terminal = client.run_agent_loop(lambda *_: None, delivery=delivery, max_iterations=3)
assert terminal == "max_iterations"
assert delivery.run_once.call_count == 3
assert client.heartbeat.call_count == 3
def test_run_agent_loop_default_delivery_is_poll(client: RemoteAgentClient, monkeypatch):
"""When delivery=None, run_agent_loop should construct a PollDelivery."""
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
# fetch_inbound returns an empty list once for the default-poll path.
client.fetch_inbound = MagicMock(return_value=[]) # type: ignore[method-assign]
terminal = client.run_agent_loop(lambda *_: None)
assert terminal == "paused"
client.fetch_inbound.assert_called()
def test_run_agent_loop_swallows_heartbeat_exception(
client: RemoteAgentClient, monkeypatch
):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock(side_effect=RuntimeError("hb down")) # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
terminal = client.run_agent_loop(lambda *_: None, delivery=delivery)
# Heartbeat failure does NOT stop the loop — we still detect 'paused'.
assert terminal == "paused"
def test_run_agent_loop_swallows_delivery_exception(
client: RemoteAgentClient, monkeypatch
):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
delivery = MagicMock()
delivery.run_once.side_effect = RuntimeError("delivery exploded")
delivery.interval = 0.0
terminal = client.run_agent_loop(lambda *_: None, delivery=delivery)
# Delivery failure logged + continued; loop still exits cleanly on paused.
assert terminal == "paused"
def test_run_agent_loop_uses_min_of_intervals(client: RemoteAgentClient, monkeypatch):
"""The loop should sleep min(heartbeat_interval, delivery.interval)."""
sleeps: list[float] = []
monkeypatch.setattr("time.sleep", lambda s: sleeps.append(s))
client.heartbeat_interval = 30.0
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client) # online; uses max_iterations to exit
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 5.0
client.run_agent_loop(lambda *_: None, delivery=delivery, max_iterations=2)
assert sleeps == [5.0, 5.0]
def test_run_agent_loop_calls_task_supplier(client: RemoteAgentClient, monkeypatch):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
def supplier():
return {"current_task": "doing-thing", "active_tasks": 2}
client.run_agent_loop(lambda *_: None, delivery=delivery, task_supplier=supplier)
# Heartbeat receives the supplied report.
hb_kwargs = client.heartbeat.call_args.kwargs
assert hb_kwargs["current_task"] == "doing-thing"
assert hb_kwargs["active_tasks"] == 2
def test_run_agent_loop_swallows_task_supplier_exception(
client: RemoteAgentClient, monkeypatch
):
monkeypatch.setattr("time.sleep", lambda _s: None)
client.heartbeat = MagicMock() # type: ignore[method-assign]
_stub_state(client, paused=True, status="paused")
delivery = MagicMock()
delivery.run_once.return_value = 0
delivery.interval = 0.0
def supplier():
raise RuntimeError("supplier broken")
terminal = client.run_agent_loop(
lambda *_: None, delivery=delivery, task_supplier=supplier
)
assert terminal == "paused"
# Heartbeat called with empty task fields (the default when supplier fails).
hb_kwargs = client.heartbeat.call_args.kwargs
assert hb_kwargs["current_task"] == ""
assert hb_kwargs["active_tasks"] == 0