fix(gateway): drain pending messages via fresh task, not recursion (#17758)
`_process_message_background` finished a turn, found a queued follow-up, and drained it via `await self._process_message_background(pending_event, session_key)`. Each chained follow-up added a frame to the call stack instead of starting fresh. Under sustained pending-queue activity (e.g. a user sending follow-ups faster than the agent finishes turns) the C stack would exhaust at ~2000 nested frames and SIGSEGV the process. Mirror the late-arrival drain pattern that already exists in the same function: spawn a new `asyncio.create_task(...)` for the pending event and return so the current frame can unwind. The new task takes ownership via `_session_tasks[session_key]`. The late-arrival drain in `finally` could now race with the in-band drain across the `await typing_task` / `await stop_typing` window, so add a guard: if `_session_tasks[session_key]` is no longer the current task, an in-band drain already spawned a follow-up task — re-queue the late-arrival event so that task picks it up after its current event, instead of spawning a second concurrent task for the same session_key. Regression test (`test_pending_drain_no_recursion.py`) chains 12 follow-ups and asserts the recorded `_process_message_background` stack depth stays bounded at handler entry. Pre-fix: depths grow linearly `[1,2,3,…,12]`. Post-fix: all depths are `1`. `test_duplicate_reply_suppression::test_stale_response_suppressed_when_interrupted` called `_process_message_background` directly and implicitly relied on the old recursive `await` semantic — updated to wait for the spawned drain task before checking the sent list. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
cb130bf776
commit
663ba9a58f
@ -2708,9 +2708,27 @@ class BasePlatformAdapter(ABC):
|
||||
if _active is not None:
|
||||
_active.clear()
|
||||
await _stop_typing_task()
|
||||
# Process pending message in new background task
|
||||
await self._process_message_background(pending_event, session_key)
|
||||
return # Already cleaned up
|
||||
# Spawn a fresh task for the pending message instead of
|
||||
# recursing. Issue #17758: `await
|
||||
# self._process_message_background(...)` here grew the
|
||||
# call stack one frame per chained follow-up, and under
|
||||
# sustained pending-queue activity the C stack would
|
||||
# exhaust at ~2000 frames and SIGSEGV the process.
|
||||
# Mirror the late-arrival drain pattern below: hand off
|
||||
# to a new task and return so this frame can unwind.
|
||||
drain_task = asyncio.create_task(
|
||||
self._process_message_background(pending_event, session_key)
|
||||
)
|
||||
# Hand ownership of the session to the drain task so
|
||||
# stale-lock detection keeps working while it runs.
|
||||
self._session_tasks[session_key] = drain_task
|
||||
try:
|
||||
self._background_tasks.add(drain_task)
|
||||
drain_task.add_done_callback(self._background_tasks.discard)
|
||||
except TypeError:
|
||||
# Tests stub create_task() with non-hashable sentinels; tolerate.
|
||||
pass
|
||||
return # Drain task owns the session now.
|
||||
|
||||
except asyncio.CancelledError:
|
||||
current_task = asyncio.current_task()
|
||||
@ -2772,25 +2790,41 @@ class BasePlatformAdapter(ABC):
|
||||
# dropped (user never gets a reply).
|
||||
late_pending = self._pending_messages.pop(session_key, None)
|
||||
if late_pending is not None:
|
||||
logger.debug(
|
||||
"[%s] Late-arrival pending message during cleanup — spawning drain task",
|
||||
self.name,
|
||||
)
|
||||
_active = self._active_sessions.get(session_key)
|
||||
if _active is not None:
|
||||
_active.clear()
|
||||
drain_task = asyncio.create_task(
|
||||
self._process_message_background(late_pending, session_key)
|
||||
)
|
||||
# Hand ownership of the session to the drain task so stale-lock
|
||||
# detection keeps working while it runs.
|
||||
self._session_tasks[session_key] = drain_task
|
||||
try:
|
||||
self._background_tasks.add(drain_task)
|
||||
drain_task.add_done_callback(self._background_tasks.discard)
|
||||
except TypeError:
|
||||
# Tests stub create_task() with non-hashable sentinels; tolerate.
|
||||
pass
|
||||
current_task = asyncio.current_task()
|
||||
existing_task = self._session_tasks.get(session_key)
|
||||
if (
|
||||
existing_task is not None
|
||||
and existing_task is not current_task
|
||||
):
|
||||
# The in-band drain (or an earlier late-arrival drain)
|
||||
# already spawned a follow-up task that owns this
|
||||
# session. Re-queue the late-arrival event so that
|
||||
# task picks it up — avoids spawning two concurrent
|
||||
# _process_message_background tasks for the same key
|
||||
# (#17758 follow-up: prevents the create_task path
|
||||
# from racing with itself across the in-band/finally
|
||||
# boundary).
|
||||
self._pending_messages[session_key] = late_pending
|
||||
else:
|
||||
logger.debug(
|
||||
"[%s] Late-arrival pending message during cleanup — spawning drain task",
|
||||
self.name,
|
||||
)
|
||||
_active = self._active_sessions.get(session_key)
|
||||
if _active is not None:
|
||||
_active.clear()
|
||||
drain_task = asyncio.create_task(
|
||||
self._process_message_background(late_pending, session_key)
|
||||
)
|
||||
# Hand ownership of the session to the drain task so stale-lock
|
||||
# detection keeps working while it runs.
|
||||
self._session_tasks[session_key] = drain_task
|
||||
try:
|
||||
self._background_tasks.add(drain_task)
|
||||
drain_task.add_done_callback(self._background_tasks.discard)
|
||||
except TypeError:
|
||||
# Tests stub create_task() with non-hashable sentinels; tolerate.
|
||||
pass
|
||||
# Leave _active_sessions[session_key] populated — the drain
|
||||
# task's own lifecycle will clean it up.
|
||||
else:
|
||||
|
||||
@ -108,6 +108,15 @@ class TestBaseInterruptSuppression:
|
||||
|
||||
await adapter._process_message_background(event_a, session_key)
|
||||
|
||||
# The in-band pending-drain now hands off to a fresh task instead
|
||||
# of recursing (#17758). Wait for that task to finish before
|
||||
# checking the sent list.
|
||||
for _ in range(200):
|
||||
if any(s["content"] == pending_response for s in adapter.sent):
|
||||
break
|
||||
await asyncio.sleep(0.01)
|
||||
await adapter.cancel_background_tasks()
|
||||
|
||||
# The stale response should NOT have been sent.
|
||||
stale_sends = [s for s in adapter.sent if s["content"] == stale_response]
|
||||
assert len(stale_sends) == 0, (
|
||||
|
||||
129
tests/gateway/test_pending_drain_no_recursion.py
Normal file
129
tests/gateway/test_pending_drain_no_recursion.py
Normal file
@ -0,0 +1,129 @@
|
||||
"""Regression test for #17758 — chained pending-message drains must not
|
||||
grow the call stack.
|
||||
|
||||
Before the fix, ``_process_message_background`` finished a turn, found a
|
||||
pending follow-up, and drained it via ``await
|
||||
self._process_message_background(pending_event, session_key)``. Each
|
||||
queued follow-up added a frame to the call stack instead of starting
|
||||
fresh, so under sustained pending-queue activity the C stack would
|
||||
exhaust at ~2000 nested frames and the process would crash with
|
||||
SIGSEGV.
|
||||
|
||||
After the fix, the in-band drain spawns a fresh task (mirroring the
|
||||
late-arrival drain pattern), so the stack stays bounded regardless of
|
||||
chain length.
|
||||
|
||||
We assert the invariant directly: count nested
|
||||
``_process_message_background`` frames at handler entry across a chain
|
||||
of N follow-ups. Recursion makes depth grow linearly (1, 2, 3, …, N);
|
||||
task spawning keeps it constant (1 every time).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import (
|
||||
BasePlatformAdapter,
|
||||
MessageEvent,
|
||||
MessageType,
|
||||
)
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
class _StubAdapter(BasePlatformAdapter):
|
||||
async def connect(self):
|
||||
pass
|
||||
|
||||
async def disconnect(self):
|
||||
pass
|
||||
|
||||
async def send(self, chat_id, text, **kwargs):
|
||||
return None
|
||||
|
||||
async def get_chat_info(self, chat_id):
|
||||
return {}
|
||||
|
||||
|
||||
def _make_adapter():
|
||||
adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM)
|
||||
adapter._send_with_retry = AsyncMock(return_value=None)
|
||||
return adapter
|
||||
|
||||
|
||||
def _make_event(text="hi", chat_id="42"):
|
||||
return MessageEvent(
|
||||
text=text,
|
||||
message_type=MessageType.TEXT,
|
||||
source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"),
|
||||
)
|
||||
|
||||
|
||||
def _sk(chat_id="42"):
|
||||
return build_session_key(
|
||||
SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm")
|
||||
)
|
||||
|
||||
|
||||
def _count_pmb_frames() -> int:
|
||||
"""Walk the current call stack and count nested
|
||||
``_process_message_background`` frames. Used to detect recursive
|
||||
in-band drains."""
|
||||
f = sys._getframe()
|
||||
n = 0
|
||||
while f is not None:
|
||||
if f.f_code.co_name == "_process_message_background":
|
||||
n += 1
|
||||
f = f.f_back
|
||||
return n
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_in_band_drain_does_not_grow_stack():
|
||||
"""Issue #17758: chained pending-message drains must not recurse.
|
||||
|
||||
Queue a fresh pending message inside each handler invocation so the
|
||||
in-band drain block fires for every turn in the chain. After N
|
||||
turns, the recorded stack depth at handler entry must stay bounded.
|
||||
Pre-fix, depths would be 1, 2, 3, …, N; post-fix, depths are 1
|
||||
every time because each drain runs in its own task.
|
||||
"""
|
||||
N = 12
|
||||
adapter = _make_adapter()
|
||||
sk = _sk()
|
||||
|
||||
depths: list[int] = []
|
||||
next_index = [1]
|
||||
|
||||
async def handler(event):
|
||||
depths.append(_count_pmb_frames())
|
||||
if next_index[0] < N:
|
||||
adapter._pending_messages[sk] = _make_event(text=f"M{next_index[0]}")
|
||||
next_index[0] += 1
|
||||
return "ok"
|
||||
|
||||
adapter._message_handler = handler
|
||||
|
||||
await adapter.handle_message(_make_event(text="M0"))
|
||||
|
||||
# Drain the chain. Each turn schedules the next via the in-band
|
||||
# drain block, so we wait until N handler runs have completed and
|
||||
# the session has been released.
|
||||
for _ in range(400):
|
||||
if len(depths) >= N and sk not in adapter._active_sessions:
|
||||
break
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
await adapter.cancel_background_tasks()
|
||||
|
||||
assert len(depths) == N, (
|
||||
f"expected {N} handler runs in the chain, got {len(depths)}: depths={depths!r}"
|
||||
)
|
||||
max_depth = max(depths)
|
||||
assert max_depth <= 2, (
|
||||
f"in-band drain is recursing instead of spawning a fresh task — "
|
||||
f"stack depth grew with chain length: {depths!r}"
|
||||
)
|
||||
Loading…
Reference in New Issue
Block a user