feat(gateway): make hygiene hard message limit configurable (#17000)
The gateway session-hygiene pre-compression safety valve had a hardcoded 400-message threshold. On long-lived sessions with short turns this was either too high (users with aggressive compression preferences) or too low (users with very large context models who want to keep more history in-flight). Add compression.hygiene_hard_message_limit (default 400) so it can be tuned without forking the gateway. Reported by @OP (Apr 26 feedback bundle). ## Changes - hermes_cli/config.py: new DEFAULT_CONFIG key with 400 default - gateway/run.py: read compression.hygiene_hard_message_limit at hygiene-time, fall back to 400 if missing/invalid - tests/gateway/test_session_hygiene.py: two tests — override fires at the configured limit, default does not fire below 400 Co-authored-by: teknium1 <teknium@users.noreply.github.com>
This commit is contained in:
parent
06164a7b28
commit
72dea9f4f7
@ -4725,6 +4725,7 @@ class GatewayRunner:
|
||||
_hyg_model = "anthropic/claude-sonnet-4.6"
|
||||
_hyg_threshold_pct = 0.85
|
||||
_hyg_compression_enabled = True
|
||||
_hyg_hard_msg_limit = 400
|
||||
_hyg_config_context_length = None
|
||||
_hyg_provider = None
|
||||
_hyg_base_url = None
|
||||
@ -4763,6 +4764,14 @@ class GatewayRunner:
|
||||
_hyg_compression_enabled = str(
|
||||
_comp_cfg.get("enabled", True)
|
||||
).lower() in ("true", "1", "yes")
|
||||
_raw_hard_limit = _comp_cfg.get("hygiene_hard_message_limit")
|
||||
if _raw_hard_limit is not None:
|
||||
try:
|
||||
_parsed = int(_raw_hard_limit)
|
||||
if _parsed > 0:
|
||||
_hyg_hard_msg_limit = _parsed
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
try:
|
||||
_hyg_model, _hyg_runtime = self._resolve_session_agent_runtime(
|
||||
@ -4844,8 +4853,10 @@ class GatewayRunner:
|
||||
# collection, which prevents compression, which causes more
|
||||
# disconnects. 400 messages is well above normal sessions
|
||||
# but catches runaway growth before it becomes unrecoverable.
|
||||
# Threshold is configurable via
|
||||
# compression.hygiene_hard_message_limit.
|
||||
# (#2153)
|
||||
_HARD_MSG_LIMIT = 400
|
||||
_HARD_MSG_LIMIT = _hyg_hard_msg_limit
|
||||
_needs_compress = (
|
||||
_approx_tokens >= _compress_token_threshold
|
||||
or _msg_count >= _HARD_MSG_LIMIT
|
||||
|
||||
@ -582,7 +582,7 @@ DEFAULT_CONFIG = {
|
||||
"threshold": 0.50, # compress when context usage exceeds this ratio
|
||||
"target_ratio": 0.20, # fraction of threshold to preserve as recent tail
|
||||
"protect_last_n": 20, # minimum recent messages to keep uncompressed
|
||||
|
||||
"hygiene_hard_message_limit": 400, # gateway session-hygiene force-compress threshold by message count
|
||||
},
|
||||
|
||||
# Anthropic prompt caching (Claude via OpenRouter or native Anthropic API).
|
||||
|
||||
@ -632,4 +632,220 @@ async def test_session_hygiene_informs_user_when_aux_model_fails_but_recovers(mo
|
||||
assert note["chat_id"] == "-1001"
|
||||
assert note["metadata"] == {"thread_id": "17585"}
|
||||
|
||||
FakeCompressAgentWithAuxRecovery.last_instance.close.assert_called_once()
|
||||
FakeCompressAgentWithAuxRecovery.last_instance.close.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_hygiene_honors_configurable_hard_message_limit(
|
||||
monkeypatch, tmp_path
|
||||
):
|
||||
"""compression.hygiene_hard_message_limit overrides the 400-message default.
|
||||
|
||||
Regression for user-reported fix: a gateway session with a small
|
||||
transcript (12 messages) should not hit hygiene compression by default,
|
||||
but WILL when the user lowers the hard-limit to 10. Verifies the new
|
||||
config key is actually read and applied at the force-compress gate.
|
||||
"""
|
||||
fake_dotenv = types.ModuleType("dotenv")
|
||||
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
|
||||
|
||||
class FakeCompressAgent:
|
||||
last_instance = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.model = kwargs.get("model")
|
||||
self.session_id = kwargs.get("session_id", "fake-session")
|
||||
self._print_fn = None
|
||||
self.shutdown_memory_provider = MagicMock()
|
||||
self.close = MagicMock()
|
||||
type(self).last_instance = self
|
||||
|
||||
def _compress_context(self, messages, *_args, **_kwargs):
|
||||
self.session_id = f"{self.session_id}_compressed"
|
||||
return ([{"role": "assistant", "content": "compressed"}], None)
|
||||
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = FakeCompressAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
|
||||
# Write config.yaml with lowered hard-limit
|
||||
cfg_path = tmp_path / "config.yaml"
|
||||
cfg_path.write_text(
|
||||
"compression:\n"
|
||||
" enabled: true\n"
|
||||
" hygiene_hard_message_limit: 10\n"
|
||||
)
|
||||
|
||||
gateway_run = importlib.import_module("gateway.run")
|
||||
GatewayRunner = gateway_run.GatewayRunner
|
||||
|
||||
adapter = HygieneCaptureAdapter()
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(
|
||||
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")}
|
||||
)
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
runner._voice_mode = {}
|
||||
runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store.get_or_create_session.return_value = SessionEntry(
|
||||
session_key="agent:main:telegram:private:12345",
|
||||
session_id="sess-1",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="private",
|
||||
)
|
||||
# 12 messages: below 400 default → no compression without override,
|
||||
# but above the configured limit of 10 → should compress.
|
||||
runner.session_store.load_transcript.return_value = _make_history(12, content_size=40)
|
||||
runner.session_store.has_any_sessions.return_value = True
|
||||
runner.session_store.rewrite_transcript = MagicMock()
|
||||
runner.session_store.append_to_transcript = MagicMock()
|
||||
runner._running_agents = {}
|
||||
runner._pending_messages = {}
|
||||
runner._pending_approvals = {}
|
||||
runner._session_db = None
|
||||
runner._is_user_authorized = lambda _source: True
|
||||
runner._set_session_env = lambda _context: None
|
||||
runner._run_agent = AsyncMock(
|
||||
return_value={
|
||||
"final_response": "ok",
|
||||
"messages": [],
|
||||
"tools": [],
|
||||
"history_offset": 0,
|
||||
"last_prompt_tokens": 0,
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.setattr(
|
||||
gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}
|
||||
)
|
||||
# Pick a context length large enough that the token-based threshold
|
||||
# won't trigger for 12 short messages — hard-limit must be the ONLY
|
||||
# thing firing compression.
|
||||
monkeypatch.setattr(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
lambda *_args, **_kwargs: 1_000_000,
|
||||
)
|
||||
|
||||
event = MessageEvent(
|
||||
text="hello",
|
||||
source=SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="12345",
|
||||
chat_type="private",
|
||||
user_id="12345",
|
||||
),
|
||||
message_id="1",
|
||||
)
|
||||
|
||||
result = await runner._handle_message(event)
|
||||
|
||||
assert result == "ok"
|
||||
# The compression agent was instantiated → hard-limit fired on the
|
||||
# configured value (10), not the hardcoded 400 default.
|
||||
assert FakeCompressAgent.last_instance is not None, (
|
||||
"Expected hygiene compression to fire when message count (12) "
|
||||
"exceeds configured hygiene_hard_message_limit (10)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_hygiene_default_hard_message_limit_does_not_fire_at_12_messages(
|
||||
monkeypatch, tmp_path
|
||||
):
|
||||
"""Sanity check for the companion test above: without config override,
|
||||
12 messages must NOT trigger the 400-message hard limit. If this test
|
||||
passes without changes, the override test's finding is meaningful."""
|
||||
fake_dotenv = types.ModuleType("dotenv")
|
||||
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
|
||||
|
||||
class FakeCompressAgent:
|
||||
last_instance = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
type(self).last_instance = self
|
||||
self.session_id = kwargs.get("session_id", "fake-session")
|
||||
self._print_fn = None
|
||||
self.shutdown_memory_provider = MagicMock()
|
||||
self.close = MagicMock()
|
||||
|
||||
def _compress_context(self, messages, *_args, **_kwargs):
|
||||
return ([{"role": "assistant", "content": "compressed"}], None)
|
||||
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = FakeCompressAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
|
||||
# No config.yaml — use defaults (hard_limit=400)
|
||||
gateway_run = importlib.import_module("gateway.run")
|
||||
GatewayRunner = gateway_run.GatewayRunner
|
||||
|
||||
adapter = HygieneCaptureAdapter()
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(
|
||||
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")}
|
||||
)
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
runner._voice_mode = {}
|
||||
runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store.get_or_create_session.return_value = SessionEntry(
|
||||
session_key="agent:main:telegram:private:12345",
|
||||
session_id="sess-1",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="private",
|
||||
)
|
||||
runner.session_store.load_transcript.return_value = _make_history(12, content_size=40)
|
||||
runner.session_store.has_any_sessions.return_value = True
|
||||
runner.session_store.rewrite_transcript = MagicMock()
|
||||
runner.session_store.append_to_transcript = MagicMock()
|
||||
runner._running_agents = {}
|
||||
runner._pending_messages = {}
|
||||
runner._pending_approvals = {}
|
||||
runner._session_db = None
|
||||
runner._is_user_authorized = lambda _source: True
|
||||
runner._set_session_env = lambda _context: None
|
||||
runner._run_agent = AsyncMock(
|
||||
return_value={
|
||||
"final_response": "ok",
|
||||
"messages": [],
|
||||
"tools": [],
|
||||
"history_offset": 0,
|
||||
"last_prompt_tokens": 0,
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.setattr(
|
||||
gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
lambda *_args, **_kwargs: 1_000_000,
|
||||
)
|
||||
|
||||
event = MessageEvent(
|
||||
text="hello",
|
||||
source=SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="12345",
|
||||
chat_type="private",
|
||||
user_id="12345",
|
||||
),
|
||||
message_id="1",
|
||||
)
|
||||
|
||||
result = await runner._handle_message(event)
|
||||
|
||||
assert result == "ok"
|
||||
# No compression agent instantiated — 12 messages well under 400 default.
|
||||
assert FakeCompressAgent.last_instance is None, (
|
||||
"Compression should NOT fire at 12 messages with default hard_limit=400"
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user