fix(process): reconcile session.exited against real child exit in poll/wait (#17430)
When a background terminal process spawns a descendant daemon that inherits the stdout pipe (e.g. 'hermes update' triggering a gateway systemctl restart), the reader thread's stdout.read() never returns EOF and its finally: block never runs. session.exited stays False forever, so process(action='poll') returns 'running' indefinitely even though the direct child exited long ago. Issue #17327: Feishu user polled 74 times over 7 minutes before killing the gateway manually. Fix: add _reconcile_local_exit() that checks the direct Popen.poll() before trusting session.exited. If the direct child has exited, drain any immediately-readable bytes non-blocking and flip session.exited. Called from poll() and wait(). The stuck reader thread remains blocked but is a daemon thread and gets reaped with the process. Safe no-op for env/PTY sessions, already-exited sessions, and live children (returns None from Popen.poll()).
This commit is contained in:
parent
13683c0842
commit
20b759cd02
@ -103,6 +103,134 @@ class TestGetAndPoll:
|
||||
assert result["exit_code"] == 0
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Orphaned-pipe reconciliation (issue #17327)
|
||||
# =========================================================================
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only: uses setsid/fcntl")
|
||||
class TestOrphanedPipeReconciliation:
|
||||
"""Regression tests for issue #17327.
|
||||
|
||||
`hermes update` in Feishu spawned a background subprocess that restarted
|
||||
the gateway; the direct child exited quickly but a descendant daemon
|
||||
held the stdout pipe open. `_reader_loop.finally` never ran, so
|
||||
`session.exited` stayed False and the agent polled 74 times over 7
|
||||
minutes, all returning `status: running`.
|
||||
|
||||
The fix is `_reconcile_local_exit()`: poll() and wait() now check the
|
||||
direct `Popen.poll()` before trusting `session.exited`.
|
||||
"""
|
||||
|
||||
def test_reconcile_flips_exited_when_direct_child_done(self, registry):
|
||||
"""Direct child exited but reader thread is blocked on orphaned pipe."""
|
||||
# Simulate the orphaned-pipe scenario: direct child exited, but a
|
||||
# descendant holds stdout open so the reader never sees EOF.
|
||||
# Approach: spawn `sh -c 'sleep 10 &'` with setsid — sh forks the
|
||||
# sleep into a new session group, exits immediately, but sleep
|
||||
# inherits the stdout pipe and keeps it open.
|
||||
proc = subprocess.Popen(
|
||||
["sh", "-c", "exec 1>&2; ( sleep 30 ) & disown; exit 0"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
preexec_fn=os.setsid,
|
||||
)
|
||||
|
||||
s = _make_session(sid="proc_orphan_test")
|
||||
s.process = proc
|
||||
s.pid = proc.pid
|
||||
registry._running[s.id] = s
|
||||
|
||||
# Wait for the direct child to exit. We don't start a reader thread,
|
||||
# so session.exited stays False (mimicking the stuck-reader state).
|
||||
assert _wait_until(lambda: proc.poll() is not None, timeout=5.0), (
|
||||
"Direct child should exit quickly (sh exits, sleep descendant "
|
||||
"holds the pipe open)"
|
||||
)
|
||||
|
||||
# Before the fix: poll would return "running" forever.
|
||||
# After the fix: poll reconciles against proc.poll() and flips.
|
||||
assert s.exited is False # Precondition: reader hasn't updated it.
|
||||
result = registry.poll(s.id)
|
||||
assert result["status"] == "exited", (
|
||||
f"Expected reconciled 'exited' status; got {result!r}. "
|
||||
"This is issue #17327 — reader is blocked on orphaned pipe."
|
||||
)
|
||||
assert result["exit_code"] == 0
|
||||
assert s.exited is True
|
||||
assert s.id in registry._finished
|
||||
assert s.id not in registry._running
|
||||
|
||||
# Clean up the orphaned descendant.
|
||||
try:
|
||||
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
pass
|
||||
|
||||
def test_reconcile_noop_when_child_still_running(self, registry):
|
||||
"""Reconcile must NOT flip exited when the direct child is alive."""
|
||||
proc = _spawn_python_sleep(5.0)
|
||||
s = _make_session(sid="proc_running_test")
|
||||
s.process = proc
|
||||
s.pid = proc.pid
|
||||
registry._running[s.id] = s
|
||||
|
||||
result = registry.poll(s.id)
|
||||
assert result["status"] == "running"
|
||||
assert s.exited is False
|
||||
|
||||
proc.kill()
|
||||
proc.wait()
|
||||
|
||||
def test_reconcile_noop_on_already_exited(self, registry):
|
||||
"""Reconcile is a no-op when session.exited is already True."""
|
||||
s = _make_session(sid="proc_already_exited", exited=True, exit_code=7)
|
||||
s.process = MagicMock()
|
||||
s.process.poll = MagicMock(return_value=0) # Would say exit 0
|
||||
registry._finished[s.id] = s
|
||||
|
||||
registry._reconcile_local_exit(s)
|
||||
# Must not overwrite the existing exit_code with proc.poll()'s 0.
|
||||
assert s.exit_code == 7
|
||||
|
||||
def test_reconcile_noop_on_no_process(self, registry):
|
||||
"""Reconcile is a no-op for sessions without a local Popen (env/PTY)."""
|
||||
s = _make_session(sid="proc_no_popen")
|
||||
assert getattr(s, "process", None) is None
|
||||
# Must not raise.
|
||||
registry._reconcile_local_exit(s)
|
||||
assert s.exited is False
|
||||
|
||||
def test_wait_returns_when_reader_blocked(self, registry):
|
||||
"""wait() must also reconcile — not just poll()."""
|
||||
proc = subprocess.Popen(
|
||||
["sh", "-c", "( sleep 30 ) & disown; exit 0"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
preexec_fn=os.setsid,
|
||||
)
|
||||
|
||||
s = _make_session(sid="proc_wait_orphan")
|
||||
s.process = proc
|
||||
s.pid = proc.pid
|
||||
registry._running[s.id] = s
|
||||
|
||||
assert _wait_until(lambda: proc.poll() is not None, timeout=5.0)
|
||||
|
||||
start = time.monotonic()
|
||||
result = registry.wait(s.id, timeout=10)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
assert result["status"] == "exited", result
|
||||
assert elapsed < 5.0, (
|
||||
f"wait() should return ~immediately via reconcile; took {elapsed:.1f}s"
|
||||
)
|
||||
|
||||
try:
|
||||
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
||||
except (ProcessLookupError, PermissionError):
|
||||
pass
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Read log
|
||||
# =========================================================================
|
||||
|
||||
@ -800,6 +800,78 @@ class ProcessRegistry:
|
||||
session = self._running.get(session_id) or self._finished.get(session_id)
|
||||
return self._refresh_detached_session(session)
|
||||
|
||||
def _reconcile_local_exit(self, session: "ProcessSession") -> None:
|
||||
"""Reconcile session.exited against the real child process state.
|
||||
|
||||
The reader thread (`_reader_loop`) sets `session.exited = True` only
|
||||
in its `finally` block, which runs when `stdout.read()` returns EOF.
|
||||
If the direct `Popen` child has exited but a descendant process (e.g.
|
||||
a daemon spawned by `hermes update` restarting the gateway) is still
|
||||
holding the stdout pipe open, the reader blocks forever and poll()
|
||||
keeps returning "running" indefinitely (issue #17327 — 74 polls over
|
||||
7 minutes on Feishu).
|
||||
|
||||
This helper closes that window: when `session.exited` is still False
|
||||
but the direct child's `Popen.poll()` reports an exit code, drain any
|
||||
readable bytes non-blocking and flip `session.exited`. The orphaned
|
||||
reader thread remains stuck on its blocking `read()` but is a daemon
|
||||
thread and will be reaped with the process.
|
||||
|
||||
Safe no-op on sessions without a local `Popen` (env/PTY), already-
|
||||
exited sessions, and detached-recovered sessions.
|
||||
"""
|
||||
if session is None or session.exited:
|
||||
return
|
||||
proc = getattr(session, "process", None)
|
||||
if proc is None:
|
||||
return
|
||||
try:
|
||||
rc = proc.poll()
|
||||
except Exception:
|
||||
return
|
||||
if rc is None:
|
||||
return # Direct child still running — reader block is legitimate.
|
||||
|
||||
# Direct child exited. Try to drain any bytes the reader hasn't
|
||||
# consumed yet. This is best-effort: if the pipe is held open by a
|
||||
# descendant, the non-blocking read returns what's immediately
|
||||
# available and we stop.
|
||||
drained = ""
|
||||
stdout = getattr(proc, "stdout", None)
|
||||
if stdout is not None and not _IS_WINDOWS:
|
||||
try:
|
||||
import fcntl
|
||||
fd = stdout.fileno()
|
||||
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
||||
try:
|
||||
chunk = stdout.read()
|
||||
if chunk:
|
||||
drained = chunk if isinstance(chunk, str) else chunk.decode("utf-8", errors="replace")
|
||||
except (BlockingIOError, OSError, ValueError):
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
fcntl.fcntl(fd, fcntl.F_SETFL, flags)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug("Non-blocking drain failed for %s: %s", session.id, e)
|
||||
|
||||
with session._lock:
|
||||
if drained:
|
||||
session.output_buffer += drained
|
||||
if len(session.output_buffer) > session.max_output_chars:
|
||||
session.output_buffer = session.output_buffer[-session.max_output_chars:]
|
||||
session.exited = True
|
||||
session.exit_code = rc
|
||||
logger.info(
|
||||
"Reconciled session %s: direct child exited with code %s but reader "
|
||||
"was still blocked (orphaned pipe). Flipped to exited.",
|
||||
session.id, rc,
|
||||
)
|
||||
self._move_to_finished(session)
|
||||
|
||||
def poll(self, session_id: str) -> dict:
|
||||
"""Check status and get new output for a background process."""
|
||||
from tools.ansi_strip import strip_ansi
|
||||
@ -808,6 +880,10 @@ class ProcessRegistry:
|
||||
if session is None:
|
||||
return {"status": "not_found", "error": f"No process with ID {session_id}"}
|
||||
|
||||
# Reconcile against real child state before reading session.exited.
|
||||
# Guards against orphaned-pipe reader hangs (issue #17327).
|
||||
self._reconcile_local_exit(session)
|
||||
|
||||
with session._lock:
|
||||
output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else ""
|
||||
|
||||
@ -898,6 +974,10 @@ class ProcessRegistry:
|
||||
|
||||
while time.monotonic() < deadline:
|
||||
session = self._refresh_detached_session(session)
|
||||
# Reconcile against real child state — guards against orphaned-
|
||||
# pipe reader hangs where the reader is blocked but the direct
|
||||
# child has already exited (issue #17327).
|
||||
self._reconcile_local_exit(session)
|
||||
if session.exited:
|
||||
self._completion_consumed.add(session_id)
|
||||
result = {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user