fix(gateway): write restart markers atomically and fix Windows lock collisions
This commit is contained in:
parent
447a2bba3a
commit
1ef9e88549
@ -239,7 +239,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
# Resolve Hermes home directory (respects HERMES_HOME override)
|
||||
from hermes_constants import get_hermes_home
|
||||
from utils import atomic_yaml_write, base_url_host_matches, is_truthy_value
|
||||
from utils import atomic_json_write, atomic_yaml_write, base_url_host_matches, is_truthy_value
|
||||
_hermes_home = get_hermes_home()
|
||||
|
||||
# Load environment variables from ~/.hermes/.env first.
|
||||
@ -2245,7 +2245,7 @@ class GatewayRunner:
|
||||
# (they might become active again next restart)
|
||||
|
||||
try:
|
||||
path.write_text(json.dumps(new_counts))
|
||||
atomic_json_write(path, new_counts, indent=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@ -2313,7 +2313,7 @@ class GatewayRunner:
|
||||
if session_key in counts:
|
||||
del counts[session_key]
|
||||
if counts:
|
||||
path.write_text(json.dumps(counts))
|
||||
atomic_json_write(path, counts, indent=None)
|
||||
else:
|
||||
path.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
@ -6734,8 +6734,10 @@ class GatewayRunner:
|
||||
}
|
||||
if event.source.thread_id:
|
||||
notify_data["thread_id"] = event.source.thread_id
|
||||
(_hermes_home / ".restart_notify.json").write_text(
|
||||
json.dumps(notify_data)
|
||||
atomic_json_write(
|
||||
_hermes_home / ".restart_notify.json",
|
||||
notify_data,
|
||||
indent=None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to write restart notify file: %s", e)
|
||||
@ -6752,8 +6754,10 @@ class GatewayRunner:
|
||||
}
|
||||
if event.platform_update_id is not None:
|
||||
dedup_data["update_id"] = event.platform_update_id
|
||||
(_hermes_home / ".restart_last_processed.json").write_text(
|
||||
json.dumps(dedup_data)
|
||||
atomic_json_write(
|
||||
_hermes_home / ".restart_last_processed.json",
|
||||
dedup_data,
|
||||
indent=None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to write restart dedup marker: %s", e)
|
||||
|
||||
@ -21,6 +21,7 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from hermes_constants import get_hermes_home
|
||||
from typing import Any, Optional
|
||||
from utils import atomic_json_write
|
||||
|
||||
if sys.platform == "win32":
|
||||
import msvcrt
|
||||
@ -34,6 +35,10 @@ _IS_WINDOWS = sys.platform == "win32"
|
||||
_UNSET = object()
|
||||
_GATEWAY_LOCK_FILENAME = "gateway.lock"
|
||||
_gateway_lock_handle = None
|
||||
# Windows byte-range locks are mandatory for other readers. Lock a byte well
|
||||
# past the JSON payload so runtime status / PID readers can still read the file
|
||||
# while another process holds the mutual-exclusion lock.
|
||||
_WINDOWS_LOCK_OFFSET = 1024 * 1024
|
||||
|
||||
|
||||
def _get_pid_path() -> Path:
|
||||
@ -205,8 +210,7 @@ def _read_json_file(path: Path) -> Optional[dict[str, Any]]:
|
||||
|
||||
|
||||
def _write_json_file(path: Path, payload: dict[str, Any]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(payload))
|
||||
atomic_json_write(path, payload, indent=None, separators=(",", ":"))
|
||||
|
||||
|
||||
def _read_pid_record(pid_path: Optional[Path] = None) -> Optional[dict]:
|
||||
@ -286,7 +290,7 @@ def _try_acquire_file_lock(handle) -> bool:
|
||||
if handle.tell() == 0:
|
||||
handle.write("\n")
|
||||
handle.flush()
|
||||
handle.seek(0)
|
||||
handle.seek(_WINDOWS_LOCK_OFFSET)
|
||||
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
else:
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
@ -298,7 +302,7 @@ def _try_acquire_file_lock(handle) -> bool:
|
||||
def _release_file_lock(handle) -> None:
|
||||
try:
|
||||
if _IS_WINDOWS:
|
||||
handle.seek(0)
|
||||
handle.seek(_WINDOWS_LOCK_OFFSET)
|
||||
msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
else:
|
||||
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
|
||||
|
||||
@ -113,6 +113,36 @@ async def test_restart_command_preserves_thread_id(tmp_path, monkeypatch):
|
||||
assert data["thread_id"] == "topic_7"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_restart_command_uses_atomic_json_writes_for_marker_files(tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
|
||||
calls = []
|
||||
|
||||
def _fake_atomic_json_write(path, payload, **kwargs):
|
||||
calls.append((Path(path).name, payload, kwargs))
|
||||
|
||||
monkeypatch.setattr(gateway_run, "atomic_json_write", _fake_atomic_json_write)
|
||||
|
||||
runner, _adapter = make_restart_runner()
|
||||
runner.request_restart = MagicMock(return_value=True)
|
||||
|
||||
source = make_restart_source(chat_id="42")
|
||||
event = MessageEvent(
|
||||
text="/restart",
|
||||
message_type=MessageType.TEXT,
|
||||
source=source,
|
||||
message_id="m1",
|
||||
)
|
||||
|
||||
await runner._handle_restart_command(event)
|
||||
|
||||
names = [name for name, _payload, _kwargs in calls]
|
||||
assert names == [".restart_notify.json", ".restart_last_processed.json"]
|
||||
assert calls[0][1]["chat_id"] == "42"
|
||||
assert calls[1][1]["platform"] == "telegram"
|
||||
|
||||
|
||||
# ── _send_restart_notification ───────────────────────────────────────────
|
||||
|
||||
|
||||
|
||||
@ -999,3 +999,65 @@ class TestStuckLoopEscalation:
|
||||
|
||||
assert store._entries[entry.session_key].resume_pending is False
|
||||
assert not counts_file.exists()
|
||||
|
||||
def test_increment_restart_failure_counts_uses_atomic_json_write(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
source = _make_source()
|
||||
session_key = _make_store(tmp_path).get_or_create_session(source).session_key
|
||||
|
||||
monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
|
||||
calls = []
|
||||
|
||||
def _fake_atomic_json_write(path, payload, **kwargs):
|
||||
calls.append((path, payload, kwargs))
|
||||
|
||||
monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write)
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._increment_restart_failure_counts({session_key})
|
||||
|
||||
assert calls == [
|
||||
(
|
||||
tmp_path / ".restart_failure_counts",
|
||||
{session_key: 1},
|
||||
{"indent": None},
|
||||
)
|
||||
]
|
||||
|
||||
def test_clear_restart_failure_count_uses_atomic_json_write_when_entries_remain(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
import json
|
||||
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
source = _make_source()
|
||||
session_key = _make_store(tmp_path).get_or_create_session(source).session_key
|
||||
other_key = "agent:main:telegram:dm:other"
|
||||
counts_file = tmp_path / ".restart_failure_counts"
|
||||
counts_file.write_text(
|
||||
json.dumps({session_key: 2, other_key: 1}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
|
||||
calls = []
|
||||
|
||||
def _fake_atomic_json_write(path, payload, **kwargs):
|
||||
calls.append((path, payload, kwargs))
|
||||
|
||||
monkeypatch.setattr("gateway.run.atomic_json_write", _fake_atomic_json_write)
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._clear_restart_failure_count(session_key)
|
||||
|
||||
assert calls == [
|
||||
(
|
||||
tmp_path / ".restart_failure_counts",
|
||||
{other_key: 1},
|
||||
{"indent": None},
|
||||
)
|
||||
]
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
from gateway import status
|
||||
@ -245,6 +246,27 @@ class TestGatewayPidState:
|
||||
|
||||
|
||||
class TestGatewayRuntimeStatus:
|
||||
def test_write_json_file_uses_atomic_json_write(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
calls = []
|
||||
|
||||
def _fake_atomic_json_write(path, payload, **kwargs):
|
||||
calls.append((Path(path), payload, kwargs))
|
||||
|
||||
monkeypatch.setattr(status, "atomic_json_write", _fake_atomic_json_write)
|
||||
|
||||
payload = {"gateway_state": "running"}
|
||||
target = tmp_path / "gateway_state.json"
|
||||
status._write_json_file(target, payload)
|
||||
|
||||
assert calls == [
|
||||
(
|
||||
target,
|
||||
payload,
|
||||
{"indent": None, "separators": (",", ":")},
|
||||
)
|
||||
]
|
||||
|
||||
def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch):
|
||||
"""Regression: setdefault() preserved stale PID from previous process (#1631)."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
@ -349,6 +371,35 @@ class TestTerminatePid:
|
||||
|
||||
|
||||
class TestScopedLocks:
|
||||
def test_windows_file_lock_uses_high_offset(self, tmp_path, monkeypatch):
|
||||
lock_path = tmp_path / "gateway.lock"
|
||||
handle = open(lock_path, "a+", encoding="utf-8")
|
||||
fd = handle.fileno()
|
||||
calls = []
|
||||
|
||||
def fake_locking(fd, mode, size):
|
||||
calls.append((fd, mode, size, handle.tell()))
|
||||
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", True)
|
||||
monkeypatch.setattr(
|
||||
status,
|
||||
"msvcrt",
|
||||
SimpleNamespace(LK_NBLCK=1, LK_UNLCK=2, locking=fake_locking),
|
||||
raising=False,
|
||||
)
|
||||
|
||||
try:
|
||||
assert status._try_acquire_file_lock(handle) is True
|
||||
status._release_file_lock(handle)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
assert calls == [
|
||||
(fd, 1, 1, status._WINDOWS_LOCK_OFFSET),
|
||||
(fd, 2, 1, status._WINDOWS_LOCK_OFFSET),
|
||||
]
|
||||
assert lock_path.read_text(encoding="utf-8") == "\n"
|
||||
|
||||
def test_acquire_scoped_lock_rejects_live_other_process(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
|
||||
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user