test+fix: snapshot-drift batch + pending_title regression #4

Open
claude-ceo-assistant wants to merge 10 commits from fix/snapshot-drift-batch into main
12 changed files with 298 additions and 84 deletions

View File

@ -67,6 +67,9 @@ AUTHOR_MAP = {
"274096618+hermes-agent-dhabibi@users.noreply.github.com": "dhabibi",
"dejie.guo@gmail.com": "JayGwod",
"maxence@groine.fr": "MaxyMoos",
# Internal molecule-ai Gitea bot identity used by Claude-Code agents
# (post-2026-05-06 GitHub suspension; no upstream/GitHub equivalent).
"claude-ceo-assistant@agents.moleculesai.app": "claude-ceo-assistant",
# OpenViking viking_read salvage (April 2026)
"hitesh@gmail.com": "htsh",
"pty819@outlook.com": "pty819",

View File

@ -200,6 +200,8 @@ class TestSessionOps:
"context",
"reset",
"compact",
"steer",
"queue",
"version",
]
model_cmd = next(

View File

@ -397,13 +397,22 @@ class TestTeamsSend:
assert "Network error" in result.error
@pytest.mark.asyncio
async def test_send_typing(self):
async def test_send_typing(self, monkeypatch):
adapter = TeamsAdapter(_make_config(
client_id="id", client_secret="secret", tenant_id="tenant",
))
mock_app = MagicMock()
mock_app.send = AsyncMock()
adapter._app = mock_app
# The adapter module imports TypingActivityInput at load time; if
# the real microsoft_teams package isn'"'"'t installed, that local
# binding is None even though the test fixture registers a mock
# in sys.modules. Force a non-None local binding so the call to
# TypingActivityInput() inside send_typing succeeds and we actually
# reach self._app.send.
class _StubTypingActivityInput:
pass
monkeypatch.setattr(_teams_mod, "TypingActivityInput", _StubTypingActivityInput)
await adapter.send_typing("conv-id")
mock_app.send.assert_awaited_once()

View File

@ -5,6 +5,8 @@ import pwd
from pathlib import Path
from types import SimpleNamespace
import re
import pytest
import hermes_cli.gateway as gateway_cli
@ -64,6 +66,11 @@ class TestSystemdServiceRefresh:
monkeypatch.setattr(gateway_cli, "get_systemd_unit_path", lambda system=False: unit_path)
monkeypatch.setattr(gateway_cli, "generate_systemd_unit", lambda system=False, run_as_user=None: "new unit\n")
# systemd_start invokes _preflight_user_systemd which probes the
# user D-Bus socket; that's not available in container test
# runners. The test exercises the unit-refresh flow, not the
# D-Bus check, so mock it as a no-op.
monkeypatch.setattr(gateway_cli, "_preflight_user_systemd", lambda: None)
calls = []
@ -87,6 +94,9 @@ class TestSystemdServiceRefresh:
monkeypatch.setattr(gateway_cli, "get_systemd_unit_path", lambda system=False: unit_path)
monkeypatch.setattr(gateway_cli, "generate_systemd_unit", lambda system=False, run_as_user=None: "new unit\n")
# See test_systemd_start_refreshes_outdated_unit — mock the
# D-Bus preflight that systemd_restart performs for user scope.
monkeypatch.setattr(gateway_cli, "_preflight_user_systemd", lambda: None)
calls = []
@ -107,6 +117,18 @@ class TestSystemdServiceRefresh:
]
def _expected_min_timeout_stop_sec() -> int:
"""Compute the minimum TimeoutStopSec that systemd unit generation
should emit, mirroring hermes_cli/gateway.py:1636. The unit must give
the gateway at least drain_timeout + 30s of headroom so post-interrupt
cleanup (tool subprocess kill, adapter disconnect) finishes before
systemd escalates to SIGKILL on the cgroup (issue #8202).
"""
from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
drain = int(DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT or 0)
return max(60, drain) + 30
class TestGeneratedSystemdUnits:
def test_user_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self):
unit = gateway_cli.generate_systemd_unit(system=False)
@ -115,10 +137,17 @@ class TestGeneratedSystemdUnits:
assert "ExecStop=" not in unit
assert "ExecReload=/bin/kill -USR1 $MAINPID" in unit
assert f"RestartForceExitStatus={GATEWAY_SERVICE_RESTART_EXIT_CODE}" in unit
# TimeoutStopSec must exceed the default drain_timeout (60s) so
# TimeoutStopSec must exceed the configured drain_timeout so
# systemd doesn't SIGKILL the cgroup before post-interrupt cleanup
# (tool subprocess kill, adapter disconnect) runs — issue #8202.
assert "TimeoutStopSec=90" in unit
# Compute the expected minimum from the SSOT default rather than
# hard-coding a value that drifts with config bumps.
expected = _expected_min_timeout_stop_sec()
match = re.search(r"^TimeoutStopSec=(\d+)", unit, flags=re.MULTILINE)
assert match is not None, "TimeoutStopSec directive missing"
assert int(match.group(1)) >= expected, (
f"TimeoutStopSec={match.group(1)} < expected {expected}"
)
def test_user_unit_includes_resolved_node_directory_in_path(self, monkeypatch):
monkeypatch.setattr(gateway_cli.shutil, "which", lambda cmd: "/home/test/.nvm/versions/node/v24.14.0/bin/node" if cmd == "node" else None)
@ -134,10 +163,17 @@ class TestGeneratedSystemdUnits:
assert "ExecStop=" not in unit
assert "ExecReload=/bin/kill -USR1 $MAINPID" in unit
assert f"RestartForceExitStatus={GATEWAY_SERVICE_RESTART_EXIT_CODE}" in unit
# TimeoutStopSec must exceed the default drain_timeout (60s) so
# TimeoutStopSec must exceed the configured drain_timeout so
# systemd doesn't SIGKILL the cgroup before post-interrupt cleanup
# (tool subprocess kill, adapter disconnect) runs — issue #8202.
assert "TimeoutStopSec=90" in unit
# Compute the expected minimum from the SSOT default rather than
# hard-coding a value that drifts with config bumps.
expected = _expected_min_timeout_stop_sec()
match = re.search(r"^TimeoutStopSec=(\d+)", unit, flags=re.MULTILINE)
assert match is not None, "TimeoutStopSec directive missing"
assert int(match.group(1)) >= expected, (
f"TimeoutStopSec={match.group(1)} < expected {expected}"
)
assert "WantedBy=multi-user.target" in unit
@ -437,6 +473,10 @@ class TestGatewayServiceDetection:
monkeypatch.setattr(gateway_cli, "is_linux", lambda: True)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
monkeypatch.setattr(gateway_cli, "is_wsl", lambda: False)
# supports_systemd_services routes containers through
# _container_systemd_operational; mock False so this test
# isolates the "native Linux + systemctl present" branch.
monkeypatch.setattr(gateway_cli, "is_container", lambda: False)
monkeypatch.setattr(gateway_cli.shutil, "which", lambda name: "/usr/bin/systemctl")
assert gateway_cli.supports_systemd_services() is True
@ -487,6 +527,11 @@ class TestGatewaySystemServiceRouting:
calls = []
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
# systemd_restart calls _preflight_user_systemd which probes the
# user D-Bus socket; not available in container test runners.
# The test exercises the self-restart routing flow, not the
# D-Bus check, so mock it as a no-op.
monkeypatch.setattr(gateway_cli, "_preflight_user_systemd", lambda: None)
monkeypatch.setattr(gateway_cli, "refresh_systemd_unit_if_needed", lambda system=False: calls.append(("refresh", system)))
monkeypatch.setattr(
"gateway.status.get_running_pid",
@ -540,6 +585,9 @@ class TestGatewaySystemServiceRouting:
assert "restarted" in out
def test_systemd_restart_recovers_failed_planned_restart(self, monkeypatch, capsys):
# See test_systemd_restart_self_requests_graceful_restart_and_waits
# — same D-Bus preflight no-op needed.
monkeypatch.setattr(gateway_cli, "_preflight_user_systemd", lambda: None)
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
monkeypatch.setattr(gateway_cli, "refresh_systemd_unit_if_needed", lambda system=False: None)
monkeypatch.setattr(

View File

@ -141,10 +141,14 @@ class TestSupportsSystemdServicesWSL:
assert gateway.supports_systemd_services() is False
def test_native_linux(self, monkeypatch):
"""Native Linux (not WSL) → True without checking systemd."""
"""Native Linux (not WSL, not container) → True without further checks."""
monkeypatch.setattr(gateway, "is_linux", lambda: True)
monkeypatch.setattr(gateway, "is_termux", lambda: False)
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
# supports_systemd_services also routes containers through
# _container_systemd_operational; mock False so this test
# isolates the native-Linux branch.
monkeypatch.setattr(gateway, "is_container", lambda: False)
assert gateway.supports_systemd_services() is True
def test_termux_still_excluded(self, monkeypatch):

View File

@ -415,7 +415,14 @@ class TestCmdUpdateLaunchdRestart:
pid=12345,
)
with patch.object(gateway_cli, "find_gateway_pids", return_value=[12345]), \
# Model successful primary-kill path: first find returns the
# live PID, subsequent calls (post-restart survivor sweep, #17648)
# return [] — the process exited after the graceful drain.
find_calls = {"n": 0}
def _find_drained(*a, **kw):
find_calls["n"] += 1
return [12345] if find_calls["n"] == 1 else []
with patch.object(gateway_cli, "find_gateway_pids", side_effect=_find_drained), \
patch.object(gateway_cli, "find_profile_gateway_processes", return_value=[process]), \
patch.object(gateway_cli, "launch_detached_profile_gateway_restart", return_value=True) as restart, \
patch.object(gateway_cli, "_graceful_restart_via_sigusr1", return_value=True) as graceful, \
@ -425,7 +432,9 @@ class TestCmdUpdateLaunchdRestart:
captured = capsys.readouterr().out
restart.assert_called_once_with("coder", 12345)
graceful.assert_called_once()
# Graceful drain succeeded — no SIGTERM fallback needed.
# Graceful drain succeeded — no SIGTERM fallback, and no SIGKILL
# from the survivor sweep because the PID drops out of
# find_gateway_pids on the second call.
kill.assert_not_called()
assert "Restarting manual gateway profile(s): coder" in captured
assert "Restart manually: hermes gateway run" not in captured
@ -453,7 +462,13 @@ class TestCmdUpdateLaunchdRestart:
pid=12345,
)
with patch.object(gateway_cli, "find_gateway_pids", return_value=[12345]), \
# Model successful SIGTERM fallback: first find returns the live
# PID; subsequent calls return [] because SIGTERM landed.
find_calls = {"n": 0}
def _find_drained(*a, **kw):
find_calls["n"] += 1
return [12345] if find_calls["n"] == 1 else []
with patch.object(gateway_cli, "find_gateway_pids", side_effect=_find_drained), \
patch.object(gateway_cli, "find_profile_gateway_processes", return_value=[process]), \
patch.object(gateway_cli, "launch_detached_profile_gateway_restart", return_value=True) as restart, \
patch.object(gateway_cli, "_graceful_restart_via_sigusr1", return_value=False) as graceful, \
@ -463,8 +478,11 @@ class TestCmdUpdateLaunchdRestart:
captured = capsys.readouterr().out
restart.assert_called_once_with("coder", 12345)
graceful.assert_called_once()
# Graceful drain returned False → SIGTERM fallback.
# Graceful drain returned False → SIGTERM fallback. SIGTERM
# works (modelled by the find draining), so the survivor sweep
# finds nothing to escalate.
kill.assert_called_once()
assert kill.call_args.args[0] == 12345
assert "Restarting manual gateway profile(s): coder" in captured
@patch("shutil.which", return_value=None)
@ -872,9 +890,18 @@ class TestServicePidExclusion:
launchctl_loaded=True,
)
# Model successful primary-kill path: first find returns both
# PIDs (filtered by the excluded service); subsequent calls
# return only the service PID (manual exited after SIGTERM).
# Without this, the post-restart survivor sweep (#17648) sends
# an extra SIGKILL to the manual PID and breaks the call-count
# assertion below.
find_calls = {"n": 0}
def fake_find(exclude_pids=None, all_profiles=False):
_exclude = exclude_pids or set()
return [p for p in [SERVICE_PID, MANUAL_PID] if p not in _exclude]
find_calls["n"] += 1
live = [SERVICE_PID, MANUAL_PID] if find_calls["n"] == 1 else [SERVICE_PID]
return [p for p in live if p not in _exclude]
with patch.object(
gateway_cli, "_get_service_pids", return_value={SERVICE_PID}

View File

@ -47,12 +47,32 @@ def _make_agent(monkeypatch):
# Worker-thread tracking state mirrored from AIAgent.__init__ so the
# real interrupt() method can fan out to concurrent-tool workers.
_active_children: list = []
# ToolCallGuardrailController instance on real AIAgent. The stub
# always allows execution — these tests exercise interrupt fanout,
# not guardrail behaviour. before_call returns a decision-shaped
# object with allows_execution=True; after_call is similar.
_tool_guardrails = MagicMock()
def __init__(self):
# Instance-level (not class-level) so each test gets a fresh set.
self._tool_worker_threads: set = set()
self._tool_worker_threads_lock = threading.Lock()
self._active_children_lock = threading.Lock()
# Re-assign per-instance so tests can mutate independently.
self._tool_guardrails = MagicMock()
# Decision shapes match agent.tool_guardrails.ToolGuardrailDecision —
# action="allow", allows_execution=True, should_halt=False — so
# the bound _append_guardrail_observation / _guardrail_block_result
# methods read sensible defaults instead of truthy MagicMock children.
_allow = MagicMock(
action="allow",
allows_execution=True,
should_halt=False,
code=None,
count=0,
)
self._tool_guardrails.before_call.return_value = _allow
self._tool_guardrails.after_call.return_value = _allow
def _touch_activity(self, desc):
self._last_activity = time.time()
@ -77,6 +97,21 @@ def _make_agent(monkeypatch):
stub._execute_tool_calls_concurrent = _ra.AIAgent._execute_tool_calls_concurrent.__get__(stub)
stub.interrupt = _ra.AIAgent.interrupt.__get__(stub)
stub.clear_interrupt = _ra.AIAgent.clear_interrupt.__get__(stub)
# Tool-guardrails wiring: the concurrent execution path now calls
# _append_guardrail_observation / _guardrail_block_result /
# _set_tool_guardrail_halt during the result-collection loop. These are
# thin pass-throughs over self._tool_guardrails; bind them for the stub
# the same way as other AIAgent methods.
stub._append_guardrail_observation = (
_ra.AIAgent._append_guardrail_observation.__get__(stub)
)
stub._guardrail_block_result = (
_ra.AIAgent._guardrail_block_result.__get__(stub)
)
stub._set_tool_guardrail_halt = (
_ra.AIAgent._set_tool_guardrail_halt.__get__(stub)
)
stub._tool_guardrail_halt_decision = None
# /steer injection (added in PR #12116) fires after every concurrent
# tool batch. Stub it as a no-op — this test exercises interrupt
# fanout, not steer injection.
@ -107,7 +142,11 @@ def test_concurrent_interrupt_cancels_pending(monkeypatch):
original_invoke = agent._invoke_tool
def slow_tool(name, args, task_id, call_id=None):
def slow_tool(name, args, task_id, call_id=None, **kwargs):
# **kwargs absorbs `messages=...` and `pre_tool_block_checked=...`
# that _invoke_tool now forwards. The test only cares about the
# positional shape; future kwargs added to _invoke_tool shouldn't
# break this fake.
if name == "slow_one":
# Block until the test sets the interrupt
barrier.wait(timeout=10)
@ -184,7 +223,10 @@ def test_running_concurrent_worker_sees_is_interrupted(monkeypatch):
observed = {"saw_true": False, "poll_count": 0, "worker_tid": None}
worker_started = threading.Event()
def polling_tool(name, args, task_id, call_id=None, messages=None):
def polling_tool(name, args, task_id, call_id=None, messages=None, **kwargs):
# **kwargs absorbs pre_tool_block_checked and any future kwargs added
# to _invoke_tool — see test_concurrent_interrupt_cancels_pending for
# the same pattern.
observed["worker_tid"] = threading.current_thread().ident
worker_started.set()
deadline = time.monotonic() + 5.0

View File

@ -753,57 +753,63 @@ def test_session_title_set_errors_when_row_lookup_fails_after_noop(monkeypatch):
server._sessions.pop("sid", None)
def test_session_create_drops_pending_title_on_valueerror(monkeypatch):
unblock_agent = threading.Event()
class _FakeWorker:
def __init__(self, key, model):
self.key = key
def close(self):
return None
class _FakeAgent:
model = "x"
provider = "openrouter"
base_url = ""
api_key = ""
class _FakeDB:
def create_session(self, _key, source="tui", model=None):
return None
def test_apply_pending_session_title_drops_on_valueerror():
"""ValueError from set_session_title (e.g. duplicate title) must drop
the pending_title so a stuck title doesn't keep retrying forever.
Originally tested via the eager-apply path in _start_agent_build, which
was removed by c5b4c48 (#18370, lazy session creation) and replaced by
a post-message-complete apply that only `except Exception: pass`'d —
losing the ValueError-specific drop semantics. The helper restores
them; this test asserts that.
"""
class _RaisingDB:
def set_session_title(self, _key, _title):
raise ValueError("Title already in use")
def _make_agent(_sid, _key):
unblock_agent.wait(timeout=2.0)
return _FakeAgent()
monkeypatch.setattr(server, "_make_agent", _make_agent)
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker)
monkeypatch.setattr(server, "_get_db", lambda: _FakeDB())
monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"})
monkeypatch.setattr(server, "_probe_credentials", lambda _a: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
import tools.approval as _approval
monkeypatch.setattr(_approval, "register_gateway_notify", lambda key, cb: None)
monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None)
resp = server.handle_request(
{"id": "1", "method": "session.create", "params": {"cols": 80}}
)
sid = resp["result"]["session_id"]
session = server._sessions[sid]
session["pending_title"] = "duplicate title"
unblock_agent.set()
session["agent_ready"].wait(timeout=2.0)
session = {"session_key": "k1", "pending_title": "duplicate title"}
server._apply_pending_session_title(session, "sid-1", _RaisingDB())
assert session["pending_title"] is None
def test_apply_pending_session_title_clears_on_success():
class _OkDB:
def set_session_title(self, _key, _title):
return True
session = {"session_key": "k2", "pending_title": "Real title"}
server._apply_pending_session_title(session, "sid-2", _OkDB())
assert session["pending_title"] is None
def test_apply_pending_session_title_retains_on_transient_exception():
"""A transient (non-ValueError) DB failure should keep the pending
title queued so the next message-complete can retry. Without this
behaviour, a single flaky DB call would silently lose the title."""
class _FlakyDB:
def set_session_title(self, _key, _title):
raise RuntimeError("transient db blip")
session = {"session_key": "k3", "pending_title": "Keep retrying"}
server._apply_pending_session_title(session, "sid-3", _FlakyDB())
assert session["pending_title"] == "Keep retrying"
def test_apply_pending_session_title_no_op_without_pending():
"""Helper must be a no-op when pending_title is None — most calls
look like this (every message-complete on a session that already has
a title applied)."""
class _ShouldNotBeCalledDB:
def set_session_title(self, _key, _title):
raise AssertionError("DB must not be touched when no pending title")
session = {"session_key": "k4", "pending_title": None}
server._apply_pending_session_title(session, "sid-4", _ShouldNotBeCalledDB())
assert session["pending_title"] is None
server._sessions.pop(sid, None)
def test_config_set_yolo_toggles_session_scope():

View File

@ -106,10 +106,20 @@ class TestCredentialPoolSeedsFromDotEnv:
assert active_sources == set()
assert entries == []
def test_os_environ_still_wins_over_dotenv(self, isolated_hermes_home, monkeypatch):
"""get_env_value checks os.environ first — verify seeding picks that up."""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-stale")
monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-env-fresh-xyz")
def test_dotenv_wins_over_stale_os_environ(self, isolated_hermes_home, monkeypatch):
""".env should win over a stale os.environ value.
Inverted from the pre-#18254 behaviour. Stale env vars inherited
from parent shells (Codex CLI, test harnesses) used to shadow
deliberate updates to ~/.hermes/.env, causing auth.json to cache
an outdated key and silent 401 errors. The invariant now is:
when a key appears in both sources, .env wins.
Sister coverage in tests/agent/test_credential_pool.py exercises
the load_pool path; this case exercises _seed_from_env directly.
"""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-fresh")
monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-env-stale-xyz")
from agent.credential_pool import _seed_from_env
entries = []
@ -118,7 +128,7 @@ class TestCredentialPoolSeedsFromDotEnv:
assert changed is True
seeded = [e for e in entries if e.source == "env:DEEPSEEK_API_KEY"]
assert len(seeded) == 1
assert seeded[0].access_token == "sk-env-fresh-xyz"
assert seeded[0].access_token == "sk-dotenv-fresh"
class TestAuthResolvesFromDotEnv:

View File

@ -106,8 +106,16 @@ def test_dockerfile_entrypoint_routes_through_the_init(dockerfile_text):
def test_dockerfile_installs_tui_dependencies(dockerfile_text):
"""The Dockerfile must install ui-tui's npm dependencies during build,
and must copy the workspace package tree (not just manifests) so npm
can resolve the @hermes/ink ``file:`` dep without falling back to the
bare manifest. See PR #16690 + a49f4c6 for the design.
"""
assert "ui-tui/package.json" in dockerfile_text
assert "ui-tui/packages/hermes-ink/package-lock.json" in dockerfile_text
# ui-tui/packages/hermes-ink/ is a `file:` workspace; copying the FULL
# tree (not just its manifests) is what lets npm resolve it correctly.
# Catches the regression where someone reverts to manifest-only copies.
assert "COPY ui-tui/packages/hermes-ink/ ui-tui/packages/hermes-ink/" in dockerfile_text
assert any(
"ui-tui" in step and "npm" in step and (" install" in step or " ci" in step)
for step in _run_steps(dockerfile_text)
@ -121,17 +129,24 @@ def test_dockerfile_builds_tui_assets(dockerfile_text):
)
def test_dockerfile_materializes_local_tui_ink_package(dockerfile_text):
assert any(
"ui-tui" in step
and "node_modules/@hermes/ink" in step
and "packages/hermes-ink" in step
and "rm -rf packages/hermes-ink/node_modules" in step
and "npm install --omit=dev" in step
and "--prefix node_modules/@hermes/ink" in step
and "rm -rf node_modules/@hermes/ink/node_modules/react" in step
and "await import('@hermes/ink')" in step
for step in _run_steps(dockerfile_text)
def test_dockerfile_forces_npm_install_links_false_for_workspace_resolution(dockerfile_text):
"""The Dockerfile must force npm to install ``file:`` deps as symlinks
rather than copies. Debian's bundled npm 9.x defaults to install-links=true
(deps installed as copies), which produces a hidden
node_modules/.package-lock.json that permanently disagrees with the root
lockfile (which is generated by npm 10+ using symlinks). The disagreement
trips the TUI launcher's _tui_need_npm_install() check on every startup
and triggers an EACCES-failing runtime `npm install`.
Replaces the older `--prefix node_modules/@hermes/ink` materialization
pattern (PR #16690) — that approach was retired in a49f4c6 in favor of
install-links=false because the materialization step was rebuilding TUI
assets unnecessarily.
"""
assert "npm_config_install_links=false" in dockerfile_text, (
"ENV npm_config_install_links=false missing — without it, Debian npm 9.x "
"installs file: deps as copies, breaking @hermes/ink workspace resolution. "
"See PR #16690 + a49f4c6."
)

View File

@ -61,6 +61,17 @@ def mock_sd(monkeypatch):
# ============================================================================
class TestDetectAudioEnvironment:
@pytest.fixture(autouse=True)
def _mock_not_container(self, monkeypatch):
"""detect_audio_environment routes container hosts through is_container()
and emits a hard-fail "no audio devices" warning. The tests in this
class exercise per-environment detection (SSH, WSL, clean Linux, Termux)
and shouldn't fail on container CI runners just because they happen to
run inside Docker. Mock is_container=False so the per-environment logic
runs unobstructed; tests that want the container branch can override.
"""
monkeypatch.setattr("hermes_constants.is_container", lambda: False)
def test_clean_environment_is_available(self, monkeypatch):
"""No SSH, Docker, or WSL — should be available."""
monkeypatch.delenv("SSH_CLIENT", raising=False)

View File

@ -515,6 +515,50 @@ def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None:
return _err(rid, 5032, err) if err else None
def _apply_pending_session_title(
session: dict, sid: str, db: object | None
) -> None:
"""Apply session["pending_title"] to the DB via db.set_session_title.
Pending titles are queued during session.create (before the DB row
exists, since c5b4c48 deferred row creation to first message) and
flushed here once a message-complete event lands.
Outcome by branch:
- set_session_title returns truthy: pending_title cleared.
- ValueError (title invalid / duplicate): pending_title dropped,
because retrying with the same value will fail the same way.
Auto-title later picks a fresh title from message content.
- other Exception: pending_title retained likely a transient DB
failure worth retrying on the next message-complete.
No-ops when there is no pending title or no DB.
Pre-c5b4c48 (#18370) the same semantics lived inline in
_start_agent_build. Extracting them here both restores the lost
ValueError handling and makes the invariant testable without
simulating a full message turn.
"""
pending = session.get("pending_title")
if not pending or db is None:
return
key = session.get("session_key") or sid
try:
if db.set_session_title(key, pending):
session["pending_title"] = None
except ValueError as exc:
# Title invalid / duplicate — retrying is futile; drop and let
# auto-title pick something.
session["pending_title"] = None
logger.info("Dropping pending title for session %s: %s", sid, exc)
except Exception:
# Likely transient — keep pending_title so the next
# message-complete can retry. Auto-title is the eventual fallback.
logger.warning(
"Failed to apply pending title for session %s", sid, exc_info=True,
)
def _start_agent_build(sid: str, session: dict) -> None:
"""Start building the real AIAgent for a TUI session, once.
@ -2982,15 +3026,8 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
_emit("message.complete", sid, payload)
# Apply pending_title now that the DB row exists.
_pending = session.get("pending_title")
if _pending and status == "complete":
_pdb = _get_db()
if _pdb:
try:
if _pdb.set_session_title(session.get("session_key") or sid, _pending):
session["pending_title"] = None
except Exception:
pass # Best effort — auto-title will handle it below
if status == "complete":
_apply_pending_session_title(session, sid, _get_db())
if (
status == "complete"