fix(gateway): isolate pending native image paths by session
This commit is contained in:
parent
5ed27c0f74
commit
bdb7edd89e
@ -884,6 +884,7 @@ class GatewayRunner:
|
||||
# /new and /reset. /model and other mid-session operations
|
||||
# preserve the queue.
|
||||
self._queued_events: Dict[str, List[MessageEvent]] = {}
|
||||
self._pending_native_image_paths_by_session: Dict[str, List[str]] = {}
|
||||
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
|
||||
self._session_run_generation: Dict[str, int] = {}
|
||||
|
||||
@ -5079,22 +5080,30 @@ class GatewayRunner:
|
||||
preprocessing pipeline so sender attribution, image enrichment, STT,
|
||||
document notes, reply context, and @ references all behave the same.
|
||||
|
||||
Side effect: writes ``self._pending_native_image_paths`` to a list of
|
||||
local image paths when the active model supports native vision AND
|
||||
the user has images attached. The caller consumes and clears this
|
||||
attribute at the ``run_conversation`` site to build a multimodal user
|
||||
turn. When the list is empty, the ``_enrich_message_with_vision``
|
||||
text path has already run and images are represented in-text.
|
||||
Side effect: buffers per-session native image paths when the active
|
||||
model supports native vision AND the user has images attached. The
|
||||
caller consumes and clears that session-scoped buffer at the
|
||||
``run_conversation`` site to build a multimodal user turn. When the
|
||||
list is empty, the ``_enrich_message_with_vision`` text path has
|
||||
already run and images are represented in-text.
|
||||
"""
|
||||
history = history or []
|
||||
message_text = event.text or ""
|
||||
# Reset per-call buffer; set only when native routing is chosen.
|
||||
self._pending_native_image_paths = []
|
||||
_group_sessions_per_user = getattr(self.config, "group_sessions_per_user", True)
|
||||
_thread_sessions_per_user = getattr(self.config, "thread_sessions_per_user", False)
|
||||
session_key = build_session_key(
|
||||
source,
|
||||
group_sessions_per_user=_group_sessions_per_user,
|
||||
thread_sessions_per_user=_thread_sessions_per_user,
|
||||
)
|
||||
# Reset only this session's per-call buffer; other sessions may be
|
||||
# concurrently preparing multimodal turns on the same runner.
|
||||
self._consume_pending_native_image_paths(session_key)
|
||||
|
||||
_is_shared_multi_user = is_shared_multi_user_session(
|
||||
source,
|
||||
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
|
||||
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
|
||||
group_sessions_per_user=_group_sessions_per_user,
|
||||
thread_sessions_per_user=_thread_sessions_per_user,
|
||||
)
|
||||
if _is_shared_multi_user and source.user_name:
|
||||
message_text = f"[{source.user_name}] {message_text}"
|
||||
@ -5115,7 +5124,11 @@ class GatewayRunner:
|
||||
_img_mode = self._decide_image_input_mode()
|
||||
if _img_mode == "native":
|
||||
# Defer attachment to the run_conversation call site.
|
||||
self._pending_native_image_paths = list(image_paths)
|
||||
pending_native = getattr(self, "_pending_native_image_paths_by_session", None)
|
||||
if pending_native is None:
|
||||
pending_native = {}
|
||||
self._pending_native_image_paths_by_session = pending_native
|
||||
pending_native[session_key] = list(image_paths)
|
||||
logger.info(
|
||||
"Image routing: native (model supports vision). %d image(s) will be attached inline.",
|
||||
len(image_paths),
|
||||
@ -5254,6 +5267,12 @@ class GatewayRunner:
|
||||
|
||||
return message_text
|
||||
|
||||
def _consume_pending_native_image_paths(self, session_key: str) -> List[str]:
|
||||
pending_native = getattr(self, "_pending_native_image_paths_by_session", None)
|
||||
if not pending_native:
|
||||
return []
|
||||
return list(pending_native.pop(session_key, []) or [])
|
||||
|
||||
async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int):
|
||||
"""Inner handler that runs under the _running_agents sentinel guard."""
|
||||
_msg_start_time = time.time()
|
||||
@ -12136,8 +12155,7 @@ class GatewayRunner:
|
||||
# attachment, wrap the user turn as an OpenAI-style multimodal
|
||||
# content list. Consume-and-clear so subsequent turns on the same
|
||||
# runner instance don't re-attach stale images.
|
||||
_native_imgs = list(getattr(self, "_pending_native_image_paths", []) or [])
|
||||
self._pending_native_image_paths = []
|
||||
_native_imgs = self._consume_pending_native_image_paths(session_key)
|
||||
if _native_imgs:
|
||||
try:
|
||||
from agent.image_routing import build_native_content_parts
|
||||
|
||||
79
tests/gateway/test_native_image_buffer_isolation.py
Normal file
79
tests/gateway/test_native_image_buffer_isolation.py
Normal file
@ -0,0 +1,79 @@
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.run import GatewayRunner
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
def _make_runner() -> GatewayRunner:
|
||||
runner = GatewayRunner.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(
|
||||
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake")},
|
||||
)
|
||||
runner.adapters = {}
|
||||
runner._model = "openai/gpt-4.1-mini"
|
||||
runner._base_url = None
|
||||
runner._decide_image_input_mode = lambda: "native"
|
||||
return runner
|
||||
|
||||
|
||||
def _source(chat_id: str) -> SessionSource:
|
||||
return SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id=chat_id,
|
||||
chat_type="private",
|
||||
user_name=f"user-{chat_id}",
|
||||
)
|
||||
|
||||
|
||||
def _image_event(source: SessionSource, path: str) -> MessageEvent:
|
||||
return MessageEvent(
|
||||
text="see image",
|
||||
message_type=MessageType.PHOTO,
|
||||
source=source,
|
||||
media_urls=[path],
|
||||
media_types=["image/png"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_native_image_buffer_isolated_per_session():
|
||||
runner = _make_runner()
|
||||
source_a = _source("chat-a")
|
||||
source_b = _source("chat-b")
|
||||
|
||||
await runner._prepare_inbound_message_text(
|
||||
event=_image_event(source_a, "/tmp/a.png"),
|
||||
source=source_a,
|
||||
history=[],
|
||||
)
|
||||
await runner._prepare_inbound_message_text(
|
||||
event=_image_event(source_b, "/tmp/b.png"),
|
||||
source=source_b,
|
||||
history=[],
|
||||
)
|
||||
|
||||
assert runner._consume_pending_native_image_paths(build_session_key(source_a)) == ["/tmp/a.png"]
|
||||
assert runner._consume_pending_native_image_paths(build_session_key(source_b)) == ["/tmp/b.png"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_native_image_buffer_not_cleared_by_other_sessions_without_images():
|
||||
runner = _make_runner()
|
||||
source_a = _source("chat-a")
|
||||
source_b = _source("chat-b")
|
||||
|
||||
await runner._prepare_inbound_message_text(
|
||||
event=_image_event(source_a, "/tmp/a.png"),
|
||||
source=source_a,
|
||||
history=[],
|
||||
)
|
||||
await runner._prepare_inbound_message_text(
|
||||
event=MessageEvent(text="plain text", source=source_b),
|
||||
source=source_b,
|
||||
history=[],
|
||||
)
|
||||
|
||||
assert runner._consume_pending_native_image_paths(build_session_key(source_a)) == ["/tmp/a.png"]
|
||||
assert runner._consume_pending_native_image_paths(build_session_key(source_b)) == []
|
||||
Loading…
Reference in New Issue
Block a user