molecule-core/docs/integrations/codex-app-server-adapter-design.md
Hongming Wang 7224276de0 feat: register codex runtime + runtime native-MCP design docs
Adds the OpenAI Codex CLI as a Molecule workspace runtime and lands
the design docs that drove the runtime native-MCP push parity work
across claude-code, hermes, openclaw, and codex.

manifest.json:
- Adds `codex` workspace_template entry pointing at the new
  Molecule-AI/molecule-ai-workspace-template-codex repo (initial
  commit landed there in parallel; 14 files / 1411 LOC). The
  workspace-server runtime registry already had `codex` in its
  fallback set — this entry makes it manifest-reachable in prod.

docs/integrations/:
- runtime-native-mcp-status.md — index across all four runtime streams
- codex-app-server-adapter-design.md — full design including v2 RPC
  sequence, executor skeleton, schema-vs-runtime drift findings
  (real codex 0.72 returns thread.id, schema says thread.threadId)
- hermes-platform-plugins-upstream-pr.md — pre-submission draft of
  the hermes-agent upstream PR

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 02:21:11 -07:00

361 lines
15 KiB
Markdown

# Codex CLI workspace adapter — app-server design
**Status:** Design draft — pre-implementation
**Owner:** Molecule AI (hongmingwang@moleculesai.app)
**Date:** 2026-05-02
**Codex version validated against:** `codex-cli 0.72.0`
**Related:** `docs/integrations/hermes-platform-plugins-upstream-pr.md`,
`molecule-ai-workspace-template-openclaw/packages/openclaw-channel-plugin/`
---
## Goal
Add a Molecule workspace template for the OpenAI Codex CLI runtime
(`@openai/codex` v0.72+). The template should give Codex agents the
same A2A inbox + mid-session push behavior the other supported
runtimes have:
- **claude-code:** MCP `notifications/claude/channel`
- **OpenClaw:** channel-plugin webhook into the gateway kernel
- **hermes:** `BasePlatformAdapter` (pending upstream PR; polling fallback today)
- **codex (this design):** persistent `codex app-server` stdio JSON-RPC
client; A2A messages become `turn/start` calls against a long-lived
thread
Today there is no codex template. The legacy fallback registry entry
at `workspace-server/internal/handlers/runtime_registry.go:83` exists
only to keep old workspaces from crashing — there is no live adapter,
no Dockerfile, nothing in `manifest.json`. This design covers the
fresh build.
---
## Architecture decision: app-server, not `codex exec`
`codex exec --json` is the obvious shape — one CLI subprocess per
A2A message, same anti-pattern OpenClaw used to have and that we are
replacing. It loses session continuity (no shared thread), pays
process-spawn cost on every turn, and gives no path to mid-turn
interruption.
`codex app-server` is a long-running JSON-RPC server over stdio that
holds thread state in memory. The v2 protocol (validated below) gives
us:
- `thread/start` → returns `threadId`
- `turn/start` → input array, threadId required → returns `turnId`
- `turn/interrupt` → cancel a running turn by `(threadId, turnId)`
- Server-pushed notifications: `agent_message_delta`, `turn/started`,
`turn/completed`, `reasoning_text_delta`,
`command_execution_output_delta`, `mcp_tool_call_progress`,
`error_notification`, etc.
A persistent app-server child plus a small async stdio reader gives us
session continuity AND mid-turn injection. Same dual-win shape we got
from migrating OpenClaw away from `openclaw agent`.
### Why not v1?
v1 of the protocol exposes `newConversation` + `sendUserMessage` /
`sendUserTurn` (one-shot per message, no streaming notifications). v2
introduces threads + turns + delta notifications. v2 is the
forward-looking surface; we build against v2 from the start.
---
## RPC sequence
### 1. Boot
```
adapter spawn ▶ codex app-server (stdio NDJSON)
◀ ready (process up)
adapter ▶ {"jsonrpc":"2.0","id":1,"method":"initialize",
"params":{"clientInfo":{"name":"molecule-runtime","version":"…"}}}
adapter ◀ {"id":1,"result":{"userAgent":"codex_cli_rs/0.72.0 …"}}
```
Validated 2026-05-02 against the installed binary — NDJSON framing,
initialize works as shown.
### 2. Thread per workspace session
```
adapter ▶ thread/start
params: {model, sandboxPolicy, approvalPolicy, cwd,
baseInstructions, developerInstructions, …}
adapter ◀ {result: {thread: {threadId: "th_…"}}}
```
`threadId` is cached on the adapter for the workspace's lifetime. On
adapter restart we use `thread/resume` against the persisted ID
(written to disk under `~/.codex/sessions/` by codex itself, but we
also keep our own pointer in workspace state for fast restore).
### 3. A2A message → turn/start
For each inbound A2A message:
```
adapter ▶ turn/start
params: {threadId, input: [{type:"text", text:"…"}], …}
adapter ◀ {result: {turn: {turnId: "tu_…"}}}
(server pushes notifications)
adapter ◀ turn/started
adapter ◀ agent_message_delta (text chunk)
adapter ◀ agent_message_delta (text chunk)
adapter ◀ turn/completed
```
The adapter accumulates `agent_message_delta` chunks into a buffer
keyed by `turnId`, emits them onto the A2A response queue (streamed if
the molecule-runtime contract supports streaming, otherwise assembled
into a single final message on `turn/completed`).
### 4. Mid-turn injection — the load-bearing case
**Default policy: per-thread serialization.** If a turn is already
running when a second A2A message arrives, queue the new message and
fire `turn/start` once the current `turn/completed` lands. This
matches OpenClaw's per-chat sequentializer behavior — the A2A peer
sees their messages handled in order, and we don't need
`turn/interrupt` for the common case.
**Opt-in policy: interrupt-and-rerun.** For workspaces that prefer
"latest message wins" semantics (rare; configurable), the adapter
fires `turn/interrupt` with `(threadId, currentTurnId)`, waits for
`turn/completed` (with cancelled status), then `turn/start` with the
combined context: previous user message + agent's partial response so
far + new message, so the agent has full context of what got
interrupted. Off by default.
### 5. Shutdown
```
adapter ▶ {"method":"shutdown"} (if v2 exposes one; otherwise SIGTERM)
adapter ▶ close stdio
adapter ▶ wait(child, timeout=5s); on timeout SIGKILL
```
---
## File layout (new template repo)
```
molecule-ai-workspace-template-codex/
├── adapter.py # BaseAdapter shell, thin (~50 LOC)
├── executor.py # AppServerProxyExecutor — the RPC client (~300 LOC)
├── app_server.py # AppServerProcess — stdio child + NDJSON reader (~150 LOC)
├── config.yaml
├── Dockerfile # node:20 + npm i -g @openai/codex@0.72
├── start.sh # boots adapter; codex app-server is spawned per session by executor
├── requirements.txt
├── README.md
└── tests/
├── test_app_server.py # mocks stdio; tests framing, request/notification dispatch
└── test_executor.py # mocks AppServerProcess; tests turn lifecycle, interrupt
```
Modeled on the hermes template (which is the closest existing shape:
adapter.py + executor.py separation; daemon proxy via local IPC). The
extra `app_server.py` exists because the JSON-RPC client + child
process management is non-trivial enough to warrant its own module
with its own tests.
---
## Executor skeleton
```python
# executor.py — A2A → codex app-server bridge
class CodexAppServerExecutor(AgentExecutor):
"""Holds one app-server child + thread, dispatches A2A turns as turn/start RPCs."""
def __init__(self, config: AdapterConfig):
self._config = config
self._app_server: AppServerProcess | None = None
self._thread_id: str | None = None
self._turn_lock = asyncio.Lock() # serialize per-thread by default
async def _ensure_thread(self) -> str:
if self._app_server is None:
self._app_server = await AppServerProcess.start()
await self._app_server.initialize(client_info={
"name": "molecule-runtime",
"version": MOLECULE_RUNTIME_VERSION,
})
if self._thread_id is None:
resp = await self._app_server.request("thread/start", {
"model": self._config.model or None,
"developerInstructions": self._config.system_prompt or None,
# other policy fields (sandbox, approval) — Molecule defaults
})
self._thread_id = resp["thread"]["threadId"]
return self._thread_id
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
prompt = extract_message_text(context.message) or ""
if not prompt.strip():
await event_queue.enqueue_event(new_agent_text_message("(empty prompt)"))
return
async with self._turn_lock: # per-thread serialization
thread_id = await self._ensure_thread()
# Subscribe to delta notifications BEFORE starting the turn so we
# don't race the first agent_message_delta.
buffer: list[str] = []
done = asyncio.Event()
error: Exception | None = None
def on_notification(method: str, params: dict) -> None:
nonlocal error
if method == "agent_message_delta":
buffer.append(params.get("delta", ""))
elif method == "turn/completed":
done.set()
elif method == "error_notification":
error = RuntimeError(params.get("message", "unknown app-server error"))
done.set()
unsub = self._app_server.subscribe(on_notification)
try:
resp = await self._app_server.request("turn/start", {
"threadId": thread_id,
"input": [{"type": "text", "text": prompt}],
})
turn_id = resp["turn"]["turnId"]
await asyncio.wait_for(done.wait(), timeout=_TURN_TIMEOUT)
finally:
unsub()
if error:
await event_queue.enqueue_event(
new_agent_text_message(f"[codex error] {error}"))
return
await event_queue.enqueue_event(new_agent_text_message("".join(buffer)))
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
# When the molecule-runtime cancels a request, fire turn/interrupt
# against the currently-running turn. Best-effort — racing
# turn/completed is fine, app-server returns a noop in that case.
if self._app_server and self._thread_id and self._current_turn_id:
await self._app_server.request("turn/interrupt", {
"threadId": self._thread_id,
"turnId": self._current_turn_id,
})
```
The `AppServerProcess` class encapsulates: stdio child management,
NDJSON line reader/writer, request-id correlation, notification
subscriber registry, and graceful shutdown. Standard async stdio
JSON-RPC client — nothing exotic.
---
## Open questions to resolve before implementation
1. **MoleculeRuntime streaming contract.** Does our A2A executor
contract support emitting incremental events (so the user sees
partial responses as the agent streams), or do we always assemble
on `turn/completed`? If streaming is supported, we want to forward
each `agent_message_delta` as an A2A event for parity with hermes
gateway streaming. (Cross-reference: hermes adapter currently
doesn't stream either — `executor.py:122` sets `stream=False`
so non-streaming is the safe v1 baseline.)
2. **Sandbox policy default.** Codex defaults to `read-only` for safety
in CLI mode; for workspace use we need write access to the
workspace tree. Pick a sensible default in `thread/start`
probably `workspace-write` scoped to the workspace cwd.
3. **Approval policy default.** Codex's `--ask-for-approval` modes
(`untrusted`, `on-failure`, `never`). Workspace agents need
`never` (they can't prompt a human). Confirm this is exposed via
`approvalPolicy` in `thread/start`.
4. **Auth — login flow.** Codex supports `login api-key` (env
`OPENAI_API_KEY`) and `login chatgpt` (interactive OAuth). For
workspace use we mandate API key. Document this in the template's
README and surface it as a required env in config.yaml.
5. **MCP server passthrough.** Codex's own `mcp_servers` config lets
the agent call out to MCP servers as a CLIENT. Should the workspace
adapter automatically wire `~/.codex/config.toml` so the agent can
reach the molecule MCP server (chat_history, recall_memory,
delegate_task)? Almost certainly yes — but verify the env-var
substitution pattern works in TOML.
6. **Thread persistence across workspace restarts.** Codex stores
sessions on disk under `~/.codex/sessions/`. The adapter should
persist the threadId in workspace state so a restart resumes the
thread (`thread/resume`) rather than starting fresh. This matches
the existing molecule-runtime convention for session continuity.
7. **Token usage / cost reporting.** v2 emits
`ThreadTokenUsageUpdatedNotification`. Plumb this into our usage
tracking — same path the other runtimes use.
8. **MCP push notifications inbound.** Earlier research established
that codex's own MCP server mode does NOT support
`notifications/*` for push. So the path for unsolicited mid-session
A2A messages is NOT "codex's MCP client receives notifications from
our MCP server" — it's "molecule-runtime polls inbox via
`wait_for_message`, and on each polled message fires `turn/start`
on the existing thread." The "MCP native" framing here is satisfied
not by codex receiving MCP push, but by the persistent thread +
turn/start delivering the same UX (session continuity + queued or
interrupted handling of new messages mid-thread).
---
## Why this design satisfies "MCP native push parity"
User goal: every runtime delivers A2A inbox messages with the same
quality of experience as claude-code's MCP `notifications/claude/channel`.
claude-code path: MCP server pushes notification → claude-code SDK
injects synthetic user turn into running session.
Codex path: molecule-runtime polls inbox (universal poll path) →
adapter fires `turn/start` on the existing app-server thread → codex
processes the message in-thread with full context. The "push" happens
at the molecule-runtime ↔ adapter boundary; the "native" part is that
codex's own session model handles it as an in-thread turn, not as a
fresh subprocess.
For mid-turn arrivals: the per-thread serialization (or opt-in
interrupt) gives us behavior equivalent to OpenClaw's per-chat
sequentializer. Equivalent UX to claude-code's mid-session
notification injection in practice — one is a kernel-level interrupt,
the other is a queue-then-dispatch, but the user-visible behavior
("the agent processes my message after the current turn finishes") is
identical.
---
## Sequencing
This is post-demo work. Order:
1. **Spec the executor lifecycle** — pin down the open questions
above (especially #1 streaming, #5 MCP passthrough, #6 thread
persistence) before any code lands.
2. **Implement `AppServerProcess`** with thorough unit tests against a
mock stdio. This is the riskiest module (concurrency around
request-id correlation + notification dispatch); land it first
with high coverage.
3. **Implement `CodexAppServerExecutor`** on top.
4. **Build the template repo skeleton** (Dockerfile, config.yaml,
start.sh, README) once the Python side runs locally.
5. **Add codex to `manifest.json`** and the runtime registry.
6. **End-to-end verify** per `feedback_close_on_user_visible_not_merge`
— boot a real workspace, send A2A messages, observe streamed
responses + thread continuity + queued mid-turn handling.
Estimated total: 3-5 engineering days for v1, plus E2E hardening.