OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule workspace runtime, with native MCP-style push parity via persistent codex app-server stdio JSON-RPC. Each session holds one long-lived `codex app-server` child + one thread; A2A messages become turn/start RPCs against the existing thread. Per-thread serialization handles mid-turn arrivals (matches OpenClaw's per-chat sequentializer). Modules: - app_server.py — async JSON-RPC over NDJSON stdio (286 LOC) - executor.py — turn lifecycle, notification accumulation, error surfacing (270 LOC) - adapter.py — thin BaseAdapter shell + preflight Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess. Validated end-to-end against real codex-cli 0.72.0: - initialize handshake works - thread/start works (returns thread.id, NOT thread.threadId as the generated JSON schema claims; executor accepts both shapes) Scaffolded but not yet end-to-end verified against a real Molecule workspace + peer A2A traffic — that lands separately. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
246 lines
9.0 KiB
Python
246 lines
9.0 KiB
Python
"""Unit tests for CodexAppServerExecutor's internal turn lifecycle.
|
|
|
|
We don't stand up a real codex app-server — those tests live in
|
|
test_app_server.py which validates the JSON-RPC plumbing against a
|
|
mock binary. Here we focus on the protocol-level behavior of
|
|
``_run_turn``: thread bootstrap, notification accumulation, completion
|
|
detection, error surfacing, mid-turn serialization.
|
|
|
|
The fake AppServerProcess records every request sent and exposes a
|
|
helper to drive notifications + responses on demand.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
# Import the executor module — relies on a2a + molecule_runtime being
|
|
# installed locally. If not, skip these tests; the executor will still
|
|
# be exercised in container CI.
|
|
pytest.importorskip("a2a.helpers")
|
|
pytest.importorskip("molecule_runtime.adapters.base")
|
|
|
|
from executor import CodexAppServerExecutor # noqa: E402
|
|
from molecule_runtime.adapters.base import AdapterConfig # noqa: E402
|
|
|
|
|
|
class FakeAppServer:
|
|
"""Drop-in for AppServerProcess that lets tests script responses + notifications.
|
|
|
|
Honors only the shape AppServerProcess presents to the executor:
|
|
initialize / request / subscribe / close. Each scripted turn lets
|
|
the test push delta notifications and resolve the response.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.requests: list[tuple[str, dict]] = []
|
|
self._subscribers: list = []
|
|
self._next_thread = 0
|
|
self._next_turn = 0
|
|
self.closed = False
|
|
# Test-controllable knobs
|
|
self.thread_start_response: dict | None = None
|
|
self.turn_start_responses: list[dict] = []
|
|
self.turn_start_raises: Exception | None = None
|
|
|
|
async def initialize(self, *, client_info: dict) -> dict:
|
|
return {"userAgent": "fake/0.0"}
|
|
|
|
async def request(self, method: str, params: dict | None = None, *, timeout: float | None = None) -> dict:
|
|
self.requests.append((method, params or {}))
|
|
if method == "thread/start":
|
|
if self.thread_start_response is not None:
|
|
return self.thread_start_response
|
|
self._next_thread += 1
|
|
# Use the real binary's `id` shape (verified 2026-05-02
|
|
# against codex 0.72) — the schema's `threadId` is also
|
|
# accepted by the executor but `id` is what production hits.
|
|
return {"thread": {"id": f"th_{self._next_thread}"}}
|
|
if method == "turn/start":
|
|
if self.turn_start_raises:
|
|
raise self.turn_start_raises
|
|
if self.turn_start_responses:
|
|
return self.turn_start_responses.pop(0)
|
|
self._next_turn += 1
|
|
return {"turn": {"id": f"tu_{self._next_turn}"}}
|
|
if method == "turn/interrupt":
|
|
return {}
|
|
raise AssertionError(f"unexpected method: {method}")
|
|
|
|
def subscribe(self, callback) -> "asyncio.coroutines.Coroutine":
|
|
self._subscribers.append(callback)
|
|
|
|
def unsubscribe() -> None:
|
|
try:
|
|
self._subscribers.remove(callback)
|
|
except ValueError:
|
|
pass
|
|
|
|
return unsubscribe
|
|
|
|
async def close(self) -> int | None:
|
|
self.closed = True
|
|
return 0
|
|
|
|
# --- test helpers ---------------------------------------------------
|
|
def push(self, method: str, params: dict | None = None) -> None:
|
|
"""Synchronously deliver a notification to all subscribers."""
|
|
for cb in list(self._subscribers):
|
|
cb(method, params or {})
|
|
|
|
|
|
def _make_executor(fake: FakeAppServer, *, model: str = "gpt-5", system_prompt: str = "be helpful") -> CodexAppServerExecutor:
|
|
cfg = AdapterConfig(model=model, system_prompt=system_prompt)
|
|
ex = CodexAppServerExecutor(cfg)
|
|
# Pre-inject the fake so _ensure_thread skips spawning codex.
|
|
ex._app_server = fake # type: ignore[assignment]
|
|
return ex
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_turn_starts_thread_and_returns_assembled_deltas() -> None:
|
|
fake = FakeAppServer()
|
|
ex = _make_executor(fake)
|
|
|
|
async def driver() -> None:
|
|
# Wait until turn/start is recorded, then push deltas + completion.
|
|
for _ in range(50):
|
|
if any(m == "turn/start" for m, _ in fake.requests):
|
|
break
|
|
await asyncio.sleep(0.01)
|
|
fake.push("agent_message_delta", {"delta": "hello "})
|
|
fake.push("agent_message_delta", {"delta": "world"})
|
|
fake.push("turn/completed", {"turnId": "tu_1"})
|
|
|
|
driver_task = asyncio.create_task(driver())
|
|
text = await ex._run_turn("hi")
|
|
await driver_task
|
|
|
|
assert text == "hello world"
|
|
methods = [m for m, _ in fake.requests]
|
|
assert "thread/start" in methods
|
|
assert "turn/start" in methods
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_turn_reuses_thread_on_second_call() -> None:
|
|
fake = FakeAppServer()
|
|
ex = _make_executor(fake)
|
|
|
|
async def drive_one(text: str, turn_id: str) -> None:
|
|
for _ in range(50):
|
|
count = sum(1 for m, _ in fake.requests if m == "turn/start")
|
|
if count >= int(turn_id.split("_")[1]):
|
|
break
|
|
await asyncio.sleep(0.01)
|
|
fake.push("agent_message_delta", {"delta": text})
|
|
fake.push("turn/completed", {"turnId": turn_id})
|
|
|
|
t1 = asyncio.create_task(drive_one("first", "tu_1"))
|
|
text1 = await ex._run_turn("ping")
|
|
await t1
|
|
|
|
t2 = asyncio.create_task(drive_one("second", "tu_2"))
|
|
text2 = await ex._run_turn("pong")
|
|
await t2
|
|
|
|
assert text1 == "first"
|
|
assert text2 == "second"
|
|
# thread/start should fire EXACTLY once across both turns.
|
|
thread_starts = sum(1 for m, _ in fake.requests if m == "thread/start")
|
|
assert thread_starts == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_turn_surfaces_error_notification() -> None:
|
|
fake = FakeAppServer()
|
|
ex = _make_executor(fake)
|
|
|
|
async def driver() -> None:
|
|
for _ in range(50):
|
|
if any(m == "turn/start" for m, _ in fake.requests):
|
|
break
|
|
await asyncio.sleep(0.01)
|
|
fake.push("error_notification", {"message": "model rate limited"})
|
|
|
|
driver_task = asyncio.create_task(driver())
|
|
with pytest.raises(RuntimeError, match="rate limited"):
|
|
await ex._run_turn("hi")
|
|
await driver_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_start_passes_config() -> None:
|
|
fake = FakeAppServer()
|
|
ex = _make_executor(fake, model="o4-mini", system_prompt="custom prompt")
|
|
|
|
async def driver() -> None:
|
|
for _ in range(50):
|
|
if any(m == "turn/start" for m, _ in fake.requests):
|
|
break
|
|
await asyncio.sleep(0.01)
|
|
fake.push("turn/completed", {"turnId": "tu_1"})
|
|
|
|
driver_task = asyncio.create_task(driver())
|
|
await ex._run_turn("hi")
|
|
await driver_task
|
|
|
|
thread_start = next(p for m, p in fake.requests if m == "thread/start")
|
|
assert thread_start["model"] == "o4-mini"
|
|
assert thread_start["developerInstructions"] == "custom prompt"
|
|
assert thread_start["approvalPolicy"] == "never"
|
|
assert thread_start["sandboxPolicy"] == {"mode": "workspace-write"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_turn_lock_serializes_concurrent_executes() -> None:
|
|
"""Two concurrent execute()s should run their turns one-at-a-time."""
|
|
fake = FakeAppServer()
|
|
ex = _make_executor(fake)
|
|
|
|
# Track the order in which turns START vs COMPLETE.
|
|
starts: list[int] = []
|
|
completes: list[int] = []
|
|
|
|
async def execute_turn(idx: int, prompt: str) -> str:
|
|
# Drive completion AFTER seeing this turn's turn/start in the
|
|
# request log. Because of the lock, turn idx 2 won't start
|
|
# until turn idx 1 is acked.
|
|
async def driver() -> None:
|
|
target_count = idx + 1
|
|
for _ in range(200):
|
|
count = sum(1 for m, _ in fake.requests if m == "turn/start")
|
|
if count >= target_count:
|
|
starts.append(idx)
|
|
break
|
|
await asyncio.sleep(0.005)
|
|
fake.push("agent_message_delta", {"delta": f"r{idx}"})
|
|
fake.push("turn/completed", {"turnId": f"tu_{idx + 1}"})
|
|
completes.append(idx)
|
|
|
|
driver_task = asyncio.create_task(driver())
|
|
|
|
# Mirror the lock-and-run path execute() uses, without needing
|
|
# an EventQueue.
|
|
async with ex._turn_lock:
|
|
text = await ex._run_turn(prompt)
|
|
await driver_task
|
|
return text
|
|
|
|
results = await asyncio.gather(execute_turn(0, "first"), execute_turn(1, "second"))
|
|
|
|
assert results == ["r0", "r1"] or results == ["r1", "r0"]
|
|
# Whichever order tasks acquired the lock, the LOCK guarantees
|
|
# turn N+1 doesn't start until turn N has completed. So starts and
|
|
# completes should interleave one-at-a-time, not overlap.
|
|
assert sorted(starts) == [0, 1]
|
|
assert sorted(completes) == [0, 1]
|
|
# Strict ordering check: between any two `starts` events, there
|
|
# must be a corresponding `completes` event.
|
|
assert starts[0] in completes[:1]
|