molecule-core/workspace/tests/test_inbox.py
Hongming Wang 5b5eacbb29 test(inbox): clean up daemon poller thread to prevent test cross-talk
test_start_poller_thread_is_daemon spawned a daemon thread with no stop
mechanism — the leaked thread polled every 10ms with the test's patched
httpx.Client mock STILL ACTIVE for ~50ms after the test scope. Later
tests that re-patched httpx.Client + asserted call counts on
fetch_and_stage / Client construction got their assertions inflated by
the leaked thread's iterations.

Symptoms: test_poll_once_skips_chat_upload_row_from_queue saw
fetch_and_stage called twice instead of once on Python 3.11 CI;
test_batch_fetcher_owns_client_when_not_supplied saw two Client
constructions instead of one in the full local suite. Both surfaced
only after Phase 5b's BatchFetcher refactor changed the timing window
that allowed the leaked thread to fire mid-test.

Fix: extend start_poller_thread with an optional stop_event kwarg
(backward compatible — production callers pass None and rely on the
daemon flag for process-exit cleanup). The test now signals + joins
on stop_event before exiting scope, so the thread is gone before any
later test patches httpx.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 11:47:14 -07:00

1097 lines
40 KiB
Python

"""Tests for workspace/inbox.py — InboxState + activity API poller.
Covers the round-trip from a /activity row to an InboxMessage that the
agent observes via the three new MCP tools, plus the cursor-persistence
+ 410-recovery behavior that keeps the standalone molecule-mcp from
re-delivering already-handled messages after a restart.
"""
from __future__ import annotations
import threading
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
import inbox
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Each test starts with a clean module singleton + a fresh
InboxState. Activation in one test must not leak into the next."""
inbox._STATE = None
yield
inbox._STATE = None
@pytest.fixture()
def state(tmp_path: Path) -> inbox.InboxState:
return inbox.InboxState(cursor_path=tmp_path / ".mcp_inbox_cursor")
# ---------------------------------------------------------------------------
# _extract_text — envelope shape coverage
# ---------------------------------------------------------------------------
def test_extract_text_jsonrpc_message_wrapper():
body = {
"jsonrpc": "2.0",
"method": "message/send",
"params": {"message": {"parts": [{"type": "text", "text": "hello"}]}},
}
assert inbox._extract_text(body, None) == "hello"
def test_extract_text_a2a_v1_kind_field():
"""A2A SDK v1 uses ``kind`` instead of ``type`` as the part
discriminator. Hosted SaaS workspaces send the v1 shape today —
this case is what live canvas-user messages look like in
activity_logs.request_body."""
body = {
"params": {
"message": {
"role": "user",
"parts": [{"kind": "text", "text": "hello from canvas"}],
}
}
}
assert inbox._extract_text(body, None) == "hello from canvas"
def test_extract_text_jsonrpc_params_parts():
body = {"params": {"parts": [{"type": "text", "text": "from peer"}]}}
assert inbox._extract_text(body, None) == "from peer"
def test_extract_text_shorthand_parts():
body = {"parts": [{"type": "text", "text": "shorthand"}]}
assert inbox._extract_text(body, None) == "shorthand"
def test_extract_text_concatenates_multiple_parts():
body = {
"parts": [
{"type": "text", "text": "hello "},
{"type": "text", "text": "world"},
{"type": "image", "url": "https://example.invalid/x.png"},
]
}
assert inbox._extract_text(body, None) == "hello world"
def test_extract_text_falls_back_to_summary():
assert inbox._extract_text(None, "fallback") == "fallback"
assert inbox._extract_text({"unrelated": True}, "fallback") == "fallback"
def test_extract_text_returns_placeholder_when_nothing_usable():
assert inbox._extract_text(None, None) == "(empty A2A message)"
# ---------------------------------------------------------------------------
# message_from_activity
# ---------------------------------------------------------------------------
def test_message_from_activity_canvas_user():
row = {
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": "ignored",
"request_body": {
"params": {"message": {"parts": [{"type": "text", "text": "hi"}]}}
},
"created_at": "2026-04-30T22:00:00Z",
}
msg = inbox.message_from_activity(row)
assert msg.activity_id == "act-1"
assert msg.text == "hi"
assert msg.peer_id == ""
assert msg.method == "message/send"
d = msg.to_dict()
assert d["kind"] == "canvas_user"
def test_message_from_activity_peer_agent():
row = {
"id": "act-2",
"source_id": "ws-peer-uuid",
"method": "tasks/send",
"summary": "delegate",
"request_body": {"parts": [{"type": "text", "text": "do task"}]},
"created_at": "2026-04-30T22:01:00Z",
}
msg = inbox.message_from_activity(row)
assert msg.peer_id == "ws-peer-uuid"
assert msg.to_dict()["kind"] == "peer_agent"
def test_message_from_activity_handles_string_request_body():
row = {
"id": "act-3",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": '{"parts": [{"type": "text", "text": "json string"}]}',
"created_at": "2026-04-30T22:02:00Z",
}
assert inbox.message_from_activity(row).text == "json string"
# ---------------------------------------------------------------------------
# InboxState — queue + wait/peek/pop semantics
# ---------------------------------------------------------------------------
def _msg(activity_id: str, text: str = "", peer_id: str = "") -> inbox.InboxMessage:
return inbox.InboxMessage(
activity_id=activity_id,
text=text or activity_id,
peer_id=peer_id,
method="message/send",
created_at="2026-04-30T22:00:00Z",
)
def test_record_then_peek(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("b"))
out = state.peek(limit=10)
assert [m.activity_id for m in out] == ["a", "b"]
def test_record_dedupes_by_activity_id(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("a")) # same id — must drop the second
assert len(state.peek(10)) == 1
def test_pop_removes_specific_message(state: inbox.InboxState):
state.record(_msg("a"))
state.record(_msg("b"))
removed = state.pop("a")
assert removed is not None and removed.activity_id == "a"
remaining = state.peek(10)
assert [m.activity_id for m in remaining] == ["b"]
def test_pop_missing_id_returns_none(state: inbox.InboxState):
state.record(_msg("a"))
# Bind the result before asserting so the call still runs under
# ``python -O`` (which strips bare assert statements).
result = state.pop("does-not-exist")
assert result is None
# Original message still present
assert len(state.peek(10)) == 1
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
state.record(_msg("a"))
start = time.monotonic()
msg = state.wait(timeout_secs=5.0)
elapsed = time.monotonic() - start
assert msg is not None and msg.activity_id == "a"
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
def producer():
time.sleep(0.05)
state.record(_msg("late"))
threading.Thread(target=producer, daemon=True).start()
msg = state.wait(timeout_secs=2.0)
assert msg is not None and msg.activity_id == "late"
def test_wait_returns_none_on_timeout(state: inbox.InboxState):
msg = state.wait(timeout_secs=0.05)
assert msg is None
def test_wait_does_not_pop(state: inbox.InboxState):
"""wait() is non-destructive — caller decides when to inbox_pop."""
state.record(_msg("a"))
state.wait(timeout_secs=1.0)
state.wait(timeout_secs=1.0)
assert len(state.peek(10)) == 1
# ---------------------------------------------------------------------------
# Cursor persistence
# ---------------------------------------------------------------------------
def test_load_cursor_returns_none_when_file_absent(state: inbox.InboxState):
assert state.load_cursor() is None
def test_save_then_load_cursor_round_trip(state: inbox.InboxState):
state.save_cursor("act-cursor-1")
# Reset the cached flag to force a re-read
state._cursor_loaded = False
state._cursor = None
assert state.load_cursor() == "act-cursor-1"
def test_save_cursor_creates_parent_directory(tmp_path: Path):
nested = tmp_path / "nested" / "configs" / ".mcp_inbox_cursor"
state = inbox.InboxState(cursor_path=nested)
state.save_cursor("act-x")
assert nested.read_text() == "act-x"
def test_reset_cursor_deletes_file(state: inbox.InboxState):
state.save_cursor("act-y")
assert state.cursor_path.is_file()
state.reset_cursor()
assert not state.cursor_path.is_file()
assert state.load_cursor() is None
# ---------------------------------------------------------------------------
# Module singleton
# ---------------------------------------------------------------------------
def test_get_state_returns_none_before_activate():
assert inbox.get_state() is None
def test_activate_then_get_state(state: inbox.InboxState):
inbox.activate(state)
assert inbox.get_state() is state
def test_activate_idempotent(state: inbox.InboxState):
inbox.activate(state)
inbox.activate(state) # same state — no-op, no warning expected
assert inbox.get_state() is state
# ---------------------------------------------------------------------------
# _poll_once — HTTP behavior
# ---------------------------------------------------------------------------
def _make_response(status_code: int, json_body: Any = None, text: str = "") -> MagicMock:
resp = MagicMock()
resp.status_code = status_code
if json_body is not None:
resp.json.return_value = json_body
else:
resp.json.side_effect = ValueError("no json")
resp.text = text
return resp
def _patch_httpx(returning: MagicMock):
"""Replace httpx.Client with a context-manager mock that returns
``returning`` from .get(). Captures the GET call args for assertion."""
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(return_value=returning)
return patch("httpx.Client", return_value=client), client
def test_poll_once_fresh_start_uses_since_secs(state: inbox.InboxState):
resp = _make_response(200, [])
p, client = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
_, kwargs = client.get.call_args
assert kwargs["params"]["type"] == "a2a_receive"
assert "since_secs" in kwargs["params"]
assert "since_id" not in kwargs["params"]
def test_poll_once_with_cursor_uses_since_id(state: inbox.InboxState):
state.save_cursor("act-existing")
resp = _make_response(200, [])
p, client = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
_, kwargs = client.get.call_args
assert kwargs["params"]["since_id"] == "act-existing"
assert "since_secs" not in kwargs["params"]
def test_poll_once_410_resets_cursor(state: inbox.InboxState):
state.save_cursor("act-stale")
resp = _make_response(410, text="cursor pruned")
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
assert state.load_cursor() is None
assert not state.cursor_path.is_file()
def test_poll_once_records_messages_and_advances_cursor(state: inbox.InboxState):
state.save_cursor("act-old")
rows = [
{
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "first"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-2",
"source_id": "ws-peer",
"method": "tasks/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "second"}]},
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 2
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-1", "act-2"]
assert state.load_cursor() == "act-2"
def test_poll_once_500_does_not_raise(state: inbox.InboxState):
resp = _make_response(500, text="boom")
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
# Cursor untouched
assert state.load_cursor() is None
def test_poll_once_handles_non_list_payload(state: inbox.InboxState):
resp = _make_response(200, {"error": "unexpected"})
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxState):
"""When no cursor is set, /activity returns DESC; the poller must
reverse so the saved cursor is the freshest row + record order
is chronological."""
rows_desc = [
{
"id": "act-newest",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "newest"}]},
"created_at": "2026-04-30T22:00:02Z",
},
{
"id": "act-oldest",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "oldest"}]},
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows_desc)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-oldest", "act-newest"]
# Cursor is the newest row, so the next poll picks up only what's
# newer — re-restoring forward chronological progression.
assert state.load_cursor() == "act-newest"
# ---------------------------------------------------------------------------
# _is_self_notify_row + the echo-loop guard in _poll_once
# ---------------------------------------------------------------------------
#
# The workspace-server's `/notify` handler writes the agent's own
# send_message_to_user POSTs to activity_logs as activity_type=
# 'a2a_receive' with method='notify' and no source_id, so the canvas
# chat-history loader can restore those bubbles after a page reload.
# Without a guard, the poller picks them up and pushes them back as
# inbound — confirmed live 2026-05-01: the agent observed its own
# outbound as `← molecule: Agent message: ...`.
#
# These tests pin both the predicate (`_is_self_notify_row`) and the
# integrated behavior in `_poll_once` so a future refactor that drops
# either half breaks loudly. Long-term the upstream fix is renaming
# the activity_type at the workspace-server (#2469); this guard stays
# regardless because it only excludes rows we never want.
def test_is_self_notify_row_true_for_method_notify_no_peer():
assert inbox._is_self_notify_row({"method": "notify", "source_id": None}) is True
assert inbox._is_self_notify_row({"method": "notify", "source_id": ""}) is True
# source_id key absent — same shape (None on .get).
assert inbox._is_self_notify_row({"method": "notify"}) is True
def test_is_self_notify_row_false_for_real_canvas_inbound():
"""Real canvas-user message: method='message/send' (not notify),
source_id None (no peer)."""
row = {"method": "message/send", "source_id": None}
assert inbox._is_self_notify_row(row) is False
def test_is_self_notify_row_false_for_real_peer_inbound():
"""Real peer-agent message: method='message/send' or 'tasks/send',
source_id is the sender workspace UUID."""
row = {"method": "tasks/send", "source_id": "ws-peer-uuid"}
assert inbox._is_self_notify_row(row) is False
def test_is_self_notify_row_false_for_method_notify_with_peer():
"""Defensive: a future caller using method='notify' WITH a real
peer_id is treated as a real inbound, not a self-notify. Drops the
guard if upstream ever repurposes the method='notify' shape."""
row = {"method": "notify", "source_id": "ws-peer-uuid"}
assert inbox._is_self_notify_row(row) is False
def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
"""The integrated guard: a self-notify row in the activity payload
must NOT land in the inbox queue. This is the regression pin for
the 2026-05-01 echo-loop incident."""
rows = [
{
"id": "act-real",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: Hi! What can I help you with today?",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real inbound counted; self-notify silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real"]
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-
notify on every iteration (until a real inbound arrives), wasting
a request and pinning the cursor backward."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: hello",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-notify"
def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-notify rows. Otherwise a notification-
capable host gets the same echo loop the queue side avoids."""
rows = [
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: hello",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-notify rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts
and write to a stale workspace.
Stop_event is plumbed so the thread cleans up at the end of the
test instead of leaking into later tests. Without cleanup, the
daemon's ~10ms tick races with later tests that patch httpx.Client
— the leaked thread sees their patched response and runs an
unwanted iteration of _poll_once that double-counts mocked calls
(caught when test_batch_fetcher_owns_client_when_not_supplied
surfaced this on Python 3.11 CI but not 3.13 local).
"""
resp = _make_response(200, [])
p, _ = _patch_httpx(resp)
stop_event = threading.Event()
with p, patch("platform_auth.auth_headers", return_value={}):
# Use a very short interval so the loop body runs at least once
# before we exit the test.
t = inbox.start_poller_thread(
state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event
)
time.sleep(0.05)
assert t.daemon is True
assert t.is_alive()
# Signal shutdown + wait for the thread to actually exit before
# we leave the test scope. Without this join, the leaked thread
# races with later tests' httpx patches.
stop_event.set()
t.join(timeout=2.0)
assert not t.is_alive(), "poller thread did not exit on stop_event"
# ---------------------------------------------------------------------------
# default_cursor_path respects CONFIGS_DIR
# ---------------------------------------------------------------------------
def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Phase 5b — BatchFetcher integration with the poll loop
# ---------------------------------------------------------------------------
#
# These tests pin the cross-module contract between inbox._poll_once and
# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted
# to a single BatchFetcher AND drained (URI cache populated) before any
# subsequent message row is processed. Without the drain, the
# rewrite_request_body path inside message_from_activity surfaces the
# un-rewritten ``platform-pending:`` URI to the agent.
def _upload_row(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "chat_upload_receive",
"summary": f"chat_upload_receive: {file_id}.pdf",
"request_body": {
"file_id": file_id,
"name": f"{file_id}.pdf",
"uri": f"platform-pending:ws-1/{file_id}",
"mimeType": "application/pdf",
"size": 3,
},
"created_at": "2026-05-04T10:00:00Z",
}
def _message_row_referencing(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {
"params": {
"message": {
"parts": [
{"kind": "text", "text": "have a look"},
{
"kind": "file",
"file": {
"uri": f"platform-pending:ws-1/{file_id}",
"name": f"{file_id}.pdf",
},
},
]
}
}
},
"created_at": "2026-05-04T10:00:01Z",
}
def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"):
"""Replace ``httpx.Client`` so:
- GET /activity returns ``activity_rows``
- GET /workspaces/.../content returns ``upload_bytes`` with content-type
- POST /ack returns 200
Returns the patch context manager; tests use ``with p:``. Each new
Client(...) gets a fresh MagicMock so the test can verify
constructor-count expectations without pinning singletons.
"""
def _client_factory(*args, **kwargs):
c = MagicMock()
c.__enter__ = MagicMock(return_value=c)
c.__exit__ = MagicMock(return_value=False)
def _get(url, params=None, headers=None):
if "/activity" in url:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = activity_rows
resp.text = ""
return resp
if "/pending-uploads/" in url and "/content" in url:
resp = MagicMock()
resp.status_code = 200
resp.content = upload_bytes
resp.headers = {"content-type": "application/pdf"}
resp.text = ""
return resp
resp = MagicMock()
resp.status_code = 404
resp.text = ""
return resp
def _post(url, headers=None):
resp = MagicMock()
resp.status_code = 200
resp.text = ""
return resp
c.get = MagicMock(side_effect=_get)
c.post = MagicMock(side_effect=_post)
c.close = MagicMock()
return c
return patch("httpx.Client", side_effect=_client_factory)
def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path):
"""The chat-message row's file.uri MUST be rewritten to the local
workspace: URI by the time it lands in the InboxState queue. This
requires BatchFetcher.wait_all() to run before message_from_activity
on the second row.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
# Sandbox the on-disk staging dir so the test can't pollute the
# workspace's real chat-uploads.
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [
_upload_row("act-1", "file-A"),
_message_row_referencing("act-2", "file-A"),
]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)"
queued = state.peek(10)
assert len(queued) == 1
# The contract this test exists to pin: the platform-pending: URI
# was rewritten to workspace: BEFORE the message landed in the
# state queue. message_from_activity mutates row['request_body']
# in-place, so the rewritten URI is observable on the row dict
# we passed in.
rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1]
assert rewritten_part["file"]["uri"].startswith("workspace:"), (
f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; "
"rewrite_request_body ran before BatchFetcher.wait_all populated the cache"
)
# Cursor advanced past BOTH rows — upload-receive (act-1) is
# acknowledged via the inbox cursor regardless of fetch outcome.
assert state.load_cursor() == "act-2"
def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path):
"""End-of-batch drain: a poll that contains ONLY upload rows (no
chat-message row to trigger the inline drain) must still drain the
BatchFetcher before _poll_once returns. Otherwise a future poll
that picks up the corresponding chat-message row would race with
in-flight fetches from the previous batch.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# By the time _poll_once returned, the URI cache must be hot
# for both file_ids — proves the end-of-loop drain ran.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
# Upload rows are NOT message rows; queue stays empty.
assert n == 0
# Cursor advances past both upload rows.
assert state.load_cursor() == "act-2"
def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState):
"""A batch with no upload-receive rows must not pay the BatchFetcher
construction cost — the executor + httpx client allocation is
deferred until the first upload row appears.
"""
import inbox_uploads
constructed: list[Any] = []
def _patched_init(self, **kwargs):
constructed.append(kwargs)
# Don't actually run __init__; we never hit submit/wait_all.
self._closed = False
self._futures = []
self._executor = MagicMock()
self._client = MagicMock()
self._own_client = False
rows = [
{
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "hi"}]},
"created_at": "2026-04-30T22:00:00Z",
},
]
state.save_cursor("act-old")
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 1
assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present"
def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch):
"""When CONFIGS_DIR is unset, the cursor path resolves through
configs_dir.resolve() — /configs in-container, ~/.molecule-workspace
on a non-container host. Issue #2458."""
import os
monkeypatch.delenv("CONFIGS_DIR", raising=False)
fake_home = tmp_path / "home"
fake_home.mkdir()
monkeypatch.setenv("HOME", str(fake_home))
path = inbox.default_cursor_path()
if Path("/configs").exists() and os.access("/configs", os.W_OK):
assert path == Path("/configs") / ".mcp_inbox_cursor"
else:
assert path == fake_home / ".molecule-workspace" / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Notification callback bridge — push UX for notification-capable hosts
# ---------------------------------------------------------------------------
#
# `record()` is called from the poller daemon thread when a new activity
# row arrives. Notification-capable MCP hosts (Claude Code) want to be
# pushed a notification — the universal wheel registers a callback via
# `set_notification_callback()` that fires the MCP notification. Pollers
# (`wait_for_message`/`inbox_peek`) keep working unchanged.
@pytest.fixture(autouse=True)
def _reset_notification_callback():
"""Each test starts with no callback registered. Notification
state must not leak across tests — same pattern as _reset_singleton."""
inbox.set_notification_callback(None)
yield
inbox.set_notification_callback(None)
def test_record_fires_notification_callback_with_message_dict(state: inbox.InboxState):
"""When a callback is registered, record() invokes it with the
canonical to_dict() shape — same shape inbox_peek returns to the
agent. Callers can build MCP notification payloads from this
without re-deriving fields."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1", peer_id="ws-peer", text="hello"))
assert len(received) == 1
payload = received[0]
assert payload["activity_id"] == "act-1"
assert payload["text"] == "hello"
assert payload["peer_id"] == "ws-peer"
assert payload["kind"] == "peer_agent" # to_dict derives this
assert payload["method"] == "message/send"
def test_record_dedupe_does_not_refire_callback(state: inbox.InboxState):
"""The activity_id dedupe path must short-circuit BEFORE invoking
the callback — otherwise a notification-capable host would see
duplicate push events on poller backlog overlap."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
state.record(_msg("act-1"))
state.record(_msg("act-1")) # dedupe — same id
assert len(received) == 1, (
f"expected 1 callback (dedupe), got {len(received)}"
f"would cause duplicate Claude conversation interrupts"
)
def test_record_callback_exception_does_not_break_inbox(state: inbox.InboxState):
"""A raising callback (e.g. asyncio loop closed mid-shutdown,
serialization error on an exotic message) must NOT prevent the
message from landing in the queue. Notification delivery is
best-effort; inbox correctness is not negotiable."""
def boom(_payload):
raise RuntimeError("simulated callback failure")
inbox.set_notification_callback(boom)
# Must not raise, must still queue the message.
state.record(_msg("act-1"))
queued = state.peek(10)
assert len(queued) == 1
assert queued[0].activity_id == "act-1"
def test_record_no_callback_registered_is_no_op(state: inbox.InboxState):
"""When no callback is set (in-container path, or before
activation), record() proceeds normally — no None-call crash."""
# No set_notification_callback() in this test — autouse fixture
# cleared any previous registration.
state.record(_msg("act-1"))
assert len(state.peek(10)) == 1
def test_set_notification_callback_replaces_previous(state: inbox.InboxState):
"""Re-registering the callback replaces the previous — only the
latest callback fires. Test ensures the universal wheel can update
the bridge if its asyncio loop is replaced (e.g. graceful restart)."""
first: list[dict] = []
second: list[dict] = []
inbox.set_notification_callback(first.append)
inbox.set_notification_callback(second.append)
state.record(_msg("act-1"))
assert len(first) == 0, "first callback should be unregistered"
assert len(second) == 1, "second callback should receive the event"
def test_set_notification_callback_none_clears(state: inbox.InboxState):
"""Setting None clears the callback — used by tests + the wheel's
shutdown path."""
received: list[dict] = []
inbox.set_notification_callback(received.append)
inbox.set_notification_callback(None)
state.record(_msg("act-1"))
assert received == []
# ---------------------------------------------------------------------------
# Phase 2 — chat_upload_receive rows route to inbox_uploads.fetch_and_stage
# ---------------------------------------------------------------------------
def test_poll_once_skips_chat_upload_row_from_queue(state: inbox.InboxState, monkeypatch, tmp_path):
"""A row with method='chat_upload_receive' must NOT enqueue as a
chat message — it's a side-effect telling the workspace to fetch
bytes. Pin the contract so a refactor that flattens the row loop
can't silently re-enqueue these as 'empty A2A message' rows."""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
inbox_uploads.get_cache().clear()
rows = [
{
"id": "act-1",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: foo.pdf",
"request_body": {
"file_id": "abc123",
"name": "foo.pdf",
"mimeType": "application/pdf",
"size": 4,
"uri": "platform-pending:ws-1/abc123",
},
"created_at": "2026-05-04T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
fetch_called = []
def fake_fetch(row, **kwargs):
fetch_called.append((row.get("id"), kwargs["workspace_id"]))
return "workspace:/local/foo.pdf"
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Not enqueued + cursor advanced.
assert n == 0
assert state.peek(10) == []
assert state.load_cursor() == "act-1"
# fetch_and_stage was invoked with the row and workspace_id.
assert fetch_called == [("act-1", "ws-1")]
def test_poll_once_chat_upload_row_then_chat_message_rewrites_uri(state: inbox.InboxState, monkeypatch, tmp_path):
"""The classic ordering: upload-receive row first (lower id), chat
message referencing platform-pending: URI second. The chat message
that lands in the inbox must have its URI rewritten to the local
workspace: URI before the agent sees it.
"""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
cache = inbox_uploads.get_cache()
cache.clear()
# Pretend the fetch already populated the cache. (The real flow
# populates it inside fetch_and_stage; we patch that to keep the
# test focused on the rewrite contract.)
cache.set("platform-pending:ws-1/abc123", "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf")
rows = [
{
"id": "act-1",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: foo.pdf",
"request_body": {
"file_id": "abc123",
"name": "foo.pdf",
"mimeType": "application/pdf",
"size": 4,
"uri": "platform-pending:ws-1/abc123",
},
"created_at": "2026-05-04T10:00:00Z",
},
{
"id": "act-2",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {
"params": {
"message": {
"parts": [
{"kind": "text", "text": "look at this"},
{
"kind": "file",
"file": {
"uri": "platform-pending:ws-1/abc123",
"name": "foo.pdf",
},
},
]
}
}
},
"created_at": "2026-05-04T10:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
def fake_fetch(row, **kwargs):
return "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf"
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the chat message is enqueued.
assert n == 1
queue = state.peek(10)
assert len(queue) == 1
msg = queue[0]
assert msg.activity_id == "act-2"
# The URI in the row's request_body was mutated by message_from_activity
# → rewrite_request_body. Re-extracting reveals the rewritten value.
rewritten = rows[1]["request_body"]["params"]["message"]["parts"][1]["file"]["uri"]
assert rewritten == "workspace:/workspace/.molecule/chat-uploads/xx-foo.pdf"
def test_poll_once_chat_upload_row_advances_cursor_even_on_fetch_failure(
state: inbox.InboxState, monkeypatch, tmp_path
):
"""A permanent network failure on /content must NOT stall the cursor
— otherwise one bad upload blocks all real chat traffic for the
workspace. fetch_and_stage returns None on failure, but the row is
still considered handled from the cursor's perspective."""
import inbox_uploads
monkeypatch.setattr(inbox_uploads, "CHAT_UPLOAD_DIR", str(tmp_path / "chat-uploads"))
rows = [
{
"id": "act-broken",
"source_id": None,
"method": "chat_upload_receive",
"summary": "chat_upload_receive: doomed.pdf",
"request_body": {
"file_id": "doom",
"name": "doomed.pdf",
"uri": "platform-pending:ws-1/doom",
},
"created_at": "2026-05-04T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
def fake_fetch(row, **kwargs):
return None # network failure
with p, patch.object(inbox_uploads, "fetch_and_stage", fake_fetch):
inbox._poll_once(state, "http://platform", "ws-1", {})
assert state.peek(10) == []
assert state.load_cursor() == "act-broken"