Files
molecule-ai-workspace-templ…/tests/mock_app_server.py
core-devops 6c1313d64e
CI / Adapter unit tests (push) Successful in 39s
CI / Template validation (static) (push) Successful in 44s
CI / Adapter unit tests (pull_request) Successful in 32s
CI / Template validation (runtime) (push) Successful in 1m44s
CI / Template validation (static) (pull_request) Successful in 1m41s
CI / T4 tier-4 conformance (live) (push) Successful in 1m44s
CI / validate (push) Successful in 1s
CI / Template validation (runtime) (pull_request) Successful in 40s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 1m21s
CI / validate (pull_request) Successful in 1s
fix(executor): prevent stdout-reader deadlock after first turn
Closes the prod-Reviewer/Researcher A2A wedge: after the first
successful codex turn, subsequent JSON-RPC `message/send` requests
returned 0 bytes in 60 s and never reached the codex CLI rollout
file. Empirical proof: 2026-05-18 22:25Z in-container probe on both
workspaces — synthetic POST timed out, codex rollout size delta = 0,
unique marker never appeared in the JSONL log. Confirmed not codex
CLI (rollout shows healthy task_complete in 2–10 s), not auth
(auth.json/CODEX_AUTH_JSON valid), not model (gpt-5.5 accepted).

Root cause — three composable failure modes in the stdio reader path:

1. `AppServerProcess._read_loop` returned cleanly when stdout reached
   EOF. `_reader_exc` stayed None, no pending request was failed.
   Any future registered after EOF waited the full request timeout
   (default 600 s).

2. No child-exit watcher. A crashed / killed child whose stdout was
   already drained left pending requests hanging on `state.completed`
   for the full turn budget.

3. `CodexAppServerExecutor._run_turn` blocked on
   `state.completed.wait()` with only a 600 s upper bound, so a
   wedged channel held the per-thread `_turn_lock` for 10 minutes
   per request — masking the dead channel as "still working".

Fix:

- `_read_loop` now treats stdout EOF as fatal: sets `_reader_exc`,
  fails every pending future via `_mark_dead`, identical to the
  exception path.
- New `_watch_child` task awaits `proc.wait()` and on completion
  marks the channel dead so any pending requests fail. Defends
  against the race where the reader is parked in `readuntil` but
  the OS reaps the child first.
- `request()` re-checks `_reader_exc` after registering the future,
  so a future added strictly after `_mark_dead` ran still gets
  failed (no observable window for the race).
- New `_TURN_INACTIVITY_TIMEOUT` (90 s) watchdog in `_run_turn`:
  reset on every notification, surfaces a clear "channel wedged"
  error when codex stops emitting. The 600 s total-turn budget is
  preserved for legitimately long tool-use turns.
- Bootstrap RPCs now bounded: initialize / thread/start each capped
  at 30 s so a child wedged during setup doesn't stall the first
  turn for 10 minutes.

Existing executor unit tests used stale codex 0.71-shape event
names (`turn/completed`, bare `agent_message_delta`) — they would
in fact hang against the production code which expects codex 0.72+
`codex/event/<type>` envelopes. Tests rewritten to use the
production event shape via `FakeAppServer.push_delta` /
`push_task_complete` helpers. Three new app_server tests
(`test_eof_fails_pending_requests`, `test_in_flight_request_fails
_on_eof`, `test_child_crash_fails_pending_requests`) and four new
executor tests (inactivity watchdog: silent + slow-but-alive,
second-turn-on-dead-channel, thread-start-timeout-bounded) cover
the regression.

Refs prod-team build foreground priority and codex template
maintenance task — open for non-author review, no auto-merge.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 15:54:56 -07:00

172 lines
5.2 KiB
Python

"""Tiny mock of `codex app-server` for unit tests.
Speaks NDJSON over stdio. Implements only the methods AppServerProcess
tests exercise: `initialize`, `echo`, `error`, `emit`. Everything else
returns a JSON-RPC method-not-found error.
This stands in for the real codex binary so tests don't depend on a
specific codex-cli version installed on the runner. Keep it dumb — any
behavior the executor relies on must be tested against the real
binary in an integration test, not here.
Two failure modes are exposed for testing the reader-lifecycle
hardening (added 2026-05-18 alongside the prod-Reviewer/Researcher
wedge fix):
- ``close_stdout_after`` request: the mock answers normally, then
closes its stdout file descriptor without exiting. This reproduces
the codex CLI behavior of closing the channel mid-conversation
while the process itself stays alive — the case the reader-EOF
detection path needs to catch.
- ``crash_after`` request: the mock answers normally, then calls
``os._exit(1)`` shortly after. Reproduces a child that segfaults /
is OOM-killed mid-turn — the case the child-watcher path catches.
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
async def _read_lines() -> "asyncio.StreamReader":
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
return reader
def _write(obj: dict) -> None:
sys.stdout.write(json.dumps(obj, separators=(",", ":")) + "\n")
sys.stdout.flush()
async def _handle(msg: dict) -> None:
method = msg.get("method")
params = msg.get("params") or {}
request_id = msg.get("id")
# Notifications (no id) are ignored by the mock.
if request_id is None:
return
if method == "initialize":
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"userAgent": "mock_app_server/0.1"},
})
return
if method == "echo":
delay_ms = int(params.get("delay_ms", 0))
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"text": params.get("text", "")},
})
return
if method == "error":
_write({
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": int(params.get("code", -32000)),
"message": str(params.get("message", "mock error")),
},
})
return
if method == "emit":
# Fire `count` notifications named `method`, then ack.
count = int(params.get("count", 0))
notif_method = str(params.get("method", "tick"))
for i in range(count):
_write({
"jsonrpc": "2.0",
"method": notif_method,
"params": {"i": i},
})
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"emitted": count},
})
return
if method == "close_stdout_after":
# Ack the request, then close stdout without exiting. The
# reader sees EOF; the child stays alive (so wait4 does NOT
# return). Exercises the EOF path of _read_loop.
#
# We close the underlying file descriptor (FD 1), not just the
# Python wrapper — closing sys.stdout only closes the Python
# buffer; the OS pipe needs ``os.close(1)`` to actually send
# EOF to the parent's reader.
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"closed": True},
})
sys.stdout.flush()
try:
os.close(1)
except Exception:
pass
# Keep stdin draining so the process doesn't crash on next
# read — we want EOF on stdout WITHOUT the watcher firing.
try:
while True:
await asyncio.sleep(60)
except Exception:
return
return
if method == "crash_after":
# Ack, then exit non-zero a moment later. Exercises the
# _watch_child path: pending requests must fail when the
# child reaps, regardless of whether the reader noticed
# stdout EOF first.
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"crashing": True},
})
sys.stdout.flush()
await asyncio.sleep(0.05)
os._exit(1)
# Method not found.
_write({
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"method not found: {method}"},
})
async def main() -> None:
reader = await _read_lines()
while True:
line = await reader.readline()
if not line:
break
try:
msg = json.loads(line.decode("utf-8"))
except json.JSONDecodeError:
continue
# Schedule handling so `emit` doesn't block subsequent reads.
asyncio.create_task(_handle(msg))
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass