From d6eb78dcee974846e4fcb6d578581897574ad896 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 18:09:09 -0700 Subject: [PATCH] feat: initial bridge daemon MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit codex-channel-molecule is the codex-side counterpart to hermes-channel-molecule. It long-polls the molecule platform inbox via molecule_runtime.a2a_tools.tool_wait_for_message, runs `codex exec --resume ` per inbound message, captures the assistant reply from stdout, and routes it back through send_message_to_user (canvas chat) or delegate_task (peer agent), then acks the inbox row. Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to disk so daemon restarts keep conversation context. Reply-routing failures skip the inbox_pop ack so the platform's at-least-once delivery re-surfaces the row on the next poll. This daemon is the operator-unblock until openai/codex#17543 lands — once codex itself accepts MCP custom notifications as Op::UserInput through the wired-in MCP server, this daemon becomes redundant. The README's deprecation-path section calls that out so future operators know when to switch off. Tests cover the dispatch loop with fake tools (8 tests asserting exact contracts: canvas vs peer routing, session continuity, persistence across restarts, timeout sentinel handling, at-least-once on reply failure, exit-code surfacing, A2A multipart text). The codex_runner tests are real-subprocess (fake codex script spawned via asyncio.create_subprocess_exec) so the boot path matches production — no in-process mocking of the spawn boundary. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci.yml | 28 +++ .gitignore | 6 + README.md | 75 ++++++- codex_channel_molecule/__init__.py | 29 +++ codex_channel_molecule/bridge.py | 282 +++++++++++++++++++++++ codex_channel_molecule/codex_runner.py | 147 ++++++++++++ codex_channel_molecule/daemon.py | 118 ++++++++++ pyproject.toml | 36 +++ tests/__init__.py | 0 tests/test_bridge.py | 299 +++++++++++++++++++++++++ tests/test_codex_runner.py | 138 ++++++++++++ 11 files changed, 1157 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci.yml create mode 100644 .gitignore create mode 100644 codex_channel_molecule/__init__.py create mode 100644 codex_channel_molecule/bridge.py create mode 100644 codex_channel_molecule/codex_runner.py create mode 100644 codex_channel_molecule/daemon.py create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/test_bridge.py create mode 100644 tests/test_codex_runner.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..7ab8b9c --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,28 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install + run: | + python -m pip install --upgrade pip + pip install -e ".[test]" + - name: Run tests + run: pytest -q diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8e7a4f6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +.pytest_cache/ +.venv/ diff --git a/README.md b/README.md index e7502d7..9b9aef4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,75 @@ # codex-channel-molecule -Bridge daemon — long-polls the molecule platform inbox, runs codex exec --resume per inbound message, replies via send_message_to_user MCP tool. Counterpart to hermes-channel-molecule for codex CLI. + +Bridge daemon — gives [codex CLI](https://github.com/openai/codex) push parity with the [Molecule AI](https://moleculesai.com) platform's other external runtimes. + +The Molecule platform's [`hermes-channel-molecule`](https://github.com/Molecule-AI/hermes-channel-molecule) plugin gives `hermes-agent` true push delivery — peer agents and canvas-user messages land mid-session as conversation turns. Codex CLI has no plugin API today and its MCP runtime drops inbound notifications, so this daemon is the equivalent push surface — built outside the codex process. + +## How it works + +``` +canvas user / peer agent ──► molecule platform inbox + │ + wait_for_message (long-poll) + │ + ▼ + codex-channel-molecule daemon + │ + codex exec --resume "" + │ + capture stdout + │ + send_message_to_user / delegate_task + │ + inbox_pop(activity_id) + │ + ▼ + canvas chat / peer workspace +``` + +Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to `~/.codex-channel-molecule/sessions.json` so daemon restarts don't lose conversation context. + +## When to use this vs. the codex tab in the External Connect modal + +The codex tab wires the molecule MCP server into `~/.codex/config.toml` so codex can call platform tools (`list_peers`, `delegate_task`, `send_message_to_user`, `commit_memory`, etc.). That's outbound — codex calls out to the platform. + +This daemon is the inbound counterpart — the platform pushes to codex. Run both for full bidirectional integration. + +## Install + +```sh +npm install -g @openai/codex@^0.57 +pip install codex-channel-molecule +``` + +## Configure + run + +The same env-var contract as `hermes-channel-molecule`'s outbound MCP path (`WORKSPACE_ID`, `PLATFORM_URL`, `MOLECULE_WORKSPACE_TOKEN`): + +```sh +export WORKSPACE_ID= +export PLATFORM_URL=https://.moleculesai.app +export MOLECULE_WORKSPACE_TOKEN= + +codex-channel-molecule +``` + +The daemon runs in the foreground; logs go to stderr. For systemd hosts, register a unit; for one-off use, `nohup ... &` plus a log file works. + +## Deprecation path + +When [`openai/codex#17543`](https://github.com/openai/codex/issues/17543) lands upstream — a generic path for handling MCP custom notifications in codex and forwarding them into the active session as user submissions — this daemon becomes redundant. Codex itself will accept inbound molecule messages as `Op::UserInput` directly through the MCP server already wired in `~/.codex/config.toml`. Until then, this is the operator-facing answer. + +## Development + +```sh +git clone https://github.com/Molecule-AI/codex-channel-molecule +cd codex-channel-molecule +pip install -e ".[test]" +pytest -q +``` + +Tests are entirely real-subprocess (no mocking the spawn boundary) so the boot path is covered the same way the daemon runs in production. + +## License + +Apache-2.0 diff --git a/codex_channel_molecule/__init__.py b/codex_channel_molecule/__init__.py new file mode 100644 index 0000000..5422bbc --- /dev/null +++ b/codex_channel_molecule/__init__.py @@ -0,0 +1,29 @@ +"""codex-channel-molecule — bridge daemon for codex CLI ↔ Molecule platform. + +The platform's ``hermes-channel-molecule`` plugin gives hermes-agent push +parity (peer messages and canvas chat arrive mid-session as conversation +turns) by hooking the hermes platform-plugin API. Codex CLI has no +plugin API today and its MCP runtime drops inbound notifications, so +this daemon is the equivalent push surface — built outside the codex +process. + +Loop: + 1. Long-poll ``molecule_runtime.a2a_tools.tool_wait_for_message`` + 2. On a real message, run ``codex exec --resume ""`` + 3. Capture codex's stdout, route back via ``tool_send_message_to_user`` + (canvas) or ``tool_delegate_task`` (peer) + 4. Ack with ``tool_inbox_pop(activity_id)`` + +Session continuity is per chat_id: the daemon keeps a small JSON map of +``chat_id → codex session_id`` so each peer or canvas-user thread +resumes its own codex session across messages. + +When `openai/codex#17543` lands upstream, this daemon becomes +redundant — codex itself will accept MCP notifications as +``Op::UserInput``. Until then, this is the operator-facing answer. +""" + +from .bridge import run_bridge +from .codex_runner import CodexRunner, CodexResult + +__all__ = ["run_bridge", "CodexRunner", "CodexResult"] diff --git a/codex_channel_molecule/bridge.py b/codex_channel_molecule/bridge.py new file mode 100644 index 0000000..6e70a51 --- /dev/null +++ b/codex_channel_molecule/bridge.py @@ -0,0 +1,282 @@ +"""Inbox poll → codex turn → reply route → ack loop. + +This is the only loop in the daemon. It calls into the molecule_runtime +tool implementations directly (no MCP-server subprocess in the middle — +this daemon IS the codex-side caller). + +Session continuity: each chat thread (one peer workspace, or one +canvas-user thread) gets its own codex session_id. The mapping is +persisted to ``$CODEX_CHANNEL_MOLECULE_STATE_DIR/sessions.json`` (default +``~/.codex-channel-molecule/sessions.json``) so daemon restarts don't +lose conversation context. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, Optional, Protocol + +from .codex_runner import CodexRunner, CodexResult + +logger = logging.getLogger(__name__) + +# Long-poll budget. molecule_runtime's wait_for_message caps at 300s +# server-side; pick something below that so we don't hit the platform's +# read timeout. 60s is the same default the MCP tool advertises. +_LONG_POLL_SECS = 60.0 + + +class _Tools(Protocol): + """Minimal interface the bridge needs from molecule_runtime. + + Defined as a Protocol so tests can supply a fake without touching + real platform endpoints. + """ + + async def wait_for_message(self, timeout_secs: float) -> str: ... + async def inbox_pop(self, activity_id: str) -> str: ... + async def send_message_to_user( + self, message: str, workspace_id: Optional[str] + ) -> str: ... + async def delegate_task( + self, workspace_id: str, task: str, source_workspace_id: Optional[str] + ) -> str: ... + + +class _RealTools: + """Production implementation — calls into the installed + molecule-ai-workspace-runtime wheel. + + Imported lazily so test runs don't require the wheel. + """ + + async def wait_for_message(self, timeout_secs: float) -> str: + from molecule_runtime.a2a_tools import tool_wait_for_message + + return await tool_wait_for_message(timeout_secs=timeout_secs) + + async def inbox_pop(self, activity_id: str) -> str: + from molecule_runtime.a2a_tools import tool_inbox_pop + + return await tool_inbox_pop(activity_id=activity_id) + + async def send_message_to_user( + self, message: str, workspace_id: Optional[str] + ) -> str: + from molecule_runtime.a2a_tools import tool_send_message_to_user + + return await tool_send_message_to_user( + message=message, workspace_id=workspace_id + ) + + async def delegate_task( + self, workspace_id: str, task: str, source_workspace_id: Optional[str] + ) -> str: + from molecule_runtime.a2a_tools import tool_delegate_task + + return await tool_delegate_task( + workspace_id=workspace_id, + task=task, + source_workspace_id=source_workspace_id, + ) + + +class _SessionStore: + """chat_id → codex session_id, persisted to disk. + + Atomic write via tmp-then-rename so a crash mid-write doesn't + corrupt the map. + """ + + def __init__(self, path: Path) -> None: + self._path = path + self._sessions: Dict[str, str] = {} + if path.exists(): + try: + self._sessions = json.loads(path.read_text()) + except (json.JSONDecodeError, OSError) as exc: + logger.warning( + "session store at %s is unreadable (%s) — starting empty", + path, exc, + ) + + def get(self, chat_id: str) -> Optional[str]: + return self._sessions.get(chat_id) + + def set(self, chat_id: str, session_id: str) -> None: + if self._sessions.get(chat_id) == session_id: + return + self._sessions[chat_id] = session_id + self._flush() + + def _flush(self) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + tmp = self._path.with_suffix(self._path.suffix + ".tmp") + tmp.write_text(json.dumps(self._sessions, indent=2, sort_keys=True)) + tmp.replace(self._path) + + +def _state_dir() -> Path: + explicit = os.environ.get("CODEX_CHANNEL_MOLECULE_STATE_DIR", "").strip() + if explicit: + return Path(explicit) + return Path.home() / ".codex-channel-molecule" + + +def _chat_id_for(message: Dict[str, Any]) -> str: + """Stable per-thread key for session continuity. + + canvas_user → ``canvas:`` (one session per workspace's + canvas chat — same as hermes-channel's encoding). + peer_agent → ``peer:`` (one session per peer). + + Fallback ``unknown:`` for malformed messages — gives + the message its own one-off session rather than poisoning shared + state. + """ + kind = message.get("kind", "") + if kind == "canvas_user": + ws = message.get("arrival_workspace_id") or message.get("workspace_id") or "" + return f"canvas:{ws}" + if kind == "peer_agent": + peer = message.get("peer_id", "") + return f"peer:{peer}" + return f"unknown:{message.get('activity_id', '')}" + + +def _extract_inbox_message(payload: str) -> Optional[Dict[str, Any]]: + """Decode the wait_for_message JSON-string return value. + + Returns None for the timeout sentinel ``{"timeout": true}`` and any + payload that doesn't look like an inbox row. + """ + try: + data = json.loads(payload) + except (json.JSONDecodeError, TypeError): + return None + if not isinstance(data, dict): + return None + if data.get("timeout"): + return None + if "activity_id" not in data: + return None + return data + + +async def _route_reply( + tools: _Tools, + message: Dict[str, Any], + reply_text: str, +) -> None: + """Send codex's output back to the right destination.""" + kind = message.get("kind", "") + if kind == "canvas_user": + ws = message.get("arrival_workspace_id") or message.get("workspace_id") + await tools.send_message_to_user(message=reply_text, workspace_id=ws) + elif kind == "peer_agent": + peer = message.get("peer_id", "") + if not peer: + logger.warning("peer_agent message lacks peer_id, dropping reply") + return + await tools.delegate_task( + workspace_id=peer, + task=reply_text, + source_workspace_id=message.get("arrival_workspace_id"), + ) + else: + logger.warning("unknown message kind %r, dropping reply", kind) + + +async def run_bridge( + *, + runner: CodexRunner, + tools: Optional[_Tools] = None, + session_store: Optional[_SessionStore] = None, + iterations: Optional[int] = None, +) -> None: + """Main bridge loop. + + *iterations*: when set (test path), break after N inbound-or-timeout + cycles. Default None means run forever. + """ + tools = tools or _RealTools() + session_store = session_store or _SessionStore(_state_dir() / "sessions.json") + + cycle = 0 + while iterations is None or cycle < iterations: + cycle += 1 + try: + payload = await tools.wait_for_message(timeout_secs=_LONG_POLL_SECS) + except Exception as exc: + logger.warning("wait_for_message raised: %s", exc) + await asyncio.sleep(1.0) + continue + + message = _extract_inbox_message(payload) + if message is None: + continue + + await _handle_one(message, runner=runner, tools=tools, store=session_store) + + +async def _handle_one( + message: Dict[str, Any], + *, + runner: CodexRunner, + tools: _Tools, + store: _SessionStore, +) -> None: + """Process a single inbound message, then ack.""" + activity_id = message.get("activity_id", "") + chat_id = _chat_id_for(message) + session_id = store.get(chat_id) + body = _message_body(message) + + logger.info( + "inbound %s (chat=%s session=%s) — %d chars", + activity_id, chat_id, session_id or "new", len(body), + ) + + result: CodexResult = await runner.run(message=body, session_id=session_id) + if result.session_id and result.session_id != session_id: + store.set(chat_id, result.session_id) + + reply = result.text or "(codex returned empty output)" + if result.exit_code != 0: + reply = f"{reply}\n\n(codex exec failed: exit={result.exit_code})" + + try: + await _route_reply(tools, message, reply) + except Exception as exc: + # Don't ack if we couldn't deliver — let the next poll cycle + # retry. Inbox is at-least-once delivery; the platform will + # re-surface the activity. + logger.error("reply routing failed for %s: %s", activity_id, exc) + return + + try: + await tools.inbox_pop(activity_id=activity_id) + except Exception as exc: + logger.warning("inbox_pop(%s) failed: %s", activity_id, exc) + + +def _message_body(message: Dict[str, Any]) -> str: + """Extract the inbound text. Format mirrors the MCP wait_for_message + response: a 'text' field at top level, OR a 'parts' list of dicts + with 'text' fields (A2A multipart shape).""" + text = message.get("text") + if isinstance(text, str) and text: + return text + parts = message.get("parts") + if isinstance(parts, list): + chunks = [] + for p in parts: + if isinstance(p, dict) and isinstance(p.get("text"), str): + chunks.append(p["text"]) + if chunks: + return "\n".join(chunks) + return "" diff --git a/codex_channel_molecule/codex_runner.py b/codex_channel_molecule/codex_runner.py new file mode 100644 index 0000000..963bea1 --- /dev/null +++ b/codex_channel_molecule/codex_runner.py @@ -0,0 +1,147 @@ +"""Spawn ``codex exec`` per inbound message and capture the reply. + +``codex exec`` is codex's non-interactive mode — runs one turn and +prints the assistant text to stdout. ``--resume `` continues +an existing session if one is given, so chat threads survive across +multiple inbound messages instead of starting fresh every time. + +The session id mapping (chat_id → codex session_id) is persisted by +``bridge.py`` — this module only handles spawning a single invocation. +""" + +from __future__ import annotations + +import asyncio +import dataclasses +import logging +import os +import shutil +from typing import Optional + +logger = logging.getLogger(__name__) + +# Codex's first turn in a fresh session writes a session_id to stderr +# (``session: ``). The bridge captures it from CodexResult.session_id +# and stores it for the next inbound on the same chat_id. +_DEFAULT_TIMEOUT_SECS = 600.0 # 10 minutes per turn — same budget as platform A2A + + +@dataclasses.dataclass(frozen=True) +class CodexResult: + """Outcome of one ``codex exec`` invocation.""" + + text: str + """Codex's assistant reply, captured from stdout.""" + + session_id: Optional[str] + """The session_id codex used. Same as the input session_id when + resuming; a freshly-issued uuid when starting a new session. None on + invocation failure (no session was created).""" + + exit_code: int + """Process exit code. 0 on success.""" + + stderr_tail: str + """Last ~2 KB of stderr — useful for surfacing codex errors back to + the caller without dumping the full log.""" + + +class CodexRunner: + """Wraps ``codex exec`` invocations with session continuity. + + Stateless — the bridge owns the chat_id → session_id map and passes + the right session_id in each call. + """ + + def __init__( + self, + codex_bin: str = "codex", + timeout_secs: float = _DEFAULT_TIMEOUT_SECS, + ) -> None: + # Resolve the codex binary once at construction so a missing + # install fails fast at daemon start, not on the first inbound. + resolved = shutil.which(codex_bin) + if resolved is None: + raise FileNotFoundError( + f"codex binary not found on PATH (looked for {codex_bin!r}). " + f"Install with `npm install -g @openai/codex` or pass " + f"--codex-bin /path/to/codex." + ) + self._codex_bin = resolved + self._timeout_secs = timeout_secs + + async def run( + self, + message: str, + session_id: Optional[str] = None, + ) -> CodexResult: + """Run one ``codex exec`` turn with *message* and return the reply. + + When *session_id* is given, ``--resume `` continues + that session. When None, codex starts a fresh session and assigns + a new id (returned in CodexResult.session_id). + """ + args: list[str] = [self._codex_bin, "exec", "--skip-git-repo-check"] + if session_id: + args.extend(["--resume", session_id]) + args.append(message) + + proc = await asyncio.create_subprocess_exec( + *args, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=os.environ.copy(), + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), timeout=self._timeout_secs + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + return CodexResult( + text=f"(codex exec timed out after {self._timeout_secs:.0f}s)", + session_id=session_id, + exit_code=-1, + stderr_tail="timeout", + ) + + text = stdout_bytes.decode(errors="replace").strip() + stderr = stderr_bytes.decode(errors="replace") + # Cap stderr to keep CodexResult shallow — full stderr is in + # codex's own log files for postmortems. + stderr_tail = stderr[-2048:] if len(stderr) > 2048 else stderr + + # Best-effort session_id capture. Codex prints a banner like + # ``session: 0123abcd-...`` to stderr at session start. When + # we resumed, session_id is unchanged. + new_session_id = session_id or _extract_session_id(stderr) + + return CodexResult( + text=text, + session_id=new_session_id, + exit_code=proc.returncode if proc.returncode is not None else -1, + stderr_tail=stderr_tail, + ) + + +def _extract_session_id(stderr: str) -> Optional[str]: + """Pull a session uuid out of codex's stderr banner. + + Codex's exec banner format is fragile across versions, so we accept + any of the common shapes and return None if nothing matches. The + daemon falls back to "no session id" gracefully — that just means + the next turn starts a fresh session, costing context continuity + but not breaking the message flow. + """ + import re + + # Two known shapes: "session: " and "session_id=". + # Both are stderr-only. + pattern = re.compile( + r"session(?:[_ ]id)?[=:\s]+([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})", + re.IGNORECASE, + ) + m = pattern.search(stderr) + return m.group(1) if m else None diff --git a/codex_channel_molecule/daemon.py b/codex_channel_molecule/daemon.py new file mode 100644 index 0000000..6f09fab --- /dev/null +++ b/codex_channel_molecule/daemon.py @@ -0,0 +1,118 @@ +"""Console-script entry point — ``codex-channel-molecule``. + +Validates the env-var contract (mirrors hermes-channel-molecule), wires +up signal handling, and runs the bridge loop until interrupted. +""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import signal +import sys + +from .bridge import run_bridge +from .codex_runner import CodexRunner + +_REQUIRED_ENV = ("WORKSPACE_ID", "PLATFORM_URL", "MOLECULE_WORKSPACE_TOKEN") + + +def _check_env() -> list[str]: + missing = [name for name in _REQUIRED_ENV if not os.environ.get(name, "").strip()] + return missing + + +def _parse_args(argv: list[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="codex-channel-molecule", + description=( + "Bridge daemon — long-polls the molecule platform inbox and " + "runs `codex exec --resume ` per inbound message. " + "Reply routes back via send_message_to_user (canvas) or " + "delegate_task (peer). Per-thread codex session continuity " + "is persisted to disk." + ), + ) + parser.add_argument( + "--codex-bin", + default="codex", + help="codex CLI binary name or absolute path (default: codex).", + ) + parser.add_argument( + "--turn-timeout-secs", + type=float, + default=600.0, + help="Per-turn ceiling on `codex exec` runtime (default: 600).", + ) + parser.add_argument( + "--log-level", + default="INFO", + choices=("DEBUG", "INFO", "WARNING", "ERROR"), + help="Logging level (default: INFO).", + ) + return parser.parse_args(argv) + + +async def _async_main(args: argparse.Namespace) -> int: + runner = CodexRunner( + codex_bin=args.codex_bin, + timeout_secs=args.turn_timeout_secs, + ) + + loop = asyncio.get_running_loop() + stop_event = asyncio.Event() + + def _request_stop() -> None: + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, _request_stop) + + bridge_task = asyncio.create_task(run_bridge(runner=runner)) + stop_task = asyncio.create_task(stop_event.wait()) + + done, pending = await asyncio.wait( + {bridge_task, stop_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + for t in pending: + try: + await t + except (asyncio.CancelledError, Exception): + pass + + return 0 + + +def main(argv: list[str] | None = None) -> int: + args = _parse_args(argv if argv is not None else sys.argv[1:]) + + logging.basicConfig( + level=args.log_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + missing = _check_env() + if missing: + sys.stderr.write( + "codex-channel-molecule: required env vars not set: " + + ", ".join(missing) + + "\n WORKSPACE_ID — UUID of the workspace this daemon represents\n" + + " PLATFORM_URL — https://.moleculesai.app\n" + + " MOLECULE_WORKSPACE_TOKEN — bearer token from the platform's\n" + + " External Connect modal for that workspace\n" + ) + return 2 + + try: + return asyncio.run(_async_main(args)) + except KeyboardInterrupt: + return 130 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8dc3c50 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "codex-channel-molecule" +version = "0.1.0" +description = "Bridge daemon for codex CLI ↔ Molecule platform — long-polls the platform inbox, runs `codex exec --resume ` per inbound message, replies via send_message_to_user MCP tool. Counterpart to hermes-channel-molecule." +readme = "README.md" +requires-python = ">=3.11" +license = { text = "Apache-2.0" } +authors = [{ name = "Molecule AI" }] +dependencies = [ + "molecule-ai-workspace-runtime>=0.1.110", +] + +[project.optional-dependencies] +test = [ + "pytest>=8", + "pytest-asyncio>=0.23", +] + +[project.scripts] +codex-channel-molecule = "codex_channel_molecule.daemon:main" + +[project.urls] +Homepage = "https://github.com/Molecule-AI/codex-channel-molecule" +Repository = "https://github.com/Molecule-AI/codex-channel-molecule" + +[tool.setuptools.packages.find] +include = ["codex_channel_molecule*"] +exclude = ["tests*"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_bridge.py b/tests/test_bridge.py new file mode 100644 index 0000000..9b76768 --- /dev/null +++ b/tests/test_bridge.py @@ -0,0 +1,299 @@ +"""Bridge-loop tests with fake tools + fake codex runner. + +The fakes capture every call so each test asserts exact contracts: +which kind of message reaches which reply tool, what activity_ids are +acked, how session_id continuity is maintained across turns. +""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import pytest + +from codex_channel_molecule.bridge import _SessionStore, run_bridge +from codex_channel_molecule.codex_runner import CodexResult + + +# ---------------------------------------------------------------------- +# Fakes +# ---------------------------------------------------------------------- + + +class FakeTools: + """Records every tool call. wait_for_message replays from a queue + seeded by the test.""" + + def __init__(self, inbox: List[Dict[str, Any]]) -> None: + self._inbox: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + for m in inbox: + self._inbox.put_nowait(m) + self.popped: List[str] = [] + self.canvas_replies: List[Tuple[str, Optional[str]]] = [] # (text, ws) + self.peer_replies: List[Tuple[str, str, Optional[str]]] = [] + # (workspace_id, task, source_workspace_id) + + async def wait_for_message(self, timeout_secs: float) -> str: + # Drain the queue immediately; a real implementation would block + # for timeout_secs. After exhaustion, return the timeout sentinel + # so the bridge keeps cycling without hanging. + try: + msg = self._inbox.get_nowait() + return json.dumps(msg) + except asyncio.QueueEmpty: + return json.dumps({"timeout": True, "timeout_secs": timeout_secs}) + + async def inbox_pop(self, activity_id: str) -> str: + self.popped.append(activity_id) + return json.dumps({"removed": True, "activity_id": activity_id}) + + async def send_message_to_user( + self, message: str, workspace_id: Optional[str] + ) -> str: + self.canvas_replies.append((message, workspace_id)) + return "ok" + + async def delegate_task( + self, workspace_id: str, task: str, source_workspace_id: Optional[str] + ) -> str: + self.peer_replies.append((workspace_id, task, source_workspace_id)) + return "ok" + + +class FakeRunner: + """Returns scripted CodexResults; records every call. Lets tests + pin session-continuity behavior without spawning real codex.""" + + def __init__(self, results: List[CodexResult]) -> None: + self._results = list(results) + self.calls: List[Tuple[str, Optional[str]]] = [] # (message, session_id) + + async def run( + self, message: str, session_id: Optional[str] = None + ) -> CodexResult: + self.calls.append((message, session_id)) + if not self._results: + return CodexResult( + text="(no scripted result)", + session_id=session_id, + exit_code=0, + stderr_tail="", + ) + return self._results.pop(0) + + +# ---------------------------------------------------------------------- +# Tests +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_canvas_user_message_is_dispatched_acked_and_replied(tmp_path): + """Canvas-user inbound → CodexRunner.run → send_message_to_user → + inbox_pop. Assert the full chain in one go.""" + inbox = [{ + "kind": "canvas_user", + "activity_id": "act-1", + "arrival_workspace_id": "ws-canvas", + "text": "hi can you help", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="sure, what's up", + session_id="sess-canvas-1", + exit_code=0, + stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert runner.calls == [("hi can you help", None)] + assert tools.canvas_replies == [("sure, what's up", "ws-canvas")] + assert tools.peer_replies == [] + assert tools.popped == ["act-1"] + assert store.get("canvas:ws-canvas") == "sess-canvas-1" + + +@pytest.mark.asyncio +async def test_peer_agent_message_routes_to_delegate_task(tmp_path): + inbox = [{ + "kind": "peer_agent", + "activity_id": "act-2", + "peer_id": "ws-peer", + "arrival_workspace_id": "ws-self", + "text": "what's your status", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="all good", session_id="sess-peer-1", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert tools.peer_replies == [("ws-peer", "all good", "ws-self")] + assert tools.canvas_replies == [] + assert tools.popped == ["act-2"] + assert store.get("peer:ws-peer") == "sess-peer-1" + + +@pytest.mark.asyncio +async def test_session_continuity_resumes_same_codex_session(tmp_path): + """Two messages on the same chat_id → second call resumes the + session_id captured from the first.""" + inbox = [ + {"kind": "canvas_user", "activity_id": "act-a", + "arrival_workspace_id": "ws-x", "text": "first"}, + {"kind": "canvas_user", "activity_id": "act-b", + "arrival_workspace_id": "ws-x", "text": "second"}, + ] + tools = FakeTools(inbox) + runner = FakeRunner([ + CodexResult(text="r1", session_id="sess-shared", exit_code=0, stderr_tail=""), + CodexResult(text="r2", session_id="sess-shared", exit_code=0, stderr_tail=""), + ]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=3) + + # First call: no session_id (new). Second call: resume sess-shared. + assert runner.calls == [("first", None), ("second", "sess-shared")] + assert tools.popped == ["act-a", "act-b"] + + +@pytest.mark.asyncio +async def test_session_store_persists_across_runs(tmp_path): + """Session map survives daemon restart — written to disk on each + update, re-read on the next instantiation.""" + state_file = tmp_path / "sessions.json" + inbox_one = [{ + "kind": "canvas_user", "activity_id": "act-1", + "arrival_workspace_id": "ws-resume", "text": "first ever", + }] + tools_one = FakeTools(inbox_one) + runner_one = FakeRunner([CodexResult( + text="hi", session_id="sess-persist", exit_code=0, stderr_tail="", + )]) + store_one = _SessionStore(state_file) + await run_bridge( + runner=runner_one, tools=tools_one, session_store=store_one, iterations=2, + ) + assert state_file.exists() + + # Simulate daemon restart — fresh store reads the same file. + inbox_two = [{ + "kind": "canvas_user", "activity_id": "act-2", + "arrival_workspace_id": "ws-resume", "text": "follow up", + }] + tools_two = FakeTools(inbox_two) + runner_two = FakeRunner([CodexResult( + text="ok", session_id="sess-persist", exit_code=0, stderr_tail="", + )]) + store_two = _SessionStore(state_file) + await run_bridge( + runner=runner_two, tools=tools_two, session_store=store_two, iterations=2, + ) + + # Second instance must have resumed the persisted session id. + assert runner_two.calls == [("follow up", "sess-persist")] + + +@pytest.mark.asyncio +async def test_timeout_sentinel_does_not_call_codex(tmp_path): + """An empty inbox returns a timeout sentinel — the bridge must + keep polling without spawning codex.""" + tools = FakeTools(inbox=[]) # queue empty → timeout sentinel + runner = FakeRunner(results=[]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=3) + + assert runner.calls == [] + assert tools.popped == [] + assert tools.canvas_replies == [] + assert tools.peer_replies == [] + + +@pytest.mark.asyncio +async def test_failed_reply_routing_skips_inbox_pop(tmp_path): + """If sending the reply fails, do NOT ack the inbox row — the + platform will re-deliver on the next poll. At-least-once semantics. + """ + inbox = [{ + "kind": "canvas_user", "activity_id": "act-err", + "arrival_workspace_id": "ws-x", "text": "msg", + }] + + class FlakyTools(FakeTools): + async def send_message_to_user(self, message, workspace_id): + raise RuntimeError("simulated 502 from platform") + + tools = FlakyTools(inbox) + runner = FakeRunner([CodexResult( + text="reply", session_id="sess", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert runner.calls == [("msg", None)] + # Must NOT have popped — at-least-once requires the unacked row to + # re-surface next poll. + assert tools.popped == [] + + +@pytest.mark.asyncio +async def test_nonzero_exit_code_surfaces_in_reply(tmp_path): + """Codex failure (e.g. timeout, crash) becomes a visible reply + instead of silently dropping. Operator sees the failure where the + answer was expected.""" + inbox = [{ + "kind": "canvas_user", "activity_id": "act-fail", + "arrival_workspace_id": "ws-x", "text": "ping", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="(codex exec timed out after 600s)", + session_id=None, + exit_code=-1, + stderr_tail="timeout", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert len(tools.canvas_replies) == 1 + text, _ = tools.canvas_replies[0] + assert "timed out" in text + assert "exit=-1" in text + # The row is acked — codex's verdict (success or failure) is + # delivered, so the inbox row is fully handled. + assert tools.popped == ["act-fail"] + + +@pytest.mark.asyncio +async def test_a2a_multipart_text_is_concatenated(tmp_path): + """A2A messages can arrive as ``parts: [{type: text, text: ...}, ...]`` + instead of a flat ``text`` field. Bridge concatenates parts into a + single codex prompt.""" + inbox = [{ + "kind": "peer_agent", "activity_id": "act-p", "peer_id": "ws-peer", + "parts": [ + {"type": "text", "text": "first chunk"}, + {"type": "text", "text": "second chunk"}, + ], + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="ack", session_id="s", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + msg, _ = runner.calls[0] + assert msg == "first chunk\nsecond chunk" diff --git a/tests/test_codex_runner.py b/tests/test_codex_runner.py new file mode 100644 index 0000000..6ad48df --- /dev/null +++ b/tests/test_codex_runner.py @@ -0,0 +1,138 @@ +"""Real-subprocess test for CodexRunner — boot path coverage. + +In-process mocking would miss subprocess-level bugs (env handling, +arg passthrough, stderr capture, signal/timeout). The fake script is a +real Python program spawned via asyncio.create_subprocess_exec, exactly +as a real codex install would be. +""" + +from __future__ import annotations + +import os +import textwrap +from pathlib import Path + +import pytest + +from codex_channel_molecule.codex_runner import ( + CodexRunner, + _extract_session_id, +) + + +_FAKE_CODEX_SCRIPT = textwrap.dedent("""\ + #!/usr/bin/env python3 + \"\"\"Fake codex CLI for tests. + + Behaviors keyed on argv shape and env: + argv: codex exec [--skip-git-repo-check] [--resume ] + + Echoes a banner to stderr (\"session: \") and the input message + to stdout. Honors FAKE_EXIT_CODE for failure-path tests. + \"\"\" + import os, sys + + args = sys.argv[1:] + assert args[0] == \"exec\", f\"unexpected first arg: {args[0]!r}\" + + resume_id = None + i = 1 + while i < len(args): + if args[i] == \"--resume\": + resume_id = args[i + 1] + i += 2 + elif args[i].startswith(\"--\"): + i += 1 # skip unrecognized flag + else: + break + msg = args[i] if i < len(args) else \"\" + + sid = resume_id or os.environ.get(\"FAKE_NEW_SESSION_ID\", \"a1b2c3d4-1111-2222-3333-444455556666\") + sys.stderr.write(f\"session: {sid}\\n\") + sys.stderr.flush() + + sys.stdout.write(f\"echo: {msg}\\n\") + sys.stdout.flush() + + sys.exit(int(os.environ.get(\"FAKE_EXIT_CODE\", \"0\"))) +""") + + +@pytest.fixture +def fake_codex(tmp_path: Path) -> Path: + p = tmp_path / "codex" + p.write_text(_FAKE_CODEX_SCRIPT) + p.chmod(0o755) + return p + + +@pytest.mark.asyncio +async def test_run_returns_stdout_text(fake_codex): + runner = CodexRunner(codex_bin=str(fake_codex), timeout_secs=10.0) + result = await runner.run(message="hello world") + assert result.text == "echo: hello world" + assert result.exit_code == 0 + + +@pytest.mark.asyncio +async def test_run_extracts_new_session_id_from_stderr(fake_codex, monkeypatch): + runner = CodexRunner(codex_bin=str(fake_codex), timeout_secs=10.0) + monkeypatch.setenv("FAKE_NEW_SESSION_ID", "deadbeef-0000-1111-2222-333344445555") + result = await runner.run(message="any") + assert result.session_id == "deadbeef-0000-1111-2222-333344445555" + + +@pytest.mark.asyncio +async def test_run_resumes_existing_session(fake_codex): + runner = CodexRunner(codex_bin=str(fake_codex), timeout_secs=10.0) + given = "11111111-2222-3333-4444-555555555555" + result = await runner.run(message="follow up", session_id=given) + # Resume → no new session id is captured (input is echoed back as-is). + assert result.session_id == given + + +@pytest.mark.asyncio +async def test_run_surfaces_nonzero_exit_code(fake_codex, monkeypatch): + runner = CodexRunner(codex_bin=str(fake_codex), timeout_secs=10.0) + monkeypatch.setenv("FAKE_EXIT_CODE", "7") + result = await runner.run(message="any") + assert result.exit_code == 7 + + +@pytest.mark.asyncio +async def test_run_kills_subprocess_on_timeout(tmp_path): + """A codex turn that hangs past the timeout must be killed and + surfaced as a timeout result — not block the bridge forever.""" + sleeper = tmp_path / "codex" + sleeper.write_text(textwrap.dedent("""\ + #!/usr/bin/env python3 + import time + time.sleep(60) + """)) + sleeper.chmod(0o755) + + runner = CodexRunner(codex_bin=str(sleeper), timeout_secs=0.5) + result = await runner.run(message="hang please") + assert "timed out" in result.text + assert result.exit_code == -1 + + +def test_constructor_fails_fast_when_codex_missing(): + with pytest.raises(FileNotFoundError) as excinfo: + CodexRunner(codex_bin="/nonexistent/path/to/codex-binary-xyzzy") + assert "codex" in str(excinfo.value).lower() + + +def test_extract_session_id_matches_canonical_banner(): + assert _extract_session_id("session: a1b2c3d4-aaaa-bbbb-cccc-dddddddddddd\n") == \ + "a1b2c3d4-aaaa-bbbb-cccc-dddddddddddd" + + +def test_extract_session_id_matches_alternate_banner_shape(): + assert _extract_session_id( + "blah blah\nsession_id=12345678-aaaa-bbbb-cccc-dddddddddddd\nmore\n" + ) == "12345678-aaaa-bbbb-cccc-dddddddddddd" + + +def test_extract_session_id_returns_none_on_no_match(): + assert _extract_session_id("nothing relevant here") is None