From 467d47e3a3109056873b7d1cfe4f903cbb3dd1fd Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 20:10:17 -0700 Subject: [PATCH] v0.1.1: exponential backoff on platform errors + error-path test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Production fix: - wait_for_message exceptions now trigger exponential backoff (1s → 60s cap, resets to 0 on first successful poll) instead of a flat 1s retry. Under platform outage, N daemons under flat 1Hz retry would hammer the endpoint unnecessarily; the cap-and-reset shape keeps the daemon responsive while being a good citizen. Correctness gate: - Test coverage for the six error branches that operators actually hit: the backoff progression itself, backoff reset on first success, inbox_pop failure (codex must not re-run the same message), peer_agent without peer_id (poison drained, not looped), unknown message kind (poison drained, not looped), empty codex output (placeholder reply, not silent drop), canvas_user falling back to workspace_id when arrival_workspace_id absent, and four malformed-payload shapes from wait_for_message (parametrised: invalid JSON, non-dict, timeout sentinel, missing activity_id). - Backoff tests verified to FAIL on the old flat-1s code by stashing only bridge.py and re-running — pinning a real regression, not a tautology. Cleanup: - _RealTools imports molecule_runtime.a2a_tools once at construction instead of four times per message. - README documents CODEX_CHANNEL_MOLECULE_STATE_DIR override. Test: pytest -q → 28 passed (was 17). Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 2 +- codex_channel_molecule/bridge.py | 41 ++++-- pyproject.toml | 2 +- tests/test_bridge.py | 244 +++++++++++++++++++++++++++++++ 4 files changed, 272 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index f6350b0..0cd48bd 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ canvas user / peer agent ──► molecule platform inbox canvas chat / peer workspace ``` -Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to `~/.codex-channel-molecule/sessions.json` so daemon restarts don't lose conversation context. +Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to `~/.codex-channel-molecule/sessions.json` so daemon restarts don't lose conversation context. Set `CODEX_CHANNEL_MOLECULE_STATE_DIR` to override the default location (e.g. when running under systemd with a per-instance state dir). ## When to use this vs. the codex tab in the External Connect modal diff --git a/codex_channel_molecule/bridge.py b/codex_channel_molecule/bridge.py index 6e70a51..90634ce 100644 --- a/codex_channel_molecule/bridge.py +++ b/codex_channel_molecule/bridge.py @@ -29,6 +29,14 @@ logger = logging.getLogger(__name__) # read timeout. 60s is the same default the MCP tool advertises. _LONG_POLL_SECS = 60.0 +# Exponential backoff bounds for wait_for_message errors. Doubles from +# _BACKOFF_INITIAL up to _BACKOFF_MAX after each consecutive failure; +# resets to zero on the first successful poll. Without this, a stale +# token / DNS failure / dead platform turns into a tight 1-Hz retry +# storm — bad citizenship across N operator daemons. +_BACKOFF_INITIAL = 1.0 +_BACKOFF_MAX = 60.0 + class _Tools(Protocol): """Minimal interface the bridge needs from molecule_runtime. @@ -51,34 +59,32 @@ class _RealTools: """Production implementation — calls into the installed molecule-ai-workspace-runtime wheel. - Imported lazily so test runs don't require the wheel. + The wheel is imported lazily on first instantiation so unit tests + that supply a fake `_Tools` don't require it. """ - async def wait_for_message(self, timeout_secs: float) -> str: - from molecule_runtime.a2a_tools import tool_wait_for_message + def __init__(self) -> None: + from molecule_runtime import a2a_tools - return await tool_wait_for_message(timeout_secs=timeout_secs) + self._tools = a2a_tools + + async def wait_for_message(self, timeout_secs: float) -> str: + return await self._tools.tool_wait_for_message(timeout_secs=timeout_secs) async def inbox_pop(self, activity_id: str) -> str: - from molecule_runtime.a2a_tools import tool_inbox_pop - - return await tool_inbox_pop(activity_id=activity_id) + return await self._tools.tool_inbox_pop(activity_id=activity_id) async def send_message_to_user( self, message: str, workspace_id: Optional[str] ) -> str: - from molecule_runtime.a2a_tools import tool_send_message_to_user - - return await tool_send_message_to_user( + return await self._tools.tool_send_message_to_user( message=message, workspace_id=workspace_id ) async def delegate_task( self, workspace_id: str, task: str, source_workspace_id: Optional[str] ) -> str: - from molecule_runtime.a2a_tools import tool_delegate_task - - return await tool_delegate_task( + return await self._tools.tool_delegate_task( workspace_id=workspace_id, task=task, source_workspace_id=source_workspace_id, @@ -207,14 +213,19 @@ async def run_bridge( session_store = session_store or _SessionStore(_state_dir() / "sessions.json") cycle = 0 + backoff = 0.0 while iterations is None or cycle < iterations: cycle += 1 try: payload = await tools.wait_for_message(timeout_secs=_LONG_POLL_SECS) except Exception as exc: - logger.warning("wait_for_message raised: %s", exc) - await asyncio.sleep(1.0) + backoff = min(backoff * 2, _BACKOFF_MAX) if backoff else _BACKOFF_INITIAL + logger.warning( + "wait_for_message raised: %s — backing off %.1fs", exc, backoff, + ) + await asyncio.sleep(backoff) continue + backoff = 0.0 message = _extract_inbox_message(payload) if message is None: diff --git a/pyproject.toml b/pyproject.toml index 8dc3c50..341ef24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "codex-channel-molecule" -version = "0.1.0" +version = "0.1.1" description = "Bridge daemon for codex CLI ↔ Molecule platform — long-polls the platform inbox, runs `codex exec --resume ` per inbound message, replies via send_message_to_user MCP tool. Counterpart to hermes-channel-molecule." readme = "README.md" requires-python = ">=3.11" diff --git a/tests/test_bridge.py b/tests/test_bridge.py index 9b76768..7034394 100644 --- a/tests/test_bridge.py +++ b/tests/test_bridge.py @@ -297,3 +297,247 @@ async def test_a2a_multipart_text_is_concatenated(tmp_path): msg, _ = runner.calls[0] assert msg == "first chunk\nsecond chunk" + + +# ---------------------------------------------------------------------- +# Error-path coverage — branches operators actually hit in production +# when the platform is flaky, the message is malformed, or codex returns +# nothing useful. Each fake supplies the exact failure shape. +# ---------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_wait_for_message_raises_triggers_exponential_backoff(tmp_path): + """When wait_for_message raises (DNS failure, dead platform, expired + token), the bridge must back off exponentially so N daemons under a + common outage don't melt the platform with 1 Hz retries. + + Sleep is monkeypatched to a recorder so the test runs instantly while + still asserting the exact 1, 2, 4, 8, 16, 32, 60-cap progression. + """ + class AlwaysRaisingTools(FakeTools): + async def wait_for_message(self, timeout_secs: float) -> str: + raise ConnectionError("simulated platform outage") + + tools = AlwaysRaisingTools(inbox=[]) + runner = FakeRunner(results=[]) + store = _SessionStore(tmp_path / "sessions.json") + + sleeps: List[float] = [] + + async def _record_sleep(secs): + sleeps.append(secs) + + import codex_channel_molecule.bridge as bridge_mod + + orig_sleep = bridge_mod.asyncio.sleep + bridge_mod.asyncio.sleep = _record_sleep + try: + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=8) + finally: + bridge_mod.asyncio.sleep = orig_sleep + + # Expected progression: 1 → 2 → 4 → 8 → 16 → 32 → 60 (cap) → 60. + assert sleeps == [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 60.0] + assert runner.calls == [] # never reached codex + + +@pytest.mark.asyncio +async def test_backoff_resets_on_first_successful_poll(tmp_path): + """A success after consecutive failures must clear the backoff so + the next failure starts at 1s again — not stuck at the cap.""" + sequence: List[Any] = ["raise", "raise", { + "kind": "canvas_user", "activity_id": "act-1", + "arrival_workspace_id": "ws-x", "text": "real", + }, "raise"] + + class StepTools(FakeTools): + def __init__(self): + super().__init__(inbox=[]) + self._idx = 0 + + async def wait_for_message(self, timeout_secs: float) -> str: + item = sequence[self._idx] + self._idx += 1 + if item == "raise": + raise ConnectionError("flake") + return json.dumps(item) + + tools = StepTools() + runner = FakeRunner([CodexResult( + text="ok", session_id="sess", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + sleeps: List[float] = [] + + async def _record_sleep(secs): + sleeps.append(secs) + + import codex_channel_molecule.bridge as bridge_mod + + orig_sleep = bridge_mod.asyncio.sleep + bridge_mod.asyncio.sleep = _record_sleep + try: + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=4) + finally: + bridge_mod.asyncio.sleep = orig_sleep + + # 1s after first raise, 2s after second, success in between resets, + # then 1s again after the trailing raise. + assert sleeps == [1.0, 2.0, 1.0] + + +@pytest.mark.asyncio +async def test_inbox_pop_failure_logs_but_does_not_re_run_codex(tmp_path): + """If pop fails after a successful reply, codex must NOT run again + on the same message — the platform still has the row but our turn + is done. We accept duplicate delivery on the next poll cycle.""" + inbox = [{ + "kind": "canvas_user", "activity_id": "act-pop-fail", + "arrival_workspace_id": "ws-x", "text": "msg", + }] + + class PopFailTools(FakeTools): + async def inbox_pop(self, activity_id): + raise RuntimeError("simulated 503") + + tools = PopFailTools(inbox) + runner = FakeRunner([CodexResult( + text="reply", session_id="sess", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + # Reply was sent; codex ran exactly once even though pop blew up. + assert runner.calls == [("msg", None)] + assert tools.canvas_replies == [("reply", "ws-x")] + + +@pytest.mark.asyncio +async def test_peer_agent_without_peer_id_drops_reply_and_acks(tmp_path): + """A malformed peer_agent message (no peer_id — registry lookup + failure or platform bug) must not crash the bridge or call + delegate_task with an empty workspace_id. + + The row IS acked: keeping an undeliverable message in the queue + would loop forever on every poll. Operator sees the warning in the + log and can investigate — at-least-once means "we won't lose a + message we CAN deliver", not "we'll loop on poison forever". + """ + inbox = [{ + "kind": "peer_agent", "activity_id": "act-no-peer", + "text": "from a peer with no id", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="reply", session_id="sess", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert runner.calls == [("from a peer with no id", None)] + assert tools.peer_replies == [] # no destination → no delegate_task call + assert tools.canvas_replies == [] + assert tools.popped == ["act-no-peer"] # poison drained + + +@pytest.mark.asyncio +async def test_unknown_kind_drops_reply_and_acks(tmp_path): + """Future or malformed message kinds must not crash the daemon — + log + drop + ack so the queue isn't blocked. A future daemon that + learns the new kind reads it from the platform's persistent log, + not from the inbox.""" + inbox = [{ + "kind": "future_kind_we_dont_know", "activity_id": "act-?", + "text": "??", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="r", session_id="s", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert tools.canvas_replies == [] + assert tools.peer_replies == [] + assert tools.popped == ["act-?"] # drained, not looped + + +@pytest.mark.asyncio +async def test_empty_codex_output_falls_back_to_placeholder(tmp_path): + """If codex returns empty stdout (silent failure, model refused, etc.) + we still send SOMETHING to the user — silent inbound never gets a + reply and looks like the daemon ate it.""" + inbox = [{ + "kind": "canvas_user", "activity_id": "act-silent", + "arrival_workspace_id": "ws-x", "text": "hi", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="", session_id="sess", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert len(tools.canvas_replies) == 1 + text, _ = tools.canvas_replies[0] + assert text == "(codex returned empty output)" + assert tools.popped == ["act-silent"] + + +@pytest.mark.asyncio +async def test_canvas_user_falls_back_to_workspace_id(tmp_path): + """The chat-id encoder accepts ``workspace_id`` when + ``arrival_workspace_id`` is absent — older platform builds use the + short field name. Same session keying either way.""" + inbox = [{ + "kind": "canvas_user", "activity_id": "act-old-shape", + "workspace_id": "ws-legacy", # no arrival_workspace_id + "text": "msg", + }] + tools = FakeTools(inbox) + runner = FakeRunner([CodexResult( + text="ack", session_id="sess-legacy", exit_code=0, stderr_tail="", + )]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert tools.canvas_replies == [("ack", "ws-legacy")] + assert store.get("canvas:ws-legacy") == "sess-legacy" + + +@pytest.mark.parametrize( + "payload", + [ + "not json at all", # JSONDecodeError + '"a string, not an object"', # decoded but not a dict + '{"timeout": true}', # real timeout sentinel + '{"no_activity_id": "field"}', # dict but missing activity_id + ], +) +@pytest.mark.asyncio +async def test_malformed_payloads_are_silently_dropped(tmp_path, payload): + """Each shape that wait_for_message can return must skip the message + handler without raising. Invariant comes from _extract_inbox_message.""" + class StaticTools(FakeTools): + def __init__(self, payload): + super().__init__(inbox=[]) + self._payload = payload + + async def wait_for_message(self, timeout_secs): + return self._payload + + tools = StaticTools(payload) + runner = FakeRunner(results=[]) + store = _SessionStore(tmp_path / "sessions.json") + + await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2) + + assert runner.calls == [] + assert tools.popped == []