molecule-ai-workspace-templ.../tests/mock_app_server.py
Hongming Wang 0f4ed28f62 feat: initial codex CLI workspace template
OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule
workspace runtime, with native MCP-style push parity via persistent
codex app-server stdio JSON-RPC.

Each session holds one long-lived `codex app-server` child + one
thread; A2A messages become turn/start RPCs against the existing
thread. Per-thread serialization handles mid-turn arrivals (matches
OpenClaw's per-chat sequentializer).

Modules:
- app_server.py — async JSON-RPC over NDJSON stdio (286 LOC)
- executor.py — turn lifecycle, notification accumulation,
  error surfacing (270 LOC)
- adapter.py — thin BaseAdapter shell + preflight

Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess.
Validated end-to-end against real codex-cli 0.72.0:
- initialize handshake works
- thread/start works (returns thread.id, NOT thread.threadId as the
  generated JSON schema claims; executor accepts both shapes)

Scaffolded but not yet end-to-end verified against a real Molecule
workspace + peer A2A traffic — that lands separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:19:52 -07:00

115 lines
3.1 KiB
Python

"""Tiny mock of `codex app-server` for unit tests.
Speaks NDJSON over stdio. Implements only the methods AppServerProcess
tests exercise: `initialize`, `echo`, `error`, `emit`. Everything else
returns a JSON-RPC method-not-found error.
This stands in for the real codex binary so tests don't depend on a
specific codex-cli version installed on the runner. Keep it dumb — any
behavior the executor relies on must be tested against the real
binary in an integration test, not here.
"""
from __future__ import annotations
import asyncio
import json
import sys
async def _read_lines() -> "asyncio.StreamReader":
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
return reader
def _write(obj: dict) -> None:
sys.stdout.write(json.dumps(obj, separators=(",", ":")) + "\n")
sys.stdout.flush()
async def _handle(msg: dict) -> None:
method = msg.get("method")
params = msg.get("params") or {}
request_id = msg.get("id")
# Notifications (no id) are ignored by the mock.
if request_id is None:
return
if method == "initialize":
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"userAgent": "mock_app_server/0.1"},
})
return
if method == "echo":
delay_ms = int(params.get("delay_ms", 0))
if delay_ms > 0:
await asyncio.sleep(delay_ms / 1000)
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"text": params.get("text", "")},
})
return
if method == "error":
_write({
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": int(params.get("code", -32000)),
"message": str(params.get("message", "mock error")),
},
})
return
if method == "emit":
# Fire `count` notifications named `method`, then ack.
count = int(params.get("count", 0))
notif_method = str(params.get("method", "tick"))
for i in range(count):
_write({
"jsonrpc": "2.0",
"method": notif_method,
"params": {"i": i},
})
_write({
"jsonrpc": "2.0",
"id": request_id,
"result": {"emitted": count},
})
return
# Method not found.
_write({
"jsonrpc": "2.0",
"id": request_id,
"error": {"code": -32601, "message": f"method not found: {method}"},
})
async def main() -> None:
reader = await _read_lines()
while True:
line = await reader.readline()
if not line:
break
try:
msg = json.loads(line.decode("utf-8"))
except json.JSONDecodeError:
continue
# Schedule handling so `emit` doesn't block subsequent reads.
asyncio.create_task(_handle(msg))
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass