fix(gateway): snapshot callback generation after agent binds it, not before
_process_message_background snapshotted callback_generation from the
interrupt event at the TOP of the task — before the handler ran.
_hermes_run_generation is only set on the event by
GatewayRunner._bind_adapter_run_generation during
_handle_message_with_agent, which runs DURING the handler await. The
early snapshot always captured None, which then flowed into
pop_post_delivery_callback(..., generation=None) in the finally block.
In pop_post_delivery_callback, generation=None with a tuple-registered
entry (generation, callback) bypasses the ownership check — it pops and
fires the callback regardless of which run owns it. Result: a stale run
could fire a fresher run's post-delivery callback (e.g. a
background-review notification attributed to the wrong turn).
Fix: move the snapshot into the finally block, after the handler has
run and _hermes_run_generation has been bound to the current run.
Regression test added: simulates a stale handler at generation=1 and a
fresher callback registered at generation=2. Pre-fix: snapshot=None →
pop fires the generation=2 callback under generation=1's ownership
("newer" fires). Post-fix: snapshot=1 → pop skips the mismatched
entry, callback stays in the dict for the correct run to claim.
Verified: test FAILS on current main (captures "newer" in fired list),
PASSES with this fix.
Salvaged from PR #12565 (the callback-ownership portion only; the
/status totals portion was already fixed on main in 7abc9ce4d via #17158).
Co-authored-by: Oxidane-bot <1317078257maroon@gmail.com>
This commit is contained in:
parent
27ec74c68a
commit
8d7500d80d
@ -2521,7 +2521,6 @@ class BasePlatformAdapter(ABC):
|
||||
# Fall back to a new Event only if the entry was removed externally.
|
||||
interrupt_event = self._active_sessions.get(session_key) or asyncio.Event()
|
||||
self._active_sessions[session_key] = interrupt_event
|
||||
callback_generation = getattr(interrupt_event, "_hermes_run_generation", None)
|
||||
|
||||
# Start continuous typing indicator (refreshes every 2 seconds)
|
||||
_thread_metadata = {"thread_id": event.source.thread_id} if event.source.thread_id else None
|
||||
@ -2820,7 +2819,20 @@ class BasePlatformAdapter(ABC):
|
||||
finally:
|
||||
# Fire any one-shot post-delivery callback registered for this
|
||||
# session (e.g. deferred background-review notifications).
|
||||
_callback_generation = callback_generation
|
||||
#
|
||||
# Snapshot the callback generation HERE (after the agent has run),
|
||||
# not at the top of this task. _hermes_run_generation is set on
|
||||
# the interrupt event by GatewayRunner._bind_adapter_run_generation
|
||||
# during _handle_message_with_agent — which happens DURING the
|
||||
# self._message_handler(event) await above. Snapshotting earlier
|
||||
# always captured None, which bypassed the generation-ownership
|
||||
# check in pop_post_delivery_callback and let stale runs fire a
|
||||
# fresher run's callbacks.
|
||||
_callback_generation = getattr(
|
||||
interrupt_event,
|
||||
"_hermes_run_generation",
|
||||
None,
|
||||
)
|
||||
if hasattr(self, "pop_post_delivery_callback"):
|
||||
_post_cb = self.pop_post_delivery_callback(
|
||||
session_key,
|
||||
|
||||
@ -298,6 +298,7 @@ AUTHOR_MAP = {
|
||||
"154585401+LeonSGP43@users.noreply.github.com": "LeonSGP43",
|
||||
"12250313+Kailigithub@users.noreply.github.com": "Kailigithub",
|
||||
"mgparkprint@gmail.com": "vlwkaos",
|
||||
"1317078257maroon@gmail.com": "Oxidane-bot",
|
||||
"tranquil_flow@protonmail.com": "Tranquil-Flow",
|
||||
"LyleLengyel@gmail.com": "mcndjxlefnd",
|
||||
"wangshengyang2004@163.com": "Wangshengyang2004",
|
||||
@ -455,6 +456,7 @@ AUTHOR_MAP = {
|
||||
"taosiyuan163@153.com": "taosiyuan163",
|
||||
"tesseracttars@gmail.com": "tesseracttars-creator",
|
||||
"tianliangjay@gmail.com": "xingkongliang",
|
||||
"1317078257maroon@gmail.com": "Oxidane-bot",
|
||||
"tranquil_flow@protonmail.com": "Tranquil-Flow",
|
||||
"LyleLengyel@gmail.com": "mcndjxlefnd",
|
||||
"unayung@gmail.com": "Unayung",
|
||||
|
||||
@ -568,3 +568,68 @@ async def test_profile_command_reports_custom_root_profile(monkeypatch, tmp_path
|
||||
|
||||
assert "**Profile:** `coder`" in result
|
||||
assert f"**Home:** `{profile_home}`" in result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_post_delivery_callback_generation_snapshot_happens_after_bind():
|
||||
"""Regression: the callback_generation snapshot in _process_message_background
|
||||
must happen AFTER the handler runs, not before.
|
||||
|
||||
_hermes_run_generation is set on the interrupt event by
|
||||
GatewayRunner._bind_adapter_run_generation during _handle_message_with_agent.
|
||||
The earlier snapshot-at-task-start always captured None, which bypassed the
|
||||
generation-ownership check in pop_post_delivery_callback and let stale runs
|
||||
fire a fresher run's callbacks.
|
||||
"""
|
||||
import asyncio
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
|
||||
source = _make_source()
|
||||
session_key = build_session_key(source)
|
||||
fired = []
|
||||
|
||||
class _ConcreteAdapter(BasePlatformAdapter):
|
||||
platform = Platform.TELEGRAM
|
||||
|
||||
async def connect(self): pass
|
||||
async def disconnect(self): pass
|
||||
async def send(self, chat_id, content, **kwargs): pass
|
||||
async def get_chat_info(self, chat_id): return {}
|
||||
|
||||
adapter = _ConcreteAdapter(
|
||||
PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM
|
||||
)
|
||||
|
||||
async def fake_handler(event):
|
||||
# Simulate what _bind_adapter_run_generation does mid-run.
|
||||
interrupt_event = adapter._active_sessions.get(session_key)
|
||||
setattr(interrupt_event, "_hermes_run_generation", 1)
|
||||
# Stale run registers its callback at generation=1.
|
||||
adapter.register_post_delivery_callback(
|
||||
session_key,
|
||||
lambda: fired.append("older"),
|
||||
generation=1,
|
||||
)
|
||||
# A fresher run overwrites with generation=2 (different dict entry).
|
||||
adapter.register_post_delivery_callback(
|
||||
session_key,
|
||||
lambda: fired.append("newer"),
|
||||
generation=2,
|
||||
)
|
||||
return None
|
||||
|
||||
adapter.set_message_handler(fake_handler)
|
||||
event = MessageEvent(text="hello", source=source, message_id="m1")
|
||||
|
||||
await adapter.handle_message(event)
|
||||
tasks = list(adapter._background_tasks)
|
||||
assert tasks, "expected background task to be created"
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# The stale run (generation=1) must NOT fire the fresher run's callback
|
||||
# (generation=2). With the pre-fix code, callback_generation was snapshotted
|
||||
# as None before the handler ran, bypassing the ownership check and firing
|
||||
# "newer" anyway.
|
||||
assert fired == []
|
||||
assert session_key in adapter._post_delivery_callbacks
|
||||
assert adapter._post_delivery_callbacks[session_key][0] == 2
|
||||
|
||||
Loading…
Reference in New Issue
Block a user