Adds the OpenAI Codex CLI as a Molecule workspace runtime and lands the design docs that drove the runtime native-MCP push parity work across claude-code, hermes, openclaw, and codex. manifest.json: - Adds `codex` workspace_template entry pointing at the new Molecule-AI/molecule-ai-workspace-template-codex repo (initial commit landed there in parallel; 14 files / 1411 LOC). The workspace-server runtime registry already had `codex` in its fallback set — this entry makes it manifest-reachable in prod. docs/integrations/: - runtime-native-mcp-status.md — index across all four runtime streams - codex-app-server-adapter-design.md — full design including v2 RPC sequence, executor skeleton, schema-vs-runtime drift findings (real codex 0.72 returns thread.id, schema says thread.threadId) - hermes-platform-plugins-upstream-pr.md — pre-submission draft of the hermes-agent upstream PR Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
15 KiB
Codex CLI workspace adapter — app-server design
Status: Design draft — pre-implementation
Owner: Molecule AI (hongmingwang@moleculesai.app)
Date: 2026-05-02
Codex version validated against: codex-cli 0.72.0
Related: docs/integrations/hermes-platform-plugins-upstream-pr.md,
molecule-ai-workspace-template-openclaw/packages/openclaw-channel-plugin/
Goal
Add a Molecule workspace template for the OpenAI Codex CLI runtime
(@openai/codex v0.72+). The template should give Codex agents the
same A2A inbox + mid-session push behavior the other supported
runtimes have:
- claude-code: MCP
notifications/claude/channel - OpenClaw: channel-plugin webhook into the gateway kernel
- hermes:
BasePlatformAdapter(pending upstream PR; polling fallback today) - codex (this design): persistent
codex app-serverstdio JSON-RPC client; A2A messages becometurn/startcalls against a long-lived thread
Today there is no codex template. The legacy fallback registry entry
at workspace-server/internal/handlers/runtime_registry.go:83 exists
only to keep old workspaces from crashing — there is no live adapter,
no Dockerfile, nothing in manifest.json. This design covers the
fresh build.
Architecture decision: app-server, not codex exec
codex exec --json is the obvious shape — one CLI subprocess per
A2A message, same anti-pattern OpenClaw used to have and that we are
replacing. It loses session continuity (no shared thread), pays
process-spawn cost on every turn, and gives no path to mid-turn
interruption.
codex app-server is a long-running JSON-RPC server over stdio that
holds thread state in memory. The v2 protocol (validated below) gives
us:
thread/start→ returnsthreadIdturn/start→ input array, threadId required → returnsturnIdturn/interrupt→ cancel a running turn by(threadId, turnId)- Server-pushed notifications:
agent_message_delta,turn/started,turn/completed,reasoning_text_delta,command_execution_output_delta,mcp_tool_call_progress,error_notification, etc.
A persistent app-server child plus a small async stdio reader gives us
session continuity AND mid-turn injection. Same dual-win shape we got
from migrating OpenClaw away from openclaw agent.
Why not v1?
v1 of the protocol exposes newConversation + sendUserMessage /
sendUserTurn (one-shot per message, no streaming notifications). v2
introduces threads + turns + delta notifications. v2 is the
forward-looking surface; we build against v2 from the start.
RPC sequence
1. Boot
adapter spawn ▶ codex app-server (stdio NDJSON)
◀ ready (process up)
adapter ▶ {"jsonrpc":"2.0","id":1,"method":"initialize",
"params":{"clientInfo":{"name":"molecule-runtime","version":"…"}}}
adapter ◀ {"id":1,"result":{"userAgent":"codex_cli_rs/0.72.0 …"}}
Validated 2026-05-02 against the installed binary — NDJSON framing, initialize works as shown.
2. Thread per workspace session
adapter ▶ thread/start
params: {model, sandboxPolicy, approvalPolicy, cwd,
baseInstructions, developerInstructions, …}
adapter ◀ {result: {thread: {threadId: "th_…"}}}
threadId is cached on the adapter for the workspace's lifetime. On
adapter restart we use thread/resume against the persisted ID
(written to disk under ~/.codex/sessions/ by codex itself, but we
also keep our own pointer in workspace state for fast restore).
3. A2A message → turn/start
For each inbound A2A message:
adapter ▶ turn/start
params: {threadId, input: [{type:"text", text:"…"}], …}
adapter ◀ {result: {turn: {turnId: "tu_…"}}}
(server pushes notifications)
adapter ◀ turn/started
adapter ◀ agent_message_delta (text chunk)
adapter ◀ agent_message_delta (text chunk)
…
adapter ◀ turn/completed
The adapter accumulates agent_message_delta chunks into a buffer
keyed by turnId, emits them onto the A2A response queue (streamed if
the molecule-runtime contract supports streaming, otherwise assembled
into a single final message on turn/completed).
4. Mid-turn injection — the load-bearing case
Default policy: per-thread serialization. If a turn is already
running when a second A2A message arrives, queue the new message and
fire turn/start once the current turn/completed lands. This
matches OpenClaw's per-chat sequentializer behavior — the A2A peer
sees their messages handled in order, and we don't need
turn/interrupt for the common case.
Opt-in policy: interrupt-and-rerun. For workspaces that prefer
"latest message wins" semantics (rare; configurable), the adapter
fires turn/interrupt with (threadId, currentTurnId), waits for
turn/completed (with cancelled status), then turn/start with the
combined context: previous user message + agent's partial response so
far + new message, so the agent has full context of what got
interrupted. Off by default.
5. Shutdown
adapter ▶ {"method":"shutdown"} (if v2 exposes one; otherwise SIGTERM)
adapter ▶ close stdio
adapter ▶ wait(child, timeout=5s); on timeout SIGKILL
File layout (new template repo)
molecule-ai-workspace-template-codex/
├── adapter.py # BaseAdapter shell, thin (~50 LOC)
├── executor.py # AppServerProxyExecutor — the RPC client (~300 LOC)
├── app_server.py # AppServerProcess — stdio child + NDJSON reader (~150 LOC)
├── config.yaml
├── Dockerfile # node:20 + npm i -g @openai/codex@0.72
├── start.sh # boots adapter; codex app-server is spawned per session by executor
├── requirements.txt
├── README.md
└── tests/
├── test_app_server.py # mocks stdio; tests framing, request/notification dispatch
└── test_executor.py # mocks AppServerProcess; tests turn lifecycle, interrupt
Modeled on the hermes template (which is the closest existing shape:
adapter.py + executor.py separation; daemon proxy via local IPC). The
extra app_server.py exists because the JSON-RPC client + child
process management is non-trivial enough to warrant its own module
with its own tests.
Executor skeleton
# executor.py — A2A → codex app-server bridge
class CodexAppServerExecutor(AgentExecutor):
"""Holds one app-server child + thread, dispatches A2A turns as turn/start RPCs."""
def __init__(self, config: AdapterConfig):
self._config = config
self._app_server: AppServerProcess | None = None
self._thread_id: str | None = None
self._turn_lock = asyncio.Lock() # serialize per-thread by default
async def _ensure_thread(self) -> str:
if self._app_server is None:
self._app_server = await AppServerProcess.start()
await self._app_server.initialize(client_info={
"name": "molecule-runtime",
"version": MOLECULE_RUNTIME_VERSION,
})
if self._thread_id is None:
resp = await self._app_server.request("thread/start", {
"model": self._config.model or None,
"developerInstructions": self._config.system_prompt or None,
# other policy fields (sandbox, approval) — Molecule defaults
})
self._thread_id = resp["thread"]["threadId"]
return self._thread_id
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
prompt = extract_message_text(context.message) or ""
if not prompt.strip():
await event_queue.enqueue_event(new_agent_text_message("(empty prompt)"))
return
async with self._turn_lock: # per-thread serialization
thread_id = await self._ensure_thread()
# Subscribe to delta notifications BEFORE starting the turn so we
# don't race the first agent_message_delta.
buffer: list[str] = []
done = asyncio.Event()
error: Exception | None = None
def on_notification(method: str, params: dict) -> None:
nonlocal error
if method == "agent_message_delta":
buffer.append(params.get("delta", ""))
elif method == "turn/completed":
done.set()
elif method == "error_notification":
error = RuntimeError(params.get("message", "unknown app-server error"))
done.set()
unsub = self._app_server.subscribe(on_notification)
try:
resp = await self._app_server.request("turn/start", {
"threadId": thread_id,
"input": [{"type": "text", "text": prompt}],
})
turn_id = resp["turn"]["turnId"]
await asyncio.wait_for(done.wait(), timeout=_TURN_TIMEOUT)
finally:
unsub()
if error:
await event_queue.enqueue_event(
new_agent_text_message(f"[codex error] {error}"))
return
await event_queue.enqueue_event(new_agent_text_message("".join(buffer)))
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
# When the molecule-runtime cancels a request, fire turn/interrupt
# against the currently-running turn. Best-effort — racing
# turn/completed is fine, app-server returns a noop in that case.
if self._app_server and self._thread_id and self._current_turn_id:
await self._app_server.request("turn/interrupt", {
"threadId": self._thread_id,
"turnId": self._current_turn_id,
})
The AppServerProcess class encapsulates: stdio child management,
NDJSON line reader/writer, request-id correlation, notification
subscriber registry, and graceful shutdown. Standard async stdio
JSON-RPC client — nothing exotic.
Open questions to resolve before implementation
-
MoleculeRuntime streaming contract. Does our A2A executor contract support emitting incremental events (so the user sees partial responses as the agent streams), or do we always assemble on
turn/completed? If streaming is supported, we want to forward eachagent_message_deltaas an A2A event for parity with hermes gateway streaming. (Cross-reference: hermes adapter currently doesn't stream either —executor.py:122setsstream=False— so non-streaming is the safe v1 baseline.) -
Sandbox policy default. Codex defaults to
read-onlyfor safety in CLI mode; for workspace use we need write access to the workspace tree. Pick a sensible default inthread/start— probablyworkspace-writescoped to the workspace cwd. -
Approval policy default. Codex's
--ask-for-approvalmodes (untrusted,on-failure,never). Workspace agents neednever(they can't prompt a human). Confirm this is exposed viaapprovalPolicyinthread/start. -
Auth — login flow. Codex supports
login api-key(envOPENAI_API_KEY) andlogin chatgpt(interactive OAuth). For workspace use we mandate API key. Document this in the template's README and surface it as a required env in config.yaml. -
MCP server passthrough. Codex's own
mcp_serversconfig lets the agent call out to MCP servers as a CLIENT. Should the workspace adapter automatically wire~/.codex/config.tomlso the agent can reach the molecule MCP server (chat_history, recall_memory, delegate_task)? Almost certainly yes — but verify the env-var substitution pattern works in TOML. -
Thread persistence across workspace restarts. Codex stores sessions on disk under
~/.codex/sessions/. The adapter should persist the threadId in workspace state so a restart resumes the thread (thread/resume) rather than starting fresh. This matches the existing molecule-runtime convention for session continuity. -
Token usage / cost reporting. v2 emits
ThreadTokenUsageUpdatedNotification. Plumb this into our usage tracking — same path the other runtimes use. -
MCP push notifications inbound. Earlier research established that codex's own MCP server mode does NOT support
notifications/*for push. So the path for unsolicited mid-session A2A messages is NOT "codex's MCP client receives notifications from our MCP server" — it's "molecule-runtime polls inbox viawait_for_message, and on each polled message firesturn/starton the existing thread." The "MCP native" framing here is satisfied not by codex receiving MCP push, but by the persistent thread + turn/start delivering the same UX (session continuity + queued or interrupted handling of new messages mid-thread).
Why this design satisfies "MCP native push parity"
User goal: every runtime delivers A2A inbox messages with the same
quality of experience as claude-code's MCP notifications/claude/channel.
claude-code path: MCP server pushes notification → claude-code SDK injects synthetic user turn into running session.
Codex path: molecule-runtime polls inbox (universal poll path) →
adapter fires turn/start on the existing app-server thread → codex
processes the message in-thread with full context. The "push" happens
at the molecule-runtime ↔ adapter boundary; the "native" part is that
codex's own session model handles it as an in-thread turn, not as a
fresh subprocess.
For mid-turn arrivals: the per-thread serialization (or opt-in interrupt) gives us behavior equivalent to OpenClaw's per-chat sequentializer. Equivalent UX to claude-code's mid-session notification injection in practice — one is a kernel-level interrupt, the other is a queue-then-dispatch, but the user-visible behavior ("the agent processes my message after the current turn finishes") is identical.
Sequencing
This is post-demo work. Order:
- Spec the executor lifecycle — pin down the open questions above (especially #1 streaming, #5 MCP passthrough, #6 thread persistence) before any code lands.
- Implement
AppServerProcesswith thorough unit tests against a mock stdio. This is the riskiest module (concurrency around request-id correlation + notification dispatch); land it first with high coverage. - Implement
CodexAppServerExecutoron top. - Build the template repo skeleton (Dockerfile, config.yaml, start.sh, README) once the Python side runs locally.
- Add codex to
manifest.jsonand the runtime registry. - End-to-end verify per
feedback_close_on_user_visible_not_merge— boot a real workspace, send A2A messages, observe streamed responses + thread continuity + queued mid-turn handling.
Estimated total: 3-5 engineering days for v1, plus E2E hardening.