diff --git a/gateway/run.py b/gateway/run.py index e573514e..1c761af7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4725,6 +4725,7 @@ class GatewayRunner: _hyg_model = "anthropic/claude-sonnet-4.6" _hyg_threshold_pct = 0.85 _hyg_compression_enabled = True + _hyg_hard_msg_limit = 400 _hyg_config_context_length = None _hyg_provider = None _hyg_base_url = None @@ -4763,6 +4764,14 @@ class GatewayRunner: _hyg_compression_enabled = str( _comp_cfg.get("enabled", True) ).lower() in ("true", "1", "yes") + _raw_hard_limit = _comp_cfg.get("hygiene_hard_message_limit") + if _raw_hard_limit is not None: + try: + _parsed = int(_raw_hard_limit) + if _parsed > 0: + _hyg_hard_msg_limit = _parsed + except (TypeError, ValueError): + pass try: _hyg_model, _hyg_runtime = self._resolve_session_agent_runtime( @@ -4844,8 +4853,10 @@ class GatewayRunner: # collection, which prevents compression, which causes more # disconnects. 400 messages is well above normal sessions # but catches runaway growth before it becomes unrecoverable. + # Threshold is configurable via + # compression.hygiene_hard_message_limit. # (#2153) - _HARD_MSG_LIMIT = 400 + _HARD_MSG_LIMIT = _hyg_hard_msg_limit _needs_compress = ( _approx_tokens >= _compress_token_threshold or _msg_count >= _HARD_MSG_LIMIT diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 210cb12d..8cec3f72 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -582,7 +582,7 @@ DEFAULT_CONFIG = { "threshold": 0.50, # compress when context usage exceeds this ratio "target_ratio": 0.20, # fraction of threshold to preserve as recent tail "protect_last_n": 20, # minimum recent messages to keep uncompressed - + "hygiene_hard_message_limit": 400, # gateway session-hygiene force-compress threshold by message count }, # Anthropic prompt caching (Claude via OpenRouter or native Anthropic API). diff --git a/tests/gateway/test_session_hygiene.py b/tests/gateway/test_session_hygiene.py index 0932b1bb..327dfc28 100644 --- a/tests/gateway/test_session_hygiene.py +++ b/tests/gateway/test_session_hygiene.py @@ -632,4 +632,220 @@ async def test_session_hygiene_informs_user_when_aux_model_fails_but_recovers(mo assert note["chat_id"] == "-1001" assert note["metadata"] == {"thread_id": "17585"} - FakeCompressAgentWithAuxRecovery.last_instance.close.assert_called_once() \ No newline at end of file + FakeCompressAgentWithAuxRecovery.last_instance.close.assert_called_once() + + +@pytest.mark.asyncio +async def test_session_hygiene_honors_configurable_hard_message_limit( + monkeypatch, tmp_path +): + """compression.hygiene_hard_message_limit overrides the 400-message default. + + Regression for user-reported fix: a gateway session with a small + transcript (12 messages) should not hit hygiene compression by default, + but WILL when the user lowers the hard-limit to 10. Verifies the new + config key is actually read and applied at the force-compress gate. + """ + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + class FakeCompressAgent: + last_instance = None + + def __init__(self, **kwargs): + self.model = kwargs.get("model") + self.session_id = kwargs.get("session_id", "fake-session") + self._print_fn = None + self.shutdown_memory_provider = MagicMock() + self.close = MagicMock() + type(self).last_instance = self + + def _compress_context(self, messages, *_args, **_kwargs): + self.session_id = f"{self.session_id}_compressed" + return ([{"role": "assistant", "content": "compressed"}], None) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeCompressAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + # Write config.yaml with lowered hard-limit + cfg_path = tmp_path / "config.yaml" + cfg_path.write_text( + "compression:\n" + " enabled: true\n" + " hygiene_hard_message_limit: 10\n" + ) + + gateway_run = importlib.import_module("gateway.run") + GatewayRunner = gateway_run.GatewayRunner + + adapter = HygieneCaptureAdapter() + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")} + ) + runner.adapters = {Platform.TELEGRAM: adapter} + runner._voice_mode = {} + runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) + runner.session_store = MagicMock() + runner.session_store.get_or_create_session.return_value = SessionEntry( + session_key="agent:main:telegram:private:12345", + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + platform=Platform.TELEGRAM, + chat_type="private", + ) + # 12 messages: below 400 default → no compression without override, + # but above the configured limit of 10 → should compress. + runner.session_store.load_transcript.return_value = _make_history(12, content_size=40) + runner.session_store.has_any_sessions.return_value = True + runner.session_store.rewrite_transcript = MagicMock() + runner.session_store.append_to_transcript = MagicMock() + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_db = None + runner._is_user_authorized = lambda _source: True + runner._set_session_env = lambda _context: None + runner._run_agent = AsyncMock( + return_value={ + "final_response": "ok", + "messages": [], + "tools": [], + "history_offset": 0, + "last_prompt_tokens": 0, + } + ) + + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr( + gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"} + ) + # Pick a context length large enough that the token-based threshold + # won't trigger for 12 short messages — hard-limit must be the ONLY + # thing firing compression. + monkeypatch.setattr( + "agent.model_metadata.get_model_context_length", + lambda *_args, **_kwargs: 1_000_000, + ) + + event = MessageEvent( + text="hello", + source=SessionSource( + platform=Platform.TELEGRAM, + chat_id="12345", + chat_type="private", + user_id="12345", + ), + message_id="1", + ) + + result = await runner._handle_message(event) + + assert result == "ok" + # The compression agent was instantiated → hard-limit fired on the + # configured value (10), not the hardcoded 400 default. + assert FakeCompressAgent.last_instance is not None, ( + "Expected hygiene compression to fire when message count (12) " + "exceeds configured hygiene_hard_message_limit (10)" + ) + + +@pytest.mark.asyncio +async def test_session_hygiene_default_hard_message_limit_does_not_fire_at_12_messages( + monkeypatch, tmp_path +): + """Sanity check for the companion test above: without config override, + 12 messages must NOT trigger the 400-message hard limit. If this test + passes without changes, the override test's finding is meaningful.""" + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + class FakeCompressAgent: + last_instance = None + + def __init__(self, **kwargs): + type(self).last_instance = self + self.session_id = kwargs.get("session_id", "fake-session") + self._print_fn = None + self.shutdown_memory_provider = MagicMock() + self.close = MagicMock() + + def _compress_context(self, messages, *_args, **_kwargs): + return ([{"role": "assistant", "content": "compressed"}], None) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = FakeCompressAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + # No config.yaml — use defaults (hard_limit=400) + gateway_run = importlib.import_module("gateway.run") + GatewayRunner = gateway_run.GatewayRunner + + adapter = HygieneCaptureAdapter() + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")} + ) + runner.adapters = {Platform.TELEGRAM: adapter} + runner._voice_mode = {} + runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) + runner.session_store = MagicMock() + runner.session_store.get_or_create_session.return_value = SessionEntry( + session_key="agent:main:telegram:private:12345", + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + platform=Platform.TELEGRAM, + chat_type="private", + ) + runner.session_store.load_transcript.return_value = _make_history(12, content_size=40) + runner.session_store.has_any_sessions.return_value = True + runner.session_store.rewrite_transcript = MagicMock() + runner.session_store.append_to_transcript = MagicMock() + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_db = None + runner._is_user_authorized = lambda _source: True + runner._set_session_env = lambda _context: None + runner._run_agent = AsyncMock( + return_value={ + "final_response": "ok", + "messages": [], + "tools": [], + "history_offset": 0, + "last_prompt_tokens": 0, + } + ) + + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr( + gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"} + ) + monkeypatch.setattr( + "agent.model_metadata.get_model_context_length", + lambda *_args, **_kwargs: 1_000_000, + ) + + event = MessageEvent( + text="hello", + source=SessionSource( + platform=Platform.TELEGRAM, + chat_id="12345", + chat_type="private", + user_id="12345", + ), + message_id="1", + ) + + result = await runner._handle_message(event) + + assert result == "ok" + # No compression agent instantiated — 12 messages well under 400 default. + assert FakeCompressAgent.last_instance is None, ( + "Compression should NOT fire at 12 messages with default hard_limit=400" + )