fix(tui-gateway): harden stdio transport against half-closed pipes + SIGTERM races (#17118)

* fix(tui-gateway): harden stdio transport against half-closed pipes + SIGTERM races

`tui_gateway` reports `tui_gateway_crash.log` traces where the main
thread sits in `sys.stdin` while a worker holds `_stdout_lock` mid-
flush, and SIGTERM then calls `sys.exit(0)` while the lock is still
held — the interpreter shutdown stalls behind the wedged write.

Two narrowly scoped hardenings:

**`tui_gateway/transport.py`**

* Move JSON serialisation outside the lock — long messages no longer
  block sibling writers while we serialise.
* Treat `BrokenPipeError`, `ValueError` ("I/O on closed file") and
  generic `OSError` from both `write` and `flush` as "peer is gone":
  return `False` instead of bubbling, matching what `write_json`'s
  callers in `entry.py` already expect.
* Split `flush` into its own try block so a stuck flush never strands
  a partial write or holds the lock indefinitely on its way out.
* Optional `HERMES_TUI_GATEWAY_NO_FLUSH=1` env knob to skip explicit
  `flush()` entirely on environments where a half-closed read pipe
  produces an indefinite kernel-level block.  Default unchanged.

**`tui_gateway/entry.py`**

* `_log_signal` now spawns a 1-second daemon timer that calls
  `os._exit(0)` if the orderly `sys.exit(0)` path is itself stuck
  behind a wedged worker.  Atexit handlers run inside the grace
  window when they can; the timer is the safety net so a deadlocked
  flush no longer strands the gateway process.

Tests:

* `test_write_json_closed_stream_returns_false` — ValueError path.
* `test_write_json_oserror_on_flush_returns_false` — OSError on flush
  must not strand the lock; the write portion still landed before the
  flush failure.
* `test_write_json_no_flush_env_skips_flush` — env knob bypass.

Validation: `scripts/run_tests.sh tests/tui_gateway/test_protocol.py`
(42/42 pass; one pre-existing failure on
`test_session_resume_returns_hydrated_messages` is unrelated to this
change — same `include_ancestors` mock kwarg issue tracked elsewhere).
`scripts/run_tests.sh tests/test_tui_gateway_server.py` 90/90 pass.

* review(copilot): tighten transport hardening comments + test cleanup

* review(copilot): narrow exception capture, configurable grace, simpler no-flush test

* fix(tui-gateway): narrow ValueError to closed-stream; surface UnicodeEncodeError

Copilot review on PR #17118: `UnicodeEncodeError` is a ValueError
subclass, so a non-UTF-8 stdout (mismatched PYTHONIOENCODING / locale)
would have been silently swallowed as 'peer gone' under
`except ValueError`.  That hides a real environment bug.

Now:
- UnicodeEncodeError → log with exc_info (warning) and drop the frame
- ValueError where str(e) contains 'closed file' → peer gone, return False
- Any other ValueError → log loudly, drop frame (defensive, but visible)

Same shape applied to flush.  Adds two regression tests.

* fix(tui-gateway): reserve write() False for peer-gone; re-raise programming errors

Round 2 Copilot review on PR #17118: `Transport.write()` returning
`False` is documented as 'peer is gone', and `entry.py` reacts by
calling `sys.exit(0)`.  But the implementation also returned False
for non-IO conditions (non-JSON-safe payloads, UnicodeEncodeError,
unrelated ValueErrors), so a programming error or local env bug would
present as a clean disconnect — exactly the diagnosis pain we wanted
to eliminate.

Now:
- `json.dumps` failure → re-raises (TypeError/ValueError surfaces in crash log)
- `BrokenPipeError` → False (peer gone)
- `ValueError('...closed file...')` → False (peer gone)
- `UnicodeEncodeError` and any other ValueError → re-raise
- `OSError` → False (existing IO-failure semantics, debug-logged)

Tests updated to assert the re-raise behaviour and added a
non-serializable-payload regression test.

* fix(tui-gateway): narrow OSError to peer-gone errnos; honest test naming

Round 3 Copilot review on PR #17118:

- Docstring claimed False = peer gone, but generic OSError on write/flush
  also returned False — meaning ENOSPC/EACCES/EIO would silently exit.
  Added `_PEER_GONE_ERRNOS = {EPIPE, ECONNRESET, EBADF, ESHUTDOWN, +WSA}`
  and narrowed the OSError handlers; non-peer-gone errnos re-raise.
  Docstring now lists OSError as peer-gone branch with the errno set.
- The `_DISABLE_FLUSH` test was named after the env var but actually
  patched the module constant. Renamed it to reflect the contract being
  tested (skips flush when constant is true) AND added a real
  end-to-end test that sets the env var, reloads transport.py, and
  asserts the constant flips. Cleanup reload restores defaults so
  parallel tests stay isolated.

Self-review (avoid round 4):
- Verified TeeTransport's secondary-swallow stays intentional.
- _log_signal grace path already covered by separate tests.
This commit is contained in:
brooklyn! 2026-04-28 15:54:06 -07:00 committed by GitHub
parent af6b1a3343
commit 1e326c686d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 283 additions and 8 deletions

View File

@ -83,6 +83,134 @@ def test_write_json_broken_pipe(server):
assert server.write_json({"x": 1}) is False assert server.write_json({"x": 1}) is False
def test_write_json_closed_stream_returns_false(server):
"""ValueError ('I/O on closed file') used to bubble up; treat as gone."""
class _Closed:
def write(self, _): raise ValueError("I/O operation on closed file")
def flush(self): raise ValueError("I/O operation on closed file")
server._real_stdout = _Closed()
assert server.write_json({"x": 1}) is False
def test_write_json_unicode_encode_error_re_raises(server):
"""A non-UTF-8 stdout encoding raises UnicodeEncodeError (a ValueError
subclass). It must NOT be swallowed as 'peer gone' that would let
`entry.py` exit cleanly via the False path and hide the real config
bug. We re-raise so the existing crash-log infrastructure records it."""
class _AsciiOnly:
def write(self, line):
line.encode("ascii") # raises UnicodeEncodeError on non-ascii
def flush(self): pass
server._real_stdout = _AsciiOnly()
with pytest.raises(UnicodeEncodeError):
server.write_json({"msg": "héllo"})
def test_write_json_unrelated_value_error_re_raises(server):
"""Only ValueError('...closed file...') means peer gone. Other
ValueErrors are programming errors and must surface."""
class _BadValue:
def write(self, _): raise ValueError("something else entirely")
def flush(self): pass
server._real_stdout = _BadValue()
with pytest.raises(ValueError, match="something else entirely"):
server.write_json({"x": 1})
def test_write_json_non_serializable_payload_re_raises(server):
"""Non-JSON-safe payloads are programming errors — they must NOT be
silently dropped via the False path (which would trigger a clean exit
in entry.py and mask the real bug)."""
import io
server._real_stdout = io.StringIO()
with pytest.raises(TypeError):
server.write_json({"obj": object()})
def test_write_json_peer_gone_oserror_on_flush_returns_false(server):
"""A flush that raises a peer-gone OSError (EPIPE) must not strand
the lock or crash; it returns False so the dispatcher exits cleanly."""
import errno
written = []
class _FlushPeerGone:
def write(self, line): written.append(line)
def flush(self): raise OSError(errno.EPIPE, "broken pipe")
server._real_stdout = _FlushPeerGone()
assert server.write_json({"x": 1}) is False
assert written and json.loads(written[0]) == {"x": 1}
def test_write_json_non_peer_gone_oserror_re_raises(server):
"""Host I/O failures (ENOSPC, EACCES, EIO …) are NOT peer-gone — they
must re-raise so the crash log records them instead of looking like
a clean disconnect via the False path."""
import errno
class _DiskFull:
def write(self, _): raise OSError(errno.ENOSPC, "no space left")
def flush(self): pass
server._real_stdout = _DiskFull()
with pytest.raises(OSError, match="no space"):
server.write_json({"x": 1})
def test_write_json_skips_flush_when_disable_flush_true(monkeypatch):
"""`StdioTransport` skips flush when `_DISABLE_FLUSH` is true.
Tests the runtime *behaviour* via direct module-attr patch. The env
var module constant wiring is covered by the dedicated env test
below; reloading server.py here would re-register atexit hooks and
recreate the worker pool.
"""
import importlib
transport_mod = importlib.import_module("tui_gateway.transport")
monkeypatch.setattr(transport_mod, "_DISABLE_FLUSH", True)
flushed = {"count": 0}
written = []
class _Stream:
def write(self, line): written.append(line)
def flush(self): flushed["count"] += 1
stream = _Stream()
transport = transport_mod.StdioTransport(lambda: stream, threading.Lock())
assert transport.write({"x": 1}) is True
assert flushed["count"] == 0
def test_disable_flush_env_var_actually_wires_to_module_constant(monkeypatch):
"""End-to-end: setting `HERMES_TUI_GATEWAY_NO_FLUSH=1` and importing
`tui_gateway.transport` fresh actually flips `_DISABLE_FLUSH` true.
Reloads only the transport module server.py is untouched so its
atexit hooks/worker pool stay intact."""
import importlib
monkeypatch.setenv("HERMES_TUI_GATEWAY_NO_FLUSH", "1")
transport_mod = importlib.reload(importlib.import_module("tui_gateway.transport"))
try:
assert transport_mod._DISABLE_FLUSH is True
finally:
# Restore the env-disabled state so other tests see the default.
monkeypatch.delenv("HERMES_TUI_GATEWAY_NO_FLUSH", raising=False)
importlib.reload(transport_mod)
# ── _emit ──────────────────────────────────────────────────────────── # ── _emit ────────────────────────────────────────────────────────────

View File

@ -29,6 +29,28 @@ def _install_sidecar_publisher() -> None:
) )
# How long to wait for orderly shutdown (atexit + finalisers) before
# falling back to ``os._exit(0)`` so a wedged worker mid-flush can't
# strand the process. 1s covers the gateway's own shutdown work
# (thread-pool drain + session finalize) on every machine we've
# tested; override via ``HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S`` if a
# slower environment needs more headroom (e.g. encrypted disks
# flushing checkpoints) and accept that a longer grace also means a
# longer wait when shutdown actually deadlocks.
_DEFAULT_SHUTDOWN_GRACE_S = 1.0
def _shutdown_grace_seconds() -> float:
raw = (os.environ.get("HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S") or "").strip()
if not raw:
return _DEFAULT_SHUTDOWN_GRACE_S
try:
value = float(raw)
except ValueError:
return _DEFAULT_SHUTDOWN_GRACE_S
return value if value > 0 else _DEFAULT_SHUTDOWN_GRACE_S
def _log_signal(signum: int, frame) -> None: def _log_signal(signum: int, frame) -> None:
"""Capture WHICH thread and WHERE a termination signal hit us. """Capture WHICH thread and WHERE a termination signal hit us.
@ -38,6 +60,15 @@ def _log_signal(signum: int, frame) -> None:
handler the gateway-exited banner in the TUI has no trace the handler the gateway-exited banner in the TUI has no trace the
crash log never sees a Python exception because the kernel reaps crash log never sees a Python exception because the kernel reaps
the process before the interpreter runs anything. the process before the interpreter runs anything.
Termination semantics: ``sys.exit(0)`` here used to race the worker
pool a thread holding ``_stdout_lock`` mid-flush would block the
interpreter shutdown indefinitely. We now log the stack, give the
process the configured shutdown grace
(``HERMES_TUI_GATEWAY_SHUTDOWN_GRACE_S``, default
``_DEFAULT_SHUTDOWN_GRACE_S``) to drain naturally on a background
thread, and fall back to ``os._exit(0)`` so a wedged write/flush
can never strand the process.
""" """
name = { name = {
signal.SIGPIPE: "SIGPIPE", signal.SIGPIPE: "SIGPIPE",
@ -62,7 +93,31 @@ def _log_signal(signum: int, frame) -> None:
except Exception: except Exception:
pass pass
print(f"[gateway-signal] {name}", file=sys.stderr, flush=True) print(f"[gateway-signal] {name}", file=sys.stderr, flush=True)
sys.exit(0)
import threading as _threading
def _hard_exit() -> None:
# If a worker thread is still mid-flush on a half-closed pipe,
# ``sys.exit(0)`` would wait forever for it to drop the GIL on
# interpreter shutdown. ``os._exit`` skips atexit handlers but
# breaks the deadlock. The crash log + stderr line above are
# the forensic trail.
os._exit(0)
timer = _threading.Timer(_shutdown_grace_seconds(), _hard_exit)
timer.daemon = True
timer.start()
try:
sys.exit(0)
except SystemExit:
# Re-raise so the main-thread interpreter unwinds and runs
# atexit + finalisers inside the grace window. Python signal
# handlers always run on the main thread, but a worker thread
# holding ``_stdout_lock`` mid-flush can keep that unwind
# waiting indefinitely; the daemon timer above is the safety
# net for that exact case.
raise
# SIGPIPE: ignore, don't exit. The old SIG_DFL killed the process # SIGPIPE: ignore, don't exit. The old SIG_DFL killed the process

View File

@ -23,10 +23,45 @@ the stream lazily through a callback.
from __future__ import annotations from __future__ import annotations
import contextvars import contextvars
import errno
import json import json
import logging
import os
import threading import threading
from typing import Any, Callable, Optional, Protocol, runtime_checkable from typing import Any, Callable, Optional, Protocol, runtime_checkable
# Errno values that mean "the peer is gone" rather than "the host has a
# real I/O problem". Anything outside this set re-raises so it surfaces
# in the crash log instead of looking like a clean disconnect.
_PEER_GONE_ERRNOS = frozenset({
errno.EPIPE, # write to closed pipe (POSIX)
errno.ECONNRESET, # peer reset the connection
errno.EBADF, # fd closed under us
errno.ESHUTDOWN, # transport endpoint shut down
getattr(errno, "WSAECONNRESET", -1), # win32 mapping (no-op on POSIX)
getattr(errno, "WSAESHUTDOWN", -1),
} - {-1})
logger = logging.getLogger(__name__)
# Optional knob: when true, StdioTransport does not call ``stream.flush``
# after writing. Use this on environments where a half-closed pipe (TUI
# Node parent quit while the gateway is still emitting events) makes
# flush block long enough to starve the rest of the worker pool.
#
# IMPORTANT: Python text stdout is fully buffered when attached to a
# pipe (the TUI case), so this knob ONLY makes sense when the gateway
# is launched with ``-u`` or ``PYTHONUNBUFFERED=1``. Without one of
# those, JSON-RPC frames will accumulate in the buffer and the TUI
# will hang waiting for ``gateway.ready``. Default stays off so the
# existing flush-after-write behaviour is unchanged.
_DISABLE_FLUSH = (os.environ.get("HERMES_TUI_GATEWAY_NO_FLUSH", "") or "").strip().lower() in {
"1",
"true",
"yes",
"on",
}
@runtime_checkable @runtime_checkable
class Transport(Protocol): class Transport(Protocol):
@ -77,15 +112,72 @@ class StdioTransport:
self._lock = lock self._lock = lock
def write(self, obj: dict) -> bool: def write(self, obj: dict) -> bool:
"""Return ``True`` on success, ``False`` ONLY when the peer is gone.
Returning ``False`` is the dispatcher's "broken stdout pipe" signal
``entry.py`` calls ``sys.exit(0)`` when ``write_json`` reports
``False``. So programming errors (non-JSON-safe payloads, encoding
misconfig, unexpected ValueErrors, host I/O bugs like ENOSPC) MUST
NOT return ``False``, otherwise a real bug looks like a clean
disconnect and is harder to diagnose. Those re-raise so the
existing crash-log infrastructure records the traceback.
Peer-gone branches:
* ``BrokenPipeError``
* ``ValueError("...closed file...")``
* ``OSError`` whose errno is in :data:`_PEER_GONE_ERRNOS`
(EPIPE / ECONNRESET / EBADF / ESHUTDOWN; plus WSA mappings
on Windows). Other OSError errnos (ENOSPC, EACCES, ...) are
real host problems and re-raise.
"""
# Serialization is OUTSIDE the lock so a large payload can't
# block other threads emitting their own frames. A non-JSON-safe
# payload is a programming error: re-raise so the crash log
# captures it instead of silently exiting via the False path.
line = json.dumps(obj, ensure_ascii=False) + "\n" line = json.dumps(obj, ensure_ascii=False) + "\n"
try:
with self._lock: with self._lock:
stream = self._stream_getter() stream = self._stream_getter()
try:
stream.write(line) stream.write(line)
stream.flush() except BrokenPipeError:
return True return False
except BrokenPipeError: except ValueError as e:
return False # ValueError("I/O operation on closed file") is the
# ONLY ValueError that means "peer gone". Anything
# else — including UnicodeEncodeError, which is a
# ValueError subclass for misconfigured locales —
# is a real bug; re-raise so it surfaces in the crash log.
if isinstance(e, UnicodeEncodeError) or "closed file" not in str(e):
raise
return False
except OSError as e:
if e.errno not in _PEER_GONE_ERRNOS:
raise
logger.debug("StdioTransport write peer gone: %s", e)
return False
# A flush that *raises* with a peer-gone errno means the
# dispatcher should exit cleanly. A flush that *hangs* on
# a half-closed pipe holds the lock until it returns — see
# ``_DISABLE_FLUSH`` for the "skip flush entirely" escape
# hatch.
if not _DISABLE_FLUSH:
try:
stream.flush()
except BrokenPipeError:
return False
except ValueError as e:
if isinstance(e, UnicodeEncodeError) or "closed file" not in str(e):
raise
return False
except OSError as e:
if e.errno not in _PEER_GONE_ERRNOS:
raise
logger.debug("StdioTransport flush peer gone: %s", e)
return False
return True
def close(self) -> None: def close(self) -> None:
return None return None