diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 913efdcd..69fa53aa 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -547,6 +547,9 @@ class BatchFetcher: ) self._futures: list[concurrent.futures.Future[Any]] = [] self._closed = False + # Flipped to True by wait_all when the timeout fires; close() + # reads this to decide between drain-and-wait vs cancel-queued. + self._timed_out = False def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: """Submit ``row`` for fetch + stage + ack. Non-blocking — the @@ -580,8 +583,12 @@ class BatchFetcher: exception propagating up to here is unexpected and we don't want one bad fetch to abort the whole batch. - Timeouts are also logged + swallowed; the caller will move on - and the un-acked rows will be retried by the next poll. + Timeouts are also logged + swallowed AND record the timed-out + futures on ``self._timed_out`` so ``close`` can cancel them + without paying their full latency. Without this hand-off, + ``close()``'s ``shutdown(wait=True)`` would block on the leaked + workers and undo the user-facing timeout — the inbox poll loop + would stall indefinitely on a hung /content fetch. """ if not self._futures: return @@ -606,22 +613,45 @@ class BatchFetcher: len(not_done), timeout, ) + # Mark these futures so close() knows to cancel-not-wait. We + # cancel queued-but-not-started ones immediately; futures + # already running can't be cancelled (Python's threading + # model), but close() will pass cancel_futures=True so any + # remaining queued items don't run. + for fut in not_done: + fut.cancel() + self._timed_out = True def close(self) -> None: """Tear down the executor + (if owned) the httpx client. Idempotent. After close, ``submit`` raises and the BatchFetcher cannot be reused — construct a fresh one for the next poll. + + If ``wait_all`` reported a timeout, shutdown skips the + ``wait=True`` drain and instead asks the executor to drop queued + futures (``cancel_futures=True``). Currently-running workers + can't be interrupted by Python's threading model, but the poll + loop returns immediately rather than blocking on a hung fetch. """ if self._closed: return self._closed = True - # Drain remaining futures so worker threads aren't killed mid- - # request. wait=True is the safe default; for an inbox poller a - # 60s tail at shutdown is acceptable since uploads in flight are - # the only thing close() is called between. + timed_out = getattr(self, "_timed_out", False) try: - self._executor.shutdown(wait=True) + if timed_out: + # cancel_futures landed in Python 3.9 — guarded for older + # interpreters via a TypeError fallback. Drop queued + # tasks; running ones will exit when their httpx call + # eventually returns or the daemon thread dies. + try: + self._executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + self._executor.shutdown(wait=False) + else: + # Healthy path: wait for in-flight work so we don't + # interrupt a fetch mid-write. + self._executor.shutdown(wait=True) except Exception as exc: # noqa: BLE001 logger.warning("inbox_uploads: executor shutdown error: %s", exc) if self._own_client and self._client is not None: diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index c13cea70..37446760 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -1034,3 +1034,87 @@ def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): else: sys.modules.pop("httpx", None) assert result is None + + +def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers(): + """The deadline contract: when wait_all times out, close() must NOT + block waiting for the leaked worker threads. Otherwise the inbox + poll loop stalls indefinitely on a hung /content fetch — undoing + the user-facing timeout. + + Strategy: build a client whose .get() blocks on a threading.Event + that the test never sets. Submit a row, wait_all with a tiny + timeout, then time close(). If close() drained-and-waited it would + block until we set the event (i.e., forever in this test). + """ + import threading + import time + + blocker = threading.Event() # never set — workers stay running + + def _hang_get(url, headers=None): + # Wait at most ~5s so a buggy implementation eventually unblocks + # the test instead of timing out the whole pytest run, but + # nothing legitimate should reach this fallback. + blocker.wait(timeout=5.0) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_hang_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=1, # serialize so submitting 1 keeps the worker busy + ) + bf.submit(_row_with_id("act-a", "a")) + # Tiny timeout — wait_all must report the future as not_done. + bf.wait_all(timeout=0.05) + t0 = time.time() + bf.close() + elapsed = time.time() - t0 + # Unblock the lingering worker so it doesn't pollute later tests. + blocker.set() + + # Without the cancel-on-timeout fix, close() would block until + # blocker.set() — i.e., the full ~5s. With the fix it returns + # immediately because shutdown(wait=False) doesn't drain. + assert elapsed < 1.0, ( + f"close() blocked for {elapsed:.2f}s after wait_all timeout — " + "cancel-on-timeout regression: close() is draining instead of bailing" + ) + + +def test_batch_fetcher_close_without_timeout_still_drains(): + """Negative leg of the timeout contract: when wait_all completes + cleanly (no timeout), close() must KEEP its drain-and-wait + behavior so a still-queued ack POST isn't dropped mid-write. + """ + import time + + def _slow_get(url, headers=None): + time.sleep(0.05) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=2, + ) + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() # generous default timeout — should not fire + bf.close() + + # All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait. + assert client.get.call_count == 2 + assert client.post.call_count == 2