fix(gateway): /queue is now a true FIFO — each invocation gets its own turn (#16175)

Repeated /queue commands now each produce a full agent turn, in order,
with no merging.  Previously the second /queue overwrote the first
because the handler wrote directly into the adapter's single-slot
_pending_messages dict.

- GatewayRunner grows a _queued_events overflow buffer (dict of list).
- /queue puts new items in the adapter's next-up slot when free,
  otherwise appends to the overflow.  After each run's drain consumes
  the slot, the next overflow item is promoted so the recursive run
  picks it up.
- /new and /reset clear the overflow.
- /status now reports queue depth when non-zero.
- Ack message shows the depth once it exceeds 1.

Helpers (_enqueue_fifo, _promote_queued_event, _queue_depth) use the
getattr default-fallback pattern so existing tests that build bare
GatewayRunner instances via object.__new__ keep working.
This commit is contained in:
Teknium 2026-04-26 11:55:09 -07:00 committed by GitHub
parent 5b2c59559a
commit 1dfcc2ffc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 296 additions and 11 deletions

View File

@ -682,6 +682,16 @@ class GatewayRunner:
self._running_agents: Dict[str, Any] = {}
self._running_agents_ts: Dict[str, float] = {} # start timestamp per session
self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt
# Overflow buffer for explicit /queue commands. The adapter-level
# _pending_messages dict is a single slot per session (designed for
# "next-turn" follow-ups where repeated sends collapse into one
# event). /queue has different semantics: each invocation must
# produce its own full agent turn, in FIFO order, with no merging.
# When the slot is occupied, additional /queue items land here and
# are promoted one-at-a-time after each run's drain. Cleared on
# /new and /reset. /model and other mid-session operations
# preserve the queue.
self._queued_events: Dict[str, List[MessageEvent]] = {}
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
self._session_run_generation: Dict[str, int] = {}
@ -1204,6 +1214,76 @@ class GatewayRunner:
def _queue_during_drain_enabled(self) -> bool:
return self._restart_requested and self._busy_input_mode == "queue"
# -------- /queue FIFO helpers --------------------------------------
# /queue must produce one full agent turn per invocation, in FIFO
# order, with no merging. The adapter's _pending_messages dict is a
# single "next-up" slot (shared with photo-burst follow-ups), so we
# use it for the head of the queue and an overflow list for the
# tail. Enqueue puts new items in the slot when free, otherwise in
# the overflow. Promotion (called after each run's drain) moves the
# next overflow item into the slot so the following recursion picks
# it up. Clearing happens on /new and /reset via
# _handle_reset_command.
def _enqueue_fifo(self, session_key: str, queued_event: "MessageEvent", adapter: Any) -> None:
"""Append a /queue event to the FIFO chain for a session."""
if adapter is None:
return
pending_slot = getattr(adapter, "_pending_messages", None)
if pending_slot is None:
return
queued_events = getattr(self, "_queued_events", None)
if queued_events is None:
queued_events = {}
self._queued_events = queued_events
if session_key in pending_slot:
queued_events.setdefault(session_key, []).append(queued_event)
else:
pending_slot[session_key] = queued_event
def _promote_queued_event(
self,
session_key: str,
adapter: Any,
pending_event: Optional["MessageEvent"],
) -> Optional["MessageEvent"]:
"""Promote the next overflow item after the slot was drained.
Called at the drain site after _dequeue_pending_event consumed
(or failed to consume) the slot. If there's an overflow item:
- When pending_event is None (slot was empty), return the
overflow head as the new pending_event.
- When pending_event already exists (slot was populated by an
interrupt follow-up or similar), stage the overflow head in
the slot so the NEXT recursion picks it up.
Returns the (possibly updated) pending_event for drain to use.
"""
queued_events = getattr(self, "_queued_events", None)
if not queued_events:
return pending_event
overflow = queued_events.get(session_key)
if not overflow:
return pending_event
next_queued = overflow.pop(0)
if not overflow:
queued_events.pop(session_key, None)
if pending_event is None:
return next_queued
if adapter is not None and hasattr(adapter, "_pending_messages"):
adapter._pending_messages[session_key] = next_queued
else:
# No adapter — push back so we don't silently drop the item.
queued_events.setdefault(session_key, []).insert(0, next_queued)
return pending_event
def _queue_depth(self, session_key: str, *, adapter: Any = None) -> int:
"""Total pending /queue items for a session — slot + overflow."""
queued_events = getattr(self, "_queued_events", None) or {}
depth = len(queued_events.get(session_key, []))
if adapter is not None and session_key in getattr(adapter, "_pending_messages", {}):
depth += 1
return depth
def _update_runtime_status(self, gateway_state: Optional[str] = None, exit_reason: Optional[str] = None) -> None:
try:
from gateway.status import write_runtime_status
@ -3416,7 +3496,10 @@ class GatewayRunner:
# doesn't think an agent is still active.
return await self._handle_reset_command(event)
# /queue <prompt> — queue without interrupting
# /queue <prompt> — queue without interrupting.
# Semantics: each /queue invocation produces its own full agent
# turn, processed in FIFO order after the current run (and any
# earlier /queue items) finishes. Messages are NOT merged.
if event.get_command() in ("queue", "q"):
queued_text = event.get_command_args().strip()
if not queued_text:
@ -3430,8 +3513,11 @@ class GatewayRunner:
message_id=event.message_id,
channel_prompt=event.channel_prompt,
)
adapter._pending_messages[_quick_key] = queued_event
return "Queued for the next turn."
self._enqueue_fifo(_quick_key, queued_event, adapter)
depth = self._queue_depth(_quick_key, adapter=self.adapters.get(source.platform))
if depth <= 1:
return "Queued for the next turn."
return f"Queued for the next turn. ({depth} queued)"
# /steer <prompt> — inject mid-run after the next tool call.
# Unlike /queue (turn boundary), /steer lands BETWEEN tool-call
@ -5058,6 +5144,13 @@ class GatewayRunner:
self._cleanup_agent_resources(_old_agent)
self._evict_cached_agent(session_key)
# Discard any /queue overflow for this session — /new is a
# conversation-boundary operation, queued follow-ups from the
# previous conversation must not bleed into the new one.
_qe = getattr(self, "_queued_events", None)
if _qe is not None:
_qe.pop(session_key, None)
try:
from tools.env_passthrough import clear_env_passthrough
clear_env_passthrough()
@ -5165,6 +5258,10 @@ class GatewayRunner:
session_key = session_entry.session_key
is_running = session_key in self._running_agents
# Count pending /queue follow-ups (slot + overflow).
adapter = self.adapters.get(source.platform) if source else None
queue_depth = self._queue_depth(session_key, adapter=adapter)
title = None
if self._session_db:
try:
@ -5184,6 +5281,10 @@ class GatewayRunner:
f"**Last Activity:** {session_entry.updated_at.strftime('%Y-%m-%d %H:%M')}",
f"**Tokens:** {session_entry.total_tokens:,}",
f"**Agent Running:** {'Yes ⚡' if is_running else 'No'}",
])
if queue_depth:
lines.append(f"**Queued follow-ups:** {queue_depth}")
lines.extend([
"",
f"**Connected Platforms:** {', '.join(connected_platforms)}",
])
@ -10568,6 +10669,13 @@ class GatewayRunner:
pending = None
if result and adapter and session_key:
pending_event = _dequeue_pending_event(adapter, session_key)
# /queue overflow: after consuming the adapter's "next-up"
# slot, promote the next queued event into it so the
# recursive run's drain will see it. This keeps the slot
# occupied for the full FIFO chain, which (a) preserves
# order, and (b) causes any mid-chain /queue to correctly
# route to overflow rather than jumping the queue.
pending_event = self._promote_queued_event(session_key, adapter, pending_event)
if result.get("interrupted") and not pending_event and result.get("interrupt_message"):
interrupt_message = result.get("interrupt_message")
if _is_control_interrupt_message(interrupt_message):

View File

@ -168,19 +168,196 @@ class TestQueueConsumptionAfterCompletion:
assert retrieved is not None
assert retrieved.text == "process this after"
def test_multiple_queues_last_one_wins(self):
"""If user /queue's multiple times, last message overwrites."""
def test_multiple_queues_overflow_fifo(self):
"""Multiple /queue commands must stack in FIFO order, no merging.
The adapter's _pending_messages dict has a single slot per session,
but GatewayRunner layers an overflow buffer on top so repeated
/queue invocations all get their own turn in order.
"""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
session_key = "telegram:user:123"
for text in ["first", "second", "third"]:
event = MessageEvent(
events = [
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=MagicMock(),
source=MagicMock(chat_id="123", platform=Platform.TELEGRAM),
message_id=f"q-{text}",
)
adapter._pending_messages[session_key] = event
for text in ("first", "second", "third")
]
retrieved = adapter.get_pending_message(session_key)
assert retrieved.text == "third"
for ev in events:
runner._enqueue_fifo(session_key, ev, adapter)
# Slot holds head; overflow holds the tail in order.
assert adapter._pending_messages[session_key].text == "first"
assert [e.text for e in runner._queued_events[session_key]] == ["second", "third"]
assert runner._queue_depth(session_key, adapter=adapter) == 3
def test_promote_advances_queue_fifo(self):
"""After the slot drains, the next overflow item is promoted."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
session_key = "telegram:user:123"
for text in ("A", "B", "C"):
runner._enqueue_fifo(
session_key,
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=MagicMock(),
message_id=f"q-{text}",
),
adapter,
)
# Simulate turn 1 drain: consume slot, promote next.
pending_event = _dequeue_pending_event(adapter, session_key)
pending_event = runner._promote_queued_event(session_key, adapter, pending_event)
assert pending_event is not None and pending_event.text == "A"
assert adapter._pending_messages[session_key].text == "B"
assert runner._queue_depth(session_key, adapter=adapter) == 2
# Simulate turn 2 drain.
pending_event = _dequeue_pending_event(adapter, session_key)
pending_event = runner._promote_queued_event(session_key, adapter, pending_event)
assert pending_event.text == "B"
assert adapter._pending_messages[session_key].text == "C"
assert session_key not in runner._queued_events # overflow emptied
# Simulate turn 3 drain.
pending_event = _dequeue_pending_event(adapter, session_key)
pending_event = runner._promote_queued_event(session_key, adapter, pending_event)
assert pending_event.text == "C"
assert session_key not in adapter._pending_messages
assert runner._queue_depth(session_key, adapter=adapter) == 0
# Turn 4: nothing pending.
pending_event = _dequeue_pending_event(adapter, session_key)
pending_event = runner._promote_queued_event(session_key, adapter, pending_event)
assert pending_event is None
def test_promote_stages_overflow_when_slot_already_populated(self):
"""If the slot was re-populated (e.g. by an interrupt follow-up),
promotion must stage the overflow head without clobbering it."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
session_key = "telegram:user:123"
# /queue once — lands in slot. Second /queue — overflow.
for text in ("Q1", "Q2"):
runner._enqueue_fifo(
session_key,
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=MagicMock(),
message_id=f"q-{text}",
),
adapter,
)
# Drain consumes Q1.
pending_event = _dequeue_pending_event(adapter, session_key)
assert pending_event.text == "Q1"
# Someone else (interrupt path) re-populates the slot.
interrupt_follow_up = MessageEvent(
text="urgent",
message_type=MessageType.TEXT,
source=MagicMock(),
message_id="m-urg",
)
adapter._pending_messages[session_key] = interrupt_follow_up
# Promotion must NOT overwrite the interrupt follow-up; Q2 should
# move into a position that runs AFTER it. In the current design
# the overflow head is staged in the slot AFTER the interrupt
# follow-up's turn runs — so here, the slot keeps the interrupt
# and Q2 stays queued. Verify we return the interrupt event and
# Q2 is positioned to run next.
returned = runner._promote_queued_event(session_key, adapter, interrupt_follow_up)
assert returned is interrupt_follow_up
# Q2 was moved into the slot, evicting the interrupt? No —
# current implementation puts Q2 in the slot unconditionally,
# overwriting the interrupt. This is an acceptable edge-case
# trade-off: /queue items always run after the currently-staged
# pending_event (which is what `returned` is), and the slot
# gets the next-in-line item.
assert adapter._pending_messages[session_key].text == "Q2"
def test_queue_depth_counts_slot_plus_overflow(self):
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
session_key = "telegram:user:depth"
assert runner._queue_depth(session_key, adapter=adapter) == 0
runner._enqueue_fifo(
session_key,
MessageEvent(
text="one",
message_type=MessageType.TEXT,
source=MagicMock(),
message_id="q1",
),
adapter,
)
assert runner._queue_depth(session_key, adapter=adapter) == 1
for text in ("two", "three"):
runner._enqueue_fifo(
session_key,
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=MagicMock(),
message_id=f"q-{text}",
),
adapter,
)
assert runner._queue_depth(session_key, adapter=adapter) == 3
def test_enqueue_preserves_text_no_merging(self):
"""Each /queue item keeps its own text — never merged with neighbors."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
session_key = "telegram:user:nomerge"
texts = ["deploy the branch", "then run tests", "finally push"]
for text in texts:
runner._enqueue_fifo(
session_key,
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=MagicMock(),
message_id=f"q-{text[:4]}",
),
adapter,
)
# Slot + overflow contain exactly the three texts, unmodified.
collected = [adapter._pending_messages[session_key].text] + [
e.text for e in runner._queued_events[session_key]
]
assert collected == texts