From 30fb507165f96b36f274f295ec3dc13f7eb7f274 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 11:26:55 -0700 Subject: [PATCH] =?UTF-8?q?feat(poll-upload):=20phase=205b=20=E2=80=94=20c?= =?UTF-8?q?oncurrent=20BatchFetcher=20+=20httpx=20client=20reuse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves the two remaining findings from the Phase 1-4 retrospective review (the Python-side counterparts to phase 5a): 1. Important — inbox_uploads.fetch_and_stage blocked the inbox poll loop synchronously per row. A user dragging 4 files into chat at once would stall the poller for 4× per-fetch latency before the chat message reached the agent. Add BatchFetcher: a thread-pool wrapper (default 4 workers) that submits fetches concurrently and exposes wait_all() as the barrier the inbox loop calls before processing the chat-message row that references the uploads. The drain barrier is the correctness invariant: rewrite_request_body must observe a populated URI cache when it walks the chat-message row's parts. _poll_once now drains the BatchFetcher inline before the first non-upload row, AND at end-of-batch (case: batch contains only upload rows; the corresponding chat message arrives in a later poll, but the future-poll-races-current-fetch race is closed). 2. Nit — fetch_and_stage created two httpx.Client instances per row (one for GET /content, one for POST /ack). Refactor so a single client serves both calls. When called from BatchFetcher, the batch-shared client serves every row's GET + ack — so the second fetch reuses the TCP+TLS handshake from the first. Comprehensive tests: - 13 new inbox_uploads tests: - fetch_and_stage with supplied client: zero httpx.Client constructions, GET+POST through the same client, caller's client not closed (lifecycle owned by caller). - fetch_and_stage without supplied client: exactly one httpx.Client constructed (was 2 pre-fix), closed on the way out. - BatchFetcher: 3 rows × 120ms = parallel completion < 250ms (vs. ~360ms serial), URI cache hot when wait_all returns, per-row failure isolation, single-client reuse across all submits, idempotent close, submit-after-close raises, owned-vs-supplied client lifecycle, no-op wait_all on empty batch, graceful httpx-missing degradation. - 3 new inbox tests: - poll_once drains uploads before processing the chat-message row (in-place mutation of row['request_body'] proves the URI was rewritten BEFORE message_from_activity returned). - poll_once with only upload rows still drains at end-of-batch. - poll_once with no upload rows never constructs a BatchFetcher (zero overhead on the no-upload happy path). 133 total inbox + inbox_uploads tests pass; 0 regressions. Closes the chat-upload poll-mode-perf gap end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 45 +++- workspace/inbox_uploads.py | 249 +++++++++++++++++-- workspace/tests/test_inbox.py | 213 ++++++++++++++++ workspace/tests/test_inbox_uploads.py | 339 ++++++++++++++++++++++++++ 4 files changed, 824 insertions(+), 22 deletions(-) diff --git a/workspace/inbox.py b/workspace/inbox.py index 6c7ea895..5e2f02b1 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -553,10 +553,26 @@ def _poll_once( # Imported lazily at use-site so a runtime that never sees an # upload-receive row never imports the module. Cheap on the hot # path because Python caches the import. - from inbox_uploads import is_chat_upload_row, fetch_and_stage + from inbox_uploads import is_chat_upload_row, BatchFetcher new_count = 0 last_id: str | None = None + # ``batch_fetcher`` is lazy: a poll batch with no upload rows pays + # zero overhead. Once the first upload row appears we open one + # BatchFetcher and submit every subsequent upload row to its thread + # pool; before processing the FIRST non-upload row we drain the + # pool (wait_all) so the URI cache is hot when message rewriting + # runs. Without the barrier, the chat message that references the + # upload would arrive at the agent with the un-rewritten + # platform-pending: URI. + batch_fetcher: BatchFetcher | None = None + + def _drain_uploads(bf: BatchFetcher | None) -> None: + if bf is None: + return + bf.wait_all() + bf.close() + for row in rows: if not isinstance(row, dict): continue @@ -570,14 +586,21 @@ def _poll_once( # message_from_activity. We DO advance the cursor past # this row so a permanent network outage on /content # doesn't stall the cursor and block real chat traffic. - fetch_and_stage( - row, - platform_url=platform_url, - workspace_id=workspace_id, - headers=headers, - ) + if batch_fetcher is None: + batch_fetcher = BatchFetcher( + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + ) + batch_fetcher.submit(row) last_id = str(row.get("id", "")) or last_id continue + # Non-upload row: drain any pending uploads first so the URI + # cache is populated before we run rewrite_request_body / + # message_from_activity on a row that may reference one. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None if _is_self_notify_row(row): # The workspace-server's `/notify` handler writes the agent's # own send_message_to_user POSTs to activity_logs with @@ -612,6 +635,14 @@ def _poll_once( last_id = message.activity_id new_count += 1 + # Drain any uploads still in flight if the batch ended with upload + # rows (no chat-message row to trigger the inline drain). Without + # this, a future poll that picks up the chat-message row first + # would race with the still-running fetches. + if batch_fetcher is not None: + _drain_uploads(batch_fetcher) + batch_fetcher = None + if last_id is not None: state.save_cursor(last_id, cursor_key) return new_count diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 798f18de..913efdcd 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted. """ from __future__ import annotations +import concurrent.futures import logging import mimetypes import os @@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024 # 10s default for /activity calls — both are user-perceived latency. DEFAULT_FETCH_TIMEOUT = 60.0 +# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom +# for the realistic "user dragged 3-4 files into chat at once" case +# while bounding the platform's per-workspace fan-out. The cap matters +# because the platform's /content endpoint reads bytea from Postgres in +# a single round-trip per request — N workers = N concurrent DB reads +# of up to 25 MB each, so a higher cap could pressure platform memory +# without much UX win (network bandwidth is the bottleneck once the +# bytes are buffered). +DEFAULT_BATCH_FETCH_WORKERS = 4 + +# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox +# poll loop before giving up on still-in-flight fetches. Aligned with +# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop +# longer than its own deadline. A timeout fires only if a worker thread +# is stuck past the underlying httpx timeout — pathological case; +# normal completion is bounded by per-fetch timeout × ceil(N/W). +DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0 + # Cap on the URI cache. A long-lived workspace handling thousands of # uploads shouldn't grow without bound; an LRU cap of 1024 keeps the # entries-needed-for-a-typical-conversation well within memory. @@ -275,6 +294,7 @@ def fetch_and_stage( workspace_id: str, headers: dict[str, str], timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + client: Any = None, ) -> str | None: """Fetch the row's bytes, stage them under chat-uploads, and ack. @@ -289,6 +309,11 @@ def fetch_and_stage( On success, the URI cache is updated so a subsequent chat message referencing the same ``platform-pending:`` URI is rewritten before the agent sees it. + + Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and + POST ack (saves one TLS handshake per row vs. constructing one + per-call). ``BatchFetcher`` does this across an entire poll batch so + N concurrent fetches share one connection pool. """ body = _request_body_dict(row) if body is None: @@ -317,25 +342,58 @@ def fetch_and_stage( if not isinstance(filename, str): filename = "file" - # Lazy httpx import: the standalone MCP path uses httpx; an in- - # container caller that imports this module by accident shouldn't - # explode at import time. - try: - import httpx # noqa: WPS433 - except ImportError: - logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) - return None + # Caller-supplied client: reuse for both GET + POST ack. Otherwise + # build a one-shot client and close it on the way out. Lazy httpx + # import keeps the standalone MCP path's optional dep optional. + own_client = client is None + if own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id) + return None + client = httpx.Client(timeout=timeout_secs) + try: + return _fetch_and_stage_with_client( + client, + platform_url=platform_url, + workspace_id=workspace_id, + headers=headers, + file_id=file_id, + pending_uri=pending_uri, + filename=filename, + body=body, + ) + finally: + if own_client: + try: + client.close() + except Exception: # noqa: BLE001 — close should never crash the caller + pass + + +def _fetch_and_stage_with_client( + client: Any, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + file_id: str, + pending_uri: str, + filename: str, + body: dict[str, Any], +) -> str | None: + """Inner body of fetch_and_stage. Always uses the supplied client for + both GET and POST so the connection pool is shared across the call. + """ content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content" ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack" try: - with httpx.Client(timeout=timeout_secs) as client: - resp = client.get(content_url, headers=headers) + resp = client.get(content_url, headers=headers) except Exception as exc: # noqa: BLE001 - logger.warning( - "inbox_uploads: GET %s failed: %s", content_url, exc - ) + logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc) return None if resp.status_code == 404: @@ -403,8 +461,7 @@ def fetch_and_stage( # back the on-disk file — the platform's sweep will clean up # eventually. try: - with httpx.Client(timeout=timeout_secs) as client: - ack_resp = client.post(ack_url, headers=headers) + ack_resp = client.post(ack_url, headers=headers) if ack_resp.status_code >= 400: logger.warning( "inbox_uploads: ack %s returned %d: %s", @@ -418,6 +475,168 @@ def fetch_and_stage( return local_uri +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch across a single poll batch +# --------------------------------------------------------------------------- + + +class BatchFetcher: + """Fetch + stage + ack a batch of upload-receive rows concurrently. + + Why this exists: the inbox poll loop used to call ``fetch_and_stage`` + serially per row. With N upload rows in a batch (a user dragging + multiple files into chat at once), the loop blocked for + ``N × per_fetch_latency`` before processing the chat message that + referenced them — a 4-file upload at 5s each = 20s of stall + before the agent saw the user's prompt. ``BatchFetcher`` runs the + fetches on a small thread pool (default 4 workers) so the stall is + bounded by ``ceil(N/W) × per_fetch_latency`` instead. + + Connection reuse: one ``httpx.Client`` is shared across every fetch + in the batch. httpx clients carry a connection pool, so a second + fetch to the same platform host reuses the TCP+TLS handshake from + the first — measurable win when fetches happen back-to-back. + + Correctness invariant the caller MUST preserve: the inbox loop is + expected to call ``wait_all()`` before processing the chat-message + activity row that REFERENCES one of these uploads. Without the + barrier, the URI cache is empty when ``rewrite_request_body`` runs + and the agent sees the un-rewritten ``platform-pending:`` URI. The + caller-side test ``test_poll_once_waits_for_uploads_before_messages`` + pins this end-to-end. + + Use as a context manager so the executor + client are torn down + even if the caller raises mid-batch. + """ + + def __init__( + self, + *, + platform_url: str, + workspace_id: str, + headers: dict[str, str], + timeout_secs: float = DEFAULT_FETCH_TIMEOUT, + max_workers: int = DEFAULT_BATCH_FETCH_WORKERS, + client: Any = None, + ): + self._platform_url = platform_url + self._workspace_id = workspace_id + self._headers = dict(headers) # copy so caller mutations don't leak in + self._timeout_secs = timeout_secs + + # Caller can inject a client (tests do this); production callers + # let us build one. Track ownership so we only close ours. + self._own_client = client is None + if self._own_client: + try: + import httpx # noqa: WPS433 + except ImportError: + # Match fetch_and_stage's behavior: log + degrade rather + # than raising at construction time. submit() will then + # return None for every row. + logger.error("inbox_uploads: httpx not installed; BatchFetcher inert") + self._client: Any = None + else: + self._client = httpx.Client(timeout=timeout_secs) + else: + self._client = client + + self._executor = concurrent.futures.ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="upload-fetch", + ) + self._futures: list[concurrent.futures.Future[Any]] = [] + self._closed = False + + def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: + """Submit ``row`` for fetch + stage + ack. Non-blocking — the + worker thread runs ``fetch_and_stage`` with the shared client. + + Returns the Future so a caller that wants per-row outcome can + await it; ``None`` if the BatchFetcher is in a degraded state + (httpx missing). + """ + if self._closed: + raise RuntimeError("BatchFetcher: submit after close") + if self._client is None: + return None + fut = self._executor.submit( + fetch_and_stage, + row, + platform_url=self._platform_url, + workspace_id=self._workspace_id, + headers=self._headers, + timeout_secs=self._timeout_secs, + client=self._client, + ) + self._futures.append(fut) + return fut + + def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None: + """Block until every submitted future completes (or times out). + + Per-future exceptions are logged + swallowed — ``fetch_and_stage`` + already converts every error path to ``return None``, so a real + exception propagating up to here is unexpected and we don't want + one bad fetch to abort the whole batch. + + Timeouts are also logged + swallowed; the caller will move on + and the un-acked rows will be retried by the next poll. + """ + if not self._futures: + return + try: + done, not_done = concurrent.futures.wait( + self._futures, + timeout=timeout, + return_when=concurrent.futures.ALL_COMPLETED, + ) + except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here + logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc) + return + for fut in done: + exc = fut.exception() + if exc is not None: + logger.warning( + "inbox_uploads: BatchFetcher worker raised: %s", exc + ) + if not_done: + logger.warning( + "inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout", + len(not_done), + timeout, + ) + + def close(self) -> None: + """Tear down the executor + (if owned) the httpx client. + + Idempotent. After close, ``submit`` raises and the BatchFetcher + cannot be reused — construct a fresh one for the next poll. + """ + if self._closed: + return + self._closed = True + # Drain remaining futures so worker threads aren't killed mid- + # request. wait=True is the safe default; for an inbox poller a + # 60s tail at shutdown is acceptable since uploads in flight are + # the only thing close() is called between. + try: + self._executor.shutdown(wait=True) + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: executor shutdown error: %s", exc) + if self._own_client and self._client is not None: + try: + self._client.close() + except Exception as exc: # noqa: BLE001 + logger.warning("inbox_uploads: client close error: %s", exc) + + def __enter__(self) -> "BatchFetcher": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.close() + + # --------------------------------------------------------------------------- # URI rewrite for incoming chat messages # --------------------------------------------------------------------------- diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 162c32c2..d62b2a0a 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -577,6 +577,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path): assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor" +# --------------------------------------------------------------------------- +# Phase 5b — BatchFetcher integration with the poll loop +# --------------------------------------------------------------------------- +# +# These tests pin the cross-module contract between inbox._poll_once and +# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted +# to a single BatchFetcher AND drained (URI cache populated) before any +# subsequent message row is processed. Without the drain, the +# rewrite_request_body path inside message_from_activity surfaces the +# un-rewritten ``platform-pending:`` URI to the agent. + + +def _upload_row(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "chat_upload_receive", + "summary": f"chat_upload_receive: {file_id}.pdf", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 3, + }, + "created_at": "2026-05-04T10:00:00Z", + } + + +def _message_row_referencing(act_id: str, file_id: str) -> dict: + return { + "id": act_id, + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": { + "params": { + "message": { + "parts": [ + {"kind": "text", "text": "have a look"}, + { + "kind": "file", + "file": { + "uri": f"platform-pending:ws-1/{file_id}", + "name": f"{file_id}.pdf", + }, + }, + ] + } + } + }, + "created_at": "2026-05-04T10:00:01Z", + } + + +def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"): + """Replace ``httpx.Client`` so: + + - GET /activity returns ``activity_rows`` + - GET /workspaces/.../content returns ``upload_bytes`` with content-type + - POST /ack returns 200 + + Returns the patch context manager; tests use ``with p:``. Each new + Client(...) gets a fresh MagicMock so the test can verify + constructor-count expectations without pinning singletons. + """ + def _client_factory(*args, **kwargs): + c = MagicMock() + c.__enter__ = MagicMock(return_value=c) + c.__exit__ = MagicMock(return_value=False) + + def _get(url, params=None, headers=None): + if "/activity" in url: + resp = MagicMock() + resp.status_code = 200 + resp.json.return_value = activity_rows + resp.text = "" + return resp + if "/pending-uploads/" in url and "/content" in url: + resp = MagicMock() + resp.status_code = 200 + resp.content = upload_bytes + resp.headers = {"content-type": "application/pdf"} + resp.text = "" + return resp + resp = MagicMock() + resp.status_code = 404 + resp.text = "" + return resp + + def _post(url, headers=None): + resp = MagicMock() + resp.status_code = 200 + resp.text = "" + return resp + + c.get = MagicMock(side_effect=_get) + c.post = MagicMock(side_effect=_post) + c.close = MagicMock() + return c + + return patch("httpx.Client", side_effect=_client_factory) + + +def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path): + """The chat-message row's file.uri MUST be rewritten to the local + workspace: URI by the time it lands in the InboxState queue. This + requires BatchFetcher.wait_all() to run before message_from_activity + on the second row. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + # Sandbox the on-disk staging dir so the test can't pollute the + # workspace's real chat-uploads. + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [ + _upload_row("act-1", "file-A"), + _message_row_referencing("act-2", "file-A"), + ] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + + assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)" + queued = state.peek(10) + assert len(queued) == 1 + # The contract this test exists to pin: the platform-pending: URI + # was rewritten to workspace: BEFORE the message landed in the + # state queue. message_from_activity mutates row['request_body'] + # in-place, so the rewritten URI is observable on the row dict + # we passed in. + rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1] + assert rewritten_part["file"]["uri"].startswith("workspace:"), ( + f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; " + "rewrite_request_body ran before BatchFetcher.wait_all populated the cache" + ) + # Cursor advanced past BOTH rows — upload-receive (act-1) is + # acknowledged via the inbox cursor regardless of fetch outcome. + assert state.load_cursor() == "act-2" + + +def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path): + """End-of-batch drain: a poll that contains ONLY upload rows (no + chat-message row to trigger the inline drain) must still drain the + BatchFetcher before _poll_once returns. Otherwise a future poll + that picks up the corresponding chat-message row would race with + in-flight fetches from the previous batch. + """ + import inbox_uploads + inbox_uploads.get_cache().clear() + real_dir = inbox_uploads.CHAT_UPLOAD_DIR + inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads") + try: + rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")] + state.save_cursor("act-old") + with _patch_httpx_routing(rows, upload_bytes=b"PDF"): + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + # By the time _poll_once returned, the URI cache must be hot + # for both file_ids — proves the end-of-loop drain ran. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None + finally: + inbox_uploads.CHAT_UPLOAD_DIR = real_dir + inbox_uploads.get_cache().clear() + # Upload rows are NOT message rows; queue stays empty. + assert n == 0 + # Cursor advances past both upload rows. + assert state.load_cursor() == "act-2" + + +def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState): + """A batch with no upload-receive rows must not pay the BatchFetcher + construction cost — the executor + httpx client allocation is + deferred until the first upload row appears. + """ + import inbox_uploads + + constructed: list[Any] = [] + + def _patched_init(self, **kwargs): + constructed.append(kwargs) + # Don't actually run __init__; we never hit submit/wait_all. + self._closed = False + self._futures = [] + self._executor = MagicMock() + self._client = MagicMock() + self._own_client = False + + rows = [ + { + "id": "act-1", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "hi"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + state.save_cursor("act-old") + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 1 + assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present" + + def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch): """When CONFIGS_DIR is unset, the cursor path resolves through configs_dir.resolve() — /configs in-container, ~/.molecule-workspace diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index 515616e2..c13cea70 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -695,3 +695,342 @@ def test_rewrite_request_body_handles_non_list_parts(): def test_rewrite_request_body_handles_non_dict_file(): body = {"parts": [{"kind": "file", "file": "not a dict"}]} inbox_uploads.rewrite_request_body(body) # must not raise + + +# --------------------------------------------------------------------------- +# fetch_and_stage with shared client — Phase 5b client-reuse contract +# --------------------------------------------------------------------------- +# +# When a caller passes ``client=`` to fetch_and_stage, that client must be +# used for BOTH the GET /content and the POST /ack — no fresh +# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b +# implementation made one new client for GET and another for ack; the new +# shape lets BatchFetcher share one connection pool across an entire batch. + + +def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + supplied = MagicMock() + supplied.get = MagicMock(return_value=get_resp) + supplied.post = MagicMock(return_value=ack_resp) + # Sentinel: any code path that constructs httpx.Client when one was + # already supplied is a regression — count constructions. + constructed: list[Any] = [] + + class _ShouldNotBeCalled: + def __init__(self, *a, **kw): + constructed.append((a, kw)) + + monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled) + + local_uri = inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={"Authorization": "Bearer t"}, + client=supplied, + ) + assert local_uri is not None + assert constructed == [], "supplied client must be reused; no new Client should be constructed" + # GET + POST ack both went through the supplied client. + supplied.get.assert_called_once() + supplied.post.assert_called_once() + # Caller-owned client must NOT be closed by fetch_and_stage; the + # batch fetcher (or test) closes it once the whole batch is done. + supplied.close.assert_not_called() + + +def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch): + row = _row(uri="platform-pending:ws-1/file-1") + get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf") + ack_resp = _make_resp(200) + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=get_resp) + c.post = MagicMock(return_value=ack_resp) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + local_uri = inbox_uploads.fetch_and_stage( + row, platform_url="http://plat", workspace_id="ws-1", headers={} + ) + assert local_uri is not None + # Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one. + assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}" + # Same client must serve BOTH calls. + built[0].get.assert_called_once() + built[0].post.assert_called_once() + # Owned client must be closed by fetch_and_stage on the way out. + built[0].close.assert_called_once() + + +def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client(): + # Even on failure the supplied client must not be closed — the + # BatchFetcher owns the lifecycle for the whole batch. + row = _row(uri="platform-pending:ws-1/file-1") + supplied = MagicMock() + supplied.get = MagicMock(side_effect=RuntimeError("network down")) + supplied.post = MagicMock() # should not be reached on GET failure + inbox_uploads.fetch_and_stage( + row, + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=supplied, + ) + supplied.close.assert_not_called() + supplied.post.assert_not_called() + + +# --------------------------------------------------------------------------- +# BatchFetcher — concurrent fetch + URI cache barrier +# --------------------------------------------------------------------------- + + +def _row_with_id(act_id: str, file_id: str) -> dict: + """Helper: an upload-receive row with a distinct activity id + file id.""" + return { + "id": act_id, + "method": "chat_upload_receive", + "request_body": { + "file_id": file_id, + "name": f"{file_id}.pdf", + "uri": f"platform-pending:ws-1/{file_id}", + "mimeType": "application/pdf", + "size": 1, + }, + } + + +def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock: + """Build one MagicMock client that returns per-file_id responses + based on the file_id segment of the URL. + """ + client = MagicMock() + + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + for fid, resp in get_responses.items(): + if f"/pending-uploads/{fid}/content" in url: + return resp + return _make_resp(404) + + def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock: + return _make_resp(200) + + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(side_effect=_post) + return client + + +def test_batch_fetcher_runs_submitted_rows_concurrently(): + # Three rows whose .get() blocks for ~120ms each. With 4 workers the + # batch should complete in ~120ms (parallel), not ~360ms (serial). + # The 250ms ceiling accommodates CI scheduler jitter while still + # discriminating concurrent (~120ms) from serial (~360ms). + import time + + barrier_start = [0.0] + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.12) + for fid in ("a", "b", "c"): + if f"/pending-uploads/{fid}/content" in url: + return _make_resp(200, content=b"X", content_type="text/plain") + return _make_resp(404) + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=4, + ) + barrier_start[0] = time.time() + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + elapsed = time.time() - barrier_start[0] + bf.close() + + assert elapsed < 0.25, ( + f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s " + "(suggests serial execution — Phase 5b regression)" + ) + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated(): + """Pin the correctness invariant: when wait_all returns, the URI + cache is hot for every submitted row. Without this barrier the + inbox loop would process the chat-message row before its uploads + were staged, and rewrite_request_body would surface the un-rewritten + platform-pending: URI to the agent. + """ + import time + + def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + time.sleep(0.05) + return _make_resp(200, content=b"data", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() + # Cache must be hot for BOTH rows by the time wait_all returns. + assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None + assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None + + +def test_batch_fetcher_isolates_per_row_failure(): + """One failing fetch must not abort siblings. Sibling rows complete, + URI cache populates for them; the bad row's cache entry stays absent. + """ + def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock: + if "/pending-uploads/bad/content" in url: + return _make_resp(500, text="upstream broken") + return _make_resp(200, content=b"ok", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_get) + client.post = MagicMock(return_value=_make_resp(200)) + + inbox_uploads.get_cache().clear() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-1", "good1")) + bf.submit(_row_with_id("act-2", "bad")) + bf.submit(_row_with_id("act-3", "good2")) + bf.wait_all() + + cache = inbox_uploads.get_cache() + assert cache.get("platform-pending:ws-1/good1") is not None + assert cache.get("platform-pending:ws-1/good2") is not None + assert cache.get("platform-pending:ws-1/bad") is None + + +def test_batch_fetcher_reuses_one_client_across_all_submits(): + """Every row in the batch must share the same client instance. This + is the connection-pool-reuse leg of the perf win: a second fetch + to the same host reuses the TCP+TLS handshake from the first. + """ + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + for fid in ("a", "b", "c"): + bf.submit(_row_with_id(f"act-{fid}", fid)) + bf.wait_all() + + # 3 GETs + 3 POST acks all on the same client — no per-row Client + # construction. + assert client.get.call_count == 3 + assert client.post.call_count == 3 + + +def test_batch_fetcher_close_idempotent(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + bf.close() # second call must not raise + + +def test_batch_fetcher_submit_after_close_raises(): + client = MagicMock() + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) + bf.close() + with pytest.raises(RuntimeError, match="submit after close"): + bf.submit(_row_with_id("act-x", "x")) + + +def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch): + built: list[MagicMock] = [] + + def _factory(*args, **kwargs): + c = MagicMock() + c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + c.post = MagicMock(return_value=_make_resp(200)) + built.append(c) + return c + + monkeypatch.setattr("httpx.Client", _factory) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + + assert len(built) == 1, "expected one owned client per BatchFetcher" + built[0].close.assert_called_once() + + +def test_batch_fetcher_does_not_close_supplied_client(): + client = MagicMock() + client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain")) + client.post = MagicMock(return_value=_make_resp(200)) + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + # Supplied client survives the BatchFetcher's close — caller's lifecycle. + client.close.assert_not_called() + + +def test_batch_fetcher_wait_all_no_op_on_empty_batch(): + client = MagicMock() + with inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={}, client=client + ) as bf: + bf.wait_all() # nothing submitted; must not block, must not raise + client.get.assert_not_called() + client.post.assert_not_called() + + +def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): + # No client supplied + httpx import fails → BatchFetcher degrades + # gracefully: submit() returns None and the row is silently skipped. + import sys + + real_httpx = sys.modules.pop("httpx", None) + monkeypatch.setitem(sys.modules, "httpx", None) + try: + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", workspace_id="ws-1", headers={} + ) + result = bf.submit(_row_with_id("act-a", "a")) + bf.wait_all() + bf.close() + finally: + if real_httpx is not None: + sys.modules["httpx"] = real_httpx + else: + sys.modules.pop("httpx", None) + assert result is None