molecule-ai-workspace-templ.../tests/test_executor.py
Hongming Wang 0f4ed28f62 feat: initial codex CLI workspace template
OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule
workspace runtime, with native MCP-style push parity via persistent
codex app-server stdio JSON-RPC.

Each session holds one long-lived `codex app-server` child + one
thread; A2A messages become turn/start RPCs against the existing
thread. Per-thread serialization handles mid-turn arrivals (matches
OpenClaw's per-chat sequentializer).

Modules:
- app_server.py — async JSON-RPC over NDJSON stdio (286 LOC)
- executor.py — turn lifecycle, notification accumulation,
  error surfacing (270 LOC)
- adapter.py — thin BaseAdapter shell + preflight

Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess.
Validated end-to-end against real codex-cli 0.72.0:
- initialize handshake works
- thread/start works (returns thread.id, NOT thread.threadId as the
  generated JSON schema claims; executor accepts both shapes)

Scaffolded but not yet end-to-end verified against a real Molecule
workspace + peer A2A traffic — that lands separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:19:52 -07:00

246 lines
9.0 KiB
Python

"""Unit tests for CodexAppServerExecutor's internal turn lifecycle.
We don't stand up a real codex app-server — those tests live in
test_app_server.py which validates the JSON-RPC plumbing against a
mock binary. Here we focus on the protocol-level behavior of
``_run_turn``: thread bootstrap, notification accumulation, completion
detection, error surfacing, mid-turn serialization.
The fake AppServerProcess records every request sent and exposes a
helper to drive notifications + responses on demand.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
# Import the executor module — relies on a2a + molecule_runtime being
# installed locally. If not, skip these tests; the executor will still
# be exercised in container CI.
pytest.importorskip("a2a.helpers")
pytest.importorskip("molecule_runtime.adapters.base")
from executor import CodexAppServerExecutor # noqa: E402
from molecule_runtime.adapters.base import AdapterConfig # noqa: E402
class FakeAppServer:
"""Drop-in for AppServerProcess that lets tests script responses + notifications.
Honors only the shape AppServerProcess presents to the executor:
initialize / request / subscribe / close. Each scripted turn lets
the test push delta notifications and resolve the response.
"""
def __init__(self) -> None:
self.requests: list[tuple[str, dict]] = []
self._subscribers: list = []
self._next_thread = 0
self._next_turn = 0
self.closed = False
# Test-controllable knobs
self.thread_start_response: dict | None = None
self.turn_start_responses: list[dict] = []
self.turn_start_raises: Exception | None = None
async def initialize(self, *, client_info: dict) -> dict:
return {"userAgent": "fake/0.0"}
async def request(self, method: str, params: dict | None = None, *, timeout: float | None = None) -> dict:
self.requests.append((method, params or {}))
if method == "thread/start":
if self.thread_start_response is not None:
return self.thread_start_response
self._next_thread += 1
# Use the real binary's `id` shape (verified 2026-05-02
# against codex 0.72) — the schema's `threadId` is also
# accepted by the executor but `id` is what production hits.
return {"thread": {"id": f"th_{self._next_thread}"}}
if method == "turn/start":
if self.turn_start_raises:
raise self.turn_start_raises
if self.turn_start_responses:
return self.turn_start_responses.pop(0)
self._next_turn += 1
return {"turn": {"id": f"tu_{self._next_turn}"}}
if method == "turn/interrupt":
return {}
raise AssertionError(f"unexpected method: {method}")
def subscribe(self, callback) -> "asyncio.coroutines.Coroutine":
self._subscribers.append(callback)
def unsubscribe() -> None:
try:
self._subscribers.remove(callback)
except ValueError:
pass
return unsubscribe
async def close(self) -> int | None:
self.closed = True
return 0
# --- test helpers ---------------------------------------------------
def push(self, method: str, params: dict | None = None) -> None:
"""Synchronously deliver a notification to all subscribers."""
for cb in list(self._subscribers):
cb(method, params or {})
def _make_executor(fake: FakeAppServer, *, model: str = "gpt-5", system_prompt: str = "be helpful") -> CodexAppServerExecutor:
cfg = AdapterConfig(model=model, system_prompt=system_prompt)
ex = CodexAppServerExecutor(cfg)
# Pre-inject the fake so _ensure_thread skips spawning codex.
ex._app_server = fake # type: ignore[assignment]
return ex
@pytest.mark.asyncio
async def test_run_turn_starts_thread_and_returns_assembled_deltas() -> None:
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
# Wait until turn/start is recorded, then push deltas + completion.
for _ in range(50):
if any(m == "turn/start" for m, _ in fake.requests):
break
await asyncio.sleep(0.01)
fake.push("agent_message_delta", {"delta": "hello "})
fake.push("agent_message_delta", {"delta": "world"})
fake.push("turn/completed", {"turnId": "tu_1"})
driver_task = asyncio.create_task(driver())
text = await ex._run_turn("hi")
await driver_task
assert text == "hello world"
methods = [m for m, _ in fake.requests]
assert "thread/start" in methods
assert "turn/start" in methods
@pytest.mark.asyncio
async def test_run_turn_reuses_thread_on_second_call() -> None:
fake = FakeAppServer()
ex = _make_executor(fake)
async def drive_one(text: str, turn_id: str) -> None:
for _ in range(50):
count = sum(1 for m, _ in fake.requests if m == "turn/start")
if count >= int(turn_id.split("_")[1]):
break
await asyncio.sleep(0.01)
fake.push("agent_message_delta", {"delta": text})
fake.push("turn/completed", {"turnId": turn_id})
t1 = asyncio.create_task(drive_one("first", "tu_1"))
text1 = await ex._run_turn("ping")
await t1
t2 = asyncio.create_task(drive_one("second", "tu_2"))
text2 = await ex._run_turn("pong")
await t2
assert text1 == "first"
assert text2 == "second"
# thread/start should fire EXACTLY once across both turns.
thread_starts = sum(1 for m, _ in fake.requests if m == "thread/start")
assert thread_starts == 1
@pytest.mark.asyncio
async def test_run_turn_surfaces_error_notification() -> None:
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
for _ in range(50):
if any(m == "turn/start" for m, _ in fake.requests):
break
await asyncio.sleep(0.01)
fake.push("error_notification", {"message": "model rate limited"})
driver_task = asyncio.create_task(driver())
with pytest.raises(RuntimeError, match="rate limited"):
await ex._run_turn("hi")
await driver_task
@pytest.mark.asyncio
async def test_thread_start_passes_config() -> None:
fake = FakeAppServer()
ex = _make_executor(fake, model="o4-mini", system_prompt="custom prompt")
async def driver() -> None:
for _ in range(50):
if any(m == "turn/start" for m, _ in fake.requests):
break
await asyncio.sleep(0.01)
fake.push("turn/completed", {"turnId": "tu_1"})
driver_task = asyncio.create_task(driver())
await ex._run_turn("hi")
await driver_task
thread_start = next(p for m, p in fake.requests if m == "thread/start")
assert thread_start["model"] == "o4-mini"
assert thread_start["developerInstructions"] == "custom prompt"
assert thread_start["approvalPolicy"] == "never"
assert thread_start["sandboxPolicy"] == {"mode": "workspace-write"}
@pytest.mark.asyncio
async def test_turn_lock_serializes_concurrent_executes() -> None:
"""Two concurrent execute()s should run their turns one-at-a-time."""
fake = FakeAppServer()
ex = _make_executor(fake)
# Track the order in which turns START vs COMPLETE.
starts: list[int] = []
completes: list[int] = []
async def execute_turn(idx: int, prompt: str) -> str:
# Drive completion AFTER seeing this turn's turn/start in the
# request log. Because of the lock, turn idx 2 won't start
# until turn idx 1 is acked.
async def driver() -> None:
target_count = idx + 1
for _ in range(200):
count = sum(1 for m, _ in fake.requests if m == "turn/start")
if count >= target_count:
starts.append(idx)
break
await asyncio.sleep(0.005)
fake.push("agent_message_delta", {"delta": f"r{idx}"})
fake.push("turn/completed", {"turnId": f"tu_{idx + 1}"})
completes.append(idx)
driver_task = asyncio.create_task(driver())
# Mirror the lock-and-run path execute() uses, without needing
# an EventQueue.
async with ex._turn_lock:
text = await ex._run_turn(prompt)
await driver_task
return text
results = await asyncio.gather(execute_turn(0, "first"), execute_turn(1, "second"))
assert results == ["r0", "r1"] or results == ["r1", "r0"]
# Whichever order tasks acquired the lock, the LOCK guarantees
# turn N+1 doesn't start until turn N has completed. So starts and
# completes should interleave one-at-a-time, not overlap.
assert sorted(starts) == [0, 1]
assert sorted(completes) == [0, 1]
# Strict ordering check: between any two `starts` events, there
# must be a corresponding `completes` event.
assert starts[0] in completes[:1]