feat(poll-upload): phase 5b — concurrent BatchFetcher + httpx client reuse

Resolves the two remaining findings from the Phase 1-4 retrospective
review (the Python-side counterparts to phase 5a):

1. Important — inbox_uploads.fetch_and_stage blocked the inbox poll
   loop synchronously per row. A user dragging 4 files into chat at
   once would stall the poller for 4× per-fetch latency before the
   chat message reached the agent. Add BatchFetcher: a thread-pool
   wrapper (default 4 workers) that submits fetches concurrently and
   exposes wait_all() as the barrier the inbox loop calls before
   processing the chat-message row that references the uploads.

   The drain barrier is the correctness invariant: rewrite_request_body
   must observe a populated URI cache when it walks the chat-message
   row's parts. _poll_once now drains the BatchFetcher inline before
   the first non-upload row, AND at end-of-batch (case: batch contains
   only upload rows; the corresponding chat message arrives in a later
   poll, but the future-poll-races-current-fetch race is closed).

2. Nit — fetch_and_stage created two httpx.Client instances per row
   (one for GET /content, one for POST /ack). Refactor so a single
   client serves both calls. When called from BatchFetcher, the
   batch-shared client serves every row's GET + ack — so the second
   fetch reuses the TCP+TLS handshake from the first.

Comprehensive tests:

- 13 new inbox_uploads tests:
  - fetch_and_stage with supplied client: zero httpx.Client
    constructions, GET+POST through the same client, caller's client
    not closed (lifecycle owned by caller).
  - fetch_and_stage without supplied client: exactly one
    httpx.Client constructed (was 2 pre-fix), closed on the way out.
  - BatchFetcher: 3 rows × 120ms = parallel completion < 250ms
    (vs. ~360ms serial), URI cache hot when wait_all returns,
    per-row failure isolation, single-client reuse across all
    submits, idempotent close, submit-after-close raises,
    owned-vs-supplied client lifecycle, no-op wait_all on empty
    batch, graceful httpx-missing degradation.

- 3 new inbox tests:
  - poll_once drains uploads before processing the chat-message row
    (in-place mutation of row['request_body'] proves the URI was
    rewritten BEFORE message_from_activity returned).
  - poll_once with only upload rows still drains at end-of-batch.
  - poll_once with no upload rows never constructs a BatchFetcher
    (zero overhead on the no-upload happy path).

133 total inbox + inbox_uploads tests pass; 0 regressions.

Closes the chat-upload poll-mode-perf gap end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-05 11:26:55 -07:00
parent 27db090d3d
commit 30fb507165
4 changed files with 824 additions and 22 deletions

View File

@ -553,10 +553,26 @@ def _poll_once(
# Imported lazily at use-site so a runtime that never sees an
# upload-receive row never imports the module. Cheap on the hot
# path because Python caches the import.
from inbox_uploads import is_chat_upload_row, fetch_and_stage
from inbox_uploads import is_chat_upload_row, BatchFetcher
new_count = 0
last_id: str | None = None
# ``batch_fetcher`` is lazy: a poll batch with no upload rows pays
# zero overhead. Once the first upload row appears we open one
# BatchFetcher and submit every subsequent upload row to its thread
# pool; before processing the FIRST non-upload row we drain the
# pool (wait_all) so the URI cache is hot when message rewriting
# runs. Without the barrier, the chat message that references the
# upload would arrive at the agent with the un-rewritten
# platform-pending: URI.
batch_fetcher: BatchFetcher | None = None
def _drain_uploads(bf: BatchFetcher | None) -> None:
if bf is None:
return
bf.wait_all()
bf.close()
for row in rows:
if not isinstance(row, dict):
continue
@ -570,14 +586,21 @@ def _poll_once(
# message_from_activity. We DO advance the cursor past
# this row so a permanent network outage on /content
# doesn't stall the cursor and block real chat traffic.
fetch_and_stage(
row,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
)
if batch_fetcher is None:
batch_fetcher = BatchFetcher(
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
)
batch_fetcher.submit(row)
last_id = str(row.get("id", "")) or last_id
continue
# Non-upload row: drain any pending uploads first so the URI
# cache is populated before we run rewrite_request_body /
# message_from_activity on a row that may reference one.
if batch_fetcher is not None:
_drain_uploads(batch_fetcher)
batch_fetcher = None
if _is_self_notify_row(row):
# The workspace-server's `/notify` handler writes the agent's
# own send_message_to_user POSTs to activity_logs with
@ -612,6 +635,14 @@ def _poll_once(
last_id = message.activity_id
new_count += 1
# Drain any uploads still in flight if the batch ended with upload
# rows (no chat-message row to trigger the inline drain). Without
# this, a future poll that picks up the chat-message row first
# would race with the still-running fetches.
if batch_fetcher is not None:
_drain_uploads(batch_fetcher)
batch_fetcher = None
if last_id is not None:
state.save_cursor(last_id, cursor_key)
return new_count

View File

@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted.
"""
from __future__ import annotations
import concurrent.futures
import logging
import mimetypes
import os
@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024
# 10s default for /activity calls — both are user-perceived latency.
DEFAULT_FETCH_TIMEOUT = 60.0
# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom
# for the realistic "user dragged 3-4 files into chat at once" case
# while bounding the platform's per-workspace fan-out. The cap matters
# because the platform's /content endpoint reads bytea from Postgres in
# a single round-trip per request — N workers = N concurrent DB reads
# of up to 25 MB each, so a higher cap could pressure platform memory
# without much UX win (network bandwidth is the bottleneck once the
# bytes are buffered).
DEFAULT_BATCH_FETCH_WORKERS = 4
# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox
# poll loop before giving up on still-in-flight fetches. Aligned with
# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop
# longer than its own deadline. A timeout fires only if a worker thread
# is stuck past the underlying httpx timeout — pathological case;
# normal completion is bounded by per-fetch timeout × ceil(N/W).
DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0
# Cap on the URI cache. A long-lived workspace handling thousands of
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
# entries-needed-for-a-typical-conversation well within memory.
@ -275,6 +294,7 @@ def fetch_and_stage(
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
client: Any = None,
) -> str | None:
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
@ -289,6 +309,11 @@ def fetch_and_stage(
On success, the URI cache is updated so a subsequent chat message
referencing the same ``platform-pending:`` URI is rewritten before
the agent sees it.
Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and
POST ack (saves one TLS handshake per row vs. constructing one
per-call). ``BatchFetcher`` does this across an entire poll batch so
N concurrent fetches share one connection pool.
"""
body = _request_body_dict(row)
if body is None:
@ -317,25 +342,58 @@ def fetch_and_stage(
if not isinstance(filename, str):
filename = "file"
# Lazy httpx import: the standalone MCP path uses httpx; an in-
# container caller that imports this module by accident shouldn't
# explode at import time.
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
# Caller-supplied client: reuse for both GET + POST ack. Otherwise
# build a one-shot client and close it on the way out. Lazy httpx
# import keeps the standalone MCP path's optional dep optional.
own_client = client is None
if own_client:
try:
import httpx # noqa: WPS433
except ImportError:
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
return None
client = httpx.Client(timeout=timeout_secs)
try:
return _fetch_and_stage_with_client(
client,
platform_url=platform_url,
workspace_id=workspace_id,
headers=headers,
file_id=file_id,
pending_uri=pending_uri,
filename=filename,
body=body,
)
finally:
if own_client:
try:
client.close()
except Exception: # noqa: BLE001 — close should never crash the caller
pass
def _fetch_and_stage_with_client(
client: Any,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
file_id: str,
pending_uri: str,
filename: str,
body: dict[str, Any],
) -> str | None:
"""Inner body of fetch_and_stage. Always uses the supplied client for
both GET and POST so the connection pool is shared across the call.
"""
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
try:
with httpx.Client(timeout=timeout_secs) as client:
resp = client.get(content_url, headers=headers)
resp = client.get(content_url, headers=headers)
except Exception as exc: # noqa: BLE001
logger.warning(
"inbox_uploads: GET %s failed: %s", content_url, exc
)
logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc)
return None
if resp.status_code == 404:
@ -403,8 +461,7 @@ def fetch_and_stage(
# back the on-disk file — the platform's sweep will clean up
# eventually.
try:
with httpx.Client(timeout=timeout_secs) as client:
ack_resp = client.post(ack_url, headers=headers)
ack_resp = client.post(ack_url, headers=headers)
if ack_resp.status_code >= 400:
logger.warning(
"inbox_uploads: ack %s returned %d: %s",
@ -418,6 +475,168 @@ def fetch_and_stage(
return local_uri
# ---------------------------------------------------------------------------
# BatchFetcher — concurrent fetch across a single poll batch
# ---------------------------------------------------------------------------
class BatchFetcher:
"""Fetch + stage + ack a batch of upload-receive rows concurrently.
Why this exists: the inbox poll loop used to call ``fetch_and_stage``
serially per row. With N upload rows in a batch (a user dragging
multiple files into chat at once), the loop blocked for
``N × per_fetch_latency`` before processing the chat message that
referenced them a 4-file upload at 5s each = 20s of stall
before the agent saw the user's prompt. ``BatchFetcher`` runs the
fetches on a small thread pool (default 4 workers) so the stall is
bounded by ``ceil(N/W) × per_fetch_latency`` instead.
Connection reuse: one ``httpx.Client`` is shared across every fetch
in the batch. httpx clients carry a connection pool, so a second
fetch to the same platform host reuses the TCP+TLS handshake from
the first measurable win when fetches happen back-to-back.
Correctness invariant the caller MUST preserve: the inbox loop is
expected to call ``wait_all()`` before processing the chat-message
activity row that REFERENCES one of these uploads. Without the
barrier, the URI cache is empty when ``rewrite_request_body`` runs
and the agent sees the un-rewritten ``platform-pending:`` URI. The
caller-side test ``test_poll_once_waits_for_uploads_before_messages``
pins this end-to-end.
Use as a context manager so the executor + client are torn down
even if the caller raises mid-batch.
"""
def __init__(
self,
*,
platform_url: str,
workspace_id: str,
headers: dict[str, str],
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
max_workers: int = DEFAULT_BATCH_FETCH_WORKERS,
client: Any = None,
):
self._platform_url = platform_url
self._workspace_id = workspace_id
self._headers = dict(headers) # copy so caller mutations don't leak in
self._timeout_secs = timeout_secs
# Caller can inject a client (tests do this); production callers
# let us build one. Track ownership so we only close ours.
self._own_client = client is None
if self._own_client:
try:
import httpx # noqa: WPS433
except ImportError:
# Match fetch_and_stage's behavior: log + degrade rather
# than raising at construction time. submit() will then
# return None for every row.
logger.error("inbox_uploads: httpx not installed; BatchFetcher inert")
self._client: Any = None
else:
self._client = httpx.Client(timeout=timeout_secs)
else:
self._client = client
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix="upload-fetch",
)
self._futures: list[concurrent.futures.Future[Any]] = []
self._closed = False
def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None:
"""Submit ``row`` for fetch + stage + ack. Non-blocking — the
worker thread runs ``fetch_and_stage`` with the shared client.
Returns the Future so a caller that wants per-row outcome can
await it; ``None`` if the BatchFetcher is in a degraded state
(httpx missing).
"""
if self._closed:
raise RuntimeError("BatchFetcher: submit after close")
if self._client is None:
return None
fut = self._executor.submit(
fetch_and_stage,
row,
platform_url=self._platform_url,
workspace_id=self._workspace_id,
headers=self._headers,
timeout_secs=self._timeout_secs,
client=self._client,
)
self._futures.append(fut)
return fut
def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None:
"""Block until every submitted future completes (or times out).
Per-future exceptions are logged + swallowed ``fetch_and_stage``
already converts every error path to ``return None``, so a real
exception propagating up to here is unexpected and we don't want
one bad fetch to abort the whole batch.
Timeouts are also logged + swallowed; the caller will move on
and the un-acked rows will be retried by the next poll.
"""
if not self._futures:
return
try:
done, not_done = concurrent.futures.wait(
self._futures,
timeout=timeout,
return_when=concurrent.futures.ALL_COMPLETED,
)
except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here
logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc)
return
for fut in done:
exc = fut.exception()
if exc is not None:
logger.warning(
"inbox_uploads: BatchFetcher worker raised: %s", exc
)
if not_done:
logger.warning(
"inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout",
len(not_done),
timeout,
)
def close(self) -> None:
"""Tear down the executor + (if owned) the httpx client.
Idempotent. After close, ``submit`` raises and the BatchFetcher
cannot be reused construct a fresh one for the next poll.
"""
if self._closed:
return
self._closed = True
# Drain remaining futures so worker threads aren't killed mid-
# request. wait=True is the safe default; for an inbox poller a
# 60s tail at shutdown is acceptable since uploads in flight are
# the only thing close() is called between.
try:
self._executor.shutdown(wait=True)
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: executor shutdown error: %s", exc)
if self._own_client and self._client is not None:
try:
self._client.close()
except Exception as exc: # noqa: BLE001
logger.warning("inbox_uploads: client close error: %s", exc)
def __enter__(self) -> "BatchFetcher":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.close()
# ---------------------------------------------------------------------------
# URI rewrite for incoming chat messages
# ---------------------------------------------------------------------------

View File

@ -577,6 +577,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
# ---------------------------------------------------------------------------
# Phase 5b — BatchFetcher integration with the poll loop
# ---------------------------------------------------------------------------
#
# These tests pin the cross-module contract between inbox._poll_once and
# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted
# to a single BatchFetcher AND drained (URI cache populated) before any
# subsequent message row is processed. Without the drain, the
# rewrite_request_body path inside message_from_activity surfaces the
# un-rewritten ``platform-pending:`` URI to the agent.
def _upload_row(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "chat_upload_receive",
"summary": f"chat_upload_receive: {file_id}.pdf",
"request_body": {
"file_id": file_id,
"name": f"{file_id}.pdf",
"uri": f"platform-pending:ws-1/{file_id}",
"mimeType": "application/pdf",
"size": 3,
},
"created_at": "2026-05-04T10:00:00Z",
}
def _message_row_referencing(act_id: str, file_id: str) -> dict:
return {
"id": act_id,
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {
"params": {
"message": {
"parts": [
{"kind": "text", "text": "have a look"},
{
"kind": "file",
"file": {
"uri": f"platform-pending:ws-1/{file_id}",
"name": f"{file_id}.pdf",
},
},
]
}
}
},
"created_at": "2026-05-04T10:00:01Z",
}
def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"):
"""Replace ``httpx.Client`` so:
- GET /activity returns ``activity_rows``
- GET /workspaces/.../content returns ``upload_bytes`` with content-type
- POST /ack returns 200
Returns the patch context manager; tests use ``with p:``. Each new
Client(...) gets a fresh MagicMock so the test can verify
constructor-count expectations without pinning singletons.
"""
def _client_factory(*args, **kwargs):
c = MagicMock()
c.__enter__ = MagicMock(return_value=c)
c.__exit__ = MagicMock(return_value=False)
def _get(url, params=None, headers=None):
if "/activity" in url:
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = activity_rows
resp.text = ""
return resp
if "/pending-uploads/" in url and "/content" in url:
resp = MagicMock()
resp.status_code = 200
resp.content = upload_bytes
resp.headers = {"content-type": "application/pdf"}
resp.text = ""
return resp
resp = MagicMock()
resp.status_code = 404
resp.text = ""
return resp
def _post(url, headers=None):
resp = MagicMock()
resp.status_code = 200
resp.text = ""
return resp
c.get = MagicMock(side_effect=_get)
c.post = MagicMock(side_effect=_post)
c.close = MagicMock()
return c
return patch("httpx.Client", side_effect=_client_factory)
def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path):
"""The chat-message row's file.uri MUST be rewritten to the local
workspace: URI by the time it lands in the InboxState queue. This
requires BatchFetcher.wait_all() to run before message_from_activity
on the second row.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
# Sandbox the on-disk staging dir so the test can't pollute the
# workspace's real chat-uploads.
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [
_upload_row("act-1", "file-A"),
_message_row_referencing("act-2", "file-A"),
]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)"
queued = state.peek(10)
assert len(queued) == 1
# The contract this test exists to pin: the platform-pending: URI
# was rewritten to workspace: BEFORE the message landed in the
# state queue. message_from_activity mutates row['request_body']
# in-place, so the rewritten URI is observable on the row dict
# we passed in.
rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1]
assert rewritten_part["file"]["uri"].startswith("workspace:"), (
f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; "
"rewrite_request_body ran before BatchFetcher.wait_all populated the cache"
)
# Cursor advanced past BOTH rows — upload-receive (act-1) is
# acknowledged via the inbox cursor regardless of fetch outcome.
assert state.load_cursor() == "act-2"
def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path):
"""End-of-batch drain: a poll that contains ONLY upload rows (no
chat-message row to trigger the inline drain) must still drain the
BatchFetcher before _poll_once returns. Otherwise a future poll
that picks up the corresponding chat-message row would race with
in-flight fetches from the previous batch.
"""
import inbox_uploads
inbox_uploads.get_cache().clear()
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
try:
rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")]
state.save_cursor("act-old")
with _patch_httpx_routing(rows, upload_bytes=b"PDF"):
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# By the time _poll_once returned, the URI cache must be hot
# for both file_ids — proves the end-of-loop drain ran.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None
finally:
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
inbox_uploads.get_cache().clear()
# Upload rows are NOT message rows; queue stays empty.
assert n == 0
# Cursor advances past both upload rows.
assert state.load_cursor() == "act-2"
def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState):
"""A batch with no upload-receive rows must not pay the BatchFetcher
construction cost the executor + httpx client allocation is
deferred until the first upload row appears.
"""
import inbox_uploads
constructed: list[Any] = []
def _patched_init(self, **kwargs):
constructed.append(kwargs)
# Don't actually run __init__; we never hit submit/wait_all.
self._closed = False
self._futures = []
self._executor = MagicMock()
self._client = MagicMock()
self._own_client = False
rows = [
{
"id": "act-1",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "hi"}]},
"created_at": "2026-04-30T22:00:00Z",
},
]
state.save_cursor("act-old")
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 1
assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present"
def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch):
"""When CONFIGS_DIR is unset, the cursor path resolves through
configs_dir.resolve() /configs in-container, ~/.molecule-workspace

View File

@ -695,3 +695,342 @@ def test_rewrite_request_body_handles_non_list_parts():
def test_rewrite_request_body_handles_non_dict_file():
body = {"parts": [{"kind": "file", "file": "not a dict"}]}
inbox_uploads.rewrite_request_body(body) # must not raise
# ---------------------------------------------------------------------------
# fetch_and_stage with shared client — Phase 5b client-reuse contract
# ---------------------------------------------------------------------------
#
# When a caller passes ``client=`` to fetch_and_stage, that client must be
# used for BOTH the GET /content and the POST /ack — no fresh
# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b
# implementation made one new client for GET and another for ack; the new
# shape lets BatchFetcher share one connection pool across an entire batch.
def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch):
row = _row(uri="platform-pending:ws-1/file-1")
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
ack_resp = _make_resp(200)
supplied = MagicMock()
supplied.get = MagicMock(return_value=get_resp)
supplied.post = MagicMock(return_value=ack_resp)
# Sentinel: any code path that constructs httpx.Client when one was
# already supplied is a regression — count constructions.
constructed: list[Any] = []
class _ShouldNotBeCalled:
def __init__(self, *a, **kw):
constructed.append((a, kw))
monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled)
local_uri = inbox_uploads.fetch_and_stage(
row,
platform_url="http://plat",
workspace_id="ws-1",
headers={"Authorization": "Bearer t"},
client=supplied,
)
assert local_uri is not None
assert constructed == [], "supplied client must be reused; no new Client should be constructed"
# GET + POST ack both went through the supplied client.
supplied.get.assert_called_once()
supplied.post.assert_called_once()
# Caller-owned client must NOT be closed by fetch_and_stage; the
# batch fetcher (or test) closes it once the whole batch is done.
supplied.close.assert_not_called()
def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch):
row = _row(uri="platform-pending:ws-1/file-1")
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
ack_resp = _make_resp(200)
built: list[MagicMock] = []
def _factory(*args, **kwargs):
c = MagicMock()
c.get = MagicMock(return_value=get_resp)
c.post = MagicMock(return_value=ack_resp)
built.append(c)
return c
monkeypatch.setattr("httpx.Client", _factory)
local_uri = inbox_uploads.fetch_and_stage(
row, platform_url="http://plat", workspace_id="ws-1", headers={}
)
assert local_uri is not None
# Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one.
assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}"
# Same client must serve BOTH calls.
built[0].get.assert_called_once()
built[0].post.assert_called_once()
# Owned client must be closed by fetch_and_stage on the way out.
built[0].close.assert_called_once()
def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client():
# Even on failure the supplied client must not be closed — the
# BatchFetcher owns the lifecycle for the whole batch.
row = _row(uri="platform-pending:ws-1/file-1")
supplied = MagicMock()
supplied.get = MagicMock(side_effect=RuntimeError("network down"))
supplied.post = MagicMock() # should not be reached on GET failure
inbox_uploads.fetch_and_stage(
row,
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=supplied,
)
supplied.close.assert_not_called()
supplied.post.assert_not_called()
# ---------------------------------------------------------------------------
# BatchFetcher — concurrent fetch + URI cache barrier
# ---------------------------------------------------------------------------
def _row_with_id(act_id: str, file_id: str) -> dict:
"""Helper: an upload-receive row with a distinct activity id + file id."""
return {
"id": act_id,
"method": "chat_upload_receive",
"request_body": {
"file_id": file_id,
"name": f"{file_id}.pdf",
"uri": f"platform-pending:ws-1/{file_id}",
"mimeType": "application/pdf",
"size": 1,
},
}
def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock:
"""Build one MagicMock client that returns per-file_id responses
based on the file_id segment of the URL.
"""
client = MagicMock()
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
for fid, resp in get_responses.items():
if f"/pending-uploads/{fid}/content" in url:
return resp
return _make_resp(404)
def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock:
return _make_resp(200)
client.get = MagicMock(side_effect=_get)
client.post = MagicMock(side_effect=_post)
return client
def test_batch_fetcher_runs_submitted_rows_concurrently():
# Three rows whose .get() blocks for ~120ms each. With 4 workers the
# batch should complete in ~120ms (parallel), not ~360ms (serial).
# The 250ms ceiling accommodates CI scheduler jitter while still
# discriminating concurrent (~120ms) from serial (~360ms).
import time
barrier_start = [0.0]
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
time.sleep(0.12)
for fid in ("a", "b", "c"):
if f"/pending-uploads/{fid}/content" in url:
return _make_resp(200, content=b"X", content_type="text/plain")
return _make_resp(404)
client = MagicMock()
client.get = MagicMock(side_effect=_slow_get)
client.post = MagicMock(return_value=_make_resp(200))
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat",
workspace_id="ws-1",
headers={},
client=client,
max_workers=4,
)
barrier_start[0] = time.time()
for fid in ("a", "b", "c"):
bf.submit(_row_with_id(f"act-{fid}", fid))
bf.wait_all()
elapsed = time.time() - barrier_start[0]
bf.close()
assert elapsed < 0.25, (
f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s "
"(suggests serial execution — Phase 5b regression)"
)
assert client.get.call_count == 3
assert client.post.call_count == 3
def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated():
"""Pin the correctness invariant: when wait_all returns, the URI
cache is hot for every submitted row. Without this barrier the
inbox loop would process the chat-message row before its uploads
were staged, and rewrite_request_body would surface the un-rewritten
platform-pending: URI to the agent.
"""
import time
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
time.sleep(0.05)
return _make_resp(200, content=b"data", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_slow_get)
client.post = MagicMock(return_value=_make_resp(200))
inbox_uploads.get_cache().clear()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-a", "a"))
bf.submit(_row_with_id("act-b", "b"))
bf.wait_all()
# Cache must be hot for BOTH rows by the time wait_all returns.
assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None
assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None
def test_batch_fetcher_isolates_per_row_failure():
"""One failing fetch must not abort siblings. Sibling rows complete,
URI cache populates for them; the bad row's cache entry stays absent.
"""
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
if "/pending-uploads/bad/content" in url:
return _make_resp(500, text="upstream broken")
return _make_resp(200, content=b"ok", content_type="text/plain")
client = MagicMock()
client.get = MagicMock(side_effect=_get)
client.post = MagicMock(return_value=_make_resp(200))
inbox_uploads.get_cache().clear()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-1", "good1"))
bf.submit(_row_with_id("act-2", "bad"))
bf.submit(_row_with_id("act-3", "good2"))
bf.wait_all()
cache = inbox_uploads.get_cache()
assert cache.get("platform-pending:ws-1/good1") is not None
assert cache.get("platform-pending:ws-1/good2") is not None
assert cache.get("platform-pending:ws-1/bad") is None
def test_batch_fetcher_reuses_one_client_across_all_submits():
"""Every row in the batch must share the same client instance. This
is the connection-pool-reuse leg of the perf win: a second fetch
to the same host reuses the TCP+TLS handshake from the first.
"""
client = MagicMock()
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
client.post = MagicMock(return_value=_make_resp(200))
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
for fid in ("a", "b", "c"):
bf.submit(_row_with_id(f"act-{fid}", fid))
bf.wait_all()
# 3 GETs + 3 POST acks all on the same client — no per-row Client
# construction.
assert client.get.call_count == 3
assert client.post.call_count == 3
def test_batch_fetcher_close_idempotent():
client = MagicMock()
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
)
bf.close()
bf.close() # second call must not raise
def test_batch_fetcher_submit_after_close_raises():
client = MagicMock()
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
)
bf.close()
with pytest.raises(RuntimeError, match="submit after close"):
bf.submit(_row_with_id("act-x", "x"))
def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch):
built: list[MagicMock] = []
def _factory(*args, **kwargs):
c = MagicMock()
c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
c.post = MagicMock(return_value=_make_resp(200))
built.append(c)
return c
monkeypatch.setattr("httpx.Client", _factory)
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}
)
bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
bf.close()
assert len(built) == 1, "expected one owned client per BatchFetcher"
built[0].close.assert_called_once()
def test_batch_fetcher_does_not_close_supplied_client():
client = MagicMock()
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
client.post = MagicMock(return_value=_make_resp(200))
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
# Supplied client survives the BatchFetcher's close — caller's lifecycle.
client.close.assert_not_called()
def test_batch_fetcher_wait_all_no_op_on_empty_batch():
client = MagicMock()
with inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
) as bf:
bf.wait_all() # nothing submitted; must not block, must not raise
client.get.assert_not_called()
client.post.assert_not_called()
def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch):
# No client supplied + httpx import fails → BatchFetcher degrades
# gracefully: submit() returns None and the row is silently skipped.
import sys
real_httpx = sys.modules.pop("httpx", None)
monkeypatch.setitem(sys.modules, "httpx", None)
try:
bf = inbox_uploads.BatchFetcher(
platform_url="http://plat", workspace_id="ws-1", headers={}
)
result = bf.submit(_row_with_id("act-a", "a"))
bf.wait_all()
bf.close()
finally:
if real_httpx is not None:
sys.modules["httpx"] = real_httpx
else:
sys.modules.pop("httpx", None)
assert result is None