6c1313d64e
CI / Adapter unit tests (push) Successful in 39s
CI / Template validation (static) (push) Successful in 44s
CI / Adapter unit tests (pull_request) Successful in 32s
CI / Template validation (runtime) (push) Successful in 1m44s
CI / Template validation (static) (pull_request) Successful in 1m41s
CI / T4 tier-4 conformance (live) (push) Successful in 1m44s
CI / validate (push) Successful in 1s
CI / Template validation (runtime) (pull_request) Successful in 40s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 1m21s
CI / validate (pull_request) Successful in 1s
Closes the prod-Reviewer/Researcher A2A wedge: after the first successful codex turn, subsequent JSON-RPC `message/send` requests returned 0 bytes in 60 s and never reached the codex CLI rollout file. Empirical proof: 2026-05-18 22:25Z in-container probe on both workspaces — synthetic POST timed out, codex rollout size delta = 0, unique marker never appeared in the JSONL log. Confirmed not codex CLI (rollout shows healthy task_complete in 2–10 s), not auth (auth.json/CODEX_AUTH_JSON valid), not model (gpt-5.5 accepted). Root cause — three composable failure modes in the stdio reader path: 1. `AppServerProcess._read_loop` returned cleanly when stdout reached EOF. `_reader_exc` stayed None, no pending request was failed. Any future registered after EOF waited the full request timeout (default 600 s). 2. No child-exit watcher. A crashed / killed child whose stdout was already drained left pending requests hanging on `state.completed` for the full turn budget. 3. `CodexAppServerExecutor._run_turn` blocked on `state.completed.wait()` with only a 600 s upper bound, so a wedged channel held the per-thread `_turn_lock` for 10 minutes per request — masking the dead channel as "still working". Fix: - `_read_loop` now treats stdout EOF as fatal: sets `_reader_exc`, fails every pending future via `_mark_dead`, identical to the exception path. - New `_watch_child` task awaits `proc.wait()` and on completion marks the channel dead so any pending requests fail. Defends against the race where the reader is parked in `readuntil` but the OS reaps the child first. - `request()` re-checks `_reader_exc` after registering the future, so a future added strictly after `_mark_dead` ran still gets failed (no observable window for the race). - New `_TURN_INACTIVITY_TIMEOUT` (90 s) watchdog in `_run_turn`: reset on every notification, surfaces a clear "channel wedged" error when codex stops emitting. The 600 s total-turn budget is preserved for legitimately long tool-use turns. - Bootstrap RPCs now bounded: initialize / thread/start each capped at 30 s so a child wedged during setup doesn't stall the first turn for 10 minutes. Existing executor unit tests used stale codex 0.71-shape event names (`turn/completed`, bare `agent_message_delta`) — they would in fact hang against the production code which expects codex 0.72+ `codex/event/<type>` envelopes. Tests rewritten to use the production event shape via `FakeAppServer.push_delta` / `push_task_complete` helpers. Three new app_server tests (`test_eof_fails_pending_requests`, `test_in_flight_request_fails _on_eof`, `test_child_crash_fails_pending_requests`) and four new executor tests (inactivity watchdog: silent + slow-but-alive, second-turn-on-dead-channel, thread-start-timeout-bounded) cover the regression. Refs prod-team build foreground priority and codex template maintenance task — open for non-author review, no auto-merge. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
211 lines
8.4 KiB
Python
211 lines
8.4 KiB
Python
"""Unit tests for AppServerProcess against a fake stdio child.
|
|
|
|
We can't depend on a real `codex` binary in CI, so these tests stand up
|
|
a Python-implemented mock app-server that speaks NDJSON over stdio.
|
|
The mock is intentionally tiny — it only handles the request/response
|
|
+ notification semantics we exercise here, not the full v2 protocol.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
# Make app_server.py importable from the test file without setup.py.
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from app_server import AppServerError, AppServerProcess # noqa: E402
|
|
|
|
# Path to the in-tree mock app-server (a Python script that pretends
|
|
# to be `codex app-server`). Tests pass it via executable= override.
|
|
_MOCK = str(Path(__file__).resolve().parent / "mock_app_server.py")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_handshake() -> None:
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
result = await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
assert result["userAgent"].startswith("mock_app_server/")
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_request_response_correlation() -> None:
|
|
"""Concurrent requests should not cross responses."""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
|
|
# Mock supports `echo` which round-trips params.text after a
|
|
# configurable delay. Send three with different delays + texts;
|
|
# confirm each future resolves to its own input.
|
|
results = await asyncio.gather(
|
|
proc.request("echo", {"text": "alpha", "delay_ms": 30}),
|
|
proc.request("echo", {"text": "beta", "delay_ms": 5}),
|
|
proc.request("echo", {"text": "gamma", "delay_ms": 15}),
|
|
)
|
|
assert [r["text"] for r in results] == ["alpha", "beta", "gamma"]
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_response_raises_app_server_error() -> None:
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
with pytest.raises(AppServerError) as ei:
|
|
await proc.request("error", {"code": -32000, "message": "boom"})
|
|
assert "boom" in str(ei.value)
|
|
assert ei.value.payload.get("code") == -32000
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notifications_dispatched_to_subscribers() -> None:
|
|
"""Subscribed callback should fire for every notification, in order."""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
received: list[tuple[str, dict]] = []
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
proc.subscribe(lambda m, p: received.append((m, p)))
|
|
# Mock's `emit` request fires N notifications named `tick` then
|
|
# returns a final ack. We need to wait until all notifications
|
|
# arrive — the mock guarantees the response is sent AFTER its
|
|
# notifications, so awaiting the response is sufficient.
|
|
await proc.request("emit", {"count": 3, "method": "tick"})
|
|
# Give the reader loop one tick to process trailing notifications
|
|
# if any (defensive — mock orders them before the response).
|
|
await asyncio.sleep(0.05)
|
|
assert [m for m, _ in received] == ["tick", "tick", "tick"]
|
|
assert [p["i"] for _, p in received] == [0, 1, 2]
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_requests_fail_on_close() -> None:
|
|
"""close() must release any awaiting request callers."""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
# Fire a long-delay request and close the process before the
|
|
# response can arrive. The pending future should fail with
|
|
# ConnectionError so the caller doesn't hang.
|
|
slow = asyncio.create_task(proc.request("echo", {"text": "x", "delay_ms": 5000}))
|
|
await asyncio.sleep(0.05)
|
|
await proc.close()
|
|
with pytest.raises(ConnectionError):
|
|
await slow
|
|
finally:
|
|
# Idempotent
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_is_idempotent() -> None:
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
await proc.close()
|
|
rc = await proc.close()
|
|
assert rc is not None # mock exits with 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_request_after_close_raises() -> None:
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
await proc.close()
|
|
with pytest.raises(ConnectionError):
|
|
await proc.request("echo", {"text": "x"})
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_eof_fails_pending_requests() -> None:
|
|
"""Stdout EOF on a still-alive child must fail every pending request.
|
|
|
|
Regression for the 2026-05-18 prod-Reviewer/Researcher wedge: the
|
|
codex CLI closed its stdout pipe mid-conversation while the
|
|
process itself stayed alive (parked in epoll). Pre-fix
|
|
AppServerProcess._read_loop returned cleanly on EOF without
|
|
setting _reader_exc — any subsequent request() blocked on a future
|
|
that would never resolve until the 600 s request timeout. Post-fix
|
|
EOF sets _reader_exc and fails every pending future immediately.
|
|
"""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
|
|
# Ask the mock to close stdout, then verify a subsequent
|
|
# request fails fast with ConnectionError (NOT a timeout).
|
|
await proc.request("close_stdout_after", {})
|
|
|
|
# Give the reader a moment to notice EOF.
|
|
await asyncio.sleep(0.1)
|
|
|
|
with pytest.raises(ConnectionError) as ei:
|
|
# 5s is plenty for the mark-dead path to trip; pre-fix
|
|
# this would wait the full default request timeout.
|
|
await proc.request("echo", {"text": "after-eof"}, timeout=5.0)
|
|
assert "EOF" in str(ei.value) or "stdout" in str(ei.value)
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_in_flight_request_fails_on_eof() -> None:
|
|
"""A future already-pending when EOF arrives must fail, not hang."""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
|
|
# Issue a request whose response will never come because we'll
|
|
# close the stdout pipe in the SAME mock invocation. The mock's
|
|
# close_stdout_after acks first then closes, so the only way
|
|
# to test mid-flight failure is to issue a separate slow
|
|
# request alongside.
|
|
slow = asyncio.create_task(
|
|
proc.request("echo", {"text": "x", "delay_ms": 5000}, timeout=10.0)
|
|
)
|
|
# Yield long enough for the slow echo to be registered in
|
|
# _pending and written to stdin.
|
|
await asyncio.sleep(0.05)
|
|
|
|
# Close stdout — slow's pending future must fail.
|
|
await proc.request("close_stdout_after", {})
|
|
|
|
with pytest.raises(ConnectionError):
|
|
await slow
|
|
finally:
|
|
await proc.close()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_child_crash_fails_pending_requests() -> None:
|
|
"""Child process exit must fail pending requests via the watcher.
|
|
|
|
Even if the reader missed EOF (parked in readuntil) the
|
|
_watch_child task awaits proc.wait() and on completion fails any
|
|
still-pending requests with ConnectionError. Covers OS-level
|
|
crashes (SIGKILL, segfault) that the reader-EOF path might race.
|
|
"""
|
|
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
|
|
try:
|
|
await proc.initialize(client_info={"name": "test", "version": "0"})
|
|
|
|
# Ask the mock to crash. The ack arrives before the exit; the
|
|
# next request must fail fast.
|
|
await proc.request("crash_after", {})
|
|
# Give the child a moment to actually exit and the watcher to
|
|
# mark the channel dead.
|
|
await asyncio.sleep(0.3)
|
|
|
|
with pytest.raises(ConnectionError):
|
|
await proc.request("echo", {"text": "after-crash"}, timeout=5.0)
|
|
finally:
|
|
await proc.close()
|