The shutdown banner promised "send any message after restart to resume where you left off" but the code did the opposite: a drain-timeout restart skipped the .clean_shutdown marker, which made the next startup call suspend_recently_active(), which marked the session suspended, which made get_or_create_session() spawn a fresh session_id with a 'Session automatically reset. Use /resume...' notice — contradicting the banner. Introduce a resume_pending state on SessionEntry that is distinct from suspended. Drain-timeout shutdown flags active sessions resume_pending instead of letting startup-wide suspension destroy them. The next message on the same session_key preserves the session_id, reloads the transcript, and the agent receives a reason-aware restart-resume system note that subsumes the existing tool-tail auto-continue note (PR #9934). Terminal escalation still flows through the existing .restart_failure_counts stuck-loop counter (PR #7536, threshold 3) — no parallel counter on SessionEntry. suspended still wins over resume_pending in get_or_create_session() so genuinely stuck sessions converge to a clean slate. Spec: PR #11852 (BrennerSpear). Implementation follows the spec with the approved correction (reuse .restart_failure_counts rather than adding a resume_attempts field). Changes: - gateway/session.py: SessionEntry.resume_pending/resume_reason/ last_resume_marked_at + to_dict/from_dict; SessionStore .mark_resume_pending()/clear_resume_pending(); get_or_create_session() returns existing entry when resume_pending (suspended still wins); suspend_recently_active() skips resume_pending entries. - gateway/run.py: _stop_impl() drain-timeout branch marks active sessions resume_pending before _interrupt_running_agents(); _run_agent() injects reason-aware restart-resume system note that subsumes the tool-tail case; successful-turn cleanup also clears resume_pending next to _clear_restart_failure_count(); _notify_active_sessions_of_shutdown() softens the restart banner to 'I'll try to resume where you left off' (honest about stuck-loop escalation). - tests/gateway/test_restart_resume_pending.py: 29 new tests covering SessionEntry roundtrip, mark/clear helpers, get_or_create_session precedence (suspended > resume_pending), suspend_recently_active skip, drain-timeout mark reason (restart vs shutdown), system-note injection decision tree (including tool-tail subsumption), banner wording, and stuck-loop escalation override.
This commit is contained in:
parent
ad99e32371
commit
cb4addacab
@ -1539,7 +1539,7 @@ class GatewayRunner:
|
||||
action = "restarting" if self._restart_requested else "shutting down"
|
||||
hint = (
|
||||
"Your current task will be interrupted. "
|
||||
"Send any message after restart to resume where it left off."
|
||||
"Send any message after restart and I'll try to resume where you left off."
|
||||
if self._restart_requested
|
||||
else "Your current task will be interrupted."
|
||||
)
|
||||
@ -2373,6 +2373,27 @@ class GatewayRunner:
|
||||
timeout,
|
||||
self._running_agent_count(),
|
||||
)
|
||||
# Mark forcibly-interrupted sessions as resume_pending BEFORE
|
||||
# interrupting the agents. This preserves each session's
|
||||
# session_id + transcript so the next message on the same
|
||||
# session_key auto-resumes from the existing conversation
|
||||
# instead of getting routed through suspend_recently_active()
|
||||
# and converted into a fresh session. Terminal escalation
|
||||
# for genuinely stuck sessions still flows through the
|
||||
# existing ``.restart_failure_counts`` stuck-loop counter
|
||||
# (incremented below, threshold 3), which sets
|
||||
# ``suspended=True`` and overrides resume_pending.
|
||||
_resume_reason = (
|
||||
"restart_timeout" if self._restart_requested else "shutdown_timeout"
|
||||
)
|
||||
for _sk in list(active_agents.keys()):
|
||||
try:
|
||||
self.session_store.mark_resume_pending(_sk, _resume_reason)
|
||||
except Exception as _e:
|
||||
logger.debug(
|
||||
"mark_resume_pending failed for %s: %s",
|
||||
_sk[:20], _e,
|
||||
)
|
||||
self._interrupt_running_agents(
|
||||
"Gateway restarting" if self._restart_requested else "Gateway shutting down"
|
||||
)
|
||||
@ -4152,8 +4173,20 @@ class GatewayRunner:
|
||||
# Successful turn — clear any stuck-loop counter for this session.
|
||||
# This ensures the counter only accumulates across CONSECUTIVE
|
||||
# restarts where the session was active (never completed).
|
||||
#
|
||||
# Also clear the resume_pending flag (set by drain-timeout
|
||||
# shutdown) — the turn ran to completion, so recovery
|
||||
# succeeded and subsequent messages should no longer receive
|
||||
# the restart-interruption system note.
|
||||
if session_key:
|
||||
self._clear_restart_failure_count(session_key)
|
||||
try:
|
||||
self.session_store.clear_resume_pending(session_key)
|
||||
except Exception as _e:
|
||||
logger.debug(
|
||||
"clear_resume_pending failed for %s: %s",
|
||||
session_key[:20], _e,
|
||||
)
|
||||
|
||||
# Surface error details when the agent failed silently (final_response=None)
|
||||
if not response and agent_result.get("failed"):
|
||||
@ -9427,7 +9460,40 @@ class GatewayRunner:
|
||||
# restart, crash, SIGTERM). Prepend a system note so the model
|
||||
# finishes processing the pending tool results before addressing
|
||||
# the user's new message. (#4493)
|
||||
if agent_history and agent_history[-1].get("role") == "tool":
|
||||
#
|
||||
# Session-level resume_pending (set on drain-timeout shutdown)
|
||||
# escalates the wording — the transcript's last role may be
|
||||
# anything (tool, assistant with unfinished work, etc.), so we
|
||||
# give a stronger, reason-aware instruction that subsumes the
|
||||
# tool-tail case.
|
||||
_resume_entry = None
|
||||
if session_key:
|
||||
try:
|
||||
_resume_entry = self.session_store._entries.get(session_key)
|
||||
except Exception:
|
||||
_resume_entry = None
|
||||
_is_resume_pending = bool(
|
||||
_resume_entry is not None and getattr(_resume_entry, "resume_pending", False)
|
||||
)
|
||||
|
||||
if _is_resume_pending:
|
||||
_reason = getattr(_resume_entry, "resume_reason", None) or "restart_timeout"
|
||||
_reason_phrase = (
|
||||
"a gateway restart"
|
||||
if _reason == "restart_timeout"
|
||||
else "a gateway shutdown"
|
||||
if _reason == "shutdown_timeout"
|
||||
else "a gateway interruption"
|
||||
)
|
||||
message = (
|
||||
f"[System note: Your previous turn in this session was interrupted "
|
||||
f"by {_reason_phrase}. The conversation history below is intact. "
|
||||
f"If it contains unfinished tool result(s), process them first and "
|
||||
f"summarize what was accomplished, then address the user's new "
|
||||
f"message below.]\n\n"
|
||||
+ message
|
||||
)
|
||||
elif agent_history and agent_history[-1].get("role") == "tool":
|
||||
message = (
|
||||
"[System note: Your previous turn was interrupted before you could "
|
||||
"process the last tool result(s). The conversation history contains "
|
||||
|
||||
@ -377,7 +377,19 @@ class SessionEntry:
|
||||
# this session (create a new session_id) so the user starts fresh.
|
||||
# Set by /stop to break stuck-resume loops (#7536).
|
||||
suspended: bool = False
|
||||
|
||||
|
||||
# When True the session was interrupted by a gateway restart/shutdown
|
||||
# drain timeout, but recovery is still expected. Unlike ``suspended``,
|
||||
# ``resume_pending`` preserves the existing session_id on next access —
|
||||
# the user stays on the same transcript and the agent auto-continues
|
||||
# from where it left off. Cleared after the next successful turn.
|
||||
# Escalation to ``suspended`` is handled by the existing
|
||||
# ``.restart_failure_counts`` stuck-loop counter (#7536), not by a
|
||||
# parallel counter on this entry.
|
||||
resume_pending: bool = False
|
||||
resume_reason: Optional[str] = None # e.g. "restart_timeout"
|
||||
last_resume_marked_at: Optional[datetime] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
result = {
|
||||
"session_key": self.session_key,
|
||||
@ -397,6 +409,13 @@ class SessionEntry:
|
||||
"cost_status": self.cost_status,
|
||||
"memory_flushed": self.memory_flushed,
|
||||
"suspended": self.suspended,
|
||||
"resume_pending": self.resume_pending,
|
||||
"resume_reason": self.resume_reason,
|
||||
"last_resume_marked_at": (
|
||||
self.last_resume_marked_at.isoformat()
|
||||
if self.last_resume_marked_at
|
||||
else None
|
||||
),
|
||||
}
|
||||
if self.origin:
|
||||
result["origin"] = self.origin.to_dict()
|
||||
@ -414,7 +433,15 @@ class SessionEntry:
|
||||
platform = Platform(data["platform"])
|
||||
except ValueError as e:
|
||||
logger.debug("Unknown platform value %r: %s", data["platform"], e)
|
||||
|
||||
|
||||
last_resume_marked_at = None
|
||||
_lrma = data.get("last_resume_marked_at")
|
||||
if _lrma:
|
||||
try:
|
||||
last_resume_marked_at = datetime.fromisoformat(_lrma)
|
||||
except (TypeError, ValueError):
|
||||
last_resume_marked_at = None
|
||||
|
||||
return cls(
|
||||
session_key=data["session_key"],
|
||||
session_id=data["session_id"],
|
||||
@ -434,6 +461,9 @@ class SessionEntry:
|
||||
cost_status=data.get("cost_status", "unknown"),
|
||||
memory_flushed=data.get("memory_flushed", False),
|
||||
suspended=data.get("suspended", False),
|
||||
resume_pending=data.get("resume_pending", False),
|
||||
resume_reason=data.get("resume_reason"),
|
||||
last_resume_marked_at=last_resume_marked_at,
|
||||
)
|
||||
|
||||
|
||||
@ -710,9 +740,23 @@ class SessionStore:
|
||||
entry = self._entries[session_key]
|
||||
|
||||
# Auto-reset sessions marked as suspended (e.g. after /stop
|
||||
# broke a stuck loop — #7536).
|
||||
# broke a stuck loop — #7536). ``suspended`` is the hard
|
||||
# forced-wipe signal and always wins over ``resume_pending``,
|
||||
# so repeated interrupted restarts that escalate via the
|
||||
# existing ``.restart_failure_counts`` stuck-loop counter
|
||||
# still converge to a clean slate.
|
||||
if entry.suspended:
|
||||
reset_reason = "suspended"
|
||||
elif entry.resume_pending:
|
||||
# Restart-interrupted session: preserve the session_id
|
||||
# and return the existing entry so the transcript
|
||||
# reloads intact. ``resume_pending`` is cleared after
|
||||
# the NEXT successful turn completes (not here), which
|
||||
# means a re-interrupted retry keeps trying — the
|
||||
# stuck-loop counter handles terminal escalation.
|
||||
entry.updated_at = now
|
||||
self._save()
|
||||
return entry
|
||||
else:
|
||||
reset_reason = self._should_reset(entry, source)
|
||||
if not reset_reason:
|
||||
@ -802,6 +846,55 @@ class SessionStore:
|
||||
return True
|
||||
return False
|
||||
|
||||
def mark_resume_pending(
|
||||
self,
|
||||
session_key: str,
|
||||
reason: str = "restart_timeout",
|
||||
) -> bool:
|
||||
"""Mark a session as resumable after a restart interruption.
|
||||
|
||||
Unlike ``suspend_session()``, this preserves the existing
|
||||
``session_id`` and the transcript. The next call to
|
||||
``get_or_create_session()`` for this key returns the same entry
|
||||
so the user auto-resumes on the same conversation lane.
|
||||
|
||||
Returns True if the session existed and was marked.
|
||||
"""
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
if session_key in self._entries:
|
||||
entry = self._entries[session_key]
|
||||
# Never override an explicit ``suspended`` — that is a hard
|
||||
# forced-wipe signal (from /stop or stuck-loop escalation).
|
||||
if entry.suspended:
|
||||
return False
|
||||
entry.resume_pending = True
|
||||
entry.resume_reason = reason
|
||||
entry.last_resume_marked_at = _now()
|
||||
self._save()
|
||||
return True
|
||||
return False
|
||||
|
||||
def clear_resume_pending(self, session_key: str) -> bool:
|
||||
"""Clear the resume-pending flag after a successful resumed turn.
|
||||
|
||||
Called from the gateway after ``run_conversation()`` returns a
|
||||
final response for a session that had ``resume_pending=True``,
|
||||
signalling that recovery succeeded.
|
||||
|
||||
Returns True if a flag was cleared.
|
||||
"""
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
entry = self._entries.get(session_key)
|
||||
if entry is None or not entry.resume_pending:
|
||||
return False
|
||||
entry.resume_pending = False
|
||||
entry.resume_reason = None
|
||||
entry.last_resume_marked_at = None
|
||||
self._save()
|
||||
return True
|
||||
|
||||
def prune_old_entries(self, max_age_days: int) -> int:
|
||||
"""Drop SessionEntry records older than max_age_days.
|
||||
|
||||
@ -861,6 +954,12 @@ class SessionStore:
|
||||
(#7536). Only suspends sessions updated within *max_age_seconds*
|
||||
to avoid resetting long-idle sessions that are harmless to resume.
|
||||
Returns the number of sessions that were suspended.
|
||||
|
||||
Entries flagged ``resume_pending=True`` are skipped — those were
|
||||
marked intentionally by the drain-timeout path as recoverable.
|
||||
Terminal escalation for genuinely stuck ``resume_pending`` sessions
|
||||
is handled by the existing ``.restart_failure_counts`` stuck-loop
|
||||
counter, which runs after this method on startup.
|
||||
"""
|
||||
from datetime import timedelta
|
||||
|
||||
@ -869,6 +968,8 @@ class SessionStore:
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
for entry in self._entries.values():
|
||||
if entry.resume_pending:
|
||||
continue
|
||||
if not entry.suspended and entry.updated_at >= cutoff:
|
||||
entry.suspended = True
|
||||
count += 1
|
||||
|
||||
610
tests/gateway/test_restart_resume_pending.py
Normal file
610
tests/gateway/test_restart_resume_pending.py
Normal file
@ -0,0 +1,610 @@
|
||||
"""Tests for the resume_pending session continuity path.
|
||||
|
||||
Covers the behaviour introduced to fix the ``Gateway shutting down ...
|
||||
task will be interrupted`` follow-up bug (spec: PR #11852, builds on
|
||||
PRs #9850, #9934, #7536):
|
||||
|
||||
1. When a gateway restart drain times out and agents are force-interrupted,
|
||||
the affected sessions are flagged ``resume_pending=True`` — not
|
||||
``suspended`` — so the next user message on the same session_key
|
||||
auto-resumes from the existing transcript instead of getting routed
|
||||
through ``suspend_recently_active()`` and converted into a fresh
|
||||
session.
|
||||
|
||||
2. ``suspended=True`` (from ``/stop`` or stuck-loop escalation) still
|
||||
wins over ``resume_pending`` — the forced-wipe path is preserved.
|
||||
|
||||
3. The restart-resume system note injected into the next user message is
|
||||
a superset of the existing tool-tail auto-continue note (from
|
||||
PR #9934), using session-entry metadata rather than just transcript
|
||||
shape so it fires even when the interrupted transcript does NOT end
|
||||
with a ``tool`` role.
|
||||
|
||||
4. The existing ``.restart_failure_counts`` stuck-loop counter from
|
||||
PR #7536 remains the single source of escalation — no parallel
|
||||
counter is added on ``SessionEntry``.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
from gateway.session import SessionEntry, SessionSource, SessionStore
|
||||
from tests.gateway.restart_test_helpers import (
|
||||
make_restart_runner,
|
||||
make_restart_source,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_source(platform=Platform.TELEGRAM, chat_id="123", user_id="u1"):
|
||||
return SessionSource(platform=platform, chat_id=chat_id, user_id=user_id)
|
||||
|
||||
|
||||
def _make_store(tmp_path):
|
||||
return SessionStore(sessions_dir=tmp_path, config=GatewayConfig())
|
||||
|
||||
|
||||
def _simulate_note_injection(
|
||||
agent_history: list,
|
||||
user_message: str,
|
||||
resume_entry: SessionEntry | None,
|
||||
) -> str:
|
||||
"""Mirror the note-injection logic in gateway/run.py _run_agent().
|
||||
|
||||
Matches the production code in the ``run_sync`` closure so we can
|
||||
test the decision tree without a full gateway runner.
|
||||
"""
|
||||
message = user_message
|
||||
is_resume_pending = bool(
|
||||
resume_entry is not None and getattr(resume_entry, "resume_pending", False)
|
||||
)
|
||||
|
||||
if is_resume_pending:
|
||||
reason = getattr(resume_entry, "resume_reason", None) or "restart_timeout"
|
||||
reason_phrase = (
|
||||
"a gateway restart"
|
||||
if reason == "restart_timeout"
|
||||
else "a gateway shutdown"
|
||||
if reason == "shutdown_timeout"
|
||||
else "a gateway interruption"
|
||||
)
|
||||
message = (
|
||||
f"[System note: Your previous turn in this session was interrupted "
|
||||
f"by {reason_phrase}. The conversation history below is intact. "
|
||||
f"If it contains unfinished tool result(s), process them first and "
|
||||
f"summarize what was accomplished, then address the user's new "
|
||||
f"message below.]\n\n"
|
||||
+ message
|
||||
)
|
||||
elif agent_history and agent_history[-1].get("role") == "tool":
|
||||
message = (
|
||||
"[System note: Your previous turn was interrupted before you could "
|
||||
"process the last tool result(s). The conversation history contains "
|
||||
"tool outputs you haven't responded to yet. Please finish processing "
|
||||
"those results and summarize what was accomplished, then address the "
|
||||
"user's new message below.]\n\n"
|
||||
+ message
|
||||
)
|
||||
return message
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SessionEntry field + serialization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSessionEntryResumeFields:
|
||||
def test_defaults(self):
|
||||
now = datetime.now()
|
||||
entry = SessionEntry(
|
||||
session_key="agent:main:telegram:dm:1",
|
||||
session_id="sid",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
)
|
||||
assert entry.resume_pending is False
|
||||
assert entry.resume_reason is None
|
||||
assert entry.last_resume_marked_at is None
|
||||
|
||||
def test_roundtrip_with_resume_fields(self):
|
||||
now = datetime(2026, 4, 18, 12, 0, 0)
|
||||
entry = SessionEntry(
|
||||
session_key="agent:main:telegram:dm:1",
|
||||
session_id="sid",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
resume_pending=True,
|
||||
resume_reason="restart_timeout",
|
||||
last_resume_marked_at=now,
|
||||
)
|
||||
restored = SessionEntry.from_dict(entry.to_dict())
|
||||
assert restored.resume_pending is True
|
||||
assert restored.resume_reason == "restart_timeout"
|
||||
assert restored.last_resume_marked_at == now
|
||||
|
||||
def test_from_dict_legacy_without_resume_fields(self):
|
||||
"""Old sessions.json without the new fields deserialize cleanly."""
|
||||
now = datetime.now()
|
||||
legacy = {
|
||||
"session_key": "agent:main:telegram:dm:1",
|
||||
"session_id": "sid",
|
||||
"created_at": now.isoformat(),
|
||||
"updated_at": now.isoformat(),
|
||||
"chat_type": "dm",
|
||||
}
|
||||
restored = SessionEntry.from_dict(legacy)
|
||||
assert restored.resume_pending is False
|
||||
assert restored.resume_reason is None
|
||||
assert restored.last_resume_marked_at is None
|
||||
|
||||
def test_malformed_timestamp_is_tolerated(self):
|
||||
now = datetime.now()
|
||||
data = {
|
||||
"session_key": "k",
|
||||
"session_id": "sid",
|
||||
"created_at": now.isoformat(),
|
||||
"updated_at": now.isoformat(),
|
||||
"resume_pending": True,
|
||||
"resume_reason": "restart_timeout",
|
||||
"last_resume_marked_at": "not-a-timestamp",
|
||||
}
|
||||
restored = SessionEntry.from_dict(data)
|
||||
# resume_pending still honoured, only the broken timestamp drops
|
||||
assert restored.resume_pending is True
|
||||
assert restored.resume_reason == "restart_timeout"
|
||||
assert restored.last_resume_marked_at is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SessionStore.mark_resume_pending / clear_resume_pending
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMarkResumePending:
|
||||
def test_marks_existing_session(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
|
||||
assert store.mark_resume_pending(entry.session_key) is True
|
||||
refreshed = store._entries[entry.session_key]
|
||||
assert refreshed.resume_pending is True
|
||||
assert refreshed.resume_reason == "restart_timeout"
|
||||
assert refreshed.last_resume_marked_at is not None
|
||||
|
||||
def test_custom_reason_persists(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
|
||||
store.mark_resume_pending(entry.session_key, reason="shutdown_timeout")
|
||||
assert store._entries[entry.session_key].resume_reason == "shutdown_timeout"
|
||||
|
||||
def test_returns_false_for_unknown_key(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
assert store.mark_resume_pending("no-such-key") is False
|
||||
|
||||
def test_does_not_override_suspended(self, tmp_path):
|
||||
"""suspended wins — mark_resume_pending is a no-op on a suspended entry."""
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.suspend_session(entry.session_key)
|
||||
|
||||
assert store.mark_resume_pending(entry.session_key) is False
|
||||
e = store._entries[entry.session_key]
|
||||
assert e.suspended is True
|
||||
assert e.resume_pending is False
|
||||
|
||||
def test_survives_roundtrip_through_json(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.mark_resume_pending(entry.session_key, reason="restart_timeout")
|
||||
|
||||
# Reload from disk
|
||||
store2 = _make_store(tmp_path)
|
||||
store2._ensure_loaded()
|
||||
reloaded = store2._entries[entry.session_key]
|
||||
assert reloaded.resume_pending is True
|
||||
assert reloaded.resume_reason == "restart_timeout"
|
||||
|
||||
|
||||
class TestClearResumePending:
|
||||
def test_clears_flag(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.mark_resume_pending(entry.session_key)
|
||||
|
||||
assert store.clear_resume_pending(entry.session_key) is True
|
||||
e = store._entries[entry.session_key]
|
||||
assert e.resume_pending is False
|
||||
assert e.resume_reason is None
|
||||
assert e.last_resume_marked_at is None
|
||||
|
||||
def test_returns_false_when_not_pending(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
# Not marked
|
||||
assert store.clear_resume_pending(entry.session_key) is False
|
||||
|
||||
def test_returns_false_for_unknown_key(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
assert store.clear_resume_pending("no-such-key") is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SessionStore.get_or_create_session resume_pending behaviour
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetOrCreateResumePending:
|
||||
def test_resume_pending_preserves_session_id(self, tmp_path):
|
||||
"""This is THE core behavioural fix — resume_pending ≠ new session."""
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
first = store.get_or_create_session(source)
|
||||
original_sid = first.session_id
|
||||
store.mark_resume_pending(first.session_key)
|
||||
|
||||
second = store.get_or_create_session(source)
|
||||
assert second.session_id == original_sid
|
||||
assert second.was_auto_reset is False
|
||||
assert second.auto_reset_reason is None
|
||||
# Flag is NOT cleared on read — only on successful turn completion.
|
||||
assert second.resume_pending is True
|
||||
|
||||
def test_suspended_still_creates_new_session(self, tmp_path):
|
||||
"""Regression guard — suspended must still force a clean slate."""
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
first = store.get_or_create_session(source)
|
||||
original_sid = first.session_id
|
||||
store.suspend_session(first.session_key)
|
||||
|
||||
second = store.get_or_create_session(source)
|
||||
assert second.session_id != original_sid
|
||||
assert second.was_auto_reset is True
|
||||
assert second.auto_reset_reason == "suspended"
|
||||
|
||||
def test_suspended_overrides_resume_pending(self, tmp_path):
|
||||
"""Terminal escalation: a session that somehow has BOTH flags must
|
||||
behave like ``suspended`` — forced wipe + auto_reset_reason."""
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
first = store.get_or_create_session(source)
|
||||
original_sid = first.session_id
|
||||
|
||||
# Force the pathological state directly (normally mark_resume_pending
|
||||
# refuses to run when suspended=True, but a stuck-loop escalation
|
||||
# can set suspended=True AFTER resume_pending is set).
|
||||
with store._lock:
|
||||
e = store._entries[first.session_key]
|
||||
e.resume_pending = True
|
||||
e.resume_reason = "restart_timeout"
|
||||
e.suspended = True
|
||||
store._save()
|
||||
|
||||
second = store.get_or_create_session(source)
|
||||
assert second.session_id != original_sid
|
||||
assert second.was_auto_reset is True
|
||||
assert second.auto_reset_reason == "suspended"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SessionStore.suspend_recently_active skip behaviour
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSuspendRecentlyActiveSkipsResumePending:
|
||||
def test_resume_pending_entries_not_suspended(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.mark_resume_pending(entry.session_key)
|
||||
|
||||
count = store.suspend_recently_active()
|
||||
assert count == 0
|
||||
e = store._entries[entry.session_key]
|
||||
assert e.suspended is False
|
||||
assert e.resume_pending is True
|
||||
|
||||
def test_non_resume_pending_still_suspended(self, tmp_path):
|
||||
"""Non-resume sessions still get the old crash-recovery suspension."""
|
||||
store = _make_store(tmp_path)
|
||||
source_a = _make_source(chat_id="a")
|
||||
source_b = _make_source(chat_id="b")
|
||||
entry_a = store.get_or_create_session(source_a)
|
||||
entry_b = store.get_or_create_session(source_b)
|
||||
store.mark_resume_pending(entry_a.session_key)
|
||||
|
||||
count = store.suspend_recently_active()
|
||||
assert count == 1
|
||||
assert store._entries[entry_a.session_key].suspended is False
|
||||
assert store._entries[entry_b.session_key].suspended is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Restart-resume system-note injection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestResumePendingSystemNote:
|
||||
def _pending_entry(self, reason="restart_timeout") -> SessionEntry:
|
||||
now = datetime.now()
|
||||
return SessionEntry(
|
||||
session_key="agent:main:telegram:dm:1",
|
||||
session_id="sid",
|
||||
created_at=now,
|
||||
updated_at=now,
|
||||
resume_pending=True,
|
||||
resume_reason=reason,
|
||||
last_resume_marked_at=now,
|
||||
)
|
||||
|
||||
def test_resume_pending_restart_note_mentions_restart(self):
|
||||
entry = self._pending_entry(reason="restart_timeout")
|
||||
result = _simulate_note_injection(
|
||||
agent_history=[{"role": "assistant", "content": "in progress"}],
|
||||
user_message="what happened?",
|
||||
resume_entry=entry,
|
||||
)
|
||||
assert "[System note:" in result
|
||||
assert "gateway restart" in result
|
||||
assert "what happened?" in result
|
||||
|
||||
def test_resume_pending_shutdown_note_mentions_shutdown(self):
|
||||
entry = self._pending_entry(reason="shutdown_timeout")
|
||||
result = _simulate_note_injection(
|
||||
agent_history=[{"role": "assistant", "content": "in progress"}],
|
||||
user_message="ping",
|
||||
resume_entry=entry,
|
||||
)
|
||||
assert "gateway shutdown" in result
|
||||
|
||||
def test_resume_pending_fires_without_tool_tail(self):
|
||||
"""Key improvement over PR #9934: the restart-resume note fires
|
||||
even when the transcript's last role is NOT ``tool``."""
|
||||
entry = self._pending_entry()
|
||||
history = [
|
||||
{"role": "user", "content": "run a long thing"},
|
||||
{"role": "assistant", "content": "ok, starting..."},
|
||||
]
|
||||
result = _simulate_note_injection(history, "ping", resume_entry=entry)
|
||||
assert "[System note:" in result
|
||||
assert "gateway restart" in result
|
||||
|
||||
def test_resume_pending_subsumes_tool_tail_note(self):
|
||||
"""When BOTH conditions are true, the restart-resume note wins —
|
||||
no duplicate notes."""
|
||||
entry = self._pending_entry()
|
||||
history = [
|
||||
{"role": "assistant", "content": None, "tool_calls": [
|
||||
{"id": "c1", "function": {"name": "x", "arguments": "{}"}},
|
||||
]},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "result"},
|
||||
]
|
||||
result = _simulate_note_injection(history, "ping", resume_entry=entry)
|
||||
assert result.count("[System note:") == 1
|
||||
assert "gateway restart" in result
|
||||
# Old tool-tail wording absent
|
||||
assert "haven't responded to yet" not in result
|
||||
|
||||
def test_no_resume_pending_preserves_tool_tail_note(self):
|
||||
"""Regression: the old PR #9934 tool-tail behaviour is unchanged."""
|
||||
history = [
|
||||
{"role": "assistant", "content": None, "tool_calls": [
|
||||
{"id": "c1", "function": {"name": "x", "arguments": "{}"}},
|
||||
]},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "result"},
|
||||
]
|
||||
result = _simulate_note_injection(history, "ping", resume_entry=None)
|
||||
assert "[System note:" in result
|
||||
assert "tool result" in result
|
||||
|
||||
def test_no_note_when_nothing_to_resume(self):
|
||||
history = [
|
||||
{"role": "user", "content": "hello"},
|
||||
{"role": "assistant", "content": "hi"},
|
||||
]
|
||||
result = _simulate_note_injection(history, "ping", resume_entry=None)
|
||||
assert result == "ping"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Drain-timeout path marks sessions resume_pending
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_timeout_marks_resume_pending():
|
||||
"""End-to-end: a drain timeout during gateway stop should flag every
|
||||
active session as resume_pending BEFORE the interrupt fires, so the
|
||||
next startup's suspend_recently_active() does not destroy them."""
|
||||
runner, adapter = make_restart_runner()
|
||||
adapter.disconnect = AsyncMock()
|
||||
runner._restart_drain_timeout = 0.05
|
||||
|
||||
running_agent = MagicMock()
|
||||
session_key_one = "agent:main:telegram:dm:A"
|
||||
session_key_two = "agent:main:telegram:dm:B"
|
||||
runner._running_agents = {
|
||||
session_key_one: running_agent,
|
||||
session_key_two: MagicMock(),
|
||||
}
|
||||
|
||||
# Plug a mock session_store that records marks.
|
||||
session_store = MagicMock()
|
||||
session_store.mark_resume_pending = MagicMock(return_value=True)
|
||||
runner.session_store = session_store
|
||||
|
||||
with patch("gateway.status.remove_pid_file"), patch(
|
||||
"gateway.status.write_runtime_status"
|
||||
):
|
||||
await runner.stop()
|
||||
|
||||
# Both active sessions were marked with the shutdown_timeout reason.
|
||||
calls = session_store.mark_resume_pending.call_args_list
|
||||
marked = {args[0][0] for args in calls}
|
||||
assert marked == {session_key_one, session_key_two}
|
||||
for args in calls:
|
||||
assert args[0][1] == "shutdown_timeout"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_timeout_uses_restart_reason_when_restarting():
|
||||
runner, adapter = make_restart_runner()
|
||||
adapter.disconnect = AsyncMock()
|
||||
runner._restart_drain_timeout = 0.05
|
||||
runner._restart_requested = True
|
||||
|
||||
running_agent = MagicMock()
|
||||
runner._running_agents = {"agent:main:telegram:dm:A": running_agent}
|
||||
|
||||
session_store = MagicMock()
|
||||
session_store.mark_resume_pending = MagicMock(return_value=True)
|
||||
runner.session_store = session_store
|
||||
|
||||
with patch("gateway.status.remove_pid_file"), patch(
|
||||
"gateway.status.write_runtime_status"
|
||||
):
|
||||
await runner.stop(restart=True, detached_restart=False, service_restart=True)
|
||||
|
||||
calls = session_store.mark_resume_pending.call_args_list
|
||||
assert calls, "expected at least one mark_resume_pending call"
|
||||
for args in calls:
|
||||
assert args[0][1] == "restart_timeout"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clean_drain_does_not_mark_resume_pending():
|
||||
"""If the drain completes within timeout (no force-interrupt), no
|
||||
sessions should be flagged — the normal shutdown path is unchanged."""
|
||||
runner, adapter = make_restart_runner()
|
||||
adapter.disconnect = AsyncMock()
|
||||
|
||||
running_agent = MagicMock()
|
||||
runner._running_agents = {"agent:main:telegram:dm:A": running_agent}
|
||||
|
||||
# Finish the agent before the (generous) drain deadline
|
||||
async def finish_agent():
|
||||
await asyncio.sleep(0.05)
|
||||
runner._running_agents.clear()
|
||||
|
||||
asyncio.create_task(finish_agent())
|
||||
|
||||
session_store = MagicMock()
|
||||
session_store.mark_resume_pending = MagicMock(return_value=True)
|
||||
runner.session_store = session_store
|
||||
|
||||
with patch("gateway.status.remove_pid_file"), patch(
|
||||
"gateway.status.write_runtime_status"
|
||||
):
|
||||
await runner.stop()
|
||||
|
||||
session_store.mark_resume_pending.assert_not_called()
|
||||
running_agent.interrupt.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shutdown banner wording
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restart_banner_uses_try_to_resume_wording():
|
||||
"""The notification sent before drain should hedge the resume promise
|
||||
— the session-continuity fix is best-effort (stuck-loop counter can
|
||||
still escalate to suspended)."""
|
||||
runner, adapter = make_restart_runner()
|
||||
runner._restart_requested = True
|
||||
runner._running_agents["agent:main:telegram:dm:999"] = MagicMock()
|
||||
|
||||
await runner._notify_active_sessions_of_shutdown()
|
||||
|
||||
assert len(adapter.sent) == 1
|
||||
msg = adapter.sent[0]
|
||||
assert "restarting" in msg
|
||||
assert "try to resume" in msg
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stuck-loop escalation integration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStuckLoopEscalation:
|
||||
"""The existing .restart_failure_counts counter (PR #7536) remains the
|
||||
single source of terminal escalation — no parallel counter on
|
||||
SessionEntry was added. After the configured threshold, the startup
|
||||
path flips suspended=True which overrides resume_pending."""
|
||||
|
||||
def test_escalation_via_stuck_loop_counter_overrides_resume_pending(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
"""Simulate a session that keeps getting restart-interrupted and
|
||||
hits the stuck-loop threshold: next startup should force it to
|
||||
fresh-session despite resume_pending being set."""
|
||||
import json
|
||||
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.mark_resume_pending(entry.session_key, reason="restart_timeout")
|
||||
|
||||
# Simulate counter already at threshold (3 consecutive interrupted
|
||||
# restarts). _suspend_stuck_loop_sessions will flip suspended=True.
|
||||
counts_file = tmp_path / ".restart_failure_counts"
|
||||
counts_file.write_text(json.dumps({entry.session_key: 3}))
|
||||
|
||||
monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.session_store = store
|
||||
|
||||
suspended_count = GatewayRunner._suspend_stuck_loop_sessions(runner)
|
||||
assert suspended_count == 1
|
||||
assert store._entries[entry.session_key].suspended is True
|
||||
# resume_pending is still set on the entry, but suspended wins in
|
||||
# get_or_create_session so the next message still gets a new sid.
|
||||
second = store.get_or_create_session(source)
|
||||
assert second.session_id != entry.session_id
|
||||
assert second.auto_reset_reason == "suspended"
|
||||
|
||||
def test_successful_turn_flow_clears_both_counter_and_resume_pending(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
"""The gateway's post-turn cleanup should clear both signals so a
|
||||
future restart-interrupt starts with a fresh counter."""
|
||||
import json
|
||||
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
store = _make_store(tmp_path)
|
||||
source = _make_source()
|
||||
entry = store.get_or_create_session(source)
|
||||
store.mark_resume_pending(entry.session_key, reason="restart_timeout")
|
||||
|
||||
counts_file = tmp_path / ".restart_failure_counts"
|
||||
counts_file.write_text(json.dumps({entry.session_key: 2}))
|
||||
|
||||
monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.session_store = store
|
||||
|
||||
GatewayRunner._clear_restart_failure_count(runner, entry.session_key)
|
||||
store.clear_resume_pending(entry.session_key)
|
||||
|
||||
assert store._entries[entry.session_key].resume_pending is False
|
||||
assert not counts_file.exists()
|
||||
Loading…
Reference in New Issue
Block a user