molecule-ai-workspace-templ.../tests/test_app_server.py
Hongming Wang 0f4ed28f62 feat: initial codex CLI workspace template
OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule
workspace runtime, with native MCP-style push parity via persistent
codex app-server stdio JSON-RPC.

Each session holds one long-lived `codex app-server` child + one
thread; A2A messages become turn/start RPCs against the existing
thread. Per-thread serialization handles mid-turn arrivals (matches
OpenClaw's per-chat sequentializer).

Modules:
- app_server.py — async JSON-RPC over NDJSON stdio (286 LOC)
- executor.py — turn lifecycle, notification accumulation,
  error surfacing (270 LOC)
- adapter.py — thin BaseAdapter shell + preflight

Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess.
Validated end-to-end against real codex-cli 0.72.0:
- initialize handshake works
- thread/start works (returns thread.id, NOT thread.threadId as the
  generated JSON schema claims; executor accepts both shapes)

Scaffolded but not yet end-to-end verified against a real Molecule
workspace + peer A2A traffic — that lands separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:19:52 -07:00

125 lines
4.9 KiB
Python

"""Unit tests for AppServerProcess against a fake stdio child.
We can't depend on a real `codex` binary in CI, so these tests stand up
a Python-implemented mock app-server that speaks NDJSON over stdio.
The mock is intentionally tiny — it only handles the request/response
+ notification semantics we exercise here, not the full v2 protocol.
"""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
import pytest
# Make app_server.py importable from the test file without setup.py.
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from app_server import AppServerError, AppServerProcess # noqa: E402
# Path to the in-tree mock app-server (a Python script that pretends
# to be `codex app-server`). Tests pass it via executable= override.
_MOCK = str(Path(__file__).resolve().parent / "mock_app_server.py")
@pytest.mark.asyncio
async def test_initialize_handshake() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
result = await proc.initialize(client_info={"name": "test", "version": "0"})
assert result["userAgent"].startswith("mock_app_server/")
finally:
await proc.close()
@pytest.mark.asyncio
async def test_request_response_correlation() -> None:
"""Concurrent requests should not cross responses."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Mock supports `echo` which round-trips params.text after a
# configurable delay. Send three with different delays + texts;
# confirm each future resolves to its own input.
results = await asyncio.gather(
proc.request("echo", {"text": "alpha", "delay_ms": 30}),
proc.request("echo", {"text": "beta", "delay_ms": 5}),
proc.request("echo", {"text": "gamma", "delay_ms": 15}),
)
assert [r["text"] for r in results] == ["alpha", "beta", "gamma"]
finally:
await proc.close()
@pytest.mark.asyncio
async def test_error_response_raises_app_server_error() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
with pytest.raises(AppServerError) as ei:
await proc.request("error", {"code": -32000, "message": "boom"})
assert "boom" in str(ei.value)
assert ei.value.payload.get("code") == -32000
finally:
await proc.close()
@pytest.mark.asyncio
async def test_notifications_dispatched_to_subscribers() -> None:
"""Subscribed callback should fire for every notification, in order."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
received: list[tuple[str, dict]] = []
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
proc.subscribe(lambda m, p: received.append((m, p)))
# Mock's `emit` request fires N notifications named `tick` then
# returns a final ack. We need to wait until all notifications
# arrive — the mock guarantees the response is sent AFTER its
# notifications, so awaiting the response is sufficient.
await proc.request("emit", {"count": 3, "method": "tick"})
# Give the reader loop one tick to process trailing notifications
# if any (defensive — mock orders them before the response).
await asyncio.sleep(0.05)
assert [m for m, _ in received] == ["tick", "tick", "tick"]
assert [p["i"] for _, p in received] == [0, 1, 2]
finally:
await proc.close()
@pytest.mark.asyncio
async def test_pending_requests_fail_on_close() -> None:
"""close() must release any awaiting request callers."""
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
try:
await proc.initialize(client_info={"name": "test", "version": "0"})
# Fire a long-delay request and close the process before the
# response can arrive. The pending future should fail with
# ConnectionError so the caller doesn't hang.
slow = asyncio.create_task(proc.request("echo", {"text": "x", "delay_ms": 5000}))
await asyncio.sleep(0.05)
await proc.close()
with pytest.raises(ConnectionError):
await slow
finally:
# Idempotent
await proc.close()
@pytest.mark.asyncio
async def test_close_is_idempotent() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
await proc.close()
rc = await proc.close()
assert rc is not None # mock exits with 0
@pytest.mark.asyncio
async def test_request_after_close_raises() -> None:
proc = await AppServerProcess.start(executable=sys.executable, args=(_MOCK,))
await proc.close()
with pytest.raises(ConnectionError):
await proc.request("echo", {"text": "x"})