OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule workspace runtime, with native MCP-style push parity via persistent codex app-server stdio JSON-RPC. Each session holds one long-lived `codex app-server` child + one thread; A2A messages become turn/start RPCs against the existing thread. Per-thread serialization handles mid-turn arrivals (matches OpenClaw's per-chat sequentializer). Modules: - app_server.py — async JSON-RPC over NDJSON stdio (286 LOC) - executor.py — turn lifecycle, notification accumulation, error surfacing (270 LOC) - adapter.py — thin BaseAdapter shell + preflight Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess. Validated end-to-end against real codex-cli 0.72.0: - initialize handshake works - thread/start works (returns thread.id, NOT thread.threadId as the generated JSON schema claims; executor accepts both shapes) Scaffolded but not yet end-to-end verified against a real Molecule workspace + peer A2A traffic — that lands separately. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
268 lines
11 KiB
Python
268 lines
11 KiB
Python
"""A2A → codex app-server bridge.
|
|
|
|
Holds one persistent `codex app-server` child + one thread per
|
|
workspace session, dispatches each A2A message as a `turn/start` RPC
|
|
against the existing thread.
|
|
|
|
Design rationale lives in
|
|
``docs/integrations/codex-app-server-adapter-design.md`` (in
|
|
molecule-core). The short version:
|
|
|
|
- Persistent child gives us session continuity (the agent's
|
|
conversation history, tool state, and config persist across A2A
|
|
turns) without serializing through disk.
|
|
- Per-thread serialization (``_turn_lock``) gives us safe, ordered
|
|
handling of mid-turn arrivals — A2A peers see their messages
|
|
processed in arrival order, matching OpenClaw's per-chat
|
|
sequentializer behavior.
|
|
- Notification-driven response assembly: the executor accumulates
|
|
``agent_message_delta`` chunks and emits the final assembled text
|
|
on ``turn/completed``. Streaming forward is a future upgrade once
|
|
the molecule-runtime contract supports incremental events.
|
|
|
|
The riskiest module of this stack is ``app_server.AppServerProcess``
|
|
(the raw JSON-RPC client) — that has its own unit tests. This file
|
|
focuses on the protocol-level lifecycle: thread bootstrap, turn
|
|
dispatch, notification accumulation, error surface.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from a2a.helpers import new_text_message
|
|
from a2a.server.agent_execution import AgentExecutor, RequestContext
|
|
from a2a.server.events import EventQueue
|
|
|
|
from molecule_runtime.adapters.base import AdapterConfig
|
|
from molecule_runtime.executor_helpers import extract_message_text
|
|
|
|
from app_server import AppServerError, AppServerProcess
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Per-turn timeout. Codex turns can run minutes during heavy tool use
|
|
# (test runs, edits, web fetches). Tighter than infinite to bound
|
|
# debug-time hangs.
|
|
_TURN_TIMEOUT = 600.0
|
|
|
|
|
|
@dataclass
|
|
class _TurnState:
|
|
"""Mutable state accumulated during one turn lifecycle.
|
|
|
|
Owned by the running ``_run_turn`` invocation; the notification
|
|
subscriber appends to it under ``_turn_lock``.
|
|
"""
|
|
deltas: list[str] = field(default_factory=list)
|
|
completed: asyncio.Event = field(default_factory=asyncio.Event)
|
|
error: Exception | None = None
|
|
turn_id: str | None = None
|
|
|
|
|
|
class CodexAppServerExecutor(AgentExecutor):
|
|
"""A2A executor that proxies turns to a long-lived codex app-server."""
|
|
|
|
def __init__(self, config: AdapterConfig):
|
|
self._config = config
|
|
self._app_server: AppServerProcess | None = None
|
|
self._thread_id: str | None = None
|
|
# Serialize turns per thread. mid-turn A2A arrivals queue and
|
|
# run after the current turn completes — same shape OpenClaw's
|
|
# per-chat sequentializer uses.
|
|
self._turn_lock = asyncio.Lock()
|
|
# Tracked so cancel() can fire turn/interrupt against the
|
|
# currently-running turn (best-effort).
|
|
self._current_turn_id: str | None = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Bootstrap
|
|
# ------------------------------------------------------------------
|
|
async def _ensure_thread(self) -> str:
|
|
"""Lazy-init the app-server child + thread on first turn."""
|
|
if self._app_server is None:
|
|
env = {
|
|
# Codex picks up OPENAI_API_KEY from the environment.
|
|
# We pass through everything; container start.sh is
|
|
# responsible for ensuring the key is present.
|
|
**os.environ,
|
|
}
|
|
self._app_server = await AppServerProcess.start(env=env)
|
|
await self._app_server.initialize(client_info={
|
|
"name": "molecule-runtime-codex",
|
|
"version": "0.1.0",
|
|
})
|
|
logger.info("codex app-server child initialized")
|
|
|
|
if self._thread_id is None:
|
|
params: dict[str, Any] = {}
|
|
if self._config.model:
|
|
params["model"] = self._config.model
|
|
if self._config.system_prompt:
|
|
params["developerInstructions"] = self._config.system_prompt
|
|
# Workspace agents can't prompt a human, so approval policy
|
|
# must be `never`. Sandbox `workspace-write` lets the agent
|
|
# edit the workspace tree but not arbitrary disk.
|
|
params["approvalPolicy"] = "never"
|
|
params["sandboxPolicy"] = {"mode": "workspace-write"}
|
|
|
|
resp = await self._app_server.request("thread/start", params)
|
|
# Field name varies between the v2 JSON schema (threadId) and
|
|
# the running binary 0.72.x (id). Accept either — verified
|
|
# 2026-05-02 against codex-cli 0.72.0 which returns `id`.
|
|
thread = resp.get("thread") or {}
|
|
self._thread_id = thread.get("id") or thread.get("threadId")
|
|
if not self._thread_id:
|
|
raise RuntimeError(
|
|
f"thread/start did not return an id; got keys: {list(thread.keys())}"
|
|
)
|
|
logger.info("codex thread started: %s", self._thread_id)
|
|
|
|
return self._thread_id
|
|
|
|
# ------------------------------------------------------------------
|
|
# AgentExecutor contract
|
|
# ------------------------------------------------------------------
|
|
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
prompt = extract_message_text(context.message) or ""
|
|
if not prompt.strip():
|
|
await event_queue.enqueue_event(
|
|
new_text_message("(empty prompt — nothing to do)")
|
|
)
|
|
return
|
|
|
|
async with self._turn_lock:
|
|
try:
|
|
text = await self._run_turn(prompt)
|
|
except AppServerError as exc:
|
|
logger.warning("codex app-server error: %s", exc)
|
|
await event_queue.enqueue_event(
|
|
new_text_message(f"[codex error] {exc}")
|
|
)
|
|
return
|
|
except asyncio.TimeoutError:
|
|
logger.warning("codex turn timed out after %.0fs", _TURN_TIMEOUT)
|
|
await event_queue.enqueue_event(
|
|
new_text_message(
|
|
f"[codex turn timed out after {_TURN_TIMEOUT:.0f}s]"
|
|
)
|
|
)
|
|
return
|
|
except ConnectionError as exc:
|
|
logger.exception("codex app-server connection lost")
|
|
# On connection loss, drop our cached app-server +
|
|
# thread so the next turn starts fresh.
|
|
await self._reset_app_server()
|
|
await event_queue.enqueue_event(
|
|
new_text_message(f"[codex unreachable] {exc!s}")
|
|
)
|
|
return
|
|
|
|
await event_queue.enqueue_event(new_text_message(text))
|
|
|
|
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
"""Best-effort interrupt of the in-flight turn.
|
|
|
|
Race-prone (the turn may have completed between our last
|
|
poll and this call) but the app-server treats a stale
|
|
interrupt as a no-op, so we don't need to lock around it.
|
|
"""
|
|
if (
|
|
self._app_server is not None
|
|
and self._thread_id is not None
|
|
and self._current_turn_id is not None
|
|
):
|
|
try:
|
|
await self._app_server.request(
|
|
"turn/interrupt",
|
|
{"threadId": self._thread_id, "turnId": self._current_turn_id},
|
|
timeout=5.0,
|
|
)
|
|
except (AppServerError, asyncio.TimeoutError, ConnectionError) as exc:
|
|
logger.debug("turn/interrupt failed (expected if turn already done): %s", exc)
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Tear down the app-server child cleanly. Idempotent."""
|
|
await self._reset_app_server()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internals
|
|
# ------------------------------------------------------------------
|
|
async def _run_turn(self, prompt: str) -> str:
|
|
"""Fire turn/start, accumulate deltas, return assembled text.
|
|
|
|
Splits the AgentExecutor contract into a pure-data path so
|
|
unit tests can drive it without standing up an A2A
|
|
EventQueue.
|
|
"""
|
|
thread_id = await self._ensure_thread()
|
|
assert self._app_server is not None # set by _ensure_thread
|
|
|
|
state = _TurnState()
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def on_notification(method: str, params: dict[str, Any]) -> None:
|
|
# Codex v2 protocol notifications. Only the message stream
|
|
# + completion + error map onto our flow today; everything
|
|
# else (reasoning, tool exec, token usage) is logged for
|
|
# observability but not surfaced to the A2A response.
|
|
if method == "agent_message_delta":
|
|
delta = params.get("delta") or params.get("text") or ""
|
|
if delta:
|
|
state.deltas.append(delta)
|
|
elif method in ("turn/completed", "turn.completed"):
|
|
# Match either dotted or slashed form — schema is in
|
|
# flux during the experimental phase. Also tolerate
|
|
# both `turnId` (schema) and `id` (real binary) for
|
|
# the params id field.
|
|
tid = params.get("turnId") or params.get("id")
|
|
if tid in (None, state.turn_id):
|
|
loop.call_soon_threadsafe(state.completed.set)
|
|
elif method == "error_notification":
|
|
state.error = RuntimeError(
|
|
params.get("message", "unknown app-server error")
|
|
)
|
|
loop.call_soon_threadsafe(state.completed.set)
|
|
else:
|
|
logger.debug("codex notification: %s %s", method, params)
|
|
|
|
unsubscribe = self._app_server.subscribe(on_notification)
|
|
try:
|
|
resp = await self._app_server.request("turn/start", {
|
|
"threadId": thread_id,
|
|
"input": [{"type": "text", "text": prompt}],
|
|
})
|
|
# Mirror the same id/threadId tolerance we have for thread/start.
|
|
turn = resp.get("turn") or {}
|
|
state.turn_id = turn.get("id") or turn.get("turnId")
|
|
if not state.turn_id:
|
|
raise RuntimeError(
|
|
f"turn/start did not return an id; got keys: {list(turn.keys())}"
|
|
)
|
|
self._current_turn_id = state.turn_id
|
|
|
|
await asyncio.wait_for(state.completed.wait(), timeout=_TURN_TIMEOUT)
|
|
finally:
|
|
unsubscribe()
|
|
self._current_turn_id = None
|
|
|
|
if state.error:
|
|
raise state.error
|
|
return "".join(state.deltas)
|
|
|
|
async def _reset_app_server(self) -> None:
|
|
"""Tear down + clear cached child. Idempotent."""
|
|
proc = self._app_server
|
|
self._app_server = None
|
|
self._thread_id = None
|
|
self._current_turn_id = None
|
|
if proc is not None:
|
|
try:
|
|
await proc.close()
|
|
except Exception:
|
|
logger.exception("error closing codex app-server")
|