fix: keep rapid telegram follow-ups from getting cut off

This commit is contained in:
Peter Berthelsen 2026-04-14 16:27:12 -04:00 committed by Teknium
parent 12b109b664
commit 9a9b8cd1e4
3 changed files with 152 additions and 14 deletions

View File

@ -734,25 +734,56 @@ def merge_pending_message_event(
pending_messages: Dict[str, MessageEvent],
session_key: str,
event: MessageEvent,
*,
merge_text: bool = False,
) -> None:
"""Store or merge a pending event for a session.
Photo bursts/albums often arrive as multiple near-simultaneous PHOTO
events. Merge those into the existing queued event so the next turn sees
the whole burst, while non-photo follow-ups still replace the pending
event normally.
the whole burst.
When ``merge_text`` is enabled, rapid follow-up TEXT events are appended
instead of replacing the pending turn. This is used for Telegram bursty
follow-ups so a multi-part user thought is not silently truncated to only
the last queued fragment.
"""
existing = pending_messages.get(session_key)
if (
existing
and getattr(existing, "message_type", None) == MessageType.PHOTO
and event.message_type == MessageType.PHOTO
):
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text)
return
if existing:
existing_is_photo = getattr(existing, "message_type", None) == MessageType.PHOTO
incoming_is_photo = event.message_type == MessageType.PHOTO
existing_has_media = bool(existing.media_urls)
incoming_has_media = bool(event.media_urls)
if existing_is_photo and incoming_is_photo:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text)
return
if existing_has_media or incoming_has_media:
if incoming_has_media:
existing.media_urls.extend(event.media_urls)
existing.media_types.extend(event.media_types)
if event.text:
if existing.text:
existing.text = BasePlatformAdapter._merge_caption(existing.text, event.text)
else:
existing.text = event.text
if existing_is_photo or incoming_is_photo:
existing.message_type = MessageType.PHOTO
return
if (
merge_text
and getattr(existing, "message_type", None) == MessageType.TEXT
and event.message_type == MessageType.TEXT
):
if event.text:
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
return
pending_messages[session_key] = event

View File

@ -2922,6 +2922,32 @@ class GatewayRunner:
merge_pending_message_event(adapter._pending_messages, _quick_key, event)
return None
_telegram_followup_grace = float(
os.getenv("HERMES_TELEGRAM_FOLLOWUP_GRACE_SECONDS", "3.0")
)
_started_at = self._running_agents_ts.get(_quick_key, 0)
if (
source.platform == Platform.TELEGRAM
and event.message_type == MessageType.TEXT
and _telegram_followup_grace > 0
and _started_at
and (time.time() - _started_at) <= _telegram_followup_grace
):
logger.debug(
"Telegram follow-up arrived %.2fs after run start for %s — queueing without interrupt",
time.time() - _started_at,
_quick_key[:20],
)
adapter = self.adapters.get(source.platform)
if adapter:
merge_pending_message_event(
adapter._pending_messages,
_quick_key,
event,
merge_text=True,
)
return None
running_agent = self._running_agents.get(_quick_key)
if running_agent is _AGENT_PENDING_SENTINEL:
# Agent is being set up but not ready yet.
@ -2935,7 +2961,12 @@ class GatewayRunner:
# agent starts.
adapter = self.adapters.get(source.platform)
if adapter:
adapter._pending_messages[_quick_key] = event
merge_pending_message_event(
adapter._pending_messages,
_quick_key,
event,
merge_text=True,
)
return None
if self._draining:
if self._queue_during_drain_enabled():

View File

@ -14,7 +14,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.platforms.base import MessageEvent, MessageType, merge_pending_message_event
from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL
from gateway.session import SessionSource, build_session_key
@ -184,6 +184,80 @@ async def test_second_message_during_sentinel_queued_not_duplicate():
await task1
def test_merge_pending_message_event_merges_text_and_photo_followups():
pending = {}
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="12345",
chat_type="dm",
user_id="u1",
)
session_key = build_session_key(source)
text_event = MessageEvent(
text="first follow-up",
message_type=MessageType.TEXT,
source=source,
)
photo_event = MessageEvent(
text="see screenshot",
message_type=MessageType.PHOTO,
source=source,
media_urls=["/tmp/test.png"],
media_types=["image/png"],
)
merge_pending_message_event(pending, session_key, text_event, merge_text=True)
merge_pending_message_event(pending, session_key, photo_event, merge_text=True)
merged = pending[session_key]
assert merged.message_type == MessageType.PHOTO
assert merged.text == "first follow-up\n\nsee screenshot"
assert merged.media_urls == ["/tmp/test.png"]
assert merged.media_types == ["image/png"]
@pytest.mark.asyncio
async def test_recent_telegram_text_followup_is_queued_without_interrupt():
runner = _make_runner()
event = _make_event(text="follow-up")
session_key = build_session_key(event.source)
fake_agent = MagicMock()
fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0}
runner._running_agents[session_key] = fake_agent
import time as _time
runner._running_agents_ts[session_key] = _time.time()
result = await runner._handle_message(event)
assert result is None
fake_agent.interrupt.assert_not_called()
adapter = runner.adapters[Platform.TELEGRAM]
assert adapter._pending_messages[session_key].text == "follow-up"
@pytest.mark.asyncio
async def test_recent_telegram_followups_append_in_pending_queue():
runner = _make_runner()
first = _make_event(text="part one")
second = _make_event(text="part two")
session_key = build_session_key(first.source)
fake_agent = MagicMock()
fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0}
runner._running_agents[session_key] = fake_agent
import time as _time
runner._running_agents_ts[session_key] = _time.time()
await runner._handle_message(first)
await runner._handle_message(second)
fake_agent.interrupt.assert_not_called()
adapter = runner.adapters[Platform.TELEGRAM]
assert adapter._pending_messages[session_key].text == "part one\npart two"
# ------------------------------------------------------------------
# Test 5: Sentinel not placed for command messages
# ------------------------------------------------------------------
@ -273,6 +347,7 @@ async def test_stop_hard_kills_running_agent():
# Simulate a running (possibly hung) agent
fake_agent = MagicMock()
fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0}
runner._running_agents[session_key] = fake_agent
# Send /stop
@ -305,6 +380,7 @@ async def test_stop_clears_pending_messages():
)
fake_agent = MagicMock()
fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0}
runner._running_agents[session_key] = fake_agent
runner._pending_messages[session_key] = "some queued text"