fix(gateway): linearize tool-progress bubbles with content messages (#17280)

After PR #7885 (97b0cd51e) added content-side segment breaks for
natural mid-turn assistant messages, the tool-progress task in
gateway/run.py was not updated to match. progress_msg_id and
progress_lines persisted for the whole run, so after a tool batch
produced bubble B1 followed by content bubble C1, the next tool.started
kept editing the OLD bubble B1 above C1 — making the chat appear out
of order on Telegram, Discord, and Slack.

Add on_new_message callback to GatewayStreamConsumer, fired at the
four sites where a fresh content bubble lands on the platform:
  - _send_or_edit first-send branch (NOT edits)
  - _send_commentary
  - _send_new_chunk (overflow split)
  - each successful chunk of _send_fallback_final

Gateway supplies a lambda that enqueues ('__reset__',) into the
progress_queue. send_progress_messages() handles the marker in both
the main loop and the CancelledError drain path, clearing
progress_msg_id, progress_lines, and the dedup state so the next
tool.started opens a fresh bubble below the new content.

Result: each tool batch appears in chronological order below the
preceding content. When no content appears between tool batches,
tools still group in one bubble (CLI-style compactness).

Co-authored-by: teknium1 <teknium@users.noreply.github.com>
This commit is contained in:
Teknium 2026-04-28 22:17:33 -07:00 committed by GitHub
parent ac855bba0e
commit dcd7b717f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 228 additions and 0 deletions

View File

@ -10221,6 +10221,20 @@ class GatewayRunner:
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
msg = progress_lines[-1] if progress_lines else base_msg
elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__":
# Content bubble just landed on the platform — close off
# the current tool-progress bubble so the next tool
# starts a fresh bubble below the content. Without this,
# tool lines keep editing the ORIGINAL progress message
# above the new content, making the chat appear out of
# order. Mirrors GatewayStreamConsumer.on_segment_break
# on the content side. (Issue: tool + content
# linearization regression after PR #7885.)
progress_msg_id = None
progress_lines = []
last_progress_msg[0] = None
repeat_count[0] = 0
continue
else:
msg = raw
progress_lines.append(msg)
@ -10290,6 +10304,24 @@ class GatewayRunner:
_, base_msg, count = raw
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__":
# Content-bubble marker during drain: close off
# the current progress bubble and start a fresh
# one for any tool lines that arrived after.
if can_edit and progress_lines and progress_msg_id:
_pending_text = "\n".join(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=_pending_text,
)
except Exception:
pass
progress_msg_id = None
progress_lines = []
last_progress_msg[0] = None
repeat_count[0] = 0
else:
progress_lines.append(raw)
except Exception:
@ -10495,6 +10527,11 @@ class GatewayRunner:
chat_id=source.chat_id,
config=_consumer_cfg,
metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None,
on_new_message=(
(lambda: progress_queue.put(("__reset__",)))
if progress_queue is not None
else None
),
)
if _want_stream_deltas:
def _stream_delta_cb(text: str) -> None:

View File

@ -91,11 +91,20 @@ class GatewayStreamConsumer:
chat_id: str,
config: Optional[StreamConsumerConfig] = None,
metadata: Optional[dict] = None,
on_new_message: Optional[callable] = None,
):
self.adapter = adapter
self.chat_id = chat_id
self.cfg = config or StreamConsumerConfig()
self.metadata = metadata
# Fired whenever a fresh content bubble is created on the platform
# (first-send of a new message, commentary, overflow chunk, or
# fallback continuation). The gateway uses this to linearize the
# tool-progress bubble: when content resumes after a tool batch,
# the next tool.started should open a NEW progress bubble below
# the content, not edit the old bubble above it.
# Called with no arguments. Exceptions are swallowed.
self._on_new_message = on_new_message
self._queue: queue.Queue = queue.Queue()
self._accumulated = ""
self._message_id: Optional[str] = None
@ -146,6 +155,16 @@ class GatewayStreamConsumer:
if text:
self._queue.put((_COMMENTARY, text))
def _notify_new_message(self) -> None:
"""Fire the on_new_message callback, swallowing any errors."""
cb = self._on_new_message
if cb is None:
return
try:
cb()
except Exception:
logger.debug("on_new_message callback error", exc_info=True)
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
if preserve_no_edit and self._message_id == "__no_edit__":
return
@ -529,6 +548,9 @@ class GatewayStreamConsumer:
self._message_id = str(result.message_id)
self._already_sent = True
self._last_sent_text = text
# Fresh content bubble — close off any stale tool bubble
# above so the next tool starts a new bubble below.
self._notify_new_message()
return str(result.message_id)
else:
self._edit_supported = False
@ -661,6 +683,9 @@ class GatewayStreamConsumer:
sent_any_chunk = True
last_successful_chunk = chunk
last_message_id = result.message_id or last_message_id
# Each fallback chunk is a fresh platform message — notify
# so any stale tool-progress bubble gets closed off.
self._notify_new_message()
self._message_id = last_message_id
self._already_sent = True
@ -744,6 +769,11 @@ class GatewayStreamConsumer:
# tool..."), not the final response. Setting already_sent would cause
# the final response to be incorrectly suppressed when there are
# multiple tool calls. See: https://github.com/NousResearch/hermes-agent/issues/10454
if result.success:
# Commentary counts as fresh content — close off any
# stale tool bubble above it so the next tool starts a
# new bubble below.
self._notify_new_message()
return result.success
except Exception as e:
logger.error("Commentary send error: %s", e)
@ -973,6 +1003,11 @@ class GatewayStreamConsumer:
# every delta/tool boundary when platforms accept a
# message but do not return an editable message id.
self._message_id = "__no_edit__"
# Notify the gateway that a fresh content bubble was
# created so any accumulated tool-progress bubble above
# gets closed off — the next tool fires into a new
# bubble below, preserving chronological order.
self._notify_new_message()
return True
else:
# Initial send failed — disable streaming for this session

View File

@ -1337,3 +1337,159 @@ class TestCursorStrippingOnFallback:
assert consumer._already_sent is True
# _last_sent_text must NOT be updated when the edit failed
assert consumer._last_sent_text == "Hello ▉"
# ── on_new_message callback (tool-progress linearization) ─────────────
class TestOnNewMessageCallback:
"""The on_new_message callback fires whenever a fresh content bubble
lands on the platform. Gateway uses this to close off the current
tool-progress bubble so the next tool.started opens a new bubble
below the content preserving chronological order in the chat.
Before this callback existed (post PR #7885), content messages got
their own bubbles after segment breaks, but the tool-progress task
kept editing the ORIGINAL progress bubble above all new content.
Result: tool lines appeared stacked in the upper bubble while
content messages lined up below, making the timeline look scrambled.
"""
@pytest.mark.asyncio
async def test_callback_fires_on_first_send(self):
"""First-send of a new content bubble fires on_new_message."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_fires_once_per_segment(self):
"""A new first-send fires the callback again after segment break."""
adapter = MagicMock()
msg_counter = iter(["msg_1", "msg_2", "msg_3"])
adapter.send = AsyncMock(
side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
)
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("A")
consumer.on_delta(None)
consumer.on_delta("B")
consumer.on_delta(None)
consumer.on_delta("C")
consumer.finish()
await consumer.run()
# Three content bubbles ⇒ three reset notifications
assert events == ["reset", "reset", "reset"]
@pytest.mark.asyncio
async def test_callback_not_fired_on_edit(self):
"""Subsequent edits of the same bubble do NOT fire the callback."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("Hello")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta(" world")
await asyncio.sleep(0.05)
consumer.on_delta(" more")
await asyncio.sleep(0.05)
consumer.finish()
await task
# Only one first-send happened; edits do not re-fire.
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_fires_on_commentary(self):
"""Commentary messages are fresh bubbles too — fire the callback."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_commentary("I'll search for that first.")
consumer.finish()
await consumer.run()
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_error_swallowed(self):
"""Exceptions in the callback do not crash the consumer."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
def raiser():
raise RuntimeError("boom")
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=raiser,
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run() # must not raise
assert consumer.already_sent is True
@pytest.mark.asyncio
async def test_no_callback_when_none(self):
"""Consumer works correctly when on_new_message is None (default)."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(adapter, "chat", config) # no callback
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert consumer.already_sent is True