From 39931acd9c21a3e0d450d8d20f9cbf265691e156 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 12:34:41 -0700 Subject: [PATCH] fix(inbox-uploads): cancel BatchFetcher futures on wait_all timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The deadline contract was incomplete: wait_all logged the timeout but close() then called executor.shutdown(wait=True), which blocked on the leaked workers — undoing the user-facing timeout. The inbox poll loop would stall indefinitely on a hung /content fetch instead of returning to chat-message processing. Fix: wait_all now flips self._timed_out and cancels queued (not-yet- started) futures; close() reads that flag and switches to shutdown(wait=False, cancel_futures=True) on the timeout path. Currently-running workers can't be interrupted by Python's threading model, but they're now detached daemons whose blocking httpx call no longer gates the next poll. Healthy path (no timeout) keeps the existing drain-and-wait so a still-queued ack POST isn't dropped mid-write. Two new tests pin both legs of the contract end-to-end: - close-after-timeout-doesn't-block: hung worker, wait_all(0.05s) fires the timeout, close() returns in <1s instead of waiting ~5s for the worker to come back. - close-without-timeout-still-drains: 2 slow workers, wait_all completes cleanly, close() drains both ack POSTs. Resolves the BatchFetcher timeout-cancellation finding from the post-merge five-axis review of Phase 5b. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox_uploads.py | 44 +++++++++++--- workspace/tests/test_inbox_uploads.py | 84 +++++++++++++++++++++++++++ 2 files changed, 121 insertions(+), 7 deletions(-) diff --git a/workspace/inbox_uploads.py b/workspace/inbox_uploads.py index 913efdcd..69fa53aa 100644 --- a/workspace/inbox_uploads.py +++ b/workspace/inbox_uploads.py @@ -547,6 +547,9 @@ class BatchFetcher: ) self._futures: list[concurrent.futures.Future[Any]] = [] self._closed = False + # Flipped to True by wait_all when the timeout fires; close() + # reads this to decide between drain-and-wait vs cancel-queued. + self._timed_out = False def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None: """Submit ``row`` for fetch + stage + ack. Non-blocking — the @@ -580,8 +583,12 @@ class BatchFetcher: exception propagating up to here is unexpected and we don't want one bad fetch to abort the whole batch. - Timeouts are also logged + swallowed; the caller will move on - and the un-acked rows will be retried by the next poll. + Timeouts are also logged + swallowed AND record the timed-out + futures on ``self._timed_out`` so ``close`` can cancel them + without paying their full latency. Without this hand-off, + ``close()``'s ``shutdown(wait=True)`` would block on the leaked + workers and undo the user-facing timeout — the inbox poll loop + would stall indefinitely on a hung /content fetch. """ if not self._futures: return @@ -606,22 +613,45 @@ class BatchFetcher: len(not_done), timeout, ) + # Mark these futures so close() knows to cancel-not-wait. We + # cancel queued-but-not-started ones immediately; futures + # already running can't be cancelled (Python's threading + # model), but close() will pass cancel_futures=True so any + # remaining queued items don't run. + for fut in not_done: + fut.cancel() + self._timed_out = True def close(self) -> None: """Tear down the executor + (if owned) the httpx client. Idempotent. After close, ``submit`` raises and the BatchFetcher cannot be reused — construct a fresh one for the next poll. + + If ``wait_all`` reported a timeout, shutdown skips the + ``wait=True`` drain and instead asks the executor to drop queued + futures (``cancel_futures=True``). Currently-running workers + can't be interrupted by Python's threading model, but the poll + loop returns immediately rather than blocking on a hung fetch. """ if self._closed: return self._closed = True - # Drain remaining futures so worker threads aren't killed mid- - # request. wait=True is the safe default; for an inbox poller a - # 60s tail at shutdown is acceptable since uploads in flight are - # the only thing close() is called between. + timed_out = getattr(self, "_timed_out", False) try: - self._executor.shutdown(wait=True) + if timed_out: + # cancel_futures landed in Python 3.9 — guarded for older + # interpreters via a TypeError fallback. Drop queued + # tasks; running ones will exit when their httpx call + # eventually returns or the daemon thread dies. + try: + self._executor.shutdown(wait=False, cancel_futures=True) + except TypeError: + self._executor.shutdown(wait=False) + else: + # Healthy path: wait for in-flight work so we don't + # interrupt a fetch mid-write. + self._executor.shutdown(wait=True) except Exception as exc: # noqa: BLE001 logger.warning("inbox_uploads: executor shutdown error: %s", exc) if self._own_client and self._client is not None: diff --git a/workspace/tests/test_inbox_uploads.py b/workspace/tests/test_inbox_uploads.py index c13cea70..37446760 100644 --- a/workspace/tests/test_inbox_uploads.py +++ b/workspace/tests/test_inbox_uploads.py @@ -1034,3 +1034,87 @@ def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch): else: sys.modules.pop("httpx", None) assert result is None + + +def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers(): + """The deadline contract: when wait_all times out, close() must NOT + block waiting for the leaked worker threads. Otherwise the inbox + poll loop stalls indefinitely on a hung /content fetch — undoing + the user-facing timeout. + + Strategy: build a client whose .get() blocks on a threading.Event + that the test never sets. Submit a row, wait_all with a tiny + timeout, then time close(). If close() drained-and-waited it would + block until we set the event (i.e., forever in this test). + """ + import threading + import time + + blocker = threading.Event() # never set — workers stay running + + def _hang_get(url, headers=None): + # Wait at most ~5s so a buggy implementation eventually unblocks + # the test instead of timing out the whole pytest run, but + # nothing legitimate should reach this fallback. + blocker.wait(timeout=5.0) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_hang_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=1, # serialize so submitting 1 keeps the worker busy + ) + bf.submit(_row_with_id("act-a", "a")) + # Tiny timeout — wait_all must report the future as not_done. + bf.wait_all(timeout=0.05) + t0 = time.time() + bf.close() + elapsed = time.time() - t0 + # Unblock the lingering worker so it doesn't pollute later tests. + blocker.set() + + # Without the cancel-on-timeout fix, close() would block until + # blocker.set() — i.e., the full ~5s. With the fix it returns + # immediately because shutdown(wait=False) doesn't drain. + assert elapsed < 1.0, ( + f"close() blocked for {elapsed:.2f}s after wait_all timeout — " + "cancel-on-timeout regression: close() is draining instead of bailing" + ) + + +def test_batch_fetcher_close_without_timeout_still_drains(): + """Negative leg of the timeout contract: when wait_all completes + cleanly (no timeout), close() must KEEP its drain-and-wait + behavior so a still-queued ack POST isn't dropped mid-write. + """ + import time + + def _slow_get(url, headers=None): + time.sleep(0.05) + return _make_resp(200, content=b"x", content_type="text/plain") + + client = MagicMock() + client.get = MagicMock(side_effect=_slow_get) + client.post = MagicMock(return_value=_make_resp(200)) + + bf = inbox_uploads.BatchFetcher( + platform_url="http://plat", + workspace_id="ws-1", + headers={}, + client=client, + max_workers=2, + ) + bf.submit(_row_with_id("act-a", "a")) + bf.submit(_row_with_id("act-b", "b")) + bf.wait_all() # generous default timeout — should not fire + bf.close() + + # All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait. + assert client.get.call_count == 2 + assert client.post.call_count == 2