Merge pull request #2921 from Molecule-AI/fix/batch-fetcher-cancel-on-timeout
fix(inbox-uploads): cancel BatchFetcher futures on wait_all timeout
This commit is contained in:
commit
81cf0cbf98
@ -547,6 +547,9 @@ class BatchFetcher:
|
||||
)
|
||||
self._futures: list[concurrent.futures.Future[Any]] = []
|
||||
self._closed = False
|
||||
# Flipped to True by wait_all when the timeout fires; close()
|
||||
# reads this to decide between drain-and-wait vs cancel-queued.
|
||||
self._timed_out = False
|
||||
|
||||
def submit(self, row: dict[str, Any]) -> concurrent.futures.Future[Any] | None:
|
||||
"""Submit ``row`` for fetch + stage + ack. Non-blocking — the
|
||||
@ -580,8 +583,12 @@ class BatchFetcher:
|
||||
exception propagating up to here is unexpected and we don't want
|
||||
one bad fetch to abort the whole batch.
|
||||
|
||||
Timeouts are also logged + swallowed; the caller will move on
|
||||
and the un-acked rows will be retried by the next poll.
|
||||
Timeouts are also logged + swallowed AND record the timed-out
|
||||
futures on ``self._timed_out`` so ``close`` can cancel them
|
||||
without paying their full latency. Without this hand-off,
|
||||
``close()``'s ``shutdown(wait=True)`` would block on the leaked
|
||||
workers and undo the user-facing timeout — the inbox poll loop
|
||||
would stall indefinitely on a hung /content fetch.
|
||||
"""
|
||||
if not self._futures:
|
||||
return
|
||||
@ -606,22 +613,45 @@ class BatchFetcher:
|
||||
len(not_done),
|
||||
timeout,
|
||||
)
|
||||
# Mark these futures so close() knows to cancel-not-wait. We
|
||||
# cancel queued-but-not-started ones immediately; futures
|
||||
# already running can't be cancelled (Python's threading
|
||||
# model), but close() will pass cancel_futures=True so any
|
||||
# remaining queued items don't run.
|
||||
for fut in not_done:
|
||||
fut.cancel()
|
||||
self._timed_out = True
|
||||
|
||||
def close(self) -> None:
|
||||
"""Tear down the executor + (if owned) the httpx client.
|
||||
|
||||
Idempotent. After close, ``submit`` raises and the BatchFetcher
|
||||
cannot be reused — construct a fresh one for the next poll.
|
||||
|
||||
If ``wait_all`` reported a timeout, shutdown skips the
|
||||
``wait=True`` drain and instead asks the executor to drop queued
|
||||
futures (``cancel_futures=True``). Currently-running workers
|
||||
can't be interrupted by Python's threading model, but the poll
|
||||
loop returns immediately rather than blocking on a hung fetch.
|
||||
"""
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
# Drain remaining futures so worker threads aren't killed mid-
|
||||
# request. wait=True is the safe default; for an inbox poller a
|
||||
# 60s tail at shutdown is acceptable since uploads in flight are
|
||||
# the only thing close() is called between.
|
||||
timed_out = getattr(self, "_timed_out", False)
|
||||
try:
|
||||
self._executor.shutdown(wait=True)
|
||||
if timed_out:
|
||||
# cancel_futures landed in Python 3.9 — guarded for older
|
||||
# interpreters via a TypeError fallback. Drop queued
|
||||
# tasks; running ones will exit when their httpx call
|
||||
# eventually returns or the daemon thread dies.
|
||||
try:
|
||||
self._executor.shutdown(wait=False, cancel_futures=True)
|
||||
except TypeError:
|
||||
self._executor.shutdown(wait=False)
|
||||
else:
|
||||
# Healthy path: wait for in-flight work so we don't
|
||||
# interrupt a fetch mid-write.
|
||||
self._executor.shutdown(wait=True)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning("inbox_uploads: executor shutdown error: %s", exc)
|
||||
if self._own_client and self._client is not None:
|
||||
|
||||
@ -1034,3 +1034,87 @@ def test_batch_fetcher_httpx_missing_makes_submit_a_noop(monkeypatch):
|
||||
else:
|
||||
sys.modules.pop("httpx", None)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
|
||||
"""The deadline contract: when wait_all times out, close() must NOT
|
||||
block waiting for the leaked worker threads. Otherwise the inbox
|
||||
poll loop stalls indefinitely on a hung /content fetch — undoing
|
||||
the user-facing timeout.
|
||||
|
||||
Strategy: build a client whose .get() blocks on a threading.Event
|
||||
that the test never sets. Submit a row, wait_all with a tiny
|
||||
timeout, then time close(). If close() drained-and-waited it would
|
||||
block until we set the event (i.e., forever in this test).
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
|
||||
blocker = threading.Event() # never set — workers stay running
|
||||
|
||||
def _hang_get(url, headers=None):
|
||||
# Wait at most ~5s so a buggy implementation eventually unblocks
|
||||
# the test instead of timing out the whole pytest run, but
|
||||
# nothing legitimate should reach this fallback.
|
||||
blocker.wait(timeout=5.0)
|
||||
return _make_resp(200, content=b"x", content_type="text/plain")
|
||||
|
||||
client = MagicMock()
|
||||
client.get = MagicMock(side_effect=_hang_get)
|
||||
client.post = MagicMock(return_value=_make_resp(200))
|
||||
|
||||
bf = inbox_uploads.BatchFetcher(
|
||||
platform_url="http://plat",
|
||||
workspace_id="ws-1",
|
||||
headers={},
|
||||
client=client,
|
||||
max_workers=1, # serialize so submitting 1 keeps the worker busy
|
||||
)
|
||||
bf.submit(_row_with_id("act-a", "a"))
|
||||
# Tiny timeout — wait_all must report the future as not_done.
|
||||
bf.wait_all(timeout=0.05)
|
||||
t0 = time.time()
|
||||
bf.close()
|
||||
elapsed = time.time() - t0
|
||||
# Unblock the lingering worker so it doesn't pollute later tests.
|
||||
blocker.set()
|
||||
|
||||
# Without the cancel-on-timeout fix, close() would block until
|
||||
# blocker.set() — i.e., the full ~5s. With the fix it returns
|
||||
# immediately because shutdown(wait=False) doesn't drain.
|
||||
assert elapsed < 1.0, (
|
||||
f"close() blocked for {elapsed:.2f}s after wait_all timeout — "
|
||||
"cancel-on-timeout regression: close() is draining instead of bailing"
|
||||
)
|
||||
|
||||
|
||||
def test_batch_fetcher_close_without_timeout_still_drains():
|
||||
"""Negative leg of the timeout contract: when wait_all completes
|
||||
cleanly (no timeout), close() must KEEP its drain-and-wait
|
||||
behavior so a still-queued ack POST isn't dropped mid-write.
|
||||
"""
|
||||
import time
|
||||
|
||||
def _slow_get(url, headers=None):
|
||||
time.sleep(0.05)
|
||||
return _make_resp(200, content=b"x", content_type="text/plain")
|
||||
|
||||
client = MagicMock()
|
||||
client.get = MagicMock(side_effect=_slow_get)
|
||||
client.post = MagicMock(return_value=_make_resp(200))
|
||||
|
||||
bf = inbox_uploads.BatchFetcher(
|
||||
platform_url="http://plat",
|
||||
workspace_id="ws-1",
|
||||
headers={},
|
||||
client=client,
|
||||
max_workers=2,
|
||||
)
|
||||
bf.submit(_row_with_id("act-a", "a"))
|
||||
bf.submit(_row_with_id("act-b", "b"))
|
||||
bf.wait_all() # generous default timeout — should not fire
|
||||
bf.close()
|
||||
|
||||
# All 2 GETs + 2 ACK POSTs ran to completion via drain-and-wait.
|
||||
assert client.get.call_count == 2
|
||||
assert client.post.call_count == 2
|
||||
|
||||
Loading…
Reference in New Issue
Block a user