fix(model_tools): cancel coroutine on timeout so worker thread exits + log full traceback
_run_async() bridges sync tool handlers to async code. When the handler
is invoked from inside a running event loop (gateway / nested async),
it spawns a worker thread and blocks on future.result(timeout=300).
Before this change, a coroutine that ran past 300s leaked its worker
thread:
- future.cancel() is a no-op on a running ThreadPoolExecutor future
(cancel only works on not-yet-started work).
- pool.shutdown(wait=False, cancel_futures=True) let the caller
proceed but the worker kept running the coroutine until it
returned on its own.
Every tool timeout leaked one thread. In long-lived gateway / RL
sessions this is cumulative.
The fix replaces bare asyncio.run() with a worker wrapper that
creates its own event loop. On timeout, _run_async schedules
task.cancel() on that loop via call_soon_threadsafe, then shuts the
pool down with wait=False so the caller returns immediately. The
coroutine observes CancelledError at its next await and the worker
thread exits cleanly.
Also switches logger.error() to logger.exception() in the top-level
handle_function_call() except block so tool failures produce full
stack traces in errors.log instead of just the message.
Related: #17420 (contributor flagged the leak; the original fix used
pool.shutdown(wait=True) which would have converted the leak into a
hang — caller blocks forever on the same stuck coroutine). Credit
for identifying the leak goes to the contributor.
Co-authored-by: 0z! <162235745+0z1-ghb@users.noreply.github.com>
This commit is contained in:
parent
46437966cc
commit
b0435cc164
@ -107,17 +107,58 @@ def _run_async(coro):
|
||||
loop = None
|
||||
|
||||
if loop and loop.is_running():
|
||||
# Inside an async context (gateway, RL env) — run in a fresh thread.
|
||||
# Inside an async context (gateway, RL env) — run in a fresh thread
|
||||
# with its own event loop we own a reference to, so on timeout we
|
||||
# can cancel the task inside that loop (ThreadPoolExecutor.cancel()
|
||||
# only works on not-yet-started futures — it's a no-op on a running
|
||||
# worker, which previously leaked the thread on every 300 s timeout).
|
||||
import concurrent.futures
|
||||
|
||||
worker_loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
loop_ready = threading.Event()
|
||||
|
||||
def _run_in_worker():
|
||||
nonlocal worker_loop
|
||||
worker_loop = asyncio.new_event_loop()
|
||||
loop_ready.set()
|
||||
try:
|
||||
asyncio.set_event_loop(worker_loop)
|
||||
return worker_loop.run_until_complete(coro)
|
||||
finally:
|
||||
try:
|
||||
# Cancel anything still pending (e.g. task cancelled
|
||||
# externally via call_soon_threadsafe on timeout).
|
||||
pending = asyncio.all_tasks(worker_loop)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
if pending:
|
||||
worker_loop.run_until_complete(
|
||||
asyncio.gather(*pending, return_exceptions=True)
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
worker_loop.close()
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(asyncio.run, coro)
|
||||
future = pool.submit(_run_in_worker)
|
||||
try:
|
||||
return future.result(timeout=300)
|
||||
except concurrent.futures.TimeoutError:
|
||||
future.cancel()
|
||||
# Cancel the coroutine inside its own loop so the worker thread
|
||||
# can wind down instead of running forever.
|
||||
if loop_ready.wait(timeout=1.0) and worker_loop is not None:
|
||||
try:
|
||||
for t in asyncio.all_tasks(worker_loop):
|
||||
worker_loop.call_soon_threadsafe(t.cancel)
|
||||
except RuntimeError:
|
||||
# Loop already closed — nothing to cancel.
|
||||
pass
|
||||
raise
|
||||
finally:
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
# wait=False: don't block the caller on a stuck coroutine. We've
|
||||
# already requested cancellation above; the worker will exit
|
||||
# once the coroutine observes it (usually at the next await).
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
# If we're on a worker thread (e.g., parallel tool execution in
|
||||
# delegate_task), use a per-thread persistent loop. This avoids
|
||||
@ -737,7 +778,7 @@ def handle_function_call(
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error executing {function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
logger.exception(error_msg)
|
||||
return json.dumps({"error": error_msg}, ensure_ascii=False)
|
||||
|
||||
|
||||
|
||||
@ -60,6 +60,7 @@ AUTHOR_MAP = {
|
||||
"johnnncenaaa77@gmail.com": "johnncenae",
|
||||
"thomasjhon6666@gmail.com": "ThomassJonax",
|
||||
"focusflow.app.help@gmail.com": "yes999zc",
|
||||
"162235745+0z1-ghb@users.noreply.github.com": "0z1-ghb",
|
||||
"yes999zc@163.com": "yes999zc",
|
||||
"343873859@qq.com": "DrStrangerUJN",
|
||||
"uzmpsk.dilekakbas@gmail.com": "dlkakbs",
|
||||
|
||||
@ -199,20 +199,22 @@ class TestRunAsyncWithRunningLoop:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_uses_nonblocking_executor_shutdown(self, monkeypatch):
|
||||
"""A timeout in the running-loop branch must not wait for the worker.
|
||||
"""A timeout in the running-loop branch must not block the caller.
|
||||
|
||||
ThreadPoolExecutor's context manager performs shutdown(wait=True).
|
||||
If _run_async relies on that path after future.result(timeout=...)
|
||||
times out, the timeout does not bound wall-clock time because the
|
||||
caller still waits for the stuck coroutine's thread to finish.
|
||||
If shutdown ever waits for a stuck worker, a tool coroutine that
|
||||
ignores (or can't observe) cancellation would hang the whole agent.
|
||||
Guard: the caller must raise TimeoutError and pool.shutdown must be
|
||||
called with wait=False. The worker's own event loop handles cleanup
|
||||
(cancellation is scheduled via call_soon_threadsafe before the
|
||||
caller returns).
|
||||
"""
|
||||
import concurrent.futures
|
||||
from model_tools import _run_async
|
||||
|
||||
events = {
|
||||
"cancelled": False,
|
||||
"result_timeout": None,
|
||||
"shutdown_calls": [],
|
||||
"submitted_fn": None,
|
||||
}
|
||||
|
||||
class TimeoutFuture:
|
||||
@ -221,7 +223,6 @@ class TestRunAsyncWithRunningLoop:
|
||||
raise concurrent.futures.TimeoutError()
|
||||
|
||||
def cancel(self):
|
||||
events["cancelled"] = True
|
||||
return True
|
||||
|
||||
class FakeExecutor:
|
||||
@ -236,8 +237,10 @@ class TestRunAsyncWithRunningLoop:
|
||||
return False
|
||||
|
||||
def submit(self, fn, *args, **kwargs):
|
||||
if args and hasattr(args[0], "close"):
|
||||
args[0].close()
|
||||
# Record which function got submitted -- should be the
|
||||
# in-function worker wrapper, not bare asyncio.run, so we
|
||||
# know _run_async is using a loop it owns and can cancel.
|
||||
events["submitted_fn"] = getattr(fn, "__name__", repr(fn))
|
||||
return TimeoutFuture()
|
||||
|
||||
def shutdown(self, wait=True, cancel_futures=False):
|
||||
@ -256,8 +259,82 @@ class TestRunAsyncWithRunningLoop:
|
||||
_run_async(_never_finishes())
|
||||
|
||||
assert events["result_timeout"] == 300
|
||||
assert events["cancelled"] is True
|
||||
assert events["shutdown_calls"] == [(False, True)]
|
||||
# The worker wrapper creates its own event loop so _run_async can
|
||||
# cancel the task on timeout — this must NOT be bare asyncio.run.
|
||||
assert events["submitted_fn"] != "run", (
|
||||
"_run_async submitted asyncio.run directly — it must submit a "
|
||||
"worker wrapper that owns the event loop so timeouts can cancel "
|
||||
"the task"
|
||||
)
|
||||
# Critical: shutdown must NOT wait. If wait=True, a stuck coroutine
|
||||
# would freeze the caller (converts a thread leak into a hang).
|
||||
assert events["shutdown_calls"], "shutdown was never called"
|
||||
for wait, _cancel in events["shutdown_calls"]:
|
||||
assert wait is False, (
|
||||
f"shutdown called with wait={wait} — a stuck tool coroutine "
|
||||
f"would hang the caller indefinitely"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_cancels_coroutine_in_worker_loop(self, monkeypatch):
|
||||
"""On timeout, the worker's event loop must receive a cancel request
|
||||
so the coroutine stops and the thread exits — not leaked.
|
||||
|
||||
Before the fix, future.cancel() on a running ThreadPoolExecutor
|
||||
future is a no-op, so the worker thread kept running the coroutine
|
||||
to completion (leaking one thread per tool-timeout).
|
||||
"""
|
||||
from model_tools import _run_async
|
||||
|
||||
# Shrink the 300s internal timeout by patching future.result.
|
||||
# We do this surgically: let everything else run for real so the
|
||||
# worker loop actually exists and can observe cancellation.
|
||||
import concurrent.futures as _cf
|
||||
|
||||
real_pool_cls = _cf.ThreadPoolExecutor
|
||||
|
||||
class FastTimeoutPool(real_pool_cls):
|
||||
def __init__(self, *a, **kw):
|
||||
super().__init__(*a, **kw)
|
||||
|
||||
# Patch future.result to time out after 1s instead of 300s.
|
||||
real_result = _cf.Future.result
|
||||
|
||||
def fast_result(self, timeout=None):
|
||||
return real_result(self, timeout=1.0 if timeout == 300 else timeout)
|
||||
|
||||
monkeypatch.setattr(_cf.Future, "result", fast_result)
|
||||
|
||||
cancel_observed = threading.Event()
|
||||
|
||||
async def _slow_cancellable():
|
||||
try:
|
||||
await asyncio.sleep(60)
|
||||
except asyncio.CancelledError:
|
||||
cancel_observed.set()
|
||||
raise
|
||||
|
||||
import time as _time
|
||||
t0 = _time.time()
|
||||
with pytest.raises(_cf.TimeoutError):
|
||||
_run_async(_slow_cancellable())
|
||||
elapsed = _time.time() - t0
|
||||
|
||||
# Caller must return fast (no hang waiting for the coro).
|
||||
assert elapsed < 3.0, (
|
||||
f"_run_async blocked caller for {elapsed:.1f}s — should return "
|
||||
f"on timeout regardless of whether the coroutine has finished"
|
||||
)
|
||||
|
||||
# Worker thread must cancel the task (not leak).
|
||||
deadline = _time.time() + 5
|
||||
while not cancel_observed.is_set() and _time.time() < deadline:
|
||||
_time.sleep(0.05)
|
||||
assert cancel_observed.is_set(), (
|
||||
"Coroutine never received CancelledError — worker thread leaked "
|
||||
"(ThreadPoolExecutor.cancel() is a no-op on a running future; "
|
||||
"_run_async must cancel the task inside its worker loop)"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user