molecule-ai-workspace-templ.../executor.py
Hongming Wang 0f4ed28f62 feat: initial codex CLI workspace template
OpenAI Codex CLI (@openai/codex >=0.72) wrapped as a Molecule
workspace runtime, with native MCP-style push parity via persistent
codex app-server stdio JSON-RPC.

Each session holds one long-lived `codex app-server` child + one
thread; A2A messages become turn/start RPCs against the existing
thread. Per-thread serialization handles mid-turn arrivals (matches
OpenClaw's per-chat sequentializer).

Modules:
- app_server.py — async JSON-RPC over NDJSON stdio (286 LOC)
- executor.py — turn lifecycle, notification accumulation,
  error surfacing (270 LOC)
- adapter.py — thin BaseAdapter shell + preflight

Tests: 12/12 pass against Python NDJSON mock + fake AppServerProcess.
Validated end-to-end against real codex-cli 0.72.0:
- initialize handshake works
- thread/start works (returns thread.id, NOT thread.threadId as the
  generated JSON schema claims; executor accepts both shapes)

Scaffolded but not yet end-to-end verified against a real Molecule
workspace + peer A2A traffic — that lands separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:19:52 -07:00

268 lines
11 KiB
Python

"""A2A → codex app-server bridge.
Holds one persistent `codex app-server` child + one thread per
workspace session, dispatches each A2A message as a `turn/start` RPC
against the existing thread.
Design rationale lives in
``docs/integrations/codex-app-server-adapter-design.md`` (in
molecule-core). The short version:
- Persistent child gives us session continuity (the agent's
conversation history, tool state, and config persist across A2A
turns) without serializing through disk.
- Per-thread serialization (``_turn_lock``) gives us safe, ordered
handling of mid-turn arrivals — A2A peers see their messages
processed in arrival order, matching OpenClaw's per-chat
sequentializer behavior.
- Notification-driven response assembly: the executor accumulates
``agent_message_delta`` chunks and emits the final assembled text
on ``turn/completed``. Streaming forward is a future upgrade once
the molecule-runtime contract supports incremental events.
The riskiest module of this stack is ``app_server.AppServerProcess``
(the raw JSON-RPC client) — that has its own unit tests. This file
focuses on the protocol-level lifecycle: thread bootstrap, turn
dispatch, notification accumulation, error surface.
"""
from __future__ import annotations
import asyncio
import logging
import os
from dataclasses import dataclass, field
from typing import Any
from a2a.helpers import new_text_message
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from molecule_runtime.adapters.base import AdapterConfig
from molecule_runtime.executor_helpers import extract_message_text
from app_server import AppServerError, AppServerProcess
logger = logging.getLogger(__name__)
# Per-turn timeout. Codex turns can run minutes during heavy tool use
# (test runs, edits, web fetches). Tighter than infinite to bound
# debug-time hangs.
_TURN_TIMEOUT = 600.0
@dataclass
class _TurnState:
"""Mutable state accumulated during one turn lifecycle.
Owned by the running ``_run_turn`` invocation; the notification
subscriber appends to it under ``_turn_lock``.
"""
deltas: list[str] = field(default_factory=list)
completed: asyncio.Event = field(default_factory=asyncio.Event)
error: Exception | None = None
turn_id: str | None = None
class CodexAppServerExecutor(AgentExecutor):
"""A2A executor that proxies turns to a long-lived codex app-server."""
def __init__(self, config: AdapterConfig):
self._config = config
self._app_server: AppServerProcess | None = None
self._thread_id: str | None = None
# Serialize turns per thread. mid-turn A2A arrivals queue and
# run after the current turn completes — same shape OpenClaw's
# per-chat sequentializer uses.
self._turn_lock = asyncio.Lock()
# Tracked so cancel() can fire turn/interrupt against the
# currently-running turn (best-effort).
self._current_turn_id: str | None = None
# ------------------------------------------------------------------
# Bootstrap
# ------------------------------------------------------------------
async def _ensure_thread(self) -> str:
"""Lazy-init the app-server child + thread on first turn."""
if self._app_server is None:
env = {
# Codex picks up OPENAI_API_KEY from the environment.
# We pass through everything; container start.sh is
# responsible for ensuring the key is present.
**os.environ,
}
self._app_server = await AppServerProcess.start(env=env)
await self._app_server.initialize(client_info={
"name": "molecule-runtime-codex",
"version": "0.1.0",
})
logger.info("codex app-server child initialized")
if self._thread_id is None:
params: dict[str, Any] = {}
if self._config.model:
params["model"] = self._config.model
if self._config.system_prompt:
params["developerInstructions"] = self._config.system_prompt
# Workspace agents can't prompt a human, so approval policy
# must be `never`. Sandbox `workspace-write` lets the agent
# edit the workspace tree but not arbitrary disk.
params["approvalPolicy"] = "never"
params["sandboxPolicy"] = {"mode": "workspace-write"}
resp = await self._app_server.request("thread/start", params)
# Field name varies between the v2 JSON schema (threadId) and
# the running binary 0.72.x (id). Accept either — verified
# 2026-05-02 against codex-cli 0.72.0 which returns `id`.
thread = resp.get("thread") or {}
self._thread_id = thread.get("id") or thread.get("threadId")
if not self._thread_id:
raise RuntimeError(
f"thread/start did not return an id; got keys: {list(thread.keys())}"
)
logger.info("codex thread started: %s", self._thread_id)
return self._thread_id
# ------------------------------------------------------------------
# AgentExecutor contract
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
prompt = extract_message_text(context.message) or ""
if not prompt.strip():
await event_queue.enqueue_event(
new_text_message("(empty prompt — nothing to do)")
)
return
async with self._turn_lock:
try:
text = await self._run_turn(prompt)
except AppServerError as exc:
logger.warning("codex app-server error: %s", exc)
await event_queue.enqueue_event(
new_text_message(f"[codex error] {exc}")
)
return
except asyncio.TimeoutError:
logger.warning("codex turn timed out after %.0fs", _TURN_TIMEOUT)
await event_queue.enqueue_event(
new_text_message(
f"[codex turn timed out after {_TURN_TIMEOUT:.0f}s]"
)
)
return
except ConnectionError as exc:
logger.exception("codex app-server connection lost")
# On connection loss, drop our cached app-server +
# thread so the next turn starts fresh.
await self._reset_app_server()
await event_queue.enqueue_event(
new_text_message(f"[codex unreachable] {exc!s}")
)
return
await event_queue.enqueue_event(new_text_message(text))
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Best-effort interrupt of the in-flight turn.
Race-prone (the turn may have completed between our last
poll and this call) but the app-server treats a stale
interrupt as a no-op, so we don't need to lock around it.
"""
if (
self._app_server is not None
and self._thread_id is not None
and self._current_turn_id is not None
):
try:
await self._app_server.request(
"turn/interrupt",
{"threadId": self._thread_id, "turnId": self._current_turn_id},
timeout=5.0,
)
except (AppServerError, asyncio.TimeoutError, ConnectionError) as exc:
logger.debug("turn/interrupt failed (expected if turn already done): %s", exc)
async def shutdown(self) -> None:
"""Tear down the app-server child cleanly. Idempotent."""
await self._reset_app_server()
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
async def _run_turn(self, prompt: str) -> str:
"""Fire turn/start, accumulate deltas, return assembled text.
Splits the AgentExecutor contract into a pure-data path so
unit tests can drive it without standing up an A2A
EventQueue.
"""
thread_id = await self._ensure_thread()
assert self._app_server is not None # set by _ensure_thread
state = _TurnState()
loop = asyncio.get_running_loop()
def on_notification(method: str, params: dict[str, Any]) -> None:
# Codex v2 protocol notifications. Only the message stream
# + completion + error map onto our flow today; everything
# else (reasoning, tool exec, token usage) is logged for
# observability but not surfaced to the A2A response.
if method == "agent_message_delta":
delta = params.get("delta") or params.get("text") or ""
if delta:
state.deltas.append(delta)
elif method in ("turn/completed", "turn.completed"):
# Match either dotted or slashed form — schema is in
# flux during the experimental phase. Also tolerate
# both `turnId` (schema) and `id` (real binary) for
# the params id field.
tid = params.get("turnId") or params.get("id")
if tid in (None, state.turn_id):
loop.call_soon_threadsafe(state.completed.set)
elif method == "error_notification":
state.error = RuntimeError(
params.get("message", "unknown app-server error")
)
loop.call_soon_threadsafe(state.completed.set)
else:
logger.debug("codex notification: %s %s", method, params)
unsubscribe = self._app_server.subscribe(on_notification)
try:
resp = await self._app_server.request("turn/start", {
"threadId": thread_id,
"input": [{"type": "text", "text": prompt}],
})
# Mirror the same id/threadId tolerance we have for thread/start.
turn = resp.get("turn") or {}
state.turn_id = turn.get("id") or turn.get("turnId")
if not state.turn_id:
raise RuntimeError(
f"turn/start did not return an id; got keys: {list(turn.keys())}"
)
self._current_turn_id = state.turn_id
await asyncio.wait_for(state.completed.wait(), timeout=_TURN_TIMEOUT)
finally:
unsubscribe()
self._current_turn_id = None
if state.error:
raise state.error
return "".join(state.deltas)
async def _reset_app_server(self) -> None:
"""Tear down + clear cached child. Idempotent."""
proc = self._app_server
self._app_server = None
self._thread_id = None
self._current_turn_id = None
if proc is not None:
try:
await proc.close()
except Exception:
logger.exception("error closing codex app-server")