molecule-ai-workspace-templ.../app_server.py
Hongming Wang 0f4ed28f62 feat: initial codex CLI workspace template
OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule
workspace runtime, with native MCP-style push parity via persistent
codex app-server stdio JSON-RPC.

Each session holds one long-lived `codex app-server` child + one
thread; A2A messages become turn/start RPCs against the existing
thread. Per-thread serialization handles mid-turn arrivals (matches
OpenClaw's per-chat sequentializer).

Modules:
- app_server.py — async JSON-RPC over NDJSON stdio (286 LOC)
- executor.py — turn lifecycle, notification accumulation,
  error surfacing (270 LOC)
- adapter.py — thin BaseAdapter shell + preflight

Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess.
Validated end-to-end against real codex-cli 0.72.0:
- initialize handshake works
- thread/start works (returns thread.id, NOT thread.threadId as the
  generated JSON schema claims; executor accepts both shapes)

Scaffolded but not yet end-to-end verified against a real Molecule
workspace + peer A2A traffic — that lands separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:19:52 -07:00

324 lines
12 KiB
Python

"""Async JSON-RPC client for `codex app-server`.
Spawns `codex app-server` as a long-lived child process, frames messages
as NDJSON over stdio, correlates request/response by id, and dispatches
unsolicited server notifications to registered subscribers.
The executor uses this primitive to hold one thread per workspace
session and fire `turn/start` per A2A message — see executor.py for
the protocol-level usage.
Framing: `codex app-server` uses newline-delimited JSON, NOT
LSP-style content-length headers. Validated 2026-05-02 against
codex-cli 0.72.0:
$ echo '{"jsonrpc":"2.0","id":1,"method":"initialize",...}' | codex app-server
{"id":1,"result":{"userAgent":"codex_cli_rs/0.72.0 …"}}
Concurrency model: a single asyncio Task drains stdout line-by-line,
parses each JSON object, and routes it to either a pending request
future (matched by id) or the notification subscriber list. Writes are
serialized through an asyncio.Lock — concurrent request() calls are
safe but ordering is not guaranteed at the protocol level (the
app-server's request id is what matters, not write order).
Errors: any exception in the reader task fails ALL pending requests
with that exception, prevents new request()s from succeeding, and
surfaces the original cause in the close() return value. Designed so a
mid-flight stdout pipe break doesn't silently hang request() callers.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
from collections.abc import Awaitable, Callable
from typing import Any
logger = logging.getLogger(__name__)
# Per-request timeout. Codex turns can run minutes during heavy
# tool-use, so this is generous. Tighter than infinite to bound
# debug-time hangs when the app-server gets wedged.
_DEFAULT_REQUEST_TIMEOUT = 600.0
# Graceful-shutdown grace period before SIGKILL. App-server's own
# shutdown is fast (<1s), so this is a fallback for hung children.
_SHUTDOWN_TIMEOUT = 5.0
NotificationCallback = Callable[[str, dict[str, Any]], None]
class AppServerError(RuntimeError):
"""Raised when the app-server returns a JSON-RPC error response.
The wrapped JSON-RPC error object is exposed via ``.payload`` for
callers that want to inspect ``code`` / ``data`` fields.
"""
def __init__(self, message: str, payload: dict[str, Any] | None = None):
super().__init__(message)
self.payload = payload or {}
class AppServerProcess:
"""Long-lived `codex app-server` child plus async JSON-RPC client.
Typical lifecycle:
proc = await AppServerProcess.start()
await proc.initialize(client_info={...})
unsub = proc.subscribe(on_notification)
try:
resp = await proc.request("thread/start", {...})
finally:
unsub()
await proc.close()
Not safe to share across asyncio loops.
"""
def __init__(
self,
process: asyncio.subprocess.Process,
*,
request_timeout: float = _DEFAULT_REQUEST_TIMEOUT,
):
self._proc = process
self._request_timeout = request_timeout
self._next_id = 1
self._pending: dict[int, asyncio.Future[Any]] = {}
self._subscribers: list[NotificationCallback] = []
self._write_lock = asyncio.Lock()
self._reader_task: asyncio.Task[None] | None = None
self._stderr_task: asyncio.Task[None] | None = None
self._closed = False
self._reader_exc: BaseException | None = None
# ------------------------------------------------------------------
# Construction
# ------------------------------------------------------------------
@classmethod
async def start(
cls,
*,
executable: str = "codex",
args: tuple[str, ...] = ("app-server",),
env: dict[str, str] | None = None,
cwd: str | None = None,
) -> "AppServerProcess":
"""Spawn `codex app-server` as a stdio-piped child."""
proc_env = {**os.environ, **(env or {})}
proc = await asyncio.create_subprocess_exec(
executable,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=proc_env,
cwd=cwd,
)
instance = cls(proc)
instance._reader_task = asyncio.create_task(
instance._read_loop(), name="codex-app-server-stdout"
)
instance._stderr_task = asyncio.create_task(
instance._stderr_loop(), name="codex-app-server-stderr"
)
return instance
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def initialize(self, *, client_info: dict[str, Any]) -> dict[str, Any]:
"""Send the `initialize` handshake. Must be called before other RPCs."""
return await self.request("initialize", {"clientInfo": client_info})
async def request(
self,
method: str,
params: dict[str, Any] | None = None,
*,
timeout: float | None = None,
) -> dict[str, Any]:
"""Send a JSON-RPC request and await its response.
Raises:
AppServerError: app-server returned a JSON-RPC error.
asyncio.TimeoutError: response not received in time.
ConnectionError: child process exited or stdio broken.
"""
if self._closed:
raise ConnectionError("app-server is closed")
if self._reader_exc is not None:
raise ConnectionError(
f"app-server reader failed: {self._reader_exc!r}"
) from self._reader_exc
request_id = self._next_id
self._next_id += 1
future: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
self._pending[request_id] = future
message = {"jsonrpc": "2.0", "id": request_id, "method": method}
if params is not None:
message["params"] = params
try:
await self._write_message(message)
return await asyncio.wait_for(
future, timeout=timeout if timeout is not None else self._request_timeout
)
finally:
self._pending.pop(request_id, None)
def subscribe(self, callback: NotificationCallback) -> Callable[[], None]:
"""Register a callback for unsolicited server notifications.
The callback receives `(method, params)` for every
`JSONRPCNotification` (a JSON-RPC message with no `id`).
Returns an unsubscribe callable.
Subscribers are called synchronously from the reader loop —
keep them fast. Push slow work onto an asyncio.Queue if you
need to do anything substantial.
"""
self._subscribers.append(callback)
def unsubscribe() -> None:
try:
self._subscribers.remove(callback)
except ValueError:
pass
return unsubscribe
async def close(self) -> int | None:
"""Close stdio, wait for child exit, return its exit code.
Idempotent. Safe to call from finally blocks.
"""
if self._closed:
return self._proc.returncode
self._closed = True
# Close stdin first to signal graceful shutdown — codex
# app-server exits cleanly on EOF.
if self._proc.stdin and not self._proc.stdin.is_closing():
try:
self._proc.stdin.close()
await self._proc.stdin.wait_closed()
except Exception:
pass
# Cancel reader tasks; they should exit on stdout EOF anyway.
for task in (self._reader_task, self._stderr_task):
if task and not task.done():
task.cancel()
# Fail any pending requests so awaiters don't hang.
exc = ConnectionError("app-server closed")
for fut in self._pending.values():
if not fut.done():
fut.set_exception(exc)
self._pending.clear()
try:
return await asyncio.wait_for(self._proc.wait(), timeout=_SHUTDOWN_TIMEOUT)
except asyncio.TimeoutError:
logger.warning("codex app-server did not exit cleanly; sending SIGKILL")
try:
self._proc.kill()
except ProcessLookupError:
pass
try:
return await self._proc.wait()
except Exception:
return None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
async def _write_message(self, message: dict[str, Any]) -> None:
if self._proc.stdin is None or self._proc.stdin.is_closing():
raise ConnectionError("app-server stdin closed")
line = json.dumps(message, separators=(",", ":")) + "\n"
async with self._write_lock:
self._proc.stdin.write(line.encode("utf-8"))
await self._proc.stdin.drain()
async def _read_loop(self) -> None:
"""Drain stdout line-by-line, route messages by id."""
assert self._proc.stdout is not None
try:
async for raw in self._proc.stdout:
line = raw.decode("utf-8", errors="replace").strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
logger.warning("non-JSON line from app-server: %r", line[:200])
continue
self._dispatch(msg)
except asyncio.CancelledError:
raise
except BaseException as exc:
self._reader_exc = exc
# Fail all pending requests so callers don't hang on a dead pipe.
for fut in self._pending.values():
if not fut.done():
fut.set_exception(exc)
self._pending.clear()
raise
async def _stderr_loop(self) -> None:
"""Forward app-server stderr to our logger at DEBUG."""
assert self._proc.stderr is not None
try:
async for raw in self._proc.stderr:
line = raw.decode("utf-8", errors="replace").rstrip()
if line:
logger.debug("codex app-server stderr: %s", line)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("codex app-server stderr reader crashed")
def _dispatch(self, msg: dict[str, Any]) -> None:
"""Route a parsed JSON-RPC message to its destination."""
# Response (has id, has result or error)
if "id" in msg and ("result" in msg or "error" in msg):
request_id = msg["id"]
future = self._pending.get(request_id)
if future is None or future.done():
# Late response or duplicate — log and drop. Not fatal.
logger.debug("dropping response for unknown id %r", request_id)
return
if "error" in msg:
err = msg["error"] or {}
future.set_exception(
AppServerError(err.get("message", "unknown error"), err)
)
else:
future.set_result(msg.get("result"))
return
# Notification (has method, no id)
if "method" in msg and "id" not in msg:
method = msg["method"]
params = msg.get("params") or {}
for cb in list(self._subscribers):
try:
cb(method, params)
except Exception:
logger.exception("notification subscriber raised on %r", method)
return
logger.warning("unrecognized message from app-server: %r", msg)