Files
molecule-ai-workspace-templ…/tests/test_executor.py
core-be 5a7ef7354a
CI / Adapter unit tests (push) Successful in 1m16s
CI / Template validation (static) (push) Successful in 2m15s
CI / Adapter unit tests (pull_request) Successful in 1m12s
CI / Template validation (static) (pull_request) Successful in 1m44s
CI / Template validation (runtime) (push) Successful in 3m4s
CI / T4 tier-4 conformance (live) (push) Successful in 2m14s
CI / validate (push) Successful in 2s
CI / Template validation (runtime) (pull_request) Successful in 3m47s
CI / T4 tier-4 conformance (live) (pull_request) Successful in 3m43s
CI / validate (pull_request) Successful in 2s
fix(executor): file-only messages no longer return opaque "(empty prompt — nothing to do)"
Phase 1 of the chloe-dong PDF-upload P0 (a1ea2200 archaeology). The
empty-prompt guard in execute() short-circuited the moment
extract_message_text() returned an empty string — even when the A2A
message carried attached files. Result: PDF-only / image-only / any
file-only turn fell through to the opaque "(empty prompt — nothing to
do)" reply with no signal to the user that the agent had received the
file at all.

Canary evidence: CTO ran a PDF-only message at 2026-05-20 01:04:27Z
local and got the opaque string. The actionable-failure-reason
principle (feedback_surface_actionable_failure_reason_to_user, set in
internal#211) requires the user be able to see WHY their turn produced
nothing.

Fix mirrors the claude-code reference (claude_sdk_executor.py:638-653,
already in production): use the existing extract_attached_files()
helper from molecule_runtime.executor_helpers to walk parts[*] for
kind="file" / v1 protobuf, and synthesize a manifest line into the
prompt so codex can act on the files via its own Read/Glob tools.

  - Text + files: prompt = text + manifest
  - File-only:    prompt = manifest (codex sees the file list)
  - Truly empty (no text AND no files): actionable user-facing reason
    ("Your message was empty. Please send text or a file with
    instructions.") instead of the opaque "(empty prompt — nothing
    to do)"

NOT in this PR (Phase 2, separate follow-up):
  - Actual file-content forwarding to codex via input parts /
    attachment flags. Phase 1 = relax the guard + give actionable
    feedback so file-only messages stop falling on the floor; Phase 2
    = per-runtime content-forwarding where supported.

The extract_attached_files() helper ships in
molecule-ai-workspace-runtime 0.1.1000 today (verified — published
2026-05-19 monorepo cascade). No monorepo change is needed for Phase 1.

Tests:
  - test_execute_file_only_no_longer_returns_opaque_empty pins the
    regression (file-only message → no opaque string + file name in
    the prompt that reaches the codex app-server).
  - test_execute_truly_empty_surfaces_actionable_reason pins the
    new actionable copy for empty-empty turns.
  - test_execute_text_only_still_passes_prompt_unchanged keeps the
    text-only happy path green.

Refs: a1ea2200, claude-code Phase 1 reference impl, RFC internal#211
2026-05-20 01:41:30 -07:00

577 lines
21 KiB
Python

"""Unit tests for CodexAppServerExecutor's internal turn lifecycle.
We don't stand up a real codex app-server — those tests live in
test_app_server.py which validates the JSON-RPC plumbing against a
mock binary. Here we focus on the protocol-level behavior of
``_run_turn``: thread bootstrap, notification accumulation, completion
detection, error surfacing, mid-turn serialization, and (the recent
addition) the no-deadlock guarantees when the channel goes wedged.
The fake AppServerProcess records every request sent and exposes a
helper to drive notifications + responses on demand. It deliberately
mirrors the JSON-RPC notification shape codex 0.72+ emits in
production (``method = "codex/event/<type>"`` with the payload under
``params.msg``), not the bare-method legacy form, so test failures
catch the same protocol bugs production hits.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
# Import the executor module — relies on a2a + molecule_runtime being
# installed locally. If not, skip these tests; the executor will still
# be exercised in container CI.
pytest.importorskip("a2a.helpers")
pytest.importorskip("molecule_runtime.adapters.base")
from executor import ( # noqa: E402
CodexAppServerExecutor,
_TURN_INACTIVITY_TIMEOUT,
)
from molecule_runtime.adapters.base import AdapterConfig # noqa: E402
class FakeAppServer:
"""Drop-in for AppServerProcess that lets tests script responses + notifications.
Honors only the shape AppServerProcess presents to the executor:
initialize / request / subscribe / close. Each scripted turn lets
the test push delta notifications and resolve the response.
"""
def __init__(self) -> None:
self.requests: list[tuple[str, dict]] = []
self._subscribers: list = []
self._next_thread = 0
self._next_turn = 0
self.closed = False
# Test-controllable knobs
self.thread_start_response: dict | None = None
self.turn_start_responses: list[dict] = []
self.turn_start_raises: Exception | None = None
# When set, request() raises this on the Nth call (1-indexed).
# Lets a test simulate the channel going dead between turns.
self.fail_request_n: int | None = None
self.fail_request_exc: Exception | None = None
self._request_count = 0
async def initialize(self, *, client_info: dict) -> dict:
return {"userAgent": "fake/0.0"}
async def request(self, method: str, params: dict | None = None, *, timeout: float | None = None) -> dict:
self._request_count += 1
self.requests.append((method, params or {}))
if (
self.fail_request_n is not None
and self._request_count >= self.fail_request_n
and self.fail_request_exc is not None
):
raise self.fail_request_exc
if method == "thread/start":
if self.thread_start_response is not None:
return self.thread_start_response
self._next_thread += 1
# Use the real binary's `id` shape (verified 2026-05-02
# against codex 0.72) — the schema's `threadId` is also
# accepted by the executor but `id` is what production hits.
return {"thread": {"id": f"th_{self._next_thread}"}}
if method == "turn/start":
if self.turn_start_raises:
raise self.turn_start_raises
if self.turn_start_responses:
return self.turn_start_responses.pop(0)
self._next_turn += 1
return {"turn": {"id": f"tu_{self._next_turn}"}}
if method == "turn/interrupt":
return {}
raise AssertionError(f"unexpected method: {method}")
def subscribe(self, callback):
self._subscribers.append(callback)
def unsubscribe() -> None:
try:
self._subscribers.remove(callback)
except ValueError:
pass
return unsubscribe
async def close(self) -> int | None:
self.closed = True
return 0
# --- test helpers ---------------------------------------------------
def push_delta(self, text: str) -> None:
"""Push a streamed agent_message_delta (codex 0.72+ shape)."""
self.push(
"codex/event/agent_message_delta",
{"msg": {"type": "agent_message_delta", "delta": text}},
)
def push_task_complete(self, last_message: str | None = None) -> None:
"""Push the canonical end-of-turn event (codex 0.72+ shape)."""
msg: dict = {"type": "task_complete"}
if last_message is not None:
msg["last_agent_message"] = last_message
self.push("codex/event/task_complete", {"msg": msg})
def push_event_error(self, message: str) -> None:
"""Push a fatal error notification under the codex/event envelope."""
self.push(
"codex/event/error",
{"msg": {"type": "error", "message": message}},
)
def push(self, method: str, params: dict | None = None) -> None:
"""Synchronously deliver a notification to all subscribers."""
for cb in list(self._subscribers):
cb(method, params or {})
def _make_executor(fake: FakeAppServer, *, model: str = "gpt-5.5", system_prompt: str = "be helpful") -> CodexAppServerExecutor:
cfg = AdapterConfig(model=model, system_prompt=system_prompt)
ex = CodexAppServerExecutor(cfg)
# Pre-inject the fake so _ensure_thread skips spawning codex.
ex._app_server = fake # type: ignore[assignment]
return ex
async def _wait_for_method(fake: FakeAppServer, method: str, *, after_count: int = 0) -> None:
"""Yield until ``method`` has been recorded at least ``after_count + 1`` times."""
for _ in range(500):
seen = sum(1 for m, _ in fake.requests if m == method)
if seen > after_count:
return
await asyncio.sleep(0.005)
raise AssertionError(
f"never saw {after_count + 1} call(s) to {method}; "
f"requests so far: {[m for m, _ in fake.requests]}"
)
@pytest.mark.asyncio
async def test_run_turn_starts_thread_and_returns_assembled_deltas() -> None:
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
await _wait_for_method(fake, "turn/start")
fake.push_delta("hello ")
fake.push_delta("world")
fake.push_task_complete()
driver_task = asyncio.create_task(driver())
text = await ex._run_turn("hi")
await driver_task
assert text == "hello world"
methods = [m for m, _ in fake.requests]
assert "thread/start" in methods
assert "turn/start" in methods
@pytest.mark.asyncio
async def test_run_turn_reuses_thread_on_second_call() -> None:
"""The regression that wedged prod-Reviewer/Researcher 2026-05-18.
Pre-fix the second ``_run_turn`` returned (FakeAppServer side it
worked) but the real app-server's reader loop had exited on stdout
EOF without failing pending requests — so the second
``state.completed.wait()`` would block until ``_TURN_TIMEOUT``.
This unit exercises the executor's protocol contract (two turns
reuse the thread, both assemble their deltas), and the live
failure mode of the same multi-turn shape is covered in
test_app_server.py::test_eof_fails_pending_requests.
"""
fake = FakeAppServer()
ex = _make_executor(fake)
async def drive_one(text: str, turn_index: int) -> None:
await _wait_for_method(fake, "turn/start", after_count=turn_index)
fake.push_delta(text)
fake.push_task_complete()
t1 = asyncio.create_task(drive_one("first", 0))
text1 = await ex._run_turn("ping")
await t1
t2 = asyncio.create_task(drive_one("second", 1))
text2 = await ex._run_turn("pong")
await t2
assert text1 == "first"
assert text2 == "second"
# thread/start should fire EXACTLY once across both turns — turn 2
# MUST reuse the thread, not re-bootstrap.
thread_starts = sum(1 for m, _ in fake.requests if m == "thread/start")
assert thread_starts == 1
@pytest.mark.asyncio
async def test_run_turn_surfaces_error_notification() -> None:
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
await _wait_for_method(fake, "turn/start")
fake.push_event_error("model rate limited")
driver_task = asyncio.create_task(driver())
with pytest.raises(RuntimeError, match="rate limited"):
await ex._run_turn("hi")
await driver_task
@pytest.mark.asyncio
async def test_thread_start_passes_config() -> None:
fake = FakeAppServer()
ex = _make_executor(fake, model="o4-mini", system_prompt="custom prompt")
async def driver() -> None:
await _wait_for_method(fake, "turn/start")
fake.push_task_complete()
driver_task = asyncio.create_task(driver())
await ex._run_turn("hi")
await driver_task
thread_start = next(p for m, p in fake.requests if m == "thread/start")
assert thread_start["model"] == "o4-mini"
assert thread_start["developerInstructions"] == "custom prompt"
assert thread_start["approvalPolicy"] == "never"
assert thread_start["sandboxPolicy"] == {"mode": "workspace-write"}
@pytest.mark.asyncio
async def test_turn_lock_serializes_concurrent_executes() -> None:
"""Two concurrent execute()s should run their turns one-at-a-time."""
fake = FakeAppServer()
ex = _make_executor(fake)
# Track the order in which turns START vs COMPLETE.
starts: list[int] = []
completes: list[int] = []
async def execute_turn(idx: int, prompt: str) -> str:
# Drive completion AFTER seeing this turn's turn/start in the
# request log. Because of the lock, turn idx 2 won't start
# until turn idx 1 is acked.
async def driver() -> None:
await _wait_for_method(fake, "turn/start", after_count=idx)
starts.append(idx)
fake.push_delta(f"r{idx}")
fake.push_task_complete()
completes.append(idx)
driver_task = asyncio.create_task(driver())
# Mirror the lock-and-run path execute() uses, without needing
# an EventQueue.
async with ex._turn_lock:
text = await ex._run_turn(prompt)
await driver_task
return text
results = await asyncio.gather(execute_turn(0, "first"), execute_turn(1, "second"))
assert results == ["r0", "r1"] or results == ["r1", "r0"]
# Whichever order tasks acquired the lock, the LOCK guarantees
# turn N+1 doesn't start until turn N has completed. So starts and
# completes should interleave one-at-a-time, not overlap.
assert sorted(starts) == [0, 1]
assert sorted(completes) == [0, 1]
# Strict ordering check: between any two `starts` events, there
# must be a corresponding `completes` event.
assert starts[0] in completes[:1]
@pytest.mark.asyncio
async def test_inactivity_watchdog_surfaces_error_on_silent_channel(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""The 2026-05-18 wedge: codex stops emitting events mid-turn.
Pre-fix, ``_run_turn`` would block on ``state.completed.wait()``
for the full ``_TURN_TIMEOUT`` (10 minutes) when codex stopped
sending events. Post-fix, the inactivity watchdog raises
TimeoutError after ``_TURN_INACTIVITY_TIMEOUT`` seconds.
We monkeypatch the watchdog timeout to a fraction of a second so
the test runs in well under a second.
"""
import executor as exec_mod
monkeypatch.setattr(exec_mod, "_TURN_INACTIVITY_TIMEOUT", 0.3)
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
await _wait_for_method(fake, "turn/start")
# Emit ONE delta to confirm activity worked, then go silent.
fake.push_delta("hello, ")
# No task_complete, no further events — wedge.
driver_task = asyncio.create_task(driver())
with pytest.raises(asyncio.TimeoutError, match="channel wedged"):
await ex._run_turn("hi")
await driver_task
# Lock must be released so the next caller doesn't inherit the wedge.
assert not ex._turn_lock.locked()
@pytest.mark.asyncio
async def test_inactivity_watchdog_resets_on_each_event(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A slow-but-alive channel must NOT trip the watchdog.
The inactivity watchdog fires only on gaps BETWEEN events. As long
as codex keeps emitting (deltas, reasoning, tool I/O — anything
that bumps ``state.activity``), the turn runs to its natural end
even if total time exceeds _TURN_INACTIVITY_TIMEOUT.
"""
import executor as exec_mod
monkeypatch.setattr(exec_mod, "_TURN_INACTIVITY_TIMEOUT", 0.4)
fake = FakeAppServer()
ex = _make_executor(fake)
async def driver() -> None:
await _wait_for_method(fake, "turn/start")
# Drip-feed events under the 0.4s inactivity bound.
for chunk in ("a", "b", "c", "d", "e"):
fake.push_delta(chunk)
await asyncio.sleep(0.15)
fake.push_task_complete()
driver_task = asyncio.create_task(driver())
text = await ex._run_turn("hi")
await driver_task
assert text == "abcde"
@pytest.mark.asyncio
async def test_second_turn_after_channel_dies_surfaces_error_promptly() -> None:
"""Second turn must NOT hang when the channel went dead after turn 1.
Mirrors the 2026-05-18 prod-Reviewer/Researcher wedge: first turn
completes, then the codex CLI's stdout closes (crash / EOF /
silent). Pre-fix turn 2 hung on state.completed.wait() for 10
minutes. Post-fix the executor surfaces the ConnectionError that
bubbles from AppServerProcess.request().
"""
fake = FakeAppServer()
ex = _make_executor(fake)
# Turn 1: succeeds cleanly.
async def drive_one() -> None:
await _wait_for_method(fake, "turn/start")
fake.push_delta("ok")
fake.push_task_complete()
t1 = asyncio.create_task(drive_one())
text1 = await ex._run_turn("hi 1")
await t1
assert text1 == "ok"
# Turn 2: app-server now dead. Any new request() raises
# ConnectionError (the same exception AppServerProcess raises when
# _reader_exc is set by EOF detection).
fake.fail_request_n = len(fake.requests) + 1
fake.fail_request_exc = ConnectionError(
"app-server stdout closed (EOF) — child exited or stopped writing"
)
with pytest.raises(ConnectionError, match="stdout closed"):
await ex._run_turn("hi 2")
# Lock must be released so the NEXT caller doesn't inherit the wedge.
assert not ex._turn_lock.locked()
@pytest.mark.asyncio
async def test_thread_start_timeout_is_bounded(monkeypatch: pytest.MonkeyPatch) -> None:
"""_ensure_thread() must NOT block indefinitely on a wedged child.
Pre-fix a child wedged during initialize / thread-start would hang
the executor's first turn for ``_DEFAULT_REQUEST_TIMEOUT`` (10 min).
Post-fix we cap initialize and thread/start so the failure surfaces
fast.
The fake here enforces the ``timeout=`` kwarg the same way
AppServerProcess.request does — wrapping the inner sleep in
asyncio.wait_for — so the test exercises the real contract the
executor relies on.
"""
import executor as exec_mod
# Drop the bootstrap timeouts to make the test run in well under a
# second.
monkeypatch.setattr(exec_mod, "_THREAD_START_TIMEOUT", 0.2)
monkeypatch.setattr(exec_mod, "_INITIALIZE_TIMEOUT", 0.2)
class WedgedAppServer(FakeAppServer):
async def request(self, method: str, params: dict | None = None, *, timeout: float | None = None) -> dict:
self.requests.append((method, params or {}))
if method == "thread/start":
# Mimic AppServerProcess.request: enforce caller-passed
# timeout via wait_for. A real wedged child would have
# its request future never resolve; the wait_for is
# what surfaces the TimeoutError.
async def _never() -> dict:
await asyncio.sleep(10.0)
return {"thread": {"id": "never"}}
return await asyncio.wait_for(
_never(),
timeout=timeout if timeout is not None else 10.0,
)
return await super().request(method, params, timeout=timeout)
fake = WedgedAppServer()
ex = _make_executor(fake)
with pytest.raises(asyncio.TimeoutError):
await ex._run_turn("hi")
# ----------------------------------------------------------------------
# Phase 1 file-only message support (a1ea2200 archaeology)
#
# chloe-dong canary 2026-05-20 01:04:27Z local time: PDF-only message
# returned opaque "(empty prompt — nothing to do)". The fix relaxes the
# empty-prompt guard so a file-only message synthesizes a prompt instead
# of short-circuiting, and the truly-empty case surfaces an actionable
# reason per feedback_surface_actionable_failure_reason_to_user.
# ----------------------------------------------------------------------
from types import SimpleNamespace
from unittest.mock import MagicMock
def _ctx_with_parts(parts: list) -> SimpleNamespace:
"""Build a minimal RequestContext stub the executor reads from.
Only ``context.message.parts`` is touched by the guard path under
test, so the surrounding object can stay light.
"""
msg = SimpleNamespace(parts=parts, task_id=None, context_id=None)
return SimpleNamespace(message=msg, task_id=None, session_id=None, context_id=None)
def _text_part(text: str) -> SimpleNamespace:
return SimpleNamespace(kind="text", text=text)
def _file_part(*, name: str, mime_type: str, path: str) -> SimpleNamespace:
"""Build a v0-flat FilePart-shaped object.
``extract_attached_files`` calls ``resolve_attachment_uri`` which
requires the uri to point at an existing file on disk — tests pass a
real tmp_path.
"""
file_obj = SimpleNamespace(uri=f"file://{path}", name=name, mimeType=mime_type)
return SimpleNamespace(kind="file", file=file_obj)
class _CapturingQueue:
def __init__(self) -> None:
self.events: list = []
async def enqueue_event(self, event) -> None: # type: ignore[no-untyped-def]
self.events.append(event)
@pytest.mark.asyncio
async def test_execute_file_only_no_longer_returns_opaque_empty(
tmp_path, monkeypatch
) -> None:
"""File-only message must not short-circuit with the opaque
'(empty prompt — nothing to do)' string."""
# extract_attached_files / resolve_attachment_uri refuse paths
# outside WORKSPACE_MOUNT. Point that at the test's tmp_path so the
# helper accepts our fixture file.
import molecule_runtime.executor_helpers as _helpers
monkeypatch.setattr(_helpers, "WORKSPACE_MOUNT", str(tmp_path))
fake = FakeAppServer()
ex = _make_executor(fake)
pdf = tmp_path / "chloe.pdf"
pdf.write_bytes(b"%PDF-1.4 stub\n")
ctx = _ctx_with_parts([
_file_part(name="chloe.pdf", mime_type="application/pdf", path=str(pdf)),
])
queue = _CapturingQueue()
captured_prompts: list[str] = []
async def fake_run_turn(prompt: str) -> str:
captured_prompts.append(prompt)
return "ack"
ex._run_turn = fake_run_turn # type: ignore[assignment,method-assign]
await ex.execute(ctx, queue)
# Either the synthesized prompt landed in _run_turn OR the reply
# event went out — never the opaque empty-prompt string.
blob = repr(queue.events) + repr(captured_prompts)
assert "empty prompt — nothing to do" not in blob
# Prompt must mention the file name so codex can act on it.
assert any("chloe.pdf" in p for p in captured_prompts)
@pytest.mark.asyncio
async def test_execute_truly_empty_surfaces_actionable_reason() -> None:
"""Empty text AND no files → actionable user-facing reason, NOT
opaque error type."""
fake = FakeAppServer()
ex = _make_executor(fake)
ctx = _ctx_with_parts([_text_part(" ")])
queue = _CapturingQueue()
await ex.execute(ctx, queue)
assert len(queue.events) == 1
msg_repr = repr(queue.events[0])
assert "Your message was empty" in msg_repr
assert "send text or a file" in msg_repr
# The old opaque string must NOT appear.
assert "(empty prompt — nothing to do)" not in msg_repr
@pytest.mark.asyncio
async def test_execute_text_only_still_passes_prompt_unchanged() -> None:
"""Regression-pin: text-only messages keep working exactly as
before — the file-aware branch must not perturb the text path."""
fake = FakeAppServer()
ex = _make_executor(fake)
ctx = _ctx_with_parts([_text_part("write a haiku")])
queue = _CapturingQueue()
captured_prompts: list[str] = []
async def fake_run_turn(prompt: str) -> str:
captured_prompts.append(prompt)
return "ok"
ex._run_turn = fake_run_turn # type: ignore[assignment,method-assign]
await ex.execute(ctx, queue)
assert captured_prompts == ["write a haiku"]