Adds the OpenAI Codex CLI as a Molecule workspace runtime and lands the design docs that drove the runtime native-MCP push parity work across claude-code, hermes, openclaw, and codex. manifest.json: - Adds `codex` workspace_template entry pointing at the new Molecule-AI/molecule-ai-workspace-template-codex repo (initial commit landed there in parallel; 14 files / 1411 LOC). The workspace-server runtime registry already had `codex` in its fallback set — this entry makes it manifest-reachable in prod. docs/integrations/: - runtime-native-mcp-status.md — index across all four runtime streams - codex-app-server-adapter-design.md — full design including v2 RPC sequence, executor skeleton, schema-vs-runtime drift findings (real codex 0.72 returns thread.id, schema says thread.threadId) - hermes-platform-plugins-upstream-pr.md — pre-submission draft of the hermes-agent upstream PR Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
361 lines
15 KiB
Markdown
361 lines
15 KiB
Markdown
# Codex CLI workspace adapter — app-server design
|
|
|
|
**Status:** Design draft — pre-implementation
|
|
**Owner:** Molecule AI (hongmingwang@moleculesai.app)
|
|
**Date:** 2026-05-02
|
|
**Codex version validated against:** `codex-cli 0.72.0`
|
|
**Related:** `docs/integrations/hermes-platform-plugins-upstream-pr.md`,
|
|
`molecule-ai-workspace-template-openclaw/packages/openclaw-channel-plugin/`
|
|
|
|
---
|
|
|
|
## Goal
|
|
|
|
Add a Molecule workspace template for the OpenAI Codex CLI runtime
|
|
(`@openai/codex` v0.72+). The template should give Codex agents the
|
|
same A2A inbox + mid-session push behavior the other supported
|
|
runtimes have:
|
|
|
|
- **claude-code:** MCP `notifications/claude/channel`
|
|
- **OpenClaw:** channel-plugin webhook into the gateway kernel
|
|
- **hermes:** `BasePlatformAdapter` (pending upstream PR; polling fallback today)
|
|
- **codex (this design):** persistent `codex app-server` stdio JSON-RPC
|
|
client; A2A messages become `turn/start` calls against a long-lived
|
|
thread
|
|
|
|
Today there is no codex template. The legacy fallback registry entry
|
|
at `workspace-server/internal/handlers/runtime_registry.go:83` exists
|
|
only to keep old workspaces from crashing — there is no live adapter,
|
|
no Dockerfile, nothing in `manifest.json`. This design covers the
|
|
fresh build.
|
|
|
|
---
|
|
|
|
## Architecture decision: app-server, not `codex exec`
|
|
|
|
`codex exec --json` is the obvious shape — one CLI subprocess per
|
|
A2A message, same anti-pattern OpenClaw used to have and that we are
|
|
replacing. It loses session continuity (no shared thread), pays
|
|
process-spawn cost on every turn, and gives no path to mid-turn
|
|
interruption.
|
|
|
|
`codex app-server` is a long-running JSON-RPC server over stdio that
|
|
holds thread state in memory. The v2 protocol (validated below) gives
|
|
us:
|
|
|
|
- `thread/start` → returns `threadId`
|
|
- `turn/start` → input array, threadId required → returns `turnId`
|
|
- `turn/interrupt` → cancel a running turn by `(threadId, turnId)`
|
|
- Server-pushed notifications: `agent_message_delta`, `turn/started`,
|
|
`turn/completed`, `reasoning_text_delta`,
|
|
`command_execution_output_delta`, `mcp_tool_call_progress`,
|
|
`error_notification`, etc.
|
|
|
|
A persistent app-server child plus a small async stdio reader gives us
|
|
session continuity AND mid-turn injection. Same dual-win shape we got
|
|
from migrating OpenClaw away from `openclaw agent`.
|
|
|
|
### Why not v1?
|
|
|
|
v1 of the protocol exposes `newConversation` + `sendUserMessage` /
|
|
`sendUserTurn` (one-shot per message, no streaming notifications). v2
|
|
introduces threads + turns + delta notifications. v2 is the
|
|
forward-looking surface; we build against v2 from the start.
|
|
|
|
---
|
|
|
|
## RPC sequence
|
|
|
|
### 1. Boot
|
|
|
|
```
|
|
adapter spawn ▶ codex app-server (stdio NDJSON)
|
|
◀ ready (process up)
|
|
adapter ▶ {"jsonrpc":"2.0","id":1,"method":"initialize",
|
|
"params":{"clientInfo":{"name":"molecule-runtime","version":"…"}}}
|
|
adapter ◀ {"id":1,"result":{"userAgent":"codex_cli_rs/0.72.0 …"}}
|
|
```
|
|
|
|
Validated 2026-05-02 against the installed binary — NDJSON framing,
|
|
initialize works as shown.
|
|
|
|
### 2. Thread per workspace session
|
|
|
|
```
|
|
adapter ▶ thread/start
|
|
params: {model, sandboxPolicy, approvalPolicy, cwd,
|
|
baseInstructions, developerInstructions, …}
|
|
adapter ◀ {result: {thread: {threadId: "th_…"}}}
|
|
```
|
|
|
|
`threadId` is cached on the adapter for the workspace's lifetime. On
|
|
adapter restart we use `thread/resume` against the persisted ID
|
|
(written to disk under `~/.codex/sessions/` by codex itself, but we
|
|
also keep our own pointer in workspace state for fast restore).
|
|
|
|
### 3. A2A message → turn/start
|
|
|
|
For each inbound A2A message:
|
|
|
|
```
|
|
adapter ▶ turn/start
|
|
params: {threadId, input: [{type:"text", text:"…"}], …}
|
|
adapter ◀ {result: {turn: {turnId: "tu_…"}}}
|
|
|
|
(server pushes notifications)
|
|
adapter ◀ turn/started
|
|
adapter ◀ agent_message_delta (text chunk)
|
|
adapter ◀ agent_message_delta (text chunk)
|
|
…
|
|
adapter ◀ turn/completed
|
|
```
|
|
|
|
The adapter accumulates `agent_message_delta` chunks into a buffer
|
|
keyed by `turnId`, emits them onto the A2A response queue (streamed if
|
|
the molecule-runtime contract supports streaming, otherwise assembled
|
|
into a single final message on `turn/completed`).
|
|
|
|
### 4. Mid-turn injection — the load-bearing case
|
|
|
|
**Default policy: per-thread serialization.** If a turn is already
|
|
running when a second A2A message arrives, queue the new message and
|
|
fire `turn/start` once the current `turn/completed` lands. This
|
|
matches OpenClaw's per-chat sequentializer behavior — the A2A peer
|
|
sees their messages handled in order, and we don't need
|
|
`turn/interrupt` for the common case.
|
|
|
|
**Opt-in policy: interrupt-and-rerun.** For workspaces that prefer
|
|
"latest message wins" semantics (rare; configurable), the adapter
|
|
fires `turn/interrupt` with `(threadId, currentTurnId)`, waits for
|
|
`turn/completed` (with cancelled status), then `turn/start` with the
|
|
combined context: previous user message + agent's partial response so
|
|
far + new message, so the agent has full context of what got
|
|
interrupted. Off by default.
|
|
|
|
### 5. Shutdown
|
|
|
|
```
|
|
adapter ▶ {"method":"shutdown"} (if v2 exposes one; otherwise SIGTERM)
|
|
adapter ▶ close stdio
|
|
adapter ▶ wait(child, timeout=5s); on timeout SIGKILL
|
|
```
|
|
|
|
---
|
|
|
|
## File layout (new template repo)
|
|
|
|
```
|
|
molecule-ai-workspace-template-codex/
|
|
├── adapter.py # BaseAdapter shell, thin (~50 LOC)
|
|
├── executor.py # AppServerProxyExecutor — the RPC client (~300 LOC)
|
|
├── app_server.py # AppServerProcess — stdio child + NDJSON reader (~150 LOC)
|
|
├── config.yaml
|
|
├── Dockerfile # node:20 + npm i -g @openai/codex@0.72
|
|
├── start.sh # boots adapter; codex app-server is spawned per session by executor
|
|
├── requirements.txt
|
|
├── README.md
|
|
└── tests/
|
|
├── test_app_server.py # mocks stdio; tests framing, request/notification dispatch
|
|
└── test_executor.py # mocks AppServerProcess; tests turn lifecycle, interrupt
|
|
```
|
|
|
|
Modeled on the hermes template (which is the closest existing shape:
|
|
adapter.py + executor.py separation; daemon proxy via local IPC). The
|
|
extra `app_server.py` exists because the JSON-RPC client + child
|
|
process management is non-trivial enough to warrant its own module
|
|
with its own tests.
|
|
|
|
---
|
|
|
|
## Executor skeleton
|
|
|
|
```python
|
|
# executor.py — A2A → codex app-server bridge
|
|
|
|
class CodexAppServerExecutor(AgentExecutor):
|
|
"""Holds one app-server child + thread, dispatches A2A turns as turn/start RPCs."""
|
|
|
|
def __init__(self, config: AdapterConfig):
|
|
self._config = config
|
|
self._app_server: AppServerProcess | None = None
|
|
self._thread_id: str | None = None
|
|
self._turn_lock = asyncio.Lock() # serialize per-thread by default
|
|
|
|
async def _ensure_thread(self) -> str:
|
|
if self._app_server is None:
|
|
self._app_server = await AppServerProcess.start()
|
|
await self._app_server.initialize(client_info={
|
|
"name": "molecule-runtime",
|
|
"version": MOLECULE_RUNTIME_VERSION,
|
|
})
|
|
if self._thread_id is None:
|
|
resp = await self._app_server.request("thread/start", {
|
|
"model": self._config.model or None,
|
|
"developerInstructions": self._config.system_prompt or None,
|
|
# other policy fields (sandbox, approval) — Molecule defaults
|
|
})
|
|
self._thread_id = resp["thread"]["threadId"]
|
|
return self._thread_id
|
|
|
|
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
prompt = extract_message_text(context.message) or ""
|
|
if not prompt.strip():
|
|
await event_queue.enqueue_event(new_agent_text_message("(empty prompt)"))
|
|
return
|
|
|
|
async with self._turn_lock: # per-thread serialization
|
|
thread_id = await self._ensure_thread()
|
|
|
|
# Subscribe to delta notifications BEFORE starting the turn so we
|
|
# don't race the first agent_message_delta.
|
|
buffer: list[str] = []
|
|
done = asyncio.Event()
|
|
error: Exception | None = None
|
|
|
|
def on_notification(method: str, params: dict) -> None:
|
|
nonlocal error
|
|
if method == "agent_message_delta":
|
|
buffer.append(params.get("delta", ""))
|
|
elif method == "turn/completed":
|
|
done.set()
|
|
elif method == "error_notification":
|
|
error = RuntimeError(params.get("message", "unknown app-server error"))
|
|
done.set()
|
|
|
|
unsub = self._app_server.subscribe(on_notification)
|
|
try:
|
|
resp = await self._app_server.request("turn/start", {
|
|
"threadId": thread_id,
|
|
"input": [{"type": "text", "text": prompt}],
|
|
})
|
|
turn_id = resp["turn"]["turnId"]
|
|
await asyncio.wait_for(done.wait(), timeout=_TURN_TIMEOUT)
|
|
finally:
|
|
unsub()
|
|
|
|
if error:
|
|
await event_queue.enqueue_event(
|
|
new_agent_text_message(f"[codex error] {error}"))
|
|
return
|
|
await event_queue.enqueue_event(new_agent_text_message("".join(buffer)))
|
|
|
|
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
|
|
# When the molecule-runtime cancels a request, fire turn/interrupt
|
|
# against the currently-running turn. Best-effort — racing
|
|
# turn/completed is fine, app-server returns a noop in that case.
|
|
if self._app_server and self._thread_id and self._current_turn_id:
|
|
await self._app_server.request("turn/interrupt", {
|
|
"threadId": self._thread_id,
|
|
"turnId": self._current_turn_id,
|
|
})
|
|
```
|
|
|
|
The `AppServerProcess` class encapsulates: stdio child management,
|
|
NDJSON line reader/writer, request-id correlation, notification
|
|
subscriber registry, and graceful shutdown. Standard async stdio
|
|
JSON-RPC client — nothing exotic.
|
|
|
|
---
|
|
|
|
## Open questions to resolve before implementation
|
|
|
|
1. **MoleculeRuntime streaming contract.** Does our A2A executor
|
|
contract support emitting incremental events (so the user sees
|
|
partial responses as the agent streams), or do we always assemble
|
|
on `turn/completed`? If streaming is supported, we want to forward
|
|
each `agent_message_delta` as an A2A event for parity with hermes
|
|
gateway streaming. (Cross-reference: hermes adapter currently
|
|
doesn't stream either — `executor.py:122` sets `stream=False` —
|
|
so non-streaming is the safe v1 baseline.)
|
|
|
|
2. **Sandbox policy default.** Codex defaults to `read-only` for safety
|
|
in CLI mode; for workspace use we need write access to the
|
|
workspace tree. Pick a sensible default in `thread/start` —
|
|
probably `workspace-write` scoped to the workspace cwd.
|
|
|
|
3. **Approval policy default.** Codex's `--ask-for-approval` modes
|
|
(`untrusted`, `on-failure`, `never`). Workspace agents need
|
|
`never` (they can't prompt a human). Confirm this is exposed via
|
|
`approvalPolicy` in `thread/start`.
|
|
|
|
4. **Auth — login flow.** Codex supports `login api-key` (env
|
|
`OPENAI_API_KEY`) and `login chatgpt` (interactive OAuth). For
|
|
workspace use we mandate API key. Document this in the template's
|
|
README and surface it as a required env in config.yaml.
|
|
|
|
5. **MCP server passthrough.** Codex's own `mcp_servers` config lets
|
|
the agent call out to MCP servers as a CLIENT. Should the workspace
|
|
adapter automatically wire `~/.codex/config.toml` so the agent can
|
|
reach the molecule MCP server (chat_history, recall_memory,
|
|
delegate_task)? Almost certainly yes — but verify the env-var
|
|
substitution pattern works in TOML.
|
|
|
|
6. **Thread persistence across workspace restarts.** Codex stores
|
|
sessions on disk under `~/.codex/sessions/`. The adapter should
|
|
persist the threadId in workspace state so a restart resumes the
|
|
thread (`thread/resume`) rather than starting fresh. This matches
|
|
the existing molecule-runtime convention for session continuity.
|
|
|
|
7. **Token usage / cost reporting.** v2 emits
|
|
`ThreadTokenUsageUpdatedNotification`. Plumb this into our usage
|
|
tracking — same path the other runtimes use.
|
|
|
|
8. **MCP push notifications inbound.** Earlier research established
|
|
that codex's own MCP server mode does NOT support
|
|
`notifications/*` for push. So the path for unsolicited mid-session
|
|
A2A messages is NOT "codex's MCP client receives notifications from
|
|
our MCP server" — it's "molecule-runtime polls inbox via
|
|
`wait_for_message`, and on each polled message fires `turn/start`
|
|
on the existing thread." The "MCP native" framing here is satisfied
|
|
not by codex receiving MCP push, but by the persistent thread +
|
|
turn/start delivering the same UX (session continuity + queued or
|
|
interrupted handling of new messages mid-thread).
|
|
|
|
---
|
|
|
|
## Why this design satisfies "MCP native push parity"
|
|
|
|
User goal: every runtime delivers A2A inbox messages with the same
|
|
quality of experience as claude-code's MCP `notifications/claude/channel`.
|
|
|
|
claude-code path: MCP server pushes notification → claude-code SDK
|
|
injects synthetic user turn into running session.
|
|
|
|
Codex path: molecule-runtime polls inbox (universal poll path) →
|
|
adapter fires `turn/start` on the existing app-server thread → codex
|
|
processes the message in-thread with full context. The "push" happens
|
|
at the molecule-runtime ↔ adapter boundary; the "native" part is that
|
|
codex's own session model handles it as an in-thread turn, not as a
|
|
fresh subprocess.
|
|
|
|
For mid-turn arrivals: the per-thread serialization (or opt-in
|
|
interrupt) gives us behavior equivalent to OpenClaw's per-chat
|
|
sequentializer. Equivalent UX to claude-code's mid-session
|
|
notification injection in practice — one is a kernel-level interrupt,
|
|
the other is a queue-then-dispatch, but the user-visible behavior
|
|
("the agent processes my message after the current turn finishes") is
|
|
identical.
|
|
|
|
---
|
|
|
|
## Sequencing
|
|
|
|
This is post-demo work. Order:
|
|
|
|
1. **Spec the executor lifecycle** — pin down the open questions
|
|
above (especially #1 streaming, #5 MCP passthrough, #6 thread
|
|
persistence) before any code lands.
|
|
2. **Implement `AppServerProcess`** with thorough unit tests against a
|
|
mock stdio. This is the riskiest module (concurrency around
|
|
request-id correlation + notification dispatch); land it first
|
|
with high coverage.
|
|
3. **Implement `CodexAppServerExecutor`** on top.
|
|
4. **Build the template repo skeleton** (Dockerfile, config.yaml,
|
|
start.sh, README) once the Python side runs locally.
|
|
5. **Add codex to `manifest.json`** and the runtime registry.
|
|
6. **End-to-end verify** per `feedback_close_on_user_visible_not_merge`
|
|
— boot a real workspace, send A2A messages, observe streamed
|
|
responses + thread continuity + queued mid-turn handling.
|
|
|
|
Estimated total: 3-5 engineering days for v1, plus E2E hardening.
|