From dcd7b717f8efd75b9c69a11038149c1d865806fe Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 28 Apr 2026 22:17:33 -0700 Subject: [PATCH] fix(gateway): linearize tool-progress bubbles with content messages (#17280) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After PR #7885 (97b0cd51e) added content-side segment breaks for natural mid-turn assistant messages, the tool-progress task in gateway/run.py was not updated to match. progress_msg_id and progress_lines persisted for the whole run, so after a tool batch produced bubble B1 followed by content bubble C1, the next tool.started kept editing the OLD bubble B1 above C1 — making the chat appear out of order on Telegram, Discord, and Slack. Add on_new_message callback to GatewayStreamConsumer, fired at the four sites where a fresh content bubble lands on the platform: - _send_or_edit first-send branch (NOT edits) - _send_commentary - _send_new_chunk (overflow split) - each successful chunk of _send_fallback_final Gateway supplies a lambda that enqueues ('__reset__',) into the progress_queue. send_progress_messages() handles the marker in both the main loop and the CancelledError drain path, clearing progress_msg_id, progress_lines, and the dedup state so the next tool.started opens a fresh bubble below the new content. Result: each tool batch appears in chronological order below the preceding content. When no content appears between tool batches, tools still group in one bubble (CLI-style compactness). Co-authored-by: teknium1 --- gateway/run.py | 37 ++++++ gateway/stream_consumer.py | 35 ++++++ tests/gateway/test_stream_consumer.py | 156 ++++++++++++++++++++++++++ 3 files changed, 228 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index c759cb4d..6a143411 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -10221,6 +10221,20 @@ class GatewayRunner: if progress_lines: progress_lines[-1] = f"{base_msg} (×{count + 1})" msg = progress_lines[-1] if progress_lines else base_msg + elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__": + # Content bubble just landed on the platform — close off + # the current tool-progress bubble so the next tool + # starts a fresh bubble below the content. Without this, + # tool lines keep editing the ORIGINAL progress message + # above the new content, making the chat appear out of + # order. Mirrors GatewayStreamConsumer.on_segment_break + # on the content side. (Issue: tool + content + # linearization regression after PR #7885.) + progress_msg_id = None + progress_lines = [] + last_progress_msg[0] = None + repeat_count[0] = 0 + continue else: msg = raw progress_lines.append(msg) @@ -10290,6 +10304,24 @@ class GatewayRunner: _, base_msg, count = raw if progress_lines: progress_lines[-1] = f"{base_msg} (×{count + 1})" + elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__": + # Content-bubble marker during drain: close off + # the current progress bubble and start a fresh + # one for any tool lines that arrived after. + if can_edit and progress_lines and progress_msg_id: + _pending_text = "\n".join(progress_lines) + try: + await adapter.edit_message( + chat_id=source.chat_id, + message_id=progress_msg_id, + content=_pending_text, + ) + except Exception: + pass + progress_msg_id = None + progress_lines = [] + last_progress_msg[0] = None + repeat_count[0] = 0 else: progress_lines.append(raw) except Exception: @@ -10495,6 +10527,11 @@ class GatewayRunner: chat_id=source.chat_id, config=_consumer_cfg, metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None, + on_new_message=( + (lambda: progress_queue.put(("__reset__",))) + if progress_queue is not None + else None + ), ) if _want_stream_deltas: def _stream_delta_cb(text: str) -> None: diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 1adbdd3a..c0ab9071 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -91,11 +91,20 @@ class GatewayStreamConsumer: chat_id: str, config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, + on_new_message: Optional[callable] = None, ): self.adapter = adapter self.chat_id = chat_id self.cfg = config or StreamConsumerConfig() self.metadata = metadata + # Fired whenever a fresh content bubble is created on the platform + # (first-send of a new message, commentary, overflow chunk, or + # fallback continuation). The gateway uses this to linearize the + # tool-progress bubble: when content resumes after a tool batch, + # the next tool.started should open a NEW progress bubble below + # the content, not edit the old bubble above it. + # Called with no arguments. Exceptions are swallowed. + self._on_new_message = on_new_message self._queue: queue.Queue = queue.Queue() self._accumulated = "" self._message_id: Optional[str] = None @@ -146,6 +155,16 @@ class GatewayStreamConsumer: if text: self._queue.put((_COMMENTARY, text)) + def _notify_new_message(self) -> None: + """Fire the on_new_message callback, swallowing any errors.""" + cb = self._on_new_message + if cb is None: + return + try: + cb() + except Exception: + logger.debug("on_new_message callback error", exc_info=True) + def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None: if preserve_no_edit and self._message_id == "__no_edit__": return @@ -529,6 +548,9 @@ class GatewayStreamConsumer: self._message_id = str(result.message_id) self._already_sent = True self._last_sent_text = text + # Fresh content bubble — close off any stale tool bubble + # above so the next tool starts a new bubble below. + self._notify_new_message() return str(result.message_id) else: self._edit_supported = False @@ -661,6 +683,9 @@ class GatewayStreamConsumer: sent_any_chunk = True last_successful_chunk = chunk last_message_id = result.message_id or last_message_id + # Each fallback chunk is a fresh platform message — notify + # so any stale tool-progress bubble gets closed off. + self._notify_new_message() self._message_id = last_message_id self._already_sent = True @@ -744,6 +769,11 @@ class GatewayStreamConsumer: # tool..."), not the final response. Setting already_sent would cause # the final response to be incorrectly suppressed when there are # multiple tool calls. See: https://github.com/NousResearch/hermes-agent/issues/10454 + if result.success: + # Commentary counts as fresh content — close off any + # stale tool bubble above it so the next tool starts a + # new bubble below. + self._notify_new_message() return result.success except Exception as e: logger.error("Commentary send error: %s", e) @@ -973,6 +1003,11 @@ class GatewayStreamConsumer: # every delta/tool boundary when platforms accept a # message but do not return an editable message id. self._message_id = "__no_edit__" + # Notify the gateway that a fresh content bubble was + # created so any accumulated tool-progress bubble above + # gets closed off — the next tool fires into a new + # bubble below, preserving chronological order. + self._notify_new_message() return True else: # Initial send failed — disable streaming for this session diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 7ae587da..6878ddca 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -1337,3 +1337,159 @@ class TestCursorStrippingOnFallback: assert consumer._already_sent is True # _last_sent_text must NOT be updated when the edit failed assert consumer._last_sent_text == "Hello ▉" + + +# ── on_new_message callback (tool-progress linearization) ───────────── + + +class TestOnNewMessageCallback: + """The on_new_message callback fires whenever a fresh content bubble + lands on the platform. Gateway uses this to close off the current + tool-progress bubble so the next tool.started opens a new bubble + below the content — preserving chronological order in the chat. + + Before this callback existed (post PR #7885), content messages got + their own bubbles after segment breaks, but the tool-progress task + kept editing the ORIGINAL progress bubble above all new content. + Result: tool lines appeared stacked in the upper bubble while + content messages lined up below, making the timeline look scrambled. + """ + + @pytest.mark.asyncio + async def test_callback_fires_on_first_send(self): + """First-send of a new content bubble fires on_new_message.""" + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + events = [] + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer( + adapter, "chat", config, + on_new_message=lambda: events.append("reset"), + ) + + consumer.on_delta("Hello") + consumer.finish() + await consumer.run() + + assert events == ["reset"] + + @pytest.mark.asyncio + async def test_callback_fires_once_per_segment(self): + """A new first-send fires the callback again after segment break.""" + adapter = MagicMock() + msg_counter = iter(["msg_1", "msg_2", "msg_3"]) + adapter.send = AsyncMock( + side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter)) + ) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + events = [] + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer( + adapter, "chat", config, + on_new_message=lambda: events.append("reset"), + ) + + consumer.on_delta("A") + consumer.on_delta(None) + consumer.on_delta("B") + consumer.on_delta(None) + consumer.on_delta("C") + consumer.finish() + await consumer.run() + + # Three content bubbles ⇒ three reset notifications + assert events == ["reset", "reset", "reset"] + + @pytest.mark.asyncio + async def test_callback_not_fired_on_edit(self): + """Subsequent edits of the same bubble do NOT fire the callback.""" + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + events = [] + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer( + adapter, "chat", config, + on_new_message=lambda: events.append("reset"), + ) + + consumer.on_delta("Hello") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.05) + consumer.on_delta(" world") + await asyncio.sleep(0.05) + consumer.on_delta(" more") + await asyncio.sleep(0.05) + consumer.finish() + await task + + # Only one first-send happened; edits do not re-fire. + assert events == ["reset"] + + @pytest.mark.asyncio + async def test_callback_fires_on_commentary(self): + """Commentary messages are fresh bubbles too — fire the callback.""" + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + events = [] + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer( + adapter, "chat", config, + on_new_message=lambda: events.append("reset"), + ) + + consumer.on_commentary("I'll search for that first.") + consumer.finish() + await consumer.run() + + assert events == ["reset"] + + @pytest.mark.asyncio + async def test_callback_error_swallowed(self): + """Exceptions in the callback do not crash the consumer.""" + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + def raiser(): + raise RuntimeError("boom") + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer( + adapter, "chat", config, + on_new_message=raiser, + ) + + consumer.on_delta("Hello") + consumer.finish() + await consumer.run() # must not raise + + assert consumer.already_sent is True + + @pytest.mark.asyncio + async def test_no_callback_when_none(self): + """Consumer works correctly when on_new_message is None (default).""" + adapter = MagicMock() + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1) + consumer = GatewayStreamConsumer(adapter, "chat", config) # no callback + + consumer.on_delta("Hello") + consumer.finish() + await consumer.run() + + assert consumer.already_sent is True