Files
molecule-ai-workspace-templ…/executor.py
core-be 5a7ef7354a
CI / Adapter unit tests (push) Successful in 1m16s
CI / Template validation (static) (push) Successful in 2m15s
CI / Adapter unit tests (pull_request) Successful in 1m12s
CI / Template validation (static) (pull_request) Successful in 1m44s
CI / Template validation (runtime) (push) Successful in 3m4s
CI / T4 tier-4 conformance (live) (push) Successful in 2m14s
CI / validate (push) Successful in 2s
CI / Template validation (runtime) (pull_request) Successful in 3m47s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 3m43s
CI / validate (pull_request) Successful in 2s
fix(executor): file-only messages no longer return opaque "(empty prompt — nothing to do)"
Phase 1 of the chloe-dong PDF-upload P0 (a1ea2200 archaeology). The
empty-prompt guard in execute() short-circuited the moment
extract_message_text() returned an empty string — even when the A2A
message carried attached files. Result: PDF-only / image-only / any
file-only turn fell through to the opaque "(empty prompt — nothing to
do)" reply with no signal to the user that the agent had received the
file at all.

Canary evidence: CTO ran a PDF-only message at 2026-05-20 01:04:27Z
local and got the opaque string. The actionable-failure-reason
principle (feedback_surface_actionable_failure_reason_to_user, set in
internal#211) requires the user be able to see WHY their turn produced
nothing.

Fix mirrors the claude-code reference (claude_sdk_executor.py:638-653,
already in production): use the existing extract_attached_files()
helper from molecule_runtime.executor_helpers to walk parts[*] for
kind="file" / v1 protobuf, and synthesize a manifest line into the
prompt so codex can act on the files via its own Read/Glob tools.

  - Text + files: prompt = text + manifest
  - File-only:    prompt = manifest (codex sees the file list)
  - Truly empty (no text AND no files): actionable user-facing reason
    ("Your message was empty. Please send text or a file with
    instructions.") instead of the opaque "(empty prompt — nothing
    to do)"

NOT in this PR (Phase 2, separate follow-up):
  - Actual file-content forwarding to codex via input parts /
    attachment flags. Phase 1 = relax the guard + give actionable
    feedback so file-only messages stop falling on the floor; Phase 2
    = per-runtime content-forwarding where supported.

The extract_attached_files() helper ships in
molecule-ai-workspace-runtime 0.1.1000 today (verified — published
2026-05-19 monorepo cascade). No monorepo change is needed for Phase 1.

Tests:
  - test_execute_file_only_no_longer_returns_opaque_empty pins the
    regression (file-only message → no opaque string + file name in
    the prompt that reaches the codex app-server).
  - test_execute_truly_empty_surfaces_actionable_reason pins the
    new actionable copy for empty-empty turns.
  - test_execute_text_only_still_passes_prompt_unchanged keeps the
    text-only happy path green.

Refs: a1ea2200, claude-code Phase 1 reference impl, RFC internal#211
2026-05-20 01:41:30 -07:00

529 lines
23 KiB
Python

"""A2A → codex app-server bridge.
Holds one persistent `codex app-server` child + one thread per
workspace session, dispatches each A2A message as a `turn/start` RPC
against the existing thread.
Design rationale lives in
``docs/integrations/codex-app-server-adapter-design.md`` (in
molecule-core). The short version:
- Persistent child gives us session continuity (the agent's
conversation history, tool state, and config persist across A2A
turns) without serializing through disk.
- Per-thread serialization (``_turn_lock``) gives us safe, ordered
handling of mid-turn arrivals — A2A peers see their messages
processed in arrival order, matching OpenClaw's per-chat
sequentializer behavior.
- Notification-driven response assembly: the executor accumulates
``agent_message_delta`` chunks and emits the final assembled text
on ``turn/completed``. Streaming forward is a future upgrade once
the molecule-runtime contract supports incremental events.
The riskiest module of this stack is ``app_server.AppServerProcess``
(the raw JSON-RPC client) — that has its own unit tests. This file
focuses on the protocol-level lifecycle: thread bootstrap, turn
dispatch, notification accumulation, error surface.
"""
from __future__ import annotations
import asyncio
import logging
import os
from dataclasses import dataclass, field
from typing import Any
from a2a.helpers import new_text_message
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from molecule_runtime.adapters.base import AdapterConfig
from molecule_runtime.executor_helpers import (
extract_attached_files,
extract_message_text,
)
from app_server import AppServerError, AppServerProcess
logger = logging.getLogger(__name__)
# Per-turn timeout. Codex turns can run minutes during heavy tool use
# (test runs, edits, web fetches). Tighter than infinite to bound
# debug-time hangs.
_TURN_TIMEOUT = 600.0
# Inactivity watchdog: cap the gap BETWEEN events from codex. A healthy
# turn emits frequent ``codex/event/*`` notifications (token deltas,
# tool I/O, reasoning markers) — minutes-long gaps are themselves
# evidence the channel is wedged, not work-in-progress. Smaller than
# ``_TURN_TIMEOUT`` so a stuck child surfaces an error promptly to the
# user instead of holding the lock for 10 minutes.
#
# Tuned from the production wedge:
# - Healthy fresh turn (gpt-5.5, no tool use): 2-3 s end-to-end.
# - Heavy tool-use turn: deltas every few seconds at most.
# - Wedged channel: zero events, zero rollout bytes for the full
# ``_TURN_TIMEOUT`` window. The watchdog catches that in 90 s
# instead of 600 s, and prints a diagnostic message.
_TURN_INACTIVITY_TIMEOUT = 90.0
# Bootstrap RPC timeouts. ``thread/start`` is an exchange that the
# initialised child should answer in well under a second; capping it
# means a child that wedges DURING initialise gets surfaced fast
# instead of stalling the executor's first turn for 10 minutes.
_INITIALIZE_TIMEOUT = 30.0
_THREAD_START_TIMEOUT = 30.0
@dataclass
class _TurnState:
"""Mutable state accumulated during one turn lifecycle.
Owned by the running ``_run_turn`` invocation; the notification
subscriber appends to it under ``_turn_lock``.
``activity`` is bumped on every notification the subscriber sees,
even ones we don't materially care about (debug-level events,
reasoning markers, tool I/O). It's the heartbeat the inactivity
watchdog reads — if the watchdog ticks and ``activity`` has not
advanced since the last tick, the channel is wedged and we surface
a diagnostic error.
"""
deltas: list[str] = field(default_factory=list)
completed: asyncio.Event = field(default_factory=asyncio.Event)
error: Exception | None = None
turn_id: str | None = None
activity: int = 0
class CodexAppServerExecutor(AgentExecutor):
"""A2A executor that proxies turns to a long-lived codex app-server."""
def __init__(self, config: AdapterConfig):
self._config = config
self._app_server: AppServerProcess | None = None
self._thread_id: str | None = None
# Serialize turns per thread. mid-turn A2A arrivals queue and
# run after the current turn completes — same shape OpenClaw's
# per-chat sequentializer uses.
self._turn_lock = asyncio.Lock()
# Tracked so cancel() can fire turn/interrupt against the
# currently-running turn (best-effort).
self._current_turn_id: str | None = None
# ------------------------------------------------------------------
# Bootstrap
# ------------------------------------------------------------------
async def _ensure_thread(self) -> str:
"""Lazy-init the app-server child + thread on first turn."""
if self._app_server is None:
env = {
# Codex picks up OPENAI_API_KEY from the environment.
# We pass through everything; container start.sh is
# responsible for ensuring the key is present.
**os.environ,
}
self._app_server = await AppServerProcess.start(env=env)
# Bounded handshake — a child wedged on initialize (rare but
# observed when stdio fights with a debug-attached pty)
# would otherwise stall the FIRST turn for the full
# _DEFAULT_REQUEST_TIMEOUT (10 minutes).
await asyncio.wait_for(
self._app_server.initialize(client_info={
"name": "molecule-runtime-codex",
"version": "0.1.0",
}),
timeout=_INITIALIZE_TIMEOUT,
)
logger.info("codex app-server child initialized")
if self._thread_id is None:
params: dict[str, Any] = {}
if self._config.model:
params["model"] = self._config.model
if self._config.system_prompt:
params["developerInstructions"] = self._config.system_prompt
# Workspace agents can't prompt a human, so approval policy
# must be `never`. Sandbox `workspace-write` lets the agent
# edit the workspace tree but not arbitrary disk.
params["approvalPolicy"] = "never"
params["sandboxPolicy"] = {"mode": "workspace-write"}
resp = await self._app_server.request(
"thread/start", params, timeout=_THREAD_START_TIMEOUT,
)
# Field name varies between the v2 JSON schema (threadId) and
# the running binary 0.72.x (id). Accept either — verified
# 2026-05-02 against codex-cli 0.72.0 which returns `id`.
thread = resp.get("thread") or {}
self._thread_id = thread.get("id") or thread.get("threadId")
if not self._thread_id:
raise RuntimeError(
f"thread/start did not return an id; got keys: {list(thread.keys())}"
)
logger.info("codex thread started: %s", self._thread_id)
return self._thread_id
# ------------------------------------------------------------------
# AgentExecutor contract
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
text = extract_message_text(context.message) or ""
# Phase 1 file-only message support (a1ea2200 archaeology — chloe-dong
# PDF-only canary 2026-05-20 01:04:27Z surfaced the opaque
# "(empty prompt — nothing to do)" reply). Mirror the claude-code
# reference impl (claude_sdk_executor.py:644-650): surface attached
# files to codex as a manifest in the prompt — codex reads files
# through its own tools by path. Phase 2 will wire actual
# file-content forwarding via codex's input parts.
attached = extract_attached_files(context.message)
if attached:
manifest = "\n\nAttached files:\n" + "\n".join(
f"- {f['name']} ({f['mime_type'] or 'unknown type'}) at {f['path']}"
for f in attached
)
text = (text + manifest) if text.strip() else manifest.lstrip()
if not text.strip():
# Truly empty — actionable per
# feedback_surface_actionable_failure_reason_to_user.
await event_queue.enqueue_event(
new_text_message(
"Your message was empty. Please send text or a file "
"with instructions."
)
)
return
prompt = text
# Push parity with claude-code: when a new message arrives while
# a turn is already in flight, inject it into the active turn
# via codex's `turn/steer` RPC instead of blocking on the lock
# for ~minutes until the prior turn finishes. This is the
# documented v2 codex app-server protocol primitive — see
# codex-rs/app-server/README.md§Steer-an-active-turn — and
# gives codex true mid-turn push semantics matching the
# `notifications/claude/channel` path Claude Code uses.
#
# The agent then sees the new prompt as additional input in the
# active turn's context. Per the molecule MCP server's
# instructions string, the agent replies via send_message_to_user
# (canvas) or delegate_task (peer) — the platform's reply path
# is tool-based, not the A2A response shape — so this execute()
# returning a placeholder is correct: the actual reply lands
# via the tool-call route, not through this event_queue.
if (
self._turn_lock.locked()
and self._app_server is not None
and self._thread_id is not None
and self._current_turn_id is not None
):
try:
await self._app_server.request(
"turn/steer",
{
"threadId": self._thread_id,
"input": [{"type": "text", "text": prompt}],
"expectedTurnId": self._current_turn_id,
},
timeout=5.0,
)
logger.info(
"codex push: steered into active turn %s",
self._current_turn_id,
)
# Status placeholder for the A2A response. The peer or
# canvas wrapper sees this; the agent's substantive
# reply comes via send_message_to_user / delegate_task
# MCP tool calls within the steered turn's response.
await event_queue.enqueue_event(
new_text_message(
"[steered into in-flight turn — agent will reply "
"via send_message_to_user / delegate_task]"
)
)
return
except (AppServerError, asyncio.TimeoutError) as exc:
# Steer failed — common causes:
# - ActiveTurnNotSteerable (review/manual-compact turn)
# - expectedTurnId mismatch (turn ended between our
# locked-check and the steer request)
# - app-server transport hiccup
# Fall through to the lock-and-wait path so the message
# still gets processed, just as a queued new turn.
logger.debug(
"codex turn/steer failed (%s) — falling through to new-turn path",
exc,
)
async with self._turn_lock:
try:
text = await self._run_turn(prompt)
except AppServerError as exc:
logger.warning("codex app-server error: %s", exc)
await event_queue.enqueue_event(
new_text_message(f"[codex error] {exc}")
)
return
except asyncio.TimeoutError:
logger.warning("codex turn timed out after %.0fs", _TURN_TIMEOUT)
await event_queue.enqueue_event(
new_text_message(
f"[codex turn timed out after {_TURN_TIMEOUT:.0f}s]"
)
)
return
except ConnectionError as exc:
logger.exception("codex app-server connection lost")
# On connection loss, drop our cached app-server +
# thread so the next turn starts fresh.
await self._reset_app_server()
await event_queue.enqueue_event(
new_text_message(f"[codex unreachable] {exc!s}")
)
return
except RuntimeError as exc:
# Surfaced from `state.error` in `_run_turn` — codex emitted
# an `error` notification (typically an upstream HTTP failure
# from the model provider, e.g. `unexpected status 401
# Unauthorized`). Wrapping with the same `[codex error]`
# prefix the AppServerError path uses keeps the canvas-side
# behavior consistent: a clear inline message instead of a
# bare JSON-RPC -32603 leak from the a2a-sdk top-level
# handler.
logger.warning("codex turn surfaced error: %s", exc)
await event_queue.enqueue_event(
new_text_message(f"[codex error] {exc}")
)
return
await event_queue.enqueue_event(new_text_message(text))
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Best-effort interrupt of the in-flight turn.
Race-prone (the turn may have completed between our last
poll and this call) but the app-server treats a stale
interrupt as a no-op, so we don't need to lock around it.
"""
if (
self._app_server is not None
and self._thread_id is not None
and self._current_turn_id is not None
):
try:
await self._app_server.request(
"turn/interrupt",
{"threadId": self._thread_id, "turnId": self._current_turn_id},
timeout=5.0,
)
except (AppServerError, asyncio.TimeoutError, ConnectionError) as exc:
logger.debug("turn/interrupt failed (expected if turn already done): %s", exc)
async def shutdown(self) -> None:
"""Tear down the app-server child cleanly. Idempotent."""
await self._reset_app_server()
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
async def _run_turn(self, prompt: str) -> str:
"""Fire turn/start, accumulate deltas, return assembled text.
Splits the AgentExecutor contract into a pure-data path so
unit tests can drive it without standing up an A2A
EventQueue.
"""
thread_id = await self._ensure_thread()
assert self._app_server is not None # set by _ensure_thread
state = _TurnState()
loop = asyncio.get_running_loop()
def on_notification(method: str, params: dict[str, Any]) -> None:
# Codex 0.72 wraps all event notifications under a single
# `codex/event/<type>` JSON-RPC method, with the actual
# event under `params.msg` and `params.msg.type` carrying
# the event-type tag. There's a parallel set of bare
# methods (`item/started`, `turn/started`, `error`) that
# mirror a subset for legacy clients — we ignore those
# and read the canonical `codex/event/*` stream.
#
# Captured live by running `codex app-server` directly
# against a fresh thread (2026-05-03). Pre-fix the
# executor matched on `agent_message_delta` /
# `turn/completed` directly as the JSON-RPC method, which
# never fires in codex 0.72 — every probe returned empty
# text + the workspace looked healthy.
#
# Surfaced events (msg.type values):
# - agent_message_delta — streamed chunk (delta)
# - agent_message — whole reply (when model didn't stream)
# - task_complete — turn finished cleanly
# - error — fatal turn error
# Reasoning / item / tool events are debug-logged.
#
# Activity bump: every notification (matched or unmatched)
# is the heartbeat for the inactivity watchdog. We bump
# before the early returns so even ignored bare-method
# events keep the channel "alive".
state.activity += 1
if method == "error":
# Bare-method `error` notifications (parallel schema)
# carry the error payload under `params.error`. These
# often duplicate a `codex/event/stream_error` —
# surface only the final non-retry one so the operator
# sees the real failure.
err = params.get("error") or {}
if not params.get("willRetry"):
state.error = RuntimeError(
str(err.get("message") or "unknown codex error")
)
loop.call_soon_threadsafe(state.completed.set)
return
if not method.startswith("codex/event/"):
logger.debug("codex notification: %s %s", method, params)
return
msg = params.get("msg") or {}
mtype = msg.get("type", "")
if mtype == "agent_message_delta":
delta = msg.get("delta") or msg.get("text") or ""
if delta:
state.deltas.append(delta)
elif mtype == "agent_message":
# Whole-message form: codex emits this when the model
# response wasn't streamed as chunks (most non-OpenAI
# backends). Append as a single delta so the assembled
# string is complete even without `_delta` fragments.
whole = msg.get("message") or msg.get("text") or ""
if whole:
state.deltas.append(whole)
elif mtype == "task_complete":
# task_complete carries `last_agent_message` — when
# the model returned a single message and skipped
# streaming, this is the only place the text shows
# up. Treat it as a final delta if we haven't seen
# an `agent_message` already (idempotent dedupe).
last = msg.get("last_agent_message") or ""
if last and last not in state.deltas:
state.deltas.append(last)
loop.call_soon_threadsafe(state.completed.set)
elif mtype == "error":
state.error = RuntimeError(
str(msg.get("message") or "unknown codex error")
)
loop.call_soon_threadsafe(state.completed.set)
elif mtype == "stream_error":
# Retry signal — codex retries internally. Log it
# but don't surface; the final `error` (or
# task_complete) will resolve the turn.
logger.info(
"codex stream_error (will retry): %s",
msg.get("message", "")
)
else:
logger.debug("codex event: %s %s", mtype, msg)
unsubscribe = self._app_server.subscribe(on_notification)
try:
resp = await self._app_server.request("turn/start", {
"threadId": thread_id,
"input": [{"type": "text", "text": prompt}],
})
# Mirror the same id/threadId tolerance we have for thread/start.
turn = resp.get("turn") or {}
state.turn_id = turn.get("id") or turn.get("turnId")
if not state.turn_id:
raise RuntimeError(
f"turn/start did not return an id; got keys: {list(turn.keys())}"
)
self._current_turn_id = state.turn_id
await self._await_turn_completion(state)
finally:
unsubscribe()
self._current_turn_id = None
if state.error:
raise state.error
return "".join(state.deltas)
async def _await_turn_completion(self, state: _TurnState) -> None:
"""Wait for turn completion with two stacked timeouts.
Stacked bounds:
- ``_TURN_INACTIVITY_TIMEOUT`` (90 s) — max gap between events.
A healthy turn emits ``codex/event/*`` notifications
continuously; a wedged channel emits zero. If the activity
counter does not advance for this long, we raise
``asyncio.TimeoutError`` instead of waiting the full
``_TURN_TIMEOUT``. This is the safety net for the 2026-05-18
production wedge: the executor would otherwise hold the
turn-lock for 10 minutes per stuck request, masking the
real channel failure.
- ``_TURN_TIMEOUT`` (600 s) — hard upper bound for total turn
duration even if events keep arriving. Preserves the
previous-generation bound for legitimately-long tool-use
turns (test runs, etc.).
The watchdog runs in 5 s ticks. Each tick:
1. If the completion event is set, return.
2. If the activity counter has not changed since the last
tick AND the inactivity window has elapsed, raise
TimeoutError.
3. If the total elapsed time exceeds ``_TURN_TIMEOUT``, raise
TimeoutError.
"""
loop = asyncio.get_running_loop()
started_at = loop.time()
last_seen_activity = state.activity
last_activity_at = started_at
tick = 5.0
while True:
try:
await asyncio.wait_for(state.completed.wait(), timeout=tick)
return
except asyncio.TimeoutError:
pass
now = loop.time()
if state.activity != last_seen_activity:
last_seen_activity = state.activity
last_activity_at = now
if now - last_activity_at >= _TURN_INACTIVITY_TIMEOUT:
logger.warning(
"codex turn %s wedged: no events for %.0fs "
"(deltas=%d) — failing turn",
state.turn_id,
now - last_activity_at,
len(state.deltas),
)
raise asyncio.TimeoutError(
f"codex emitted no events for "
f"{_TURN_INACTIVITY_TIMEOUT:.0f}s — channel wedged"
)
if now - started_at >= _TURN_TIMEOUT:
raise asyncio.TimeoutError(
f"codex turn exceeded total budget "
f"{_TURN_TIMEOUT:.0f}s"
)
async def _reset_app_server(self) -> None:
"""Tear down + clear cached child. Idempotent."""
proc = self._app_server
self._app_server = None
self._thread_id = None
self._current_turn_id = None
if proc is not None:
try:
await proc.close()
except Exception:
logger.exception("error closing codex app-server")