codex-channel-molecule/codex_channel_molecule/codex_runner.py
Hongming Wang 0f194d4507 v0.1.2: codex CLI subcommand shape + inbox poller activation + launchd PATH
Three production bugs caught by the codex agent live-testing the daemon
end-to-end against codex-cli 0.128 + a real LaunchAgent install:

1. Codex CLI 0.6+ moved `--resume` from a flag on `exec` to a
   `resume` subcommand. The daemon was sending
   `codex exec --skip-git-repo-check --resume <sid> <prompt>`, which
   parses on 0.5.x but fails on 0.6.x+. Fixed to:
     fresh:  codex exec --skip-git-repo-check <prompt>
     resume: codex exec resume --skip-git-repo-check <sid> <prompt>
   Verified on codex-cli 0.128 with a live binary; the 0.5.x behavior
   is preserved by the fake-codex test fixture, which now SystemExits
   if it sees the legacy `--resume` flag (regression gate).

2. wait_for_message() never returned anything because nothing in the
   daemon ever called molecule_runtime.inbox.activate() or started
   the poller thread. The wheel ships those primitives but expects
   the embedding runtime to wire them up — the workspace runtime
   does this in start.py, but a standalone daemon embedding the
   tools must do it itself. Added the activation + poller-thread
   start in _RealTools.__init__.

3. The codex binary is a `#!/usr/bin/env node` shim. Under launchd /
   stripped systemd units, the parent process PATH is `/usr/bin:/bin`
   and `env node` 127s out silently. CodexRunner now prepends the
   directory of the resolved codex binary to the subprocess PATH at
   spawn time — Node lives next to codex under nvm / brew /
   pnpm-global, so this restores the discovery without operators
   having to thread PATH through their LaunchAgent / unit file.
   README updated with a note on the launchd/systemd interaction.

Test:
- 31 passed (was 28). Three new regression gates: subcommand-shape,
  PATH-prepend (launchd-default PATH stripped), and PATH-no-double
  (idempotent when codex_bin_dir already present). Verified the two
  behavioural new tests FAIL on the old codex_runner.py by stashing
  the source change only and re-running.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 21:21:44 -07:00

180 lines
6.5 KiB
Python

"""Spawn ``codex exec`` per inbound message and capture the reply.
``codex exec`` is codex's non-interactive mode — runs one turn and
prints the assistant text to stdout. ``--resume <session-id>`` continues
an existing session if one is given, so chat threads survive across
multiple inbound messages instead of starting fresh every time.
The session id mapping (chat_id → codex session_id) is persisted by
``bridge.py`` — this module only handles spawning a single invocation.
"""
from __future__ import annotations
import asyncio
import dataclasses
import logging
import os
import shutil
from typing import Optional
logger = logging.getLogger(__name__)
# Codex's first turn in a fresh session writes a session_id to stderr
# (``session: <uuid>``). The bridge captures it from CodexResult.session_id
# and stores it for the next inbound on the same chat_id.
_DEFAULT_TIMEOUT_SECS = 600.0 # 10 minutes per turn — same budget as platform A2A
@dataclasses.dataclass(frozen=True)
class CodexResult:
"""Outcome of one ``codex exec`` invocation."""
text: str
"""Codex's assistant reply, captured from stdout."""
session_id: Optional[str]
"""The session_id codex used. Same as the input session_id when
resuming; a freshly-issued uuid when starting a new session. None on
invocation failure (no session was created)."""
exit_code: int
"""Process exit code. 0 on success."""
stderr_tail: str
"""Last ~2 KB of stderr — useful for surfacing codex errors back to
the caller without dumping the full log."""
class CodexRunner:
"""Wraps ``codex exec`` invocations with session continuity.
Stateless — the bridge owns the chat_id → session_id map and passes
the right session_id in each call.
"""
def __init__(
self,
codex_bin: str = "codex",
timeout_secs: float = _DEFAULT_TIMEOUT_SECS,
) -> None:
# Resolve the codex binary once at construction so a missing
# install fails fast at daemon start, not on the first inbound.
resolved = shutil.which(codex_bin)
if resolved is None:
raise FileNotFoundError(
f"codex binary not found on PATH (looked for {codex_bin!r}). "
f"Install with `npm install -g @openai/codex` or pass "
f"--codex-bin /path/to/codex."
)
self._codex_bin = resolved
self._timeout_secs = timeout_secs
# The codex binary is a `#!/usr/bin/env node` shim, so spawning
# it requires `node` to be discoverable via the subprocess PATH.
# Under a shell launch this is fine (operator's interactive PATH
# has Node), but launchd and most systemd unit defaults strip
# PATH down to /usr/bin:/bin — `env node` then 127s out and the
# daemon silently fails on every turn.
#
# Fix: remember the directory containing the resolved codex
# binary. Node lives next to it under nvm / brew / pnpm
# global installs. Subprocesses get that dir prepended to their
# PATH (see _build_env). Operators don't have to thread PATH
# through their LaunchAgent / unit file — just $WORKSPACE_ID,
# $PLATFORM_URL, $MOLECULE_WORKSPACE_TOKEN per the install
# contract.
self._codex_bin_dir = os.path.dirname(resolved)
def _build_env(self) -> dict[str, str]:
env = os.environ.copy()
existing_path = env.get("PATH", "")
if self._codex_bin_dir not in existing_path.split(os.pathsep):
env["PATH"] = (
self._codex_bin_dir
+ (os.pathsep + existing_path if existing_path else "")
)
return env
async def run(
self,
message: str,
session_id: Optional[str] = None,
) -> CodexResult:
"""Run one ``codex exec`` turn with *message* and return the reply.
When *session_id* is given, ``--resume <session_id>`` continues
that session. When None, codex starts a fresh session and assigns
a new id (returned in CodexResult.session_id).
"""
if session_id:
args = [
self._codex_bin,
"exec",
"resume",
"--skip-git-repo-check",
session_id,
message,
]
else:
args = [self._codex_bin, "exec", "--skip-git-repo-check", message]
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=self._build_env(),
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=self._timeout_secs
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return CodexResult(
text=f"(codex exec timed out after {self._timeout_secs:.0f}s)",
session_id=session_id,
exit_code=-1,
stderr_tail="timeout",
)
text = stdout_bytes.decode(errors="replace").strip()
stderr = stderr_bytes.decode(errors="replace")
# Cap stderr to keep CodexResult shallow — full stderr is in
# codex's own log files for postmortems.
stderr_tail = stderr[-2048:] if len(stderr) > 2048 else stderr
# Best-effort session_id capture. Codex prints a banner like
# ``session: 0123abcd-...`` to stderr at session start. When
# we resumed, session_id is unchanged.
new_session_id = session_id or _extract_session_id(stderr)
return CodexResult(
text=text,
session_id=new_session_id,
exit_code=proc.returncode if proc.returncode is not None else -1,
stderr_tail=stderr_tail,
)
def _extract_session_id(stderr: str) -> Optional[str]:
"""Pull a session uuid out of codex's stderr banner.
Codex's exec banner format is fragile across versions, so we accept
any of the common shapes and return None if nothing matches. The
daemon falls back to "no session id" gracefully — that just means
the next turn starts a fresh session, costing context continuity
but not breaking the message flow.
"""
import re
# Two known shapes: "session: <uuid>" and "session_id=<uuid>".
# Both are stderr-only.
pattern = re.compile(
r"session(?:[_ ]id)?[=:\s]+([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})",
re.IGNORECASE,
)
m = pattern.search(stderr)
return m.group(1) if m else None