Files
molecule-ai-workspace-templ…/tests/test_app_server.py
core-devops 6c1313d64e
CI / Adapter unit tests (push) Successful in 39s
CI / Template validation (static) (push) Successful in 44s
CI / Adapter unit tests (pull_request) Successful in 32s
CI / Template validation (runtime) (push) Successful in 1m44s
CI / Template validation (static) (pull_request) Successful in 1m41s
CI / T4 tier-4 conformance (live) (push) Successful in 1m44s
CI / validate (push) Successful in 1s
CI / Template validation (runtime) (pull_request) Successful in 40s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 1m21s
CI / validate (pull_request) Successful in 1s
fix(executor): prevent stdout-reader deadlock after first turn
Closes the prod-Reviewer/Researcher A2A wedge: after the first
successful codex turn, subsequent JSON-RPC `message/send` requests
returned 0 bytes in 60 s and never reached the codex CLI rollout
file. Empirical proof: 2026-05-18 22:25Z in-container probe on both
workspaces — synthetic POST timed out, codex rollout size delta = 0,
unique marker never appeared in the JSONL log. Confirmed not codex
CLI (rollout shows healthy task_complete in 2–10 s), not auth
(auth.json/CODEX_AUTH_JSON valid), not model (gpt-5.5 accepted).

Root cause — three composable failure modes in the stdio reader path:

1. `AppServerProcess._read_loop` returned cleanly when stdout reached
   EOF. `_reader_exc` stayed None, no pending request was failed.
   Any future registered after EOF waited the full request timeout
   (default 600 s).

2. No child-exit watcher. A crashed / killed child whose stdout was
   already drained left pending requests hanging on `state.completed`
   for the full turn budget.

3. `CodexAppServerExecutor._run_turn` blocked on
   `state.completed.wait()` with only a 600 s upper bound, so a
   wedged channel held the per-thread `_turn_lock` for 10 minutes
   per request — masking the dead channel as "still working".

Fix:

- `_read_loop` now treats stdout EOF as fatal: sets `_reader_exc`,
  fails every pending future via `_mark_dead`, identical to the
  exception path.
- New `_watch_child` task awaits `proc.wait()` and on completion
  marks the channel dead so any pending requests fail. Defends
  against the race where the reader is parked in `readuntil` but
  the OS reaps the child first.
- `request()` re-checks `_reader_exc` after registering the future,
  so a future added strictly after `_mark_dead` ran still gets
  failed (no observable window for the race).
- New `_TURN_INACTIVITY_TIMEOUT` (90 s) watchdog in `_run_turn`:
  reset on every notification, surfaces a clear "channel wedged"
  error when codex stops emitting. The 600 s total-turn budget is
  preserved for legitimately long tool-use turns.
- Bootstrap RPCs now bounded: initialize / thread/start each capped
  at 30 s so a child wedged during setup doesn't stall the first
  turn for 10 minutes.

Existing executor unit tests used stale codex 0.71-shape event
names (`turn/completed`, bare `agent_message_delta`) — they would
in fact hang against the production code which expects codex 0.72+
`codex/event/<type>` envelopes. Tests rewritten to use the
production event shape via `FakeAppServer.push_delta` /
`push_task_complete` helpers. Three new app_server tests
(`test_eof_fails_pending_requests`, `test_in_flight_request_fails
_on_eof`, `test_child_crash_fails_pending_requests`) and four new
executor tests (inactivity watchdog: silent + slow-but-alive,
second-turn-on-dead-channel, thread-start-timeout-bounded) cover
the regression.

Refs prod-team build foreground priority and codex template
maintenance task — open for non-author review, no auto-merge.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 15:54:56 -07:00

211 lines
8.4 KiB
Python

"""Unit tests for AppServerProcess against a fake stdio child.
We can't depend on a real `codex` binary in CI, so these tests stand up
a Python-implemented mock app-server that speaks NDJSON over stdio.
The mock is intentionally tiny — it only handles the request/response
+ notification semantics we exercise here, not the full v2 protocol.
"""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
import pytest
# Make app_server.py importable from the test file without setup.py.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from app_server import AppServerError, AppServerProcess # noqa: E402
# Path to the in-tree mock app-server (a Python script that pretends
# to be `codex app-server`). Tests pass it via executable= override.
_MOCK = str(Path(__file__).resolve().parent / "mock_app_server.py")
@pytest.mark.asyncio
async def test_initialize_handshake() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
result = await proc.initialize(client_info={"name": "test", "version": "0"})
assert result["userAgent"].startswith("mock_app_server/")
finally:
await proc.close()
@pytest.mark.asyncio
async def test_request_response_correlation() -> None:
"""Concurrent requests should not cross responses."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Mock supports `echo` which round-trips params.text after a
# configurable delay. Send three with different delays + texts;
# confirm each future resolves to its own input.
results = await asyncio.gather(
proc.request("echo", {"text": "alpha", "delay_ms": 30}),
proc.request("echo", {"text": "beta", "delay_ms": 5}),
proc.request("echo", {"text": "gamma", "delay_ms": 15}),
)
assert [r["text"] for r in results] == ["alpha", "beta", "gamma"]
finally:
await proc.close()
@pytest.mark.asyncio
async def test_error_response_raises_app_server_error() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
with pytest.raises(AppServerError) as ei:
await proc.request("error", {"code": -32000, "message": "boom"})
assert "boom" in str(ei.value)
assert ei.value.payload.get("code") == -32000
finally:
await proc.close()
@pytest.mark.asyncio
async def test_notifications_dispatched_to_subscribers() -> None:
"""Subscribed callback should fire for every notification, in order."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
received: list[tuple[str, dict]] = []
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
proc.subscribe(lambda m, p: received.append((m, p)))
# Mock's `emit` request fires N notifications named `tick` then
# returns a final ack. We need to wait until all notifications
# arrive — the mock guarantees the response is sent AFTER its
# notifications, so awaiting the response is sufficient.
await proc.request("emit", {"count": 3, "method": "tick"})
# Give the reader loop one tick to process trailing notifications
# if any (defensive — mock orders them before the response).
await asyncio.sleep(0.05)
assert [m for m, _ in received] == ["tick", "tick", "tick"]
assert [p["i"] for _, p in received] == [0, 1, 2]
finally:
await proc.close()
@pytest.mark.asyncio
async def test_pending_requests_fail_on_close() -> None:
"""close() must release any awaiting request callers."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Fire a long-delay request and close the process before the
# response can arrive. The pending future should fail with
# ConnectionError so the caller doesn't hang.
slow = asyncio.create_task(proc.request("echo", {"text": "x", "delay_ms": 5000}))
await asyncio.sleep(0.05)
await proc.close()
with pytest.raises(ConnectionError):
await slow
finally:
# Idempotent
await proc.close()
@pytest.mark.asyncio
async def test_close_is_idempotent() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
await proc.close()
rc = await proc.close()
assert rc is not None # mock exits with 0
@pytest.mark.asyncio
async def test_request_after_close_raises() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
await proc.close()
with pytest.raises(ConnectionError):
await proc.request("echo", {"text": "x"})
@pytest.mark.asyncio
async def test_eof_fails_pending_requests() -> None:
"""Stdout EOF on a still-alive child must fail every pending request.
Regression for the 2026-05-18 prod-Reviewer/Researcher wedge: the
codex CLI closed its stdout pipe mid-conversation while the
process itself stayed alive (parked in epoll). Pre-fix
AppServerProcess._read_loop returned cleanly on EOF without
setting _reader_exc — any subsequent request() blocked on a future
that would never resolve until the 600 s request timeout. Post-fix
EOF sets _reader_exc and fails every pending future immediately.
"""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Ask the mock to close stdout, then verify a subsequent
# request fails fast with ConnectionError (NOT a timeout).
await proc.request("close_stdout_after", {})
# Give the reader a moment to notice EOF.
await asyncio.sleep(0.1)
with pytest.raises(ConnectionError) as ei:
# 5s is plenty for the mark-dead path to trip; pre-fix
# this would wait the full default request timeout.
await proc.request("echo", {"text": "after-eof"}, timeout=5.0)
assert "EOF" in str(ei.value) or "stdout" in str(ei.value)
finally:
await proc.close()
@pytest.mark.asyncio
async def test_in_flight_request_fails_on_eof() -> None:
"""A future already-pending when EOF arrives must fail, not hang."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Issue a request whose response will never come because we'll
# close the stdout pipe in the SAME mock invocation. The mock's
# close_stdout_after acks first then closes, so the only way
# to test mid-flight failure is to issue a separate slow
# request alongside.
slow = asyncio.create_task(
proc.request("echo", {"text": "x", "delay_ms": 5000}, timeout=10.0)
)
# Yield long enough for the slow echo to be registered in
# _pending and written to stdin.
await asyncio.sleep(0.05)
# Close stdout — slow's pending future must fail.
await proc.request("close_stdout_after", {})
with pytest.raises(ConnectionError):
await slow
finally:
await proc.close()
@pytest.mark.asyncio
async def test_child_crash_fails_pending_requests() -> None:
"""Child process exit must fail pending requests via the watcher.
Even if the reader missed EOF (parked in readuntil) the
_watch_child task awaits proc.wait() and on completion fails any
still-pending requests with ConnectionError. Covers OS-level
crashes (SIGKILL, segfault) that the reader-EOF path might race.
"""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Ask the mock to crash. The ack arrives before the exit; the
# next request must fail fast.
await proc.request("crash_after", {})
# Give the child a moment to actually exit and the watcher to
# mark the channel dead.
await asyncio.sleep(0.3)
with pytest.raises(ConnectionError):
await proc.request("echo", {"text": "after-crash"}, timeout=5.0)
finally:
await proc.close()