6c1313d64e
CI / Adapter unit tests (push) Successful in 39s
CI / Template validation (static) (push) Successful in 44s
CI / Adapter unit tests (pull_request) Successful in 32s
CI / Template validation (runtime) (push) Successful in 1m44s
CI / Template validation (static) (pull_request) Successful in 1m41s
CI / T4 tier-4 conformance (live) (push) Successful in 1m44s
CI / validate (push) Successful in 1s
CI / Template validation (runtime) (pull_request) Successful in 40s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 1m21s
CI / validate (pull_request) Successful in 1s
Closes the prod-Reviewer/Researcher A2A wedge: after the first successful codex turn, subsequent JSON-RPC `message/send` requests returned 0 bytes in 60 s and never reached the codex CLI rollout file. Empirical proof: 2026-05-18 22:25Z in-container probe on both workspaces — synthetic POST timed out, codex rollout size delta = 0, unique marker never appeared in the JSONL log. Confirmed not codex CLI (rollout shows healthy task_complete in 2–10 s), not auth (auth.json/CODEX_AUTH_JSON valid), not model (gpt-5.5 accepted). Root cause — three composable failure modes in the stdio reader path: 1. `AppServerProcess._read_loop` returned cleanly when stdout reached EOF. `_reader_exc` stayed None, no pending request was failed. Any future registered after EOF waited the full request timeout (default 600 s). 2. No child-exit watcher. A crashed / killed child whose stdout was already drained left pending requests hanging on `state.completed` for the full turn budget. 3. `CodexAppServerExecutor._run_turn` blocked on `state.completed.wait()` with only a 600 s upper bound, so a wedged channel held the per-thread `_turn_lock` for 10 minutes per request — masking the dead channel as "still working". Fix: - `_read_loop` now treats stdout EOF as fatal: sets `_reader_exc`, fails every pending future via `_mark_dead`, identical to the exception path. - New `_watch_child` task awaits `proc.wait()` and on completion marks the channel dead so any pending requests fail. Defends against the race where the reader is parked in `readuntil` but the OS reaps the child first. - `request()` re-checks `_reader_exc` after registering the future, so a future added strictly after `_mark_dead` ran still gets failed (no observable window for the race). - New `_TURN_INACTIVITY_TIMEOUT` (90 s) watchdog in `_run_turn`: reset on every notification, surfaces a clear "channel wedged" error when codex stops emitting. The 600 s total-turn budget is preserved for legitimately long tool-use turns. - Bootstrap RPCs now bounded: initialize / thread/start each capped at 30 s so a child wedged during setup doesn't stall the first turn for 10 minutes. Existing executor unit tests used stale codex 0.71-shape event names (`turn/completed`, bare `agent_message_delta`) — they would in fact hang against the production code which expects codex 0.72+ `codex/event/<type>` envelopes. Tests rewritten to use the production event shape via `FakeAppServer.push_delta` / `push_task_complete` helpers. Three new app_server tests (`test_eof_fails_pending_requests`, `test_in_flight_request_fails _on_eof`, `test_child_crash_fails_pending_requests`) and four new executor tests (inactivity watchdog: silent + slow-but-alive, second-turn-on-dead-channel, thread-start-timeout-bounded) cover the regression. Refs prod-team build foreground priority and codex template maintenance task — open for non-author review, no auto-merge. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
"""Tiny mock of `codex app-server` for unit tests.
|
|
|
|
Speaks NDJSON over stdio. Implements only the methods AppServerProcess
|
|
tests exercise: `initialize`, `echo`, `error`, `emit`. Everything else
|
|
returns a JSON-RPC method-not-found error.
|
|
|
|
This stands in for the real codex binary so tests don't depend on a
|
|
specific codex-cli version installed on the runner. Keep it dumb — any
|
|
behavior the executor relies on must be tested against the real
|
|
binary in an integration test, not here.
|
|
|
|
Two failure modes are exposed for testing the reader-lifecycle
|
|
hardening (added 2026-05-18 alongside the prod-Reviewer/Researcher
|
|
wedge fix):
|
|
|
|
- ``close_stdout_after`` request: the mock answers normally, then
|
|
closes its stdout file descriptor without exiting. This reproduces
|
|
the codex CLI behavior of closing the channel mid-conversation
|
|
while the process itself stays alive — the case the reader-EOF
|
|
detection path needs to catch.
|
|
|
|
- ``crash_after`` request: the mock answers normally, then calls
|
|
``os._exit(1)`` shortly after. Reproduces a child that segfaults /
|
|
is OOM-killed mid-turn — the case the child-watcher path catches.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
|
|
async def _read_lines() -> "asyncio.StreamReader":
|
|
loop = asyncio.get_running_loop()
|
|
reader = asyncio.StreamReader()
|
|
protocol = asyncio.StreamReaderProtocol(reader)
|
|
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
|
|
return reader
|
|
|
|
|
|
def _write(obj: dict) -> None:
|
|
sys.stdout.write(json.dumps(obj, separators=(",", ":")) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
async def _handle(msg: dict) -> None:
|
|
method = msg.get("method")
|
|
params = msg.get("params") or {}
|
|
request_id = msg.get("id")
|
|
|
|
# Notifications (no id) are ignored by the mock.
|
|
if request_id is None:
|
|
return
|
|
|
|
if method == "initialize":
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {"userAgent": "mock_app_server/0.1"},
|
|
})
|
|
return
|
|
|
|
if method == "echo":
|
|
delay_ms = int(params.get("delay_ms", 0))
|
|
if delay_ms > 0:
|
|
await asyncio.sleep(delay_ms / 1000)
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {"text": params.get("text", "")},
|
|
})
|
|
return
|
|
|
|
if method == "error":
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": int(params.get("code", -32000)),
|
|
"message": str(params.get("message", "mock error")),
|
|
},
|
|
})
|
|
return
|
|
|
|
if method == "emit":
|
|
# Fire `count` notifications named `method`, then ack.
|
|
count = int(params.get("count", 0))
|
|
notif_method = str(params.get("method", "tick"))
|
|
for i in range(count):
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"method": notif_method,
|
|
"params": {"i": i},
|
|
})
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {"emitted": count},
|
|
})
|
|
return
|
|
|
|
if method == "close_stdout_after":
|
|
# Ack the request, then close stdout without exiting. The
|
|
# reader sees EOF; the child stays alive (so wait4 does NOT
|
|
# return). Exercises the EOF path of _read_loop.
|
|
#
|
|
# We close the underlying file descriptor (FD 1), not just the
|
|
# Python wrapper — closing sys.stdout only closes the Python
|
|
# buffer; the OS pipe needs ``os.close(1)`` to actually send
|
|
# EOF to the parent's reader.
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {"closed": True},
|
|
})
|
|
sys.stdout.flush()
|
|
try:
|
|
os.close(1)
|
|
except Exception:
|
|
pass
|
|
# Keep stdin draining so the process doesn't crash on next
|
|
# read — we want EOF on stdout WITHOUT the watcher firing.
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(60)
|
|
except Exception:
|
|
return
|
|
return
|
|
|
|
if method == "crash_after":
|
|
# Ack, then exit non-zero a moment later. Exercises the
|
|
# _watch_child path: pending requests must fail when the
|
|
# child reaps, regardless of whether the reader noticed
|
|
# stdout EOF first.
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {"crashing": True},
|
|
})
|
|
sys.stdout.flush()
|
|
await asyncio.sleep(0.05)
|
|
os._exit(1)
|
|
|
|
# Method not found.
|
|
_write({
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {"code": -32601, "message": f"method not found: {method}"},
|
|
})
|
|
|
|
|
|
async def main() -> None:
|
|
reader = await _read_lines()
|
|
while True:
|
|
line = await reader.readline()
|
|
if not line:
|
|
break
|
|
try:
|
|
msg = json.loads(line.decode("utf-8"))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# Schedule handling so `emit` doesn't block subsequent reads.
|
|
asyncio.create_task(_handle(msg))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
pass
|