Three production bugs caught by the codex agent live-testing the daemon
end-to-end against codex-cli 0.128 + a real LaunchAgent install:
1. Codex CLI 0.6+ moved `--resume` from a flag on `exec` to a
`resume` subcommand. The daemon was sending
`codex exec --skip-git-repo-check --resume <sid> <prompt>`, which
parses on 0.5.x but fails on 0.6.x+. Fixed to:
fresh: codex exec --skip-git-repo-check <prompt>
resume: codex exec resume --skip-git-repo-check <sid> <prompt>
Verified on codex-cli 0.128 with a live binary; the 0.5.x behavior
is preserved by the fake-codex test fixture, which now SystemExits
if it sees the legacy `--resume` flag (regression gate).
2. wait_for_message() never returned anything because nothing in the
daemon ever called molecule_runtime.inbox.activate() or started
the poller thread. The wheel ships those primitives but expects
the embedding runtime to wire them up — the workspace runtime
does this in start.py, but a standalone daemon embedding the
tools must do it itself. Added the activation + poller-thread
start in _RealTools.__init__.
3. The codex binary is a `#!/usr/bin/env node` shim. Under launchd /
stripped systemd units, the parent process PATH is `/usr/bin:/bin`
and `env node` 127s out silently. CodexRunner now prepends the
directory of the resolved codex binary to the subprocess PATH at
spawn time — Node lives next to codex under nvm / brew /
pnpm-global, so this restores the discovery without operators
having to thread PATH through their LaunchAgent / unit file.
README updated with a note on the launchd/systemd interaction.
Test:
- 31 passed (was 28). Three new regression gates: subcommand-shape,
PATH-prepend (launchd-default PATH stripped), and PATH-no-double
(idempotent when codex_bin_dir already present). Verified the two
behavioural new tests FAIL on the old codex_runner.py by stashing
the source change only and re-running.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
180 lines
6.5 KiB
Python
180 lines
6.5 KiB
Python
"""Spawn ``codex exec`` per inbound message and capture the reply.
|
|
|
|
``codex exec`` is codex's non-interactive mode — runs one turn and
|
|
prints the assistant text to stdout. ``--resume <session-id>`` continues
|
|
an existing session if one is given, so chat threads survive across
|
|
multiple inbound messages instead of starting fresh every time.
|
|
|
|
The session id mapping (chat_id → codex session_id) is persisted by
|
|
``bridge.py`` — this module only handles spawning a single invocation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Codex's first turn in a fresh session writes a session_id to stderr
|
|
# (``session: <uuid>``). The bridge captures it from CodexResult.session_id
|
|
# and stores it for the next inbound on the same chat_id.
|
|
_DEFAULT_TIMEOUT_SECS = 600.0 # 10 minutes per turn — same budget as platform A2A
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class CodexResult:
|
|
"""Outcome of one ``codex exec`` invocation."""
|
|
|
|
text: str
|
|
"""Codex's assistant reply, captured from stdout."""
|
|
|
|
session_id: Optional[str]
|
|
"""The session_id codex used. Same as the input session_id when
|
|
resuming; a freshly-issued uuid when starting a new session. None on
|
|
invocation failure (no session was created)."""
|
|
|
|
exit_code: int
|
|
"""Process exit code. 0 on success."""
|
|
|
|
stderr_tail: str
|
|
"""Last ~2 KB of stderr — useful for surfacing codex errors back to
|
|
the caller without dumping the full log."""
|
|
|
|
|
|
class CodexRunner:
|
|
"""Wraps ``codex exec`` invocations with session continuity.
|
|
|
|
Stateless — the bridge owns the chat_id → session_id map and passes
|
|
the right session_id in each call.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
codex_bin: str = "codex",
|
|
timeout_secs: float = _DEFAULT_TIMEOUT_SECS,
|
|
) -> None:
|
|
# Resolve the codex binary once at construction so a missing
|
|
# install fails fast at daemon start, not on the first inbound.
|
|
resolved = shutil.which(codex_bin)
|
|
if resolved is None:
|
|
raise FileNotFoundError(
|
|
f"codex binary not found on PATH (looked for {codex_bin!r}). "
|
|
f"Install with `npm install -g @openai/codex` or pass "
|
|
f"--codex-bin /path/to/codex."
|
|
)
|
|
self._codex_bin = resolved
|
|
self._timeout_secs = timeout_secs
|
|
# The codex binary is a `#!/usr/bin/env node` shim, so spawning
|
|
# it requires `node` to be discoverable via the subprocess PATH.
|
|
# Under a shell launch this is fine (operator's interactive PATH
|
|
# has Node), but launchd and most systemd unit defaults strip
|
|
# PATH down to /usr/bin:/bin — `env node` then 127s out and the
|
|
# daemon silently fails on every turn.
|
|
#
|
|
# Fix: remember the directory containing the resolved codex
|
|
# binary. Node lives next to it under nvm / brew / pnpm
|
|
# global installs. Subprocesses get that dir prepended to their
|
|
# PATH (see _build_env). Operators don't have to thread PATH
|
|
# through their LaunchAgent / unit file — just $WORKSPACE_ID,
|
|
# $PLATFORM_URL, $MOLECULE_WORKSPACE_TOKEN per the install
|
|
# contract.
|
|
self._codex_bin_dir = os.path.dirname(resolved)
|
|
|
|
def _build_env(self) -> dict[str, str]:
|
|
env = os.environ.copy()
|
|
existing_path = env.get("PATH", "")
|
|
if self._codex_bin_dir not in existing_path.split(os.pathsep):
|
|
env["PATH"] = (
|
|
self._codex_bin_dir
|
|
+ (os.pathsep + existing_path if existing_path else "")
|
|
)
|
|
return env
|
|
|
|
async def run(
|
|
self,
|
|
message: str,
|
|
session_id: Optional[str] = None,
|
|
) -> CodexResult:
|
|
"""Run one ``codex exec`` turn with *message* and return the reply.
|
|
|
|
When *session_id* is given, ``--resume <session_id>`` continues
|
|
that session. When None, codex starts a fresh session and assigns
|
|
a new id (returned in CodexResult.session_id).
|
|
"""
|
|
if session_id:
|
|
args = [
|
|
self._codex_bin,
|
|
"exec",
|
|
"resume",
|
|
"--skip-git-repo-check",
|
|
session_id,
|
|
message,
|
|
]
|
|
else:
|
|
args = [self._codex_bin, "exec", "--skip-git-repo-check", message]
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*args,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
env=self._build_env(),
|
|
)
|
|
|
|
try:
|
|
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
|
proc.communicate(), timeout=self._timeout_secs
|
|
)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
await proc.wait()
|
|
return CodexResult(
|
|
text=f"(codex exec timed out after {self._timeout_secs:.0f}s)",
|
|
session_id=session_id,
|
|
exit_code=-1,
|
|
stderr_tail="timeout",
|
|
)
|
|
|
|
text = stdout_bytes.decode(errors="replace").strip()
|
|
stderr = stderr_bytes.decode(errors="replace")
|
|
# Cap stderr to keep CodexResult shallow — full stderr is in
|
|
# codex's own log files for postmortems.
|
|
stderr_tail = stderr[-2048:] if len(stderr) > 2048 else stderr
|
|
|
|
# Best-effort session_id capture. Codex prints a banner like
|
|
# ``session: 0123abcd-...`` to stderr at session start. When
|
|
# we resumed, session_id is unchanged.
|
|
new_session_id = session_id or _extract_session_id(stderr)
|
|
|
|
return CodexResult(
|
|
text=text,
|
|
session_id=new_session_id,
|
|
exit_code=proc.returncode if proc.returncode is not None else -1,
|
|
stderr_tail=stderr_tail,
|
|
)
|
|
|
|
|
|
def _extract_session_id(stderr: str) -> Optional[str]:
|
|
"""Pull a session uuid out of codex's stderr banner.
|
|
|
|
Codex's exec banner format is fragile across versions, so we accept
|
|
any of the common shapes and return None if nothing matches. The
|
|
daemon falls back to "no session id" gracefully — that just means
|
|
the next turn starts a fresh session, costing context continuity
|
|
but not breaking the message flow.
|
|
"""
|
|
import re
|
|
|
|
# Two known shapes: "session: <uuid>" and "session_id=<uuid>".
|
|
# Both are stderr-only.
|
|
pattern = re.compile(
|
|
r"session(?:[_ ]id)?[=:\s]+([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})",
|
|
re.IGNORECASE,
|
|
)
|
|
m = pattern.search(stderr)
|
|
return m.group(1) if m else None
|