Merge pull request #2909 from Molecule-AI/feat/poll-mode-chat-upload-phase5b
feat(poll-upload): phase 5b — concurrent BatchFetcher + httpx client reuse
This commit is contained in:
commit
ae22a55675
@ -553,10 +553,26 @@ def _poll_once(
|
|||||||
# Imported lazily at use-site so a runtime that never sees an
|
# Imported lazily at use-site so a runtime that never sees an
|
||||||
# upload-receive row never imports the module. Cheap on the hot
|
# upload-receive row never imports the module. Cheap on the hot
|
||||||
# path because Python caches the import.
|
# path because Python caches the import.
|
||||||
from inbox_uploads import is_chat_upload_row, fetch_and_stage
|
from inbox_uploads import is_chat_upload_row, BatchFetcher
|
||||||
|
|
||||||
new_count = 0
|
new_count = 0
|
||||||
last_id: str | None = None
|
last_id: str | None = None
|
||||||
|
# ``batch_fetcher`` is lazy: a poll batch with no upload rows pays
|
||||||
|
# zero overhead. Once the first upload row appears we open one
|
||||||
|
# BatchFetcher and submit every subsequent upload row to its thread
|
||||||
|
# pool; before processing the FIRST non-upload row we drain the
|
||||||
|
# pool (wait_all) so the URI cache is hot when message rewriting
|
||||||
|
# runs. Without the barrier, the chat message that references the
|
||||||
|
# upload would arrive at the agent with the un-rewritten
|
||||||
|
# platform-pending: URI.
|
||||||
|
batch_fetcher: BatchFetcher | None = None
|
||||||
|
|
||||||
|
def _drain_uploads(bf: BatchFetcher | None) -> None:
|
||||||
|
if bf is None:
|
||||||
|
return
|
||||||
|
bf.wait_all()
|
||||||
|
bf.close()
|
||||||
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
if not isinstance(row, dict):
|
if not isinstance(row, dict):
|
||||||
continue
|
continue
|
||||||
@ -570,14 +586,21 @@ def _poll_once(
|
|||||||
# message_from_activity. We DO advance the cursor past
|
# message_from_activity. We DO advance the cursor past
|
||||||
# this row so a permanent network outage on /content
|
# this row so a permanent network outage on /content
|
||||||
# doesn't stall the cursor and block real chat traffic.
|
# doesn't stall the cursor and block real chat traffic.
|
||||||
fetch_and_stage(
|
if batch_fetcher is None:
|
||||||
row,
|
batch_fetcher = BatchFetcher(
|
||||||
platform_url=platform_url,
|
platform_url=platform_url,
|
||||||
workspace_id=workspace_id,
|
workspace_id=workspace_id,
|
||||||
headers=headers,
|
headers=headers,
|
||||||
)
|
)
|
||||||
|
batch_fetcher.submit(row)
|
||||||
last_id = str(row.get("id", "")) or last_id
|
last_id = str(row.get("id", "")) or last_id
|
||||||
continue
|
continue
|
||||||
|
# Non-upload row: drain any pending uploads first so the URI
|
||||||
|
# cache is populated before we run rewrite_request_body /
|
||||||
|
# message_from_activity on a row that may reference one.
|
||||||
|
if batch_fetcher is not None:
|
||||||
|
_drain_uploads(batch_fetcher)
|
||||||
|
batch_fetcher = None
|
||||||
if _is_self_notify_row(row):
|
if _is_self_notify_row(row):
|
||||||
# The workspace-server's `/notify` handler writes the agent's
|
# The workspace-server's `/notify` handler writes the agent's
|
||||||
# own send_message_to_user POSTs to activity_logs with
|
# own send_message_to_user POSTs to activity_logs with
|
||||||
@ -612,6 +635,13 @@ def _poll_once(
|
|||||||
last_id = message.activity_id
|
last_id = message.activity_id
|
||||||
new_count += 1
|
new_count += 1
|
||||||
|
|
||||||
|
# Drain any uploads still in flight if the batch ended with upload
|
||||||
|
# rows (no chat-message row to trigger the inline drain). Without
|
||||||
|
# this, a future poll that picks up the chat-message row first
|
||||||
|
# would race with the still-running fetches.
|
||||||
|
if batch_fetcher is not None:
|
||||||
|
_drain_uploads(batch_fetcher)
|
||||||
|
|
||||||
if last_id is not None:
|
if last_id is not None:
|
||||||
state.save_cursor(last_id, cursor_key)
|
state.save_cursor(last_id, cursor_key)
|
||||||
return new_count
|
return new_count
|
||||||
@ -654,6 +684,7 @@ def start_poller_thread(
|
|||||||
platform_url: str,
|
platform_url: str,
|
||||||
workspace_id: str,
|
workspace_id: str,
|
||||||
interval: float = POLL_INTERVAL_SECONDS,
|
interval: float = POLL_INTERVAL_SECONDS,
|
||||||
|
stop_event: threading.Event | None = None,
|
||||||
) -> threading.Thread:
|
) -> threading.Thread:
|
||||||
"""Spawn the poller as a daemon thread. Returns the Thread handle.
|
"""Spawn the poller as a daemon thread. Returns the Thread handle.
|
||||||
|
|
||||||
@ -665,13 +696,18 @@ def start_poller_thread(
|
|||||||
operator running ``ps -eL`` or eyeballing ``threading.enumerate()``
|
operator running ``ps -eL`` or eyeballing ``threading.enumerate()``
|
||||||
can tell which thread is which without reverse-engineering it from
|
can tell which thread is which without reverse-engineering it from
|
||||||
crash tracebacks.
|
crash tracebacks.
|
||||||
|
|
||||||
|
Pass ``stop_event`` to enable graceful shutdown — used by tests so
|
||||||
|
the daemon thread doesn't outlive the test that started it and race
|
||||||
|
with later tests' httpx patches. Production code passes None and
|
||||||
|
relies on the daemon flag for process-exit cleanup.
|
||||||
"""
|
"""
|
||||||
name = "molecule-mcp-inbox-poller"
|
name = "molecule-mcp-inbox-poller"
|
||||||
if workspace_id:
|
if workspace_id:
|
||||||
name = f"{name}-{workspace_id[:8]}"
|
name = f"{name}-{workspace_id[:8]}"
|
||||||
t = threading.Thread(
|
t = threading.Thread(
|
||||||
target=_poll_loop,
|
target=_poll_loop,
|
||||||
args=(state, platform_url, workspace_id, interval),
|
args=(state, platform_url, workspace_id, interval, stop_event),
|
||||||
name=name,
|
name=name,
|
||||||
daemon=True,
|
daemon=True,
|
||||||
)
|
)
|
||||||
|
|||||||
@ -37,6 +37,7 @@ read another tenant's bytes even if a token is misrouted.
|
|||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
import logging
|
import logging
|
||||||
import mimetypes
|
import mimetypes
|
||||||
import os
|
import os
|
||||||
@ -68,6 +69,24 @@ MAX_FILE_BYTES = 25 * 1024 * 1024
|
|||||||
# 10s default for /activity calls — both are user-perceived latency.
|
# 10s default for /activity calls — both are user-perceived latency.
|
||||||
DEFAULT_FETCH_TIMEOUT = 60.0
|
DEFAULT_FETCH_TIMEOUT = 60.0
|
||||||
|
|
||||||
|
# Concurrency cap for ``BatchFetcher``. Four workers is enough headroom
|
||||||
|
# for the realistic "user dragged 3-4 files into chat at once" case
|
||||||
|
# while bounding the platform's per-workspace fan-out. The cap matters
|
||||||
|
# because the platform's /content endpoint reads bytea from Postgres in
|
||||||
|
# a single round-trip per request — N workers = N concurrent DB reads
|
||||||
|
# of up to 25 MB each, so a higher cap could pressure platform memory
|
||||||
|
# without much UX win (network bandwidth is the bottleneck once the
|
||||||
|
# bytes are buffered).
|
||||||
|
DEFAULT_BATCH_FETCH_WORKERS = 4
|
||||||
|
|
||||||
|
# Upper bound on how long ``BatchFetcher.wait_all`` blocks the inbox
|
||||||
|
# poll loop before giving up on still-in-flight fetches. Aligned with
|
||||||
|
# DEFAULT_FETCH_TIMEOUT so a single hung fetch can't stall the loop
|
||||||
|
# longer than its own deadline. A timeout fires only if a worker thread
|
||||||
|
# is stuck past the underlying httpx timeout — pathological case;
|
||||||
|
# normal completion is bounded by per-fetch timeout × ceil(N/W).
|
||||||
|
DEFAULT_BATCH_WAIT_TIMEOUT = DEFAULT_FETCH_TIMEOUT + 5.0
|
||||||
|
|
||||||
# Cap on the URI cache. A long-lived workspace handling thousands of
|
# Cap on the URI cache. A long-lived workspace handling thousands of
|
||||||
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
|
# uploads shouldn't grow without bound; an LRU cap of 1024 keeps the
|
||||||
# entries-needed-for-a-typical-conversation well within memory.
|
# entries-needed-for-a-typical-conversation well within memory.
|
||||||
@ -275,6 +294,7 @@ def fetch_and_stage(
|
|||||||
workspace_id: str,
|
workspace_id: str,
|
||||||
headers: dict[str, str],
|
headers: dict[str, str],
|
||||||
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
|
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
|
||||||
|
client: Any = None,
|
||||||
) -> str | None:
|
) -> str | None:
|
||||||
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
|
"""Fetch the row's bytes, stage them under chat-uploads, and ack.
|
||||||
|
|
||||||
@ -289,6 +309,11 @@ def fetch_and_stage(
|
|||||||
On success, the URI cache is updated so a subsequent chat message
|
On success, the URI cache is updated so a subsequent chat message
|
||||||
referencing the same ``platform-pending:`` URI is rewritten before
|
referencing the same ``platform-pending:`` URI is rewritten before
|
||||||
the agent sees it.
|
the agent sees it.
|
||||||
|
|
||||||
|
Pass ``client`` to reuse a shared ``httpx.Client`` for both GET and
|
||||||
|
POST ack (saves one TLS handshake per row vs. constructing one
|
||||||
|
per-call). ``BatchFetcher`` does this across an entire poll batch so
|
||||||
|
N concurrent fetches share one connection pool.
|
||||||
"""
|
"""
|
||||||
body = _request_body_dict(row)
|
body = _request_body_dict(row)
|
||||||
if body is None:
|
if body is None:
|
||||||
@ -317,25 +342,58 @@ def fetch_and_stage(
|
|||||||
if not isinstance(filename, str):
|
if not isinstance(filename, str):
|
||||||
filename = "file"
|
filename = "file"
|
||||||
|
|
||||||
# Lazy httpx import: the standalone MCP path uses httpx; an in-
|
# Caller-supplied client: reuse for both GET + POST ack. Otherwise
|
||||||
# container caller that imports this module by accident shouldn't
|
# build a one-shot client and close it on the way out. Lazy httpx
|
||||||
# explode at import time.
|
# import keeps the standalone MCP path's optional dep optional.
|
||||||
try:
|
own_client = client is None
|
||||||
import httpx # noqa: WPS433
|
if own_client:
|
||||||
except ImportError:
|
try:
|
||||||
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
|
import httpx # noqa: WPS433
|
||||||
return None
|
except ImportError:
|
||||||
|
logger.error("inbox_uploads: httpx not installed; cannot fetch %s", file_id)
|
||||||
|
return None
|
||||||
|
client = httpx.Client(timeout=timeout_secs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return _fetch_and_stage_with_client(
|
||||||
|
client,
|
||||||
|
platform_url=platform_url,
|
||||||
|
workspace_id=workspace_id,
|
||||||
|
headers=headers,
|
||||||
|
file_id=file_id,
|
||||||
|
pending_uri=pending_uri,
|
||||||
|
filename=filename,
|
||||||
|
body=body,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
if own_client:
|
||||||
|
try:
|
||||||
|
client.close()
|
||||||
|
except Exception: # noqa: BLE001 — close should never crash the caller
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_and_stage_with_client(
|
||||||
|
client: Any,
|
||||||
|
*,
|
||||||
|
platform_url: str,
|
||||||
|
workspace_id: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
file_id: str,
|
||||||
|
pending_uri: str,
|
||||||
|
filename: str,
|
||||||
|
body: dict[str, Any],
|
||||||
|
) -> str | None:
|
||||||
|
"""Inner body of fetch_and_stage. Always uses the supplied client for
|
||||||
|
both GET and POST so the connection pool is shared across the call.
|
||||||
|
"""
|
||||||
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
|
content_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/content"
|
||||||
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
|
ack_url = f"{platform_url}/workspaces/{workspace_id}/pending-uploads/{file_id}/ack"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with httpx.Client(timeout=timeout_secs) as client:
|
resp = client.get(content_url, headers=headers)
|
||||||
resp = client.get(content_url, headers=headers)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
logger.warning(
|
logger.warning("inbox_uploads: GET %s failed: %s", content_url, exc)
|
||||||
"inbox_uploads: GET %s failed: %s", content_url, exc
|
|
||||||
)
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if resp.status_code == 404:
|
if resp.status_code == 404:
|
||||||
@ -403,8 +461,7 @@ def fetch_and_stage(
|
|||||||
# back the on-disk file — the platform's sweep will clean up
|
# back the on-disk file — the platform's sweep will clean up
|
||||||
# eventually.
|
# eventually.
|
||||||
try:
|
try:
|
||||||
with httpx.Client(timeout=timeout_secs) as client:
|
ack_resp = client.post(ack_url, headers=headers)
|
||||||
ack_resp = client.post(ack_url, headers=headers)
|
|
||||||
if ack_resp.status_code >= 400:
|
if ack_resp.status_code >= 400:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"inbox_uploads: ack %s returned %d: %s",
|
"inbox_uploads: ack %s returned %d: %s",
|
||||||
@ -418,6 +475,168 @@ def fetch_and_stage(
|
|||||||
return local_uri
|
return local_uri
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# BatchFetcher — concurrent fetch across a single poll batch
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class BatchFetcher:
|
||||||
|
"""Fetch + stage + ack a batch of upload-receive rows concurrently.
|
||||||
|
|
||||||
|
Why this exists: the inbox poll loop used to call ``fetch_and_stage``
|
||||||
|
serially per row. With N upload rows in a batch (a user dragging
|
||||||
|
multiple files into chat at once), the loop blocked for
|
||||||
|
``N × per_fetch_latency`` before processing the chat message that
|
||||||
|
referenced them — a 4-file upload at 5s each = 20s of stall
|
||||||
|
before the agent saw the user's prompt. ``BatchFetcher`` runs the
|
||||||
|
fetches on a small thread pool (default 4 workers) so the stall is
|
||||||
|
bounded by ``ceil(N/W) × per_fetch_latency`` instead.
|
||||||
|
|
||||||
|
Connection reuse: one ``httpx.Client`` is shared across every fetch
|
||||||
|
in the batch. httpx clients carry a connection pool, so a second
|
||||||
|
fetch to the same platform host reuses the TCP+TLS handshake from
|
||||||
|
the first — measurable win when fetches happen back-to-back.
|
||||||
|
|
||||||
|
Correctness invariant the caller MUST preserve: the inbox loop is
|
||||||
|
expected to call ``wait_all()`` before processing the chat-message
|
||||||
|
activity row that REFERENCES one of these uploads. Without the
|
||||||
|
barrier, the URI cache is empty when ``rewrite_request_body`` runs
|
||||||
|
and the agent sees the un-rewritten ``platform-pending:`` URI. The
|
||||||
|
caller-side test ``test_poll_once_waits_for_uploads_before_messages``
|
||||||
|
pins this end-to-end.
|
||||||
|
|
||||||
|
Use as a context manager so the executor + client are torn down
|
||||||
|
even if the caller raises mid-batch.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
platform_url: str,
|
||||||
|
workspace_id: str,
|
||||||
|
headers: dict[str, str],
|
||||||
|
timeout_secs: float = DEFAULT_FETCH_TIMEOUT,
|
||||||
|
max_workers: int = DEFAULT_BATCH_FETCH_WORKERS,
|
||||||
|
client: Any = None,
|
||||||
|
):
|
||||||
|
self._platform_url = platform_url
|
||||||
|
self._workspace_id = workspace_id
|
||||||
|
self._headers = dict(headers) # copy so caller mutations don't leak in
|
||||||
|
self._timeout_secs = timeout_secs
|
||||||
|
|
||||||
|
# Caller can inject a client (tests do this); production callers
|
||||||
|
# let us build one. Track ownership so we only close ours.
|
||||||
|
self._own_client = client is None
|
||||||
|
if self._own_client:
|
||||||
|
try:
|
||||||
|
import httpx # noqa: WPS433
|
||||||
|
except ImportError:
|
||||||
|
# Match fetch_and_stage's behavior: log + degrade rather
|
||||||
|
# than raising at construction time. submit() will then
|
||||||
|
# return None for every row.
|
||||||
|
logger.error("inbox_uploads: httpx not installed; BatchFetcher inert")
|
||||||
|
self._client: Any = None
|
||||||
|
else:
|
||||||
|
self._client = httpx.Client(timeout=timeout_secs)
|
||||||
|
else:
|
||||||
|
self._client = client
|
||||||
|
|
||||||
|
self._executor = concurrent.futures.ThreadPoolExecutor(
|
||||||
|
max_workers=max_workers,
|
||||||
|
thread_name_prefix="upload-fetch",
|
||||||
|
)
|
||||||
|
self._futures: list[concurrent.futures.Future[Any]] = []
|
||||||
|
self._closed = False
|
||||||
|
|
||||||
|
def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None:
|
||||||
|
"""Submit ``row`` for fetch + stage + ack. Non-blocking — the
|
||||||
|
worker thread runs ``fetch_and_stage`` with the shared client.
|
||||||
|
|
||||||
|
Returns the Future so a caller that wants per-row outcome can
|
||||||
|
await it; ``None`` if the BatchFetcher is in a degraded state
|
||||||
|
(httpx missing).
|
||||||
|
"""
|
||||||
|
if self._closed:
|
||||||
|
raise RuntimeError("BatchFetcher: submit after close")
|
||||||
|
if self._client is None:
|
||||||
|
return None
|
||||||
|
fut = self._executor.submit(
|
||||||
|
fetch_and_stage,
|
||||||
|
row,
|
||||||
|
platform_url=self._platform_url,
|
||||||
|
workspace_id=self._workspace_id,
|
||||||
|
headers=self._headers,
|
||||||
|
timeout_secs=self._timeout_secs,
|
||||||
|
client=self._client,
|
||||||
|
)
|
||||||
|
self._futures.append(fut)
|
||||||
|
return fut
|
||||||
|
|
||||||
|
def wait_all(self, timeout: float | None = DEFAULT_BATCH_WAIT_TIMEOUT) -> None:
|
||||||
|
"""Block until every submitted future completes (or times out).
|
||||||
|
|
||||||
|
Per-future exceptions are logged + swallowed — ``fetch_and_stage``
|
||||||
|
already converts every error path to ``return None``, so a real
|
||||||
|
exception propagating up to here is unexpected and we don't want
|
||||||
|
one bad fetch to abort the whole batch.
|
||||||
|
|
||||||
|
Timeouts are also logged + swallowed; the caller will move on
|
||||||
|
and the un-acked rows will be retried by the next poll.
|
||||||
|
"""
|
||||||
|
if not self._futures:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
done, not_done = concurrent.futures.wait(
|
||||||
|
self._futures,
|
||||||
|
timeout=timeout,
|
||||||
|
return_when=concurrent.futures.ALL_COMPLETED,
|
||||||
|
)
|
||||||
|
except Exception as exc: # noqa: BLE001 — concurrent.futures shouldn't raise here
|
||||||
|
logger.warning("inbox_uploads: BatchFetcher.wait_all crashed: %s", exc)
|
||||||
|
return
|
||||||
|
for fut in done:
|
||||||
|
exc = fut.exception()
|
||||||
|
if exc is not None:
|
||||||
|
logger.warning(
|
||||||
|
"inbox_uploads: BatchFetcher worker raised: %s", exc
|
||||||
|
)
|
||||||
|
if not_done:
|
||||||
|
logger.warning(
|
||||||
|
"inbox_uploads: BatchFetcher.wait_all left %d in-flight after %ss timeout",
|
||||||
|
len(not_done),
|
||||||
|
timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""Tear down the executor + (if owned) the httpx client.
|
||||||
|
|
||||||
|
Idempotent. After close, ``submit`` raises and the BatchFetcher
|
||||||
|
cannot be reused — construct a fresh one for the next poll.
|
||||||
|
"""
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
|
self._closed = True
|
||||||
|
# Drain remaining futures so worker threads aren't killed mid-
|
||||||
|
# request. wait=True is the safe default; for an inbox poller a
|
||||||
|
# 60s tail at shutdown is acceptable since uploads in flight are
|
||||||
|
# the only thing close() is called between.
|
||||||
|
try:
|
||||||
|
self._executor.shutdown(wait=True)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("inbox_uploads: executor shutdown error: %s", exc)
|
||||||
|
if self._own_client and self._client is not None:
|
||||||
|
try:
|
||||||
|
self._client.close()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("inbox_uploads: client close error: %s", exc)
|
||||||
|
|
||||||
|
def __enter__(self) -> "BatchFetcher":
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# URI rewrite for incoming chat messages
|
# URI rewrite for incoming chat messages
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@ -555,16 +555,34 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat
|
|||||||
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
|
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
|
||||||
"""Daemon flag is required so the poller dies with the parent
|
"""Daemon flag is required so the poller dies with the parent
|
||||||
process; a non-daemon poller would leak across `claude` restarts
|
process; a non-daemon poller would leak across `claude` restarts
|
||||||
and write to a stale workspace."""
|
and write to a stale workspace.
|
||||||
|
|
||||||
|
Stop_event is plumbed so the thread cleans up at the end of the
|
||||||
|
test instead of leaking into later tests. Without cleanup, the
|
||||||
|
daemon's ~10ms tick races with later tests that patch httpx.Client
|
||||||
|
— the leaked thread sees their patched response and runs an
|
||||||
|
unwanted iteration of _poll_once that double-counts mocked calls
|
||||||
|
(caught when test_batch_fetcher_owns_client_when_not_supplied
|
||||||
|
surfaced this on Python 3.11 CI but not 3.13 local).
|
||||||
|
"""
|
||||||
resp = _make_response(200, [])
|
resp = _make_response(200, [])
|
||||||
p, _ = _patch_httpx(resp)
|
p, _ = _patch_httpx(resp)
|
||||||
|
stop_event = threading.Event()
|
||||||
with p, patch("platform_auth.auth_headers", return_value={}):
|
with p, patch("platform_auth.auth_headers", return_value={}):
|
||||||
# Use a very short interval so the loop body runs at least once
|
# Use a very short interval so the loop body runs at least once
|
||||||
# before we exit the test.
|
# before we exit the test.
|
||||||
t = inbox.start_poller_thread(state, "http://platform", "ws-1", interval=0.01)
|
t = inbox.start_poller_thread(
|
||||||
|
state, "http://platform", "ws-1", interval=0.01, stop_event=stop_event
|
||||||
|
)
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
assert t.daemon is True
|
assert t.daemon is True
|
||||||
assert t.is_alive()
|
assert t.is_alive()
|
||||||
|
# Signal shutdown + wait for the thread to actually exit before
|
||||||
|
# we leave the test scope. Without this join, the leaked thread
|
||||||
|
# races with later tests' httpx patches.
|
||||||
|
stop_event.set()
|
||||||
|
t.join(timeout=2.0)
|
||||||
|
assert not t.is_alive(), "poller thread did not exit on stop_event"
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@ -577,6 +595,219 @@ def test_default_cursor_path_uses_configs_dir(monkeypatch, tmp_path: Path):
|
|||||||
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
|
assert inbox.default_cursor_path() == tmp_path / ".mcp_inbox_cursor"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Phase 5b — BatchFetcher integration with the poll loop
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# These tests pin the cross-module contract between inbox._poll_once and
|
||||||
|
# inbox_uploads.BatchFetcher: chat_upload_receive rows must be submitted
|
||||||
|
# to a single BatchFetcher AND drained (URI cache populated) before any
|
||||||
|
# subsequent message row is processed. Without the drain, the
|
||||||
|
# rewrite_request_body path inside message_from_activity surfaces the
|
||||||
|
# un-rewritten ``platform-pending:`` URI to the agent.
|
||||||
|
|
||||||
|
|
||||||
|
def _upload_row(act_id: str, file_id: str) -> dict:
|
||||||
|
return {
|
||||||
|
"id": act_id,
|
||||||
|
"source_id": None,
|
||||||
|
"method": "chat_upload_receive",
|
||||||
|
"summary": f"chat_upload_receive: {file_id}.pdf",
|
||||||
|
"request_body": {
|
||||||
|
"file_id": file_id,
|
||||||
|
"name": f"{file_id}.pdf",
|
||||||
|
"uri": f"platform-pending:ws-1/{file_id}",
|
||||||
|
"mimeType": "application/pdf",
|
||||||
|
"size": 3,
|
||||||
|
},
|
||||||
|
"created_at": "2026-05-04T10:00:00Z",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _message_row_referencing(act_id: str, file_id: str) -> dict:
|
||||||
|
return {
|
||||||
|
"id": act_id,
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {
|
||||||
|
"params": {
|
||||||
|
"message": {
|
||||||
|
"parts": [
|
||||||
|
{"kind": "text", "text": "have a look"},
|
||||||
|
{
|
||||||
|
"kind": "file",
|
||||||
|
"file": {
|
||||||
|
"uri": f"platform-pending:ws-1/{file_id}",
|
||||||
|
"name": f"{file_id}.pdf",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"created_at": "2026-05-04T10:00:01Z",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _patch_httpx_routing(activity_rows: list[dict], upload_bytes: bytes = b"PDF"):
|
||||||
|
"""Replace ``httpx.Client`` so:
|
||||||
|
|
||||||
|
- GET /activity returns ``activity_rows``
|
||||||
|
- GET /workspaces/.../content returns ``upload_bytes`` with content-type
|
||||||
|
- POST /ack returns 200
|
||||||
|
|
||||||
|
Returns the patch context manager; tests use ``with p:``. Each new
|
||||||
|
Client(...) gets a fresh MagicMock so the test can verify
|
||||||
|
constructor-count expectations without pinning singletons.
|
||||||
|
"""
|
||||||
|
def _client_factory(*args, **kwargs):
|
||||||
|
c = MagicMock()
|
||||||
|
c.__enter__ = MagicMock(return_value=c)
|
||||||
|
c.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
def _get(url, params=None, headers=None):
|
||||||
|
if "/activity" in url:
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
resp.json.return_value = activity_rows
|
||||||
|
resp.text = ""
|
||||||
|
return resp
|
||||||
|
if "/pending-uploads/" in url and "/content" in url:
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
resp.content = upload_bytes
|
||||||
|
resp.headers = {"content-type": "application/pdf"}
|
||||||
|
resp.text = ""
|
||||||
|
return resp
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 404
|
||||||
|
resp.text = ""
|
||||||
|
return resp
|
||||||
|
|
||||||
|
def _post(url, headers=None):
|
||||||
|
resp = MagicMock()
|
||||||
|
resp.status_code = 200
|
||||||
|
resp.text = ""
|
||||||
|
return resp
|
||||||
|
|
||||||
|
c.get = MagicMock(side_effect=_get)
|
||||||
|
c.post = MagicMock(side_effect=_post)
|
||||||
|
c.close = MagicMock()
|
||||||
|
return c
|
||||||
|
|
||||||
|
return patch("httpx.Client", side_effect=_client_factory)
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_drains_uploads_before_processing_message_row(state: inbox.InboxState, tmp_path):
|
||||||
|
"""The chat-message row's file.uri MUST be rewritten to the local
|
||||||
|
workspace: URI by the time it lands in the InboxState queue. This
|
||||||
|
requires BatchFetcher.wait_all() to run before message_from_activity
|
||||||
|
on the second row.
|
||||||
|
"""
|
||||||
|
import inbox_uploads
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
# Sandbox the on-disk staging dir so the test can't pollute the
|
||||||
|
# workspace's real chat-uploads.
|
||||||
|
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
|
||||||
|
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
|
||||||
|
try:
|
||||||
|
rows = [
|
||||||
|
_upload_row("act-1", "file-A"),
|
||||||
|
_message_row_referencing("act-2", "file-A"),
|
||||||
|
]
|
||||||
|
state.save_cursor("act-old")
|
||||||
|
with _patch_httpx_routing(rows, upload_bytes=b"PDF-bytes"):
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
finally:
|
||||||
|
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
|
||||||
|
assert n == 1, "exactly one message row should be enqueued (the upload row is a side-effect, not a message)"
|
||||||
|
queued = state.peek(10)
|
||||||
|
assert len(queued) == 1
|
||||||
|
# The contract this test exists to pin: the platform-pending: URI
|
||||||
|
# was rewritten to workspace: BEFORE the message landed in the
|
||||||
|
# state queue. message_from_activity mutates row['request_body']
|
||||||
|
# in-place, so the rewritten URI is observable on the row dict
|
||||||
|
# we passed in.
|
||||||
|
rewritten_part = rows[1]["request_body"]["params"]["message"]["parts"][1]
|
||||||
|
assert rewritten_part["file"]["uri"].startswith("workspace:"), (
|
||||||
|
f"upload barrier broken: file.uri = {rewritten_part['file']['uri']!r}; "
|
||||||
|
"rewrite_request_body ran before BatchFetcher.wait_all populated the cache"
|
||||||
|
)
|
||||||
|
# Cursor advanced past BOTH rows — upload-receive (act-1) is
|
||||||
|
# acknowledged via the inbox cursor regardless of fetch outcome.
|
||||||
|
assert state.load_cursor() == "act-2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_with_only_upload_rows_drains_at_loop_end(state: inbox.InboxState, tmp_path):
|
||||||
|
"""End-of-batch drain: a poll that contains ONLY upload rows (no
|
||||||
|
chat-message row to trigger the inline drain) must still drain the
|
||||||
|
BatchFetcher before _poll_once returns. Otherwise a future poll
|
||||||
|
that picks up the corresponding chat-message row would race with
|
||||||
|
in-flight fetches from the previous batch.
|
||||||
|
"""
|
||||||
|
import inbox_uploads
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
real_dir = inbox_uploads.CHAT_UPLOAD_DIR
|
||||||
|
inbox_uploads.CHAT_UPLOAD_DIR = str(tmp_path / "chat-uploads")
|
||||||
|
try:
|
||||||
|
rows = [_upload_row("act-1", "file-A"), _upload_row("act-2", "file-B")]
|
||||||
|
state.save_cursor("act-old")
|
||||||
|
with _patch_httpx_routing(rows, upload_bytes=b"PDF"):
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
# By the time _poll_once returned, the URI cache must be hot
|
||||||
|
# for both file_ids — proves the end-of-loop drain ran.
|
||||||
|
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-A") is not None
|
||||||
|
assert inbox_uploads.get_cache().get("platform-pending:ws-1/file-B") is not None
|
||||||
|
finally:
|
||||||
|
inbox_uploads.CHAT_UPLOAD_DIR = real_dir
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
# Upload rows are NOT message rows; queue stays empty.
|
||||||
|
assert n == 0
|
||||||
|
# Cursor advances past both upload rows.
|
||||||
|
assert state.load_cursor() == "act-2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_no_uploads_does_not_construct_batch_fetcher(state: inbox.InboxState):
|
||||||
|
"""A batch with no upload-receive rows must not pay the BatchFetcher
|
||||||
|
construction cost — the executor + httpx client allocation is
|
||||||
|
deferred until the first upload row appears.
|
||||||
|
"""
|
||||||
|
import inbox_uploads
|
||||||
|
|
||||||
|
constructed: list[Any] = []
|
||||||
|
|
||||||
|
def _patched_init(self, **kwargs):
|
||||||
|
constructed.append(kwargs)
|
||||||
|
# Don't actually run __init__; we never hit submit/wait_all.
|
||||||
|
self._closed = False
|
||||||
|
self._futures = []
|
||||||
|
self._executor = MagicMock()
|
||||||
|
self._client = MagicMock()
|
||||||
|
self._own_client = False
|
||||||
|
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"id": "act-1",
|
||||||
|
"source_id": None,
|
||||||
|
"method": "message/send",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "hi"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
state.save_cursor("act-old")
|
||||||
|
resp = _make_response(200, rows)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with patch.object(inbox_uploads.BatchFetcher, "__init__", _patched_init), p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
|
||||||
|
assert n == 1
|
||||||
|
assert constructed == [], "BatchFetcher must not be constructed when no upload rows are present"
|
||||||
|
|
||||||
|
|
||||||
def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch):
|
def test_default_cursor_path_falls_back_to_default(tmp_path, monkeypatch):
|
||||||
"""When CONFIGS_DIR is unset, the cursor path resolves through
|
"""When CONFIGS_DIR is unset, the cursor path resolves through
|
||||||
configs_dir.resolve() — /configs in-container, ~/.molecule-workspace
|
configs_dir.resolve() — /configs in-container, ~/.molecule-workspace
|
||||||
|
|||||||
@ -695,3 +695,342 @@ def test_rewrite_request_body_handles_non_list_parts():
|
|||||||
def test_rewrite_request_body_handles_non_dict_file():
|
def test_rewrite_request_body_handles_non_dict_file():
|
||||||
body = {"parts": [{"kind": "file", "file": "not a dict"}]}
|
body = {"parts": [{"kind": "file", "file": "not a dict"}]}
|
||||||
inbox_uploads.rewrite_request_body(body) # must not raise
|
inbox_uploads.rewrite_request_body(body) # must not raise
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# fetch_and_stage with shared client — Phase 5b client-reuse contract
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# When a caller passes ``client=`` to fetch_and_stage, that client must be
|
||||||
|
# used for BOTH the GET /content and the POST /ack — no fresh
|
||||||
|
# ``httpx.Client(...)`` constructions should happen. The pre-Phase-5b
|
||||||
|
# implementation made one new client for GET and another for ack; the new
|
||||||
|
# shape lets BatchFetcher share one connection pool across an entire batch.
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_and_stage_with_supplied_client_does_not_construct_new_client(monkeypatch):
|
||||||
|
row = _row(uri="platform-pending:ws-1/file-1")
|
||||||
|
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
|
||||||
|
ack_resp = _make_resp(200)
|
||||||
|
supplied = MagicMock()
|
||||||
|
supplied.get = MagicMock(return_value=get_resp)
|
||||||
|
supplied.post = MagicMock(return_value=ack_resp)
|
||||||
|
# Sentinel: any code path that constructs httpx.Client when one was
|
||||||
|
# already supplied is a regression — count constructions.
|
||||||
|
constructed: list[Any] = []
|
||||||
|
|
||||||
|
class _ShouldNotBeCalled:
|
||||||
|
def __init__(self, *a, **kw):
|
||||||
|
constructed.append((a, kw))
|
||||||
|
|
||||||
|
monkeypatch.setattr("httpx.Client", _ShouldNotBeCalled)
|
||||||
|
|
||||||
|
local_uri = inbox_uploads.fetch_and_stage(
|
||||||
|
row,
|
||||||
|
platform_url="http://plat",
|
||||||
|
workspace_id="ws-1",
|
||||||
|
headers={"Authorization": "Bearer t"},
|
||||||
|
client=supplied,
|
||||||
|
)
|
||||||
|
assert local_uri is not None
|
||||||
|
assert constructed == [], "supplied client must be reused; no new Client should be constructed"
|
||||||
|
# GET + POST ack both went through the supplied client.
|
||||||
|
supplied.get.assert_called_once()
|
||||||
|
supplied.post.assert_called_once()
|
||||||
|
# Caller-owned client must NOT be closed by fetch_and_stage; the
|
||||||
|
# batch fetcher (or test) closes it once the whole batch is done.
|
||||||
|
supplied.close.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_and_stage_without_supplied_client_constructs_and_closes_one(monkeypatch):
|
||||||
|
row = _row(uri="platform-pending:ws-1/file-1")
|
||||||
|
get_resp = _make_resp(200, content=b"PDF", content_type="application/pdf")
|
||||||
|
ack_resp = _make_resp(200)
|
||||||
|
built: list[MagicMock] = []
|
||||||
|
|
||||||
|
def _factory(*args, **kwargs):
|
||||||
|
c = MagicMock()
|
||||||
|
c.get = MagicMock(return_value=get_resp)
|
||||||
|
c.post = MagicMock(return_value=ack_resp)
|
||||||
|
built.append(c)
|
||||||
|
return c
|
||||||
|
|
||||||
|
monkeypatch.setattr("httpx.Client", _factory)
|
||||||
|
|
||||||
|
local_uri = inbox_uploads.fetch_and_stage(
|
||||||
|
row, platform_url="http://plat", workspace_id="ws-1", headers={}
|
||||||
|
)
|
||||||
|
assert local_uri is not None
|
||||||
|
# Pre-Phase-5b built TWO clients (one for GET, one for ack); now exactly one.
|
||||||
|
assert len(built) == 1, f"expected 1 httpx.Client construction, got {len(built)}"
|
||||||
|
# Same client must serve BOTH calls.
|
||||||
|
built[0].get.assert_called_once()
|
||||||
|
built[0].post.assert_called_once()
|
||||||
|
# Owned client must be closed by fetch_and_stage on the way out.
|
||||||
|
built[0].close.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_and_stage_with_supplied_client_does_not_close_caller_client():
|
||||||
|
# Even on failure the supplied client must not be closed — the
|
||||||
|
# BatchFetcher owns the lifecycle for the whole batch.
|
||||||
|
row = _row(uri="platform-pending:ws-1/file-1")
|
||||||
|
supplied = MagicMock()
|
||||||
|
supplied.get = MagicMock(side_effect=RuntimeError("network down"))
|
||||||
|
supplied.post = MagicMock() # should not be reached on GET failure
|
||||||
|
inbox_uploads.fetch_and_stage(
|
||||||
|
row,
|
||||||
|
platform_url="http://plat",
|
||||||
|
workspace_id="ws-1",
|
||||||
|
headers={},
|
||||||
|
client=supplied,
|
||||||
|
)
|
||||||
|
supplied.close.assert_not_called()
|
||||||
|
supplied.post.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# BatchFetcher — concurrent fetch + URI cache barrier
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _row_with_id(act_id: str, file_id: str) -> dict:
|
||||||
|
"""Helper: an upload-receive row with a distinct activity id + file id."""
|
||||||
|
return {
|
||||||
|
"id": act_id,
|
||||||
|
"method": "chat_upload_receive",
|
||||||
|
"request_body": {
|
||||||
|
"file_id": file_id,
|
||||||
|
"name": f"{file_id}.pdf",
|
||||||
|
"uri": f"platform-pending:ws-1/{file_id}",
|
||||||
|
"mimeType": "application/pdf",
|
||||||
|
"size": 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock:
|
||||||
|
"""Build one MagicMock client that returns per-file_id responses
|
||||||
|
based on the file_id segment of the URL.
|
||||||
|
"""
|
||||||
|
client = MagicMock()
|
||||||
|
|
||||||
|
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||||
|
for fid, resp in get_responses.items():
|
||||||
|
if f"/pending-uploads/{fid}/content" in url:
|
||||||
|
return resp
|
||||||
|
return _make_resp(404)
|
||||||
|
|
||||||
|
def _post(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||||
|
return _make_resp(200)
|
||||||
|
|
||||||
|
client.get = MagicMock(side_effect=_get)
|
||||||
|
client.post = MagicMock(side_effect=_post)
|
||||||
|
return client
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_runs_submitted_rows_concurrently():
|
||||||
|
# Three rows whose .get() blocks for ~120ms each. With 4 workers the
|
||||||
|
# batch should complete in ~120ms (parallel), not ~360ms (serial).
|
||||||
|
# The 250ms ceiling accommodates CI scheduler jitter while still
|
||||||
|
# discriminating concurrent (~120ms) from serial (~360ms).
|
||||||
|
import time
|
||||||
|
|
||||||
|
barrier_start = [0.0]
|
||||||
|
|
||||||
|
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||||
|
time.sleep(0.12)
|
||||||
|
for fid in ("a", "b", "c"):
|
||||||
|
if f"/pending-uploads/{fid}/content" in url:
|
||||||
|
return _make_resp(200, content=b"X", content_type="text/plain")
|
||||||
|
return _make_resp(404)
|
||||||
|
|
||||||
|
client = MagicMock()
|
||||||
|
client.get = MagicMock(side_effect=_slow_get)
|
||||||
|
client.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
|
||||||
|
bf = inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat",
|
||||||
|
workspace_id="ws-1",
|
||||||
|
headers={},
|
||||||
|
client=client,
|
||||||
|
max_workers=4,
|
||||||
|
)
|
||||||
|
barrier_start[0] = time.time()
|
||||||
|
for fid in ("a", "b", "c"):
|
||||||
|
bf.submit(_row_with_id(f"act-{fid}", fid))
|
||||||
|
bf.wait_all()
|
||||||
|
elapsed = time.time() - barrier_start[0]
|
||||||
|
bf.close()
|
||||||
|
|
||||||
|
assert elapsed < 0.25, (
|
||||||
|
f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s "
|
||||||
|
"(suggests serial execution — Phase 5b regression)"
|
||||||
|
)
|
||||||
|
assert client.get.call_count == 3
|
||||||
|
assert client.post.call_count == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_wait_all_blocks_until_uri_cache_populated():
|
||||||
|
"""Pin the correctness invariant: when wait_all returns, the URI
|
||||||
|
cache is hot for every submitted row. Without this barrier the
|
||||||
|
inbox loop would process the chat-message row before its uploads
|
||||||
|
were staged, and rewrite_request_body would surface the un-rewritten
|
||||||
|
platform-pending: URI to the agent.
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||||
|
time.sleep(0.05)
|
||||||
|
return _make_resp(200, content=b"data", content_type="text/plain")
|
||||||
|
|
||||||
|
client = MagicMock()
|
||||||
|
client.get = MagicMock(side_effect=_slow_get)
|
||||||
|
client.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
with inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
) as bf:
|
||||||
|
bf.submit(_row_with_id("act-a", "a"))
|
||||||
|
bf.submit(_row_with_id("act-b", "b"))
|
||||||
|
bf.wait_all()
|
||||||
|
# Cache must be hot for BOTH rows by the time wait_all returns.
|
||||||
|
assert inbox_uploads.get_cache().get("platform-pending:ws-1/a") is not None
|
||||||
|
assert inbox_uploads.get_cache().get("platform-pending:ws-1/b") is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_isolates_per_row_failure():
|
||||||
|
"""One failing fetch must not abort siblings. Sibling rows complete,
|
||||||
|
URI cache populates for them; the bad row's cache entry stays absent.
|
||||||
|
"""
|
||||||
|
def _get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||||
|
if "/pending-uploads/bad/content" in url:
|
||||||
|
return _make_resp(500, text="upstream broken")
|
||||||
|
return _make_resp(200, content=b"ok", content_type="text/plain")
|
||||||
|
|
||||||
|
client = MagicMock()
|
||||||
|
client.get = MagicMock(side_effect=_get)
|
||||||
|
client.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
|
||||||
|
inbox_uploads.get_cache().clear()
|
||||||
|
with inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
) as bf:
|
||||||
|
bf.submit(_row_with_id("act-1", "good1"))
|
||||||
|
bf.submit(_row_with_id("act-2", "bad"))
|
||||||
|
bf.submit(_row_with_id("act-3", "good2"))
|
||||||
|
bf.wait_all()
|
||||||
|
|
||||||
|
cache = inbox_uploads.get_cache()
|
||||||
|
assert cache.get("platform-pending:ws-1/good1") is not None
|
||||||
|
assert cache.get("platform-pending:ws-1/good2") is not None
|
||||||
|
assert cache.get("platform-pending:ws-1/bad") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_reuses_one_client_across_all_submits():
|
||||||
|
"""Every row in the batch must share the same client instance. This
|
||||||
|
is the connection-pool-reuse leg of the perf win: a second fetch
|
||||||
|
to the same host reuses the TCP+TLS handshake from the first.
|
||||||
|
"""
|
||||||
|
client = MagicMock()
|
||||||
|
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
|
||||||
|
client.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
|
||||||
|
with inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
) as bf:
|
||||||
|
for fid in ("a", "b", "c"):
|
||||||
|
bf.submit(_row_with_id(f"act-{fid}", fid))
|
||||||
|
bf.wait_all()
|
||||||
|
|
||||||
|
# 3 GETs + 3 POST acks all on the same client — no per-row Client
|
||||||
|
# construction.
|
||||||
|
assert client.get.call_count == 3
|
||||||
|
assert client.post.call_count == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_close_idempotent():
|
||||||
|
client = MagicMock()
|
||||||
|
bf = inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
)
|
||||||
|
bf.close()
|
||||||
|
bf.close() # second call must not raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_submit_after_close_raises():
|
||||||
|
client = MagicMock()
|
||||||
|
bf = inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
)
|
||||||
|
bf.close()
|
||||||
|
with pytest.raises(RuntimeError, match="submit after close"):
|
||||||
|
bf.submit(_row_with_id("act-x", "x"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_owns_client_when_not_supplied(monkeypatch):
|
||||||
|
built: list[MagicMock] = []
|
||||||
|
|
||||||
|
def _factory(*args, **kwargs):
|
||||||
|
c = MagicMock()
|
||||||
|
c.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
|
||||||
|
c.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
built.append(c)
|
||||||
|
return c
|
||||||
|
|
||||||
|
monkeypatch.setattr("httpx.Client", _factory)
|
||||||
|
|
||||||
|
bf = inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}
|
||||||
|
)
|
||||||
|
bf.submit(_row_with_id("act-a", "a"))
|
||||||
|
bf.wait_all()
|
||||||
|
bf.close()
|
||||||
|
|
||||||
|
assert len(built) == 1, "expected one owned client per BatchFetcher"
|
||||||
|
built[0].close.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_does_not_close_supplied_client():
|
||||||
|
client = MagicMock()
|
||||||
|
client.get = MagicMock(return_value=_make_resp(200, content=b"x", content_type="text/plain"))
|
||||||
|
client.post = MagicMock(return_value=_make_resp(200))
|
||||||
|
with inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
) as bf:
|
||||||
|
bf.submit(_row_with_id("act-a", "a"))
|
||||||
|
bf.wait_all()
|
||||||
|
# Supplied client survives the BatchFetcher's close — caller's lifecycle.
|
||||||
|
client.close.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_wait_all_no_op_on_empty_batch():
|
||||||
|
client = MagicMock()
|
||||||
|
with inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}, client=client
|
||||||
|
) as bf:
|
||||||
|
bf.wait_all() # nothing submitted; must not block, must not raise
|
||||||
|
client.get.assert_not_called()
|
||||||
|
client.post.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch):
|
||||||
|
# No client supplied + httpx import fails → BatchFetcher degrades
|
||||||
|
# gracefully: submit() returns None and the row is silently skipped.
|
||||||
|
import sys
|
||||||
|
|
||||||
|
real_httpx = sys.modules.pop("httpx", None)
|
||||||
|
monkeypatch.setitem(sys.modules, "httpx", None)
|
||||||
|
try:
|
||||||
|
bf = inbox_uploads.BatchFetcher(
|
||||||
|
platform_url="http://plat", workspace_id="ws-1", headers={}
|
||||||
|
)
|
||||||
|
result = bf.submit(_row_with_id("act-a", "a"))
|
||||||
|
bf.wait_all()
|
||||||
|
bf.close()
|
||||||
|
finally:
|
||||||
|
if real_httpx is not None:
|
||||||
|
sys.modules["httpx"] = real_httpx
|
||||||
|
else:
|
||||||
|
sys.modules.pop("httpx", None)
|
||||||
|
assert result is None
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user