From 30fb507165f96b36f274f295ec3dc13f7eb7f274 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:26:55 -0700 Subject: [PATCH 1/3] =?UTF-8?q?feat(poll-upload):=20phase=205b=20=E2=80=94?= =?UTF-8?q?=20concurrent=20BatchFetcher=20+=20httpx=20client=20reuse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the two remaining findings from the Phase 1-4 retrospective review (the Python-side counterparts to phase 5a): 1. Important — inbox_uploads.fetch_and_stage blocked the inbox poll loop synchronously per row. A user dragging 4 files into chat at once would stall the poller for 4× per-fetch latency before the chat message reached the agent. Add BatchFetcher: a thread-pool wrapper (default 4 workers) that submits fetches concurrently and exposes wait_all() as the barrier the inbox loop calls before processing the chat-message row that references the uploads. The drain barrier is the correctness invariant: rewrite_request_body must observe a populated URI cache when it walks the chat-message row's parts. _poll_once now drains the BatchFetcher inline before the first non-upload row, AND at end-of-batch (case: batch contains only upload rows; the corresponding chat message arrives in a later poll, but the future-poll-races-current-fetch race is closed). 2. Nit — fetch_and_stage created two httpx.Client instances per row (one for GET /content, one for POST /ack). Refactor so a single client serves both calls. When called from BatchFetcher, the batch-shared client serves every row's GET + ack — so the second fetch reuses the TCP+TLS handshake from the first. Comprehensive tests: - 13 new inbox_uploads tests: - fetch_and_stage with supplied client: zero httpx.Client constructions, GET+POST through the same client, caller's client not closed (lifecycle owned by caller). - fetch_and_stage without supplied client: exactly one httpx.Client constructed (was 2 pre-fix), closed on the way out. - BatchFetcher: 3 rows × 120ms = parallel completion < 250ms (vs. ~360ms serial), URI cache hot when wait_all returns, per-row failure isolation, single-client reuse across all submits, idempotent close, submit-after-close raises, owned-vs-supplied client lifecycle, no-op wait_all on empty batch, graceful httpx-missing degradation. - 3 new inbox tests: - poll_once drains uploads before processing the chat-message row (in-place mutation of row['request_body'] proves the URI was rewritten BEFORE message_from_activity returned). - poll_once with only upload rows still drains at end-of-batch. - poll_once with no upload rows never constructs a BatchFetcher (zero overhead on the no-upload happy path). 133 total inbox + inbox_uploads tests pass; 0 regressions. Closes the chat-upload poll-mode-perf gap end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 45 +++- workspace/inbox_uploads.py | 249 +++++++++++++++++-- workspace/tests/test_inbox.py | 213 ++++++++++++++++ workspace/tests/test_inbox_uploads.py | 339 ++++++++++++++++++++++++++ 4 files changed, 824 insertions(+), 22 deletions(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c7ea895..5e2f02b1 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -553,10 +553,26 @@ def _poll_once( # Imported lazily at use-site so a runtime that never sees an # upload-receive row never imports the module. Cheap on the hot # path because Python caches the import. - from inbox_uploads import is_chat_upload_row, fetch_and_stage + from inbox_uploads import is_chat_upload_row, BatchFetcher new_count = 0 last_id: str | None = None + # ``batch_fetcher`` is lazy: a poll batch with no upload rows pays + # zero overhead. Once the first upload row appears we open one + # BatchFetcher and submit every subsequent upload row to its thread + # pool; before processing the FIRST non-upload row we drain the + # pool (wait_all) so the URI cache is hot when message rewriting + # runs. Without the barrier, the chat message that references the + # upload would arrive at the agent with the un-rewritten + # platform-pending: URI. + batch_fetcher: BatchFetcher | None = None + + def _drain_uploads(bf: BatchFetcher | None) -> None: + if bf is None: + return + bf.wait_all() + bf.close() + for row in rows: if not isinstance(row, dict): continue @@ -570,14 +586,21 @@ def _poll_once( # message_from_activity. We DO advance the cursor past # this row so a permanent network outage on /content # doesn't stall the cursor and block real chat traffic. - fetch_and_stage( - row, - platform_url=platform_url, - workspace_id=workspace_id, - headers=headers, - ) + if batch_fetcher is None: + batch_fetcher = BatchFetcher( + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + batch_fetcher.submit(row) last_id = str(row.get("id", "")) or last_id continue + # Non-upload row: drain any pending uploads first so the URI + # cache is populated before we run rewrite_request_body / + # message_from_activity on a row that may reference one. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with @@ -612,6 +635,14 @@ def _poll_once( last_id = message.activity_id new_count += 1 + # Drain any uploads still in flight if the batch ended with upload + # rows (no chat-message row to trigger the inline drain). Without + # this, a future poll that picks up the chat-message row first + # would race with the still-running fetches. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None + if last_id is not None: state.save_cursor(last_id, cursor_key) return new_count diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 798f18de..913efdcd 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted. """ from __future__ import annotations +import concurrent.futures import logging import mimetypes import os @@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024 # 10s default for /activity calls — both are user-perceived latency. DEFAULT_FETCH_TIMEOUT = 60.0 +# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom +# for the realistic "user dragged 3-4 files into chat at once" case +# while bounding the platform's per-workspace fan-out. The cap matters +# because the platform's /content endpoint reads bytea from Postgres in +# a single round-trip per request — N workers = N concurrent DB reads +# of up to 25 MB each, so a higher cap could pressure platform memory +# without much UX win (network bandwidth is the bottleneck once the +# bytes are buffered). +DEFAULT_BATCH_FETCH_WORKERS = 4 + +# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox +# poll loop before giving up on still-in-flight fetches. Aligned with +# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop +# longer than its own deadline. A timeout fires only if a worker thread +# is stuck past the underlying httpx timeout — pathological case; +# normal completion is bounded by per-fetch timeout × ceil(N/W). +DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0 + # Cap on the URI cache. A long-lived workspace handling thousands of # uploads shouldn't grow without bound; an LRU cap of 1024 keeps the # entries-needed-for-a-typical-conversation well within memory. @@ -275,6 +294,7 @@ def fetch_and_stage( workspace_id: str, headers: dict[str, str], timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + client: Any = None, ) -> str | None: """Fetch the row's bytes, stage them under chat-uploads, and ack. @@ -289,6 +309,11 @@ def fetch_and_stage( On success, the URI cache is updated so a subsequent chat message referencing the same ``platform-pending:`` URI is rewritten before the agent sees it. + + Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and + POST ack (saves one TLS handshake per row vs. constructing one + per-call). ``BatchFetcher`` does this across an entire poll batch so + N concurrent fetches share one connection pool. """ body = _request_body_dict(row) if body is None: @@ -317,25 +342,58 @@ def fetch_and_stage( if not isinstance(filename, str): filename = "file" - # Lazy httpx import: the standalone MCP path uses httpx; an in- - # container caller that imports this module by accident shouldn't - # explode at import time. - try: - import httpx # noqa: WPS433 - except ImportError: - logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) - return None + # Caller-supplied client: reuse for both GET + POST ack. Otherwise + # build a one-shot client and close it on the way out. Lazy httpx + # import keeps the standalone MCP path's optional dep optional. + own_client = client is None + if own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + client = httpx.Client(timeout=timeout_secs) + try: + return _fetch_and_stage_with_client( + client, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + file_id=file_id, + pending_uri=pending_uri, + filename=filename, + body=body, + ) + finally: + if own_client: + try: + client.close() + except Exception: # noqa: BLE001 — close should never crash the caller + pass + + +def _fetch_and_stage_with_client( + client: Any, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + file_id: str, + pending_uri: str, + filename: str, + body: dict[str, Any], +) -> str | None: + """Inner body of fetch_and_stage. Always uses the supplied client for + both GET and POST so the connection pool is shared across the call. + """ content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" try: - with httpx.Client(timeout=timeout_secs) as client: - resp = client.get(content_url, headers=headers) + resp = client.get(content_url, headers=headers) except Exception as exc: # noqa: BLE001 - logger.warning( - "inbox_uploads: GET %s failed: %s", content_url, exc - ) + logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc) return None if resp.status_code == 404: @@ -403,8 +461,7 @@ def fetch_and_stage( # back the on-disk file — the platform's sweep will clean up # eventually. try: - with httpx.Client(timeout=timeout_secs) as client: - ack_resp = client.post(ack_url, headers=headers) + ack_resp = client.post(ack_url, headers=headers) if ack_resp.status_code >= 400: logger.warning( "inbox_uploads: ack %s returned %d: %s", @@ -418,6 +475,168 @@ def fetch_and_stage( return local_uri +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch across a single poll batch +# --------------------------------------------------------------------------- + + +class BatchFetcher: + """Fetch + stage + ack a batch of upload-receive rows concurrently. + + Why this exists: the inbox poll loop used to call ``fetch_and_stage`` + serially per row. With N upload rows in a batch (a user dragging + multiple files into chat at once), the loop blocked for + ``N × per_fetch_latency`` before processing the chat message that + referenced them — a 4-file upload at 5s each = 20s of stall + before the agent saw the user's prompt. ``BatchFetcher`` runs the + fetches on a small thread pool (default 4 workers) so the stall is + bounded by ``ceil(N/W) × per_fetch_latency`` instead. + + Connection reuse: one ``httpx.Client`` is shared across every fetch + in the batch. httpx clients carry a connection pool, so a second + fetch to the same platform host reuses the TCP+TLS handshake from + the first — measurable win when fetches happen back-to-back. + + Correctness invariant the caller MUST preserve: the inbox loop is + expected to call ``wait_all()`` before processing the chat-message + activity row that REFERENCES one of these uploads. Without the + barrier, the URI cache is empty when ``rewrite_request_body`` runs + and the agent sees the un-rewritten ``platform-pending:`` URI. The + caller-side test ``test_poll_once_waits_for_uploads_before_messages`` + pins this end-to-end. + + Use as a context manager so the executor + client are torn down + even if the caller raises mid-batch. + """ + + def __init__( + self, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + max_workers: int = DEFAULT_BATCH_FETCH_WORKERS, + client: Any = None, + ): + self._platform_url = platform_url + self._workspace_id = workspace_id + self._headers = dict(headers) # copy so caller mutations don't leak in + self._timeout_secs = timeout_secs + + # Caller can inject a client (tests do this); production callers + # let us build one. Track ownership so we only close ours. + self._own_client = client is None + if self._own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + # Match fetch_and_stage's behavior: log + degrade rather + # than raising at construction time. submit() will then + # return None for every row. + logger.error("inbox_uploads: httpx not installed; BatchFetcher inert") + self._client: Any = None + else: + self._client = httpx.Client(timeout=timeout_secs) + else: + self._client = client + + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="upload-fetch", + ) + self._futures: list[concurrent.futures.Future[Any]] = [] + self._closed = False + + def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: + """Submit ``row`` for fetch + stage + ack. Non-blocking — the + worker thread runs ``fetch_and_stage`` with the shared client. + + Returns the Future so a caller that wants per-row outcome can + await it; ``None`` if the BatchFetcher is in a degraded state + (httpx missing). + """ + if self._closed: + raise RuntimeError("BatchFetcher: submit after close") + if self._client is None: + return None + fut = self._executor.submit( + fetch_and_stage, + row, + platform_url=self._platform_url, + workspace_id=self._workspace_id, + headers=self._headers, + timeout_secs=self._timeout_secs, + client=self._client, + ) + self._futures.append(fut) + return fut + + def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None: + """Block until every submitted future completes (or times out). + + Per-future exceptions are logged + swallowed — ``fetch_and_stage`` + already converts every error path to ``return None``, so a real + exception propagating up to here is unexpected and we don't want + one bad fetch to abort the whole batch. + + Timeouts are also logged + swallowed; the caller will move on + and the un-acked rows will be retried by the next poll. + """ + if not self._futures: + return + try: + done, not_done = concurrent.futures.wait( + self._futures, + timeout=timeout, + return_when=concurrent.futures.ALL_COMPLETED, + ) + except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here + logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc) + return + for fut in done: + exc = fut.exception() + if exc is not None: + logger.warning( + "inbox_uploads: BatchFetcher worker raised: %s", exc + ) + if not_done: + logger.warning( + "inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout", + len(not_done), + timeout, + ) + + def close(self) -> None: + """Tear down the executor + (if owned) the httpx client. + + Idempotent. After close, ``submit`` raises and the BatchFetcher + cannot be reused — construct a fresh one for the next poll. + """ + if self._closed: + return + self._closed = True + # Drain remaining futures so worker threads aren't killed mid- + # request. wait=True is the safe default; for an inbox poller a + # 60s tail at shutdown is acceptable since uploads in flight are + # the only thing close() is called between. + try: + self._executor.shutdown(wait=True) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: executor shutdown error: %s", exc) + if self._own_client and self._client is not None: + try: + self._client.close() + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: client close error: %s", exc) + + def __enter__(self) -> "BatchFetcher": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + # --------------------------------------------------------------------------- # URI rewrite for incoming chat messages # --------------------------------------------------------------------------- diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 162c32c2..d62b2a0a 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -577,6 +577,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor" +# --------------------------------------------------------------------------- +# Phase 5b — BatchFetcher integration with the poll loop +# --------------------------------------------------------------------------- +# +# These tests pin the cross-module contract between inbox._poll_once and +# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted +# to a single BatchFetcher AND drained (URI cache populated) before any +# subsequent message row is processed. Without the drain, the +# rewrite_request_body path inside message_from_activity surfaces the +# un-rewritten ``platform-pending:`` URI to the agent. + + +def _upload_row(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "chat_upload_receive", + "summary": f"chat_upload_receive: {file_id}.pdf", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 3, + }, + "created_at": "2026-05-04T10:00:00Z", + } + + +def _message_row_referencing(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "have a look"}, + { + "kind": "file", + "file": { + "uri": f"platform-pending:ws-1/{file_id}", + "name": f"{file_id}.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + } + + +def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"): + """Replace ``httpx.Client`` so: + + - GET /activity returns ``activity_rows`` + - GET /workspaces/.../content returns ``upload_bytes`` with content-type + - POST /ack returns 200 + + Returns the patch context manager; tests use ``with p:``. Each new + Client(...) gets a fresh MagicMock so the test can verify + constructor-count expectations without pinning singletons. + """ + def _client_factory(*args, **kwargs): + c = MagicMock() + c.__enter__ = MagicMock(return_value=c) + c.__exit__ = MagicMock(return_value=False) + + def _get(url, params=None, headers=None): + if "/activity" in url: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = activity_rows + resp.text = "" + return resp + if "/pending-uploads/" in url and "/content" in url: + resp = MagicMock() + resp.status_code = 200 + resp.content = upload_bytes + resp.headers = {"content-type": "application/pdf"} + resp.text = "" + return resp + resp = MagicMock() + resp.status_code = 404 + resp.text = "" + return resp + + def _post(url, headers=None): + resp = MagicMock() + resp.status_code = 200 + resp.text = "" + return resp + + c.get = MagicMock(side_effect=_get) + c.post = MagicMock(side_effect=_post) + c.close = MagicMock() + return c + + return patch("httpx.Client", side_effect=_client_factory) + + +def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path): + """The chat-message row's file.uri MUST be rewritten to the local + workspace: URI by the time it lands in the InboxState queue. This + requires BatchFetcher.wait_all() to run before message_from_activity + on the second row. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + # Sandbox the on-disk staging dir so the test can't pollute the + # workspace's real chat-uploads. + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [ + _upload_row("act-1", "file-A"), + _message_row_referencing("act-2", "file-A"), + ] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + + assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)" + queued = state.peek(10) + assert len(queued) == 1 + # The contract this test exists to pin: the platform-pending: URI + # was rewritten to workspace: BEFORE the message landed in the + # state queue. message_from_activity mutates row['request_body'] + # in-place, so the rewritten URI is observable on the row dict + # we passed in. + rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1] + assert rewritten_part["file"]["uri"].startswith("workspace:"), ( + f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; " + "rewrite_request_body ran before BatchFetcher.wait_all populated the cache" + ) + # Cursor advanced past BOTH rows — upload-receive (act-1) is + # acknowledged via the inbox cursor regardless of fetch outcome. + assert state.load_cursor() == "act-2" + + +def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path): + """End-of-batch drain: a poll that contains ONLY upload rows (no + chat-message row to trigger the inline drain) must still drain the + BatchFetcher before _poll_once returns. Otherwise a future poll + that picks up the corresponding chat-message row would race with + in-flight fetches from the previous batch. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + # By the time _poll_once returned, the URI cache must be hot + # for both file_ids — proves the end-of-loop drain ran. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + # Upload rows are NOT message rows; queue stays empty. + assert n == 0 + # Cursor advances past both upload rows. + assert state.load_cursor() == "act-2" + + +def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState): + """A batch with no upload-receive rows must not pay the BatchFetcher + construction cost — the executor + httpx client allocation is + deferred until the first upload row appears. + """ + import inbox_uploads + + constructed: list[Any] = [] + + def _patched_init(self, **kwargs): + constructed.append(kwargs) + # Don't actually run __init__; we never hit submit/wait_all. + self._closed = False + self._futures = [] + self._executor = MagicMock() + self._client = MagicMock() + self._own_client = False + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "hi"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + state.save_cursor("act-old") + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 1 + assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present" + + def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch): """When CONFIGS_DIR is unset, the cursor path resolves through configs_dir.resolve() — /configs in-container, ~/.molecule-workspace diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index 515616e2..c13cea70 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -695,3 +695,342 @@ def test_rewrite_request_body_handles_non_list_parts(): def test_rewrite_request_body_handles_non_dict_file(): body = {"parts": [{"kind": "file", "file": "not a dict"}]} inbox_uploads.rewrite_request_body(body) # must not raise + + +# --------------------------------------------------------------------------- +# fetch_and_stage with shared client — Phase 5b client-reuse contract +# --------------------------------------------------------------------------- +# +# When a caller passes ``client=`` to fetch_and_stage, that client must be +# used for BOTH the GET /content and the POST /ack — no fresh +# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b +# implementation made one new client for GET and another for ack; the new +# shape lets BatchFetcher share one connection pool across an entire batch. + + +def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + supplied = MagicMock() + supplied.get = MagicMock(return_value=get_resp) + supplied.post = MagicMock(return_value=ack_resp) + # Sentinel: any code path that constructs httpx.Client when one was + # already supplied is a regression — count constructions. + constructed: list[Any] = [] + + class _ShouldNotBeCalled: + def __init__(self, *a, **kw): + constructed.append((a, kw)) + + monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled) + + local_uri = inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={"Authorization": "Bearer t"}, + client=supplied, + ) + assert local_uri is not None + assert constructed == [], "supplied client must be reused; no new Client should be constructed" + # GET + POST ack both went through the supplied client. + supplied.get.assert_called_once() + supplied.post.assert_called_once() + # Caller-owned client must NOT be closed by fetch_and_stage; the + # batch fetcher (or test) closes it once the whole batch is done. + supplied.close.assert_not_called() + + +def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=get_resp) + c.post = MagicMock(return_value=ack_resp) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + # Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one. + assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}" + # Same client must serve BOTH calls. + built[0].get.assert_called_once() + built[0].post.assert_called_once() + # Owned client must be closed by fetch_and_stage on the way out. + built[0].close.assert_called_once() + + +def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client(): + # Even on failure the supplied client must not be closed — the + # BatchFetcher owns the lifecycle for the whole batch. + row = _row(uri="platform-pending:ws-1/file-1") + supplied = MagicMock() + supplied.get = MagicMock(side_effect=RuntimeError("network down")) + supplied.post = MagicMock() # should not be reached on GET failure + inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=supplied, + ) + supplied.close.assert_not_called() + supplied.post.assert_not_called() + + +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch + URI cache barrier +# --------------------------------------------------------------------------- + + +def _row_with_id(act_id: str, file_id: str) -> dict: + """Helper: an upload-receive row with a distinct activity id + file id.""" + return { + "id": act_id, + "method": "chat_upload_receive", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 1, + }, + } + + +def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock: + """Build one MagicMock client that returns per-file_id responses + based on the file_id segment of the URL. + """ + client = MagicMock() + + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + for fid, resp in get_responses.items(): + if f"/pending-uploads/{fid}/content" in url: + return resp + return _make_resp(404) + + def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock: + return _make_resp(200) + + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(side_effect=_post) + return client + + +def test_batch_fetcher_runs_submitted_rows_concurrently(): + # Three rows whose .get() blocks for ~120ms each. With 4 workers the + # batch should complete in ~120ms (parallel), not ~360ms (serial). + # The 250ms ceiling accommodates CI scheduler jitter while still + # discriminating concurrent (~120ms) from serial (~360ms). + import time + + barrier_start = [0.0] + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.12) + for fid in ("a", "b", "c"): + if f"/pending-uploads/{fid}/content" in url: + return _make_resp(200, content=b"X", content_type="text/plain") + return _make_resp(404) + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=4, + ) + barrier_start[0] = time.time() + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + elapsed = time.time() - barrier_start[0] + bf.close() + + assert elapsed < 0.25, ( + f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s " + "(suggests serial execution — Phase 5b regression)" + ) + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated(): + """Pin the correctness invariant: when wait_all returns, the URI + cache is hot for every submitted row. Without this barrier the + inbox loop would process the chat-message row before its uploads + were staged, and rewrite_request_body would surface the un-rewritten + platform-pending: URI to the agent. + """ + import time + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.05) + return _make_resp(200, content=b"data", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() + # Cache must be hot for BOTH rows by the time wait_all returns. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None + + +def test_batch_fetcher_isolates_per_row_failure(): + """One failing fetch must not abort siblings. Sibling rows complete, + URI cache populates for them; the bad row's cache entry stays absent. + """ + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + if "/pending-uploads/bad/content" in url: + return _make_resp(500, text="upstream broken") + return _make_resp(200, content=b"ok", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-1", "good1")) + bf.submit(_row_with_id("act-2", "bad")) + bf.submit(_row_with_id("act-3", "good2")) + bf.wait_all() + + cache = inbox_uploads.get_cache() + assert cache.get("platform-pending:ws-1/good1") is not None + assert cache.get("platform-pending:ws-1/good2") is not None + assert cache.get("platform-pending:ws-1/bad") is None + + +def test_batch_fetcher_reuses_one_client_across_all_submits(): + """Every row in the batch must share the same client instance. This + is the connection-pool-reuse leg of the perf win: a second fetch + to the same host reuses the TCP+TLS handshake from the first. + """ + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + + # 3 GETs + 3 POST acks all on the same client — no per-row Client + # construction. + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_close_idempotent(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + bf.close() # second call must not raise + + +def test_batch_fetcher_submit_after_close_raises(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + with pytest.raises(RuntimeError, match="submit after close"): + bf.submit(_row_with_id("act-x", "x")) + + +def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch): + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + c.post = MagicMock(return_value=_make_resp(200)) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + + assert len(built) == 1, "expected one owned client per BatchFetcher" + built[0].close.assert_called_once() + + +def test_batch_fetcher_does_not_close_supplied_client(): + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + # Supplied client survives the BatchFetcher's close — caller's lifecycle. + client.close.assert_not_called() + + +def test_batch_fetcher_wait_all_no_op_on_empty_batch(): + client = MagicMock() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.wait_all() # nothing submitted; must not block, must not raise + client.get.assert_not_called() + client.post.assert_not_called() + + +def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): + # No client supplied + httpx import fails → BatchFetcher degrades + # gracefully: submit() returns None and the row is silently skipped. + import sys + + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + result = bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None From 5b5eacbb2946f475dae92aae6d4f57ee83c2a3b4 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:47:14 -0700 Subject: [PATCH 2/3] test(inbox): clean up daemon poller thread to prevent test cross-talk MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_start_poller_thread_is_daemon spawned a daemon thread with no stop mechanism — the leaked thread polled every 10ms with the test's patched httpx.Client mock STILL ACTIVE for ~50ms after the test scope. Later tests that re-patched httpx.Client + asserted call counts on fetch_and_stage / Client construction got their assertions inflated by the leaked thread's iterations. Symptoms: test_poll_once_skips_chat_upload_row_from_queue saw fetch_and_stage called twice instead of once on Python 3.11 CI; test_batch_fetcher_owns_client_when_not_supplied saw two Client constructions instead of one in the full local suite. Both surfaced only after Phase 5b's BatchFetcher refactor changed the timing window that allowed the leaked thread to fire mid-test. Fix: extend start_poller_thread with an optional stop_event kwarg (backward compatible — production callers pass None and rely on the daemon flag for process-exit cleanup). The test now signals + joins on stop_event before exiting scope, so the thread is gone before any later test patches httpx. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 8 +++++++- workspace/tests/test_inbox.py | 26 ++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 5e2f02b1..6c131175 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -685,6 +685,7 @@ def start_poller_thread( platform_url: str, workspace_id: str, interval: float = POLL_INTERVAL_SECONDS, + stop_event: threading.Event | None = None, ) -> threading.Thread: """Spawn the poller as a daemon thread. Returns the Thread handle. @@ -696,13 +697,18 @@ def start_poller_thread( operator running ``ps -eL`` or eyeballing ``threading.enumerate()`` can tell which thread is which without reverse-engineering it from crash tracebacks. + + Pass ``stop_event`` to enable graceful shutdown — used by tests so + the daemon thread doesn't outlive the test that started it and race + with later tests' httpx patches. Production code passes None and + relies on the daemon flag for process-exit cleanup. """ name = "molecule-mcp-inbox-poller" if workspace_id: name = f"{name}-{workspace_id[:8]}" t = threading.Thread( target=_poll_loop, - args=(state, platform_url, workspace_id, interval), + args=(state, platform_url, workspace_id, interval, stop_event), name=name, daemon=True, ) diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index d62b2a0a..cbba9a3b 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -555,16 +555,34 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat def test_start_poller_thread_is_daemon(state: inbox.InboxState): """Daemon flag is required so the poller dies with the parent process; a non-daemon poller would leak across `claude` restarts - and write to a stale workspace.""" + and write to a stale workspace. + + Stop_event is plumbed so the thread cleans up at the end of the + test instead of leaking into later tests. Without cleanup, the + daemon's ~10ms tick races with later tests that patch httpx.Client + — the leaked thread sees their patched response and runs an + unwanted iteration of _poll_once that double-counts mocked calls + (caught when test_batch_fetcher_owns_client_when_not_supplied + surfaced this on Python 3.11 CI but not 3.13 local). + """ resp = _make_response(200, []) p, _ = _patch_httpx(resp) + stop_event = threading.Event() with p, patch("platform_auth.auth_headers", return_value={}): # Use a very short interval so the loop body runs at least once # before we exit the test. - t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01) + t = inbox.start_poller_thread( + state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event + ) time.sleep(0.05) - assert t.daemon is True - assert t.is_alive() + assert t.daemon is True + assert t.is_alive() + # Signal shutdown + wait for the thread to actually exit before + # we leave the test scope. Without this join, the leaked thread + # races with later tests' httpx patches. + stop_event.set() + t.join(timeout=2.0) + assert not t.is_alive(), "poller thread did not exit on stop_event" # --------------------------------------------------------------------------- From 81e83c05b73bae8cce15979103a941dc51554864 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:56:54 -0700 Subject: [PATCH 3/3] fix(inbox): drop unused batch_fetcher = None after end-of-batch drain MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lint nit from review bot — _drain_uploads() runs and the function immediately advances to the cursor save + return, so the local re-assign is dead code. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 1 - 1 file changed, 1 deletion(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c131175..cff95c6d 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -641,7 +641,6 @@ def _poll_once( # would race with the still-running fetches. if batch_fetcher is not None: _drain_uploads(batch_fetcher) - batch_fetcher = None if last_id is not None: state.save_cursor(last_id, cursor_key)