codex-channel-molecule/tests/test_bridge.py
Hongming Wang 467d47e3a3 v0.1.1: exponential backoff on platform errors + error-path test coverage
Production fix:
- wait_for_message exceptions now trigger exponential backoff (1s → 60s
  cap, resets to 0 on first successful poll) instead of a flat 1s retry.
  Under platform outage, N daemons under flat 1Hz retry would hammer the
  endpoint unnecessarily; the cap-and-reset shape keeps the daemon
  responsive while being a good citizen.

Correctness gate:
- Test coverage for the six error branches that operators actually hit:
  the backoff progression itself, backoff reset on first success,
  inbox_pop failure (codex must not re-run the same message),
  peer_agent without peer_id (poison drained, not looped),
  unknown message kind (poison drained, not looped),
  empty codex output (placeholder reply, not silent drop),
  canvas_user falling back to workspace_id when arrival_workspace_id
  absent, and four malformed-payload shapes from wait_for_message
  (parametrised: invalid JSON, non-dict, timeout sentinel, missing
  activity_id).
- Backoff tests verified to FAIL on the old flat-1s code by stashing
  only bridge.py and re-running — pinning a real regression, not a
  tautology.

Cleanup:
- _RealTools imports molecule_runtime.a2a_tools once at construction
  instead of four times per message.
- README documents CODEX_CHANNEL_MOLECULE_STATE_DIR override.

Test: pytest -q → 28 passed (was 17).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 20:10:17 -07:00

544 lines
20 KiB
Python

"""Bridge-loop tests with fake tools + fake codex runner.
The fakes capture every call so each test asserts exact contracts:
which kind of message reaches which reply tool, what activity_ids are
acked, how session_id continuity is maintained across turns.
"""
from __future__ import annotations
import asyncio
import json
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import pytest
from codex_channel_molecule.bridge import _SessionStore, run_bridge
from codex_channel_molecule.codex_runner import CodexResult
# ----------------------------------------------------------------------
# Fakes
# ----------------------------------------------------------------------
class FakeTools:
"""Records every tool call. wait_for_message replays from a queue
seeded by the test."""
def __init__(self, inbox: List[Dict[str, Any]]) -> None:
self._inbox: asyncio.Queue[Dict[str, Any]] = asyncio.Queue()
for m in inbox:
self._inbox.put_nowait(m)
self.popped: List[str] = []
self.canvas_replies: List[Tuple[str, Optional[str]]] = [] # (text, ws)
self.peer_replies: List[Tuple[str, str, Optional[str]]] = []
# (workspace_id, task, source_workspace_id)
async def wait_for_message(self, timeout_secs: float) -> str:
# Drain the queue immediately; a real implementation would block
# for timeout_secs. After exhaustion, return the timeout sentinel
# so the bridge keeps cycling without hanging.
try:
msg = self._inbox.get_nowait()
return json.dumps(msg)
except asyncio.QueueEmpty:
return json.dumps({"timeout": True, "timeout_secs": timeout_secs})
async def inbox_pop(self, activity_id: str) -> str:
self.popped.append(activity_id)
return json.dumps({"removed": True, "activity_id": activity_id})
async def send_message_to_user(
self, message: str, workspace_id: Optional[str]
) -> str:
self.canvas_replies.append((message, workspace_id))
return "ok"
async def delegate_task(
self, workspace_id: str, task: str, source_workspace_id: Optional[str]
) -> str:
self.peer_replies.append((workspace_id, task, source_workspace_id))
return "ok"
class FakeRunner:
"""Returns scripted CodexResults; records every call. Lets tests
pin session-continuity behavior without spawning real codex."""
def __init__(self, results: List[CodexResult]) -> None:
self._results = list(results)
self.calls: List[Tuple[str, Optional[str]]] = [] # (message, session_id)
async def run(
self, message: str, session_id: Optional[str] = None
) -> CodexResult:
self.calls.append((message, session_id))
if not self._results:
return CodexResult(
text="(no scripted result)",
session_id=session_id,
exit_code=0,
stderr_tail="",
)
return self._results.pop(0)
# ----------------------------------------------------------------------
# Tests
# ----------------------------------------------------------------------
@pytest.mark.asyncio
async def test_canvas_user_message_is_dispatched_acked_and_replied(tmp_path):
"""Canvas-user inbound → CodexRunner.run → send_message_to_user →
inbox_pop. Assert the full chain in one go."""
inbox = [{
"kind": "canvas_user",
"activity_id": "act-1",
"arrival_workspace_id": "ws-canvas",
"text": "hi can you help",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="sure, what's up",
session_id="sess-canvas-1",
exit_code=0,
stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert runner.calls == [("hi can you help", None)]
assert tools.canvas_replies == [("sure, what's up", "ws-canvas")]
assert tools.peer_replies == []
assert tools.popped == ["act-1"]
assert store.get("canvas:ws-canvas") == "sess-canvas-1"
@pytest.mark.asyncio
async def test_peer_agent_message_routes_to_delegate_task(tmp_path):
inbox = [{
"kind": "peer_agent",
"activity_id": "act-2",
"peer_id": "ws-peer",
"arrival_workspace_id": "ws-self",
"text": "what's your status",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="all good", session_id="sess-peer-1", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert tools.peer_replies == [("ws-peer", "all good", "ws-self")]
assert tools.canvas_replies == []
assert tools.popped == ["act-2"]
assert store.get("peer:ws-peer") == "sess-peer-1"
@pytest.mark.asyncio
async def test_session_continuity_resumes_same_codex_session(tmp_path):
"""Two messages on the same chat_id → second call resumes the
session_id captured from the first."""
inbox = [
{"kind": "canvas_user", "activity_id": "act-a",
"arrival_workspace_id": "ws-x", "text": "first"},
{"kind": "canvas_user", "activity_id": "act-b",
"arrival_workspace_id": "ws-x", "text": "second"},
]
tools = FakeTools(inbox)
runner = FakeRunner([
CodexResult(text="r1", session_id="sess-shared", exit_code=0, stderr_tail=""),
CodexResult(text="r2", session_id="sess-shared", exit_code=0, stderr_tail=""),
])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=3)
# First call: no session_id (new). Second call: resume sess-shared.
assert runner.calls == [("first", None), ("second", "sess-shared")]
assert tools.popped == ["act-a", "act-b"]
@pytest.mark.asyncio
async def test_session_store_persists_across_runs(tmp_path):
"""Session map survives daemon restart — written to disk on each
update, re-read on the next instantiation."""
state_file = tmp_path / "sessions.json"
inbox_one = [{
"kind": "canvas_user", "activity_id": "act-1",
"arrival_workspace_id": "ws-resume", "text": "first ever",
}]
tools_one = FakeTools(inbox_one)
runner_one = FakeRunner([CodexResult(
text="hi", session_id="sess-persist", exit_code=0, stderr_tail="",
)])
store_one = _SessionStore(state_file)
await run_bridge(
runner=runner_one, tools=tools_one, session_store=store_one, iterations=2,
)
assert state_file.exists()
# Simulate daemon restart — fresh store reads the same file.
inbox_two = [{
"kind": "canvas_user", "activity_id": "act-2",
"arrival_workspace_id": "ws-resume", "text": "follow up",
}]
tools_two = FakeTools(inbox_two)
runner_two = FakeRunner([CodexResult(
text="ok", session_id="sess-persist", exit_code=0, stderr_tail="",
)])
store_two = _SessionStore(state_file)
await run_bridge(
runner=runner_two, tools=tools_two, session_store=store_two, iterations=2,
)
# Second instance must have resumed the persisted session id.
assert runner_two.calls == [("follow up", "sess-persist")]
@pytest.mark.asyncio
async def test_timeout_sentinel_does_not_call_codex(tmp_path):
"""An empty inbox returns a timeout sentinel — the bridge must
keep polling without spawning codex."""
tools = FakeTools(inbox=[]) # queue empty → timeout sentinel
runner = FakeRunner(results=[])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=3)
assert runner.calls == []
assert tools.popped == []
assert tools.canvas_replies == []
assert tools.peer_replies == []
@pytest.mark.asyncio
async def test_failed_reply_routing_skips_inbox_pop(tmp_path):
"""If sending the reply fails, do NOT ack the inbox row — the
platform will re-deliver on the next poll. At-least-once semantics.
"""
inbox = [{
"kind": "canvas_user", "activity_id": "act-err",
"arrival_workspace_id": "ws-x", "text": "msg",
}]
class FlakyTools(FakeTools):
async def send_message_to_user(self, message, workspace_id):
raise RuntimeError("simulated 502 from platform")
tools = FlakyTools(inbox)
runner = FakeRunner([CodexResult(
text="reply", session_id="sess", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert runner.calls == [("msg", None)]
# Must NOT have popped — at-least-once requires the unacked row to
# re-surface next poll.
assert tools.popped == []
@pytest.mark.asyncio
async def test_nonzero_exit_code_surfaces_in_reply(tmp_path):
"""Codex failure (e.g. timeout, crash) becomes a visible reply
instead of silently dropping. Operator sees the failure where the
answer was expected."""
inbox = [{
"kind": "canvas_user", "activity_id": "act-fail",
"arrival_workspace_id": "ws-x", "text": "ping",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="(codex exec timed out after 600s)",
session_id=None,
exit_code=-1,
stderr_tail="timeout",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert len(tools.canvas_replies) == 1
text, _ = tools.canvas_replies[0]
assert "timed out" in text
assert "exit=-1" in text
# The row is acked — codex's verdict (success or failure) is
# delivered, so the inbox row is fully handled.
assert tools.popped == ["act-fail"]
@pytest.mark.asyncio
async def test_a2a_multipart_text_is_concatenated(tmp_path):
"""A2A messages can arrive as ``parts: [{type: text, text: ...}, ...]``
instead of a flat ``text`` field. Bridge concatenates parts into a
single codex prompt."""
inbox = [{
"kind": "peer_agent", "activity_id": "act-p", "peer_id": "ws-peer",
"parts": [
{"type": "text", "text": "first chunk"},
{"type": "text", "text": "second chunk"},
],
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="ack", session_id="s", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
msg, _ = runner.calls[0]
assert msg == "first chunk\nsecond chunk"
# ----------------------------------------------------------------------
# Error-path coverage — branches operators actually hit in production
# when the platform is flaky, the message is malformed, or codex returns
# nothing useful. Each fake supplies the exact failure shape.
# ----------------------------------------------------------------------
@pytest.mark.asyncio
async def test_wait_for_message_raises_triggers_exponential_backoff(tmp_path):
"""When wait_for_message raises (DNS failure, dead platform, expired
token), the bridge must back off exponentially so N daemons under a
common outage don't melt the platform with 1 Hz retries.
Sleep is monkeypatched to a recorder so the test runs instantly while
still asserting the exact 1, 2, 4, 8, 16, 32, 60-cap progression.
"""
class AlwaysRaisingTools(FakeTools):
async def wait_for_message(self, timeout_secs: float) -> str:
raise ConnectionError("simulated platform outage")
tools = AlwaysRaisingTools(inbox=[])
runner = FakeRunner(results=[])
store = _SessionStore(tmp_path / "sessions.json")
sleeps: List[float] = []
async def _record_sleep(secs):
sleeps.append(secs)
import codex_channel_molecule.bridge as bridge_mod
orig_sleep = bridge_mod.asyncio.sleep
bridge_mod.asyncio.sleep = _record_sleep
try:
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=8)
finally:
bridge_mod.asyncio.sleep = orig_sleep
# Expected progression: 1 → 2 → 4 → 8 → 16 → 32 → 60 (cap) → 60.
assert sleeps == [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 60.0]
assert runner.calls == [] # never reached codex
@pytest.mark.asyncio
async def test_backoff_resets_on_first_successful_poll(tmp_path):
"""A success after consecutive failures must clear the backoff so
the next failure starts at 1s again — not stuck at the cap."""
sequence: List[Any] = ["raise", "raise", {
"kind": "canvas_user", "activity_id": "act-1",
"arrival_workspace_id": "ws-x", "text": "real",
}, "raise"]
class StepTools(FakeTools):
def __init__(self):
super().__init__(inbox=[])
self._idx = 0
async def wait_for_message(self, timeout_secs: float) -> str:
item = sequence[self._idx]
self._idx += 1
if item == "raise":
raise ConnectionError("flake")
return json.dumps(item)
tools = StepTools()
runner = FakeRunner([CodexResult(
text="ok", session_id="sess", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
sleeps: List[float] = []
async def _record_sleep(secs):
sleeps.append(secs)
import codex_channel_molecule.bridge as bridge_mod
orig_sleep = bridge_mod.asyncio.sleep
bridge_mod.asyncio.sleep = _record_sleep
try:
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=4)
finally:
bridge_mod.asyncio.sleep = orig_sleep
# 1s after first raise, 2s after second, success in between resets,
# then 1s again after the trailing raise.
assert sleeps == [1.0, 2.0, 1.0]
@pytest.mark.asyncio
async def test_inbox_pop_failure_logs_but_does_not_re_run_codex(tmp_path):
"""If pop fails after a successful reply, codex must NOT run again
on the same message — the platform still has the row but our turn
is done. We accept duplicate delivery on the next poll cycle."""
inbox = [{
"kind": "canvas_user", "activity_id": "act-pop-fail",
"arrival_workspace_id": "ws-x", "text": "msg",
}]
class PopFailTools(FakeTools):
async def inbox_pop(self, activity_id):
raise RuntimeError("simulated 503")
tools = PopFailTools(inbox)
runner = FakeRunner([CodexResult(
text="reply", session_id="sess", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
# Reply was sent; codex ran exactly once even though pop blew up.
assert runner.calls == [("msg", None)]
assert tools.canvas_replies == [("reply", "ws-x")]
@pytest.mark.asyncio
async def test_peer_agent_without_peer_id_drops_reply_and_acks(tmp_path):
"""A malformed peer_agent message (no peer_id — registry lookup
failure or platform bug) must not crash the bridge or call
delegate_task with an empty workspace_id.
The row IS acked: keeping an undeliverable message in the queue
would loop forever on every poll. Operator sees the warning in the
log and can investigate — at-least-once means "we won't lose a
message we CAN deliver", not "we'll loop on poison forever".
"""
inbox = [{
"kind": "peer_agent", "activity_id": "act-no-peer",
"text": "from a peer with no id",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="reply", session_id="sess", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert runner.calls == [("from a peer with no id", None)]
assert tools.peer_replies == [] # no destination → no delegate_task call
assert tools.canvas_replies == []
assert tools.popped == ["act-no-peer"] # poison drained
@pytest.mark.asyncio
async def test_unknown_kind_drops_reply_and_acks(tmp_path):
"""Future or malformed message kinds must not crash the daemon —
log + drop + ack so the queue isn't blocked. A future daemon that
learns the new kind reads it from the platform's persistent log,
not from the inbox."""
inbox = [{
"kind": "future_kind_we_dont_know", "activity_id": "act-?",
"text": "??",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="r", session_id="s", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert tools.canvas_replies == []
assert tools.peer_replies == []
assert tools.popped == ["act-?"] # drained, not looped
@pytest.mark.asyncio
async def test_empty_codex_output_falls_back_to_placeholder(tmp_path):
"""If codex returns empty stdout (silent failure, model refused, etc.)
we still send SOMETHING to the user — silent inbound never gets a
reply and looks like the daemon ate it."""
inbox = [{
"kind": "canvas_user", "activity_id": "act-silent",
"arrival_workspace_id": "ws-x", "text": "hi",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="", session_id="sess", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert len(tools.canvas_replies) == 1
text, _ = tools.canvas_replies[0]
assert text == "(codex returned empty output)"
assert tools.popped == ["act-silent"]
@pytest.mark.asyncio
async def test_canvas_user_falls_back_to_workspace_id(tmp_path):
"""The chat-id encoder accepts ``workspace_id`` when
``arrival_workspace_id`` is absent — older platform builds use the
short field name. Same session keying either way."""
inbox = [{
"kind": "canvas_user", "activity_id": "act-old-shape",
"workspace_id": "ws-legacy", # no arrival_workspace_id
"text": "msg",
}]
tools = FakeTools(inbox)
runner = FakeRunner([CodexResult(
text="ack", session_id="sess-legacy", exit_code=0, stderr_tail="",
)])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert tools.canvas_replies == [("ack", "ws-legacy")]
assert store.get("canvas:ws-legacy") == "sess-legacy"
@pytest.mark.parametrize(
"payload",
[
"not json at all", # JSONDecodeError
'"a string, not an object"', # decoded but not a dict
'{"timeout": true}', # real timeout sentinel
'{"no_activity_id": "field"}', # dict but missing activity_id
],
)
@pytest.mark.asyncio
async def test_malformed_payloads_are_silently_dropped(tmp_path, payload):
"""Each shape that wait_for_message can return must skip the message
handler without raising. Invariant comes from _extract_inbox_message."""
class StaticTools(FakeTools):
def __init__(self, payload):
super().__init__(inbox=[])
self._payload = payload
async def wait_for_message(self, timeout_secs):
return self._payload
tools = StaticTools(payload)
runner = FakeRunner(results=[])
store = _SessionStore(tmp_path / "sessions.json")
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
assert runner.calls == []
assert tools.popped == []