Merge pull request #2 from Molecule-AI/feat/v0.1.1-backoff-and-tests
v0.1.1: exponential backoff on platform errors + error-path tests
This commit is contained in:
commit
9abf6bfa79
@ -26,7 +26,7 @@ canvas user / peer agent ──► molecule platform inbox
|
||||
canvas chat / peer workspace
|
||||
```
|
||||
|
||||
Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to `~/.codex-channel-molecule/sessions.json` so daemon restarts don't lose conversation context.
|
||||
Per chat thread (one canvas-user thread or one peer-workspace thread) gets its own codex session_id, persisted to `~/.codex-channel-molecule/sessions.json` so daemon restarts don't lose conversation context. Set `CODEX_CHANNEL_MOLECULE_STATE_DIR` to override the default location (e.g. when running under systemd with a per-instance state dir).
|
||||
|
||||
## When to use this vs. the codex tab in the External Connect modal
|
||||
|
||||
|
||||
@ -29,6 +29,14 @@ logger = logging.getLogger(__name__)
|
||||
# read timeout. 60s is the same default the MCP tool advertises.
|
||||
_LONG_POLL_SECS = 60.0
|
||||
|
||||
# Exponential backoff bounds for wait_for_message errors. Doubles from
|
||||
# _BACKOFF_INITIAL up to _BACKOFF_MAX after each consecutive failure;
|
||||
# resets to zero on the first successful poll. Without this, a stale
|
||||
# token / DNS failure / dead platform turns into a tight 1-Hz retry
|
||||
# storm — bad citizenship across N operator daemons.
|
||||
_BACKOFF_INITIAL = 1.0
|
||||
_BACKOFF_MAX = 60.0
|
||||
|
||||
|
||||
class _Tools(Protocol):
|
||||
"""Minimal interface the bridge needs from molecule_runtime.
|
||||
@ -51,34 +59,32 @@ class _RealTools:
|
||||
"""Production implementation — calls into the installed
|
||||
molecule-ai-workspace-runtime wheel.
|
||||
|
||||
Imported lazily so test runs don't require the wheel.
|
||||
The wheel is imported lazily on first instantiation so unit tests
|
||||
that supply a fake `_Tools` don't require it.
|
||||
"""
|
||||
|
||||
async def wait_for_message(self, timeout_secs: float) -> str:
|
||||
from molecule_runtime.a2a_tools import tool_wait_for_message
|
||||
def __init__(self) -> None:
|
||||
from molecule_runtime import a2a_tools
|
||||
|
||||
return await tool_wait_for_message(timeout_secs=timeout_secs)
|
||||
self._tools = a2a_tools
|
||||
|
||||
async def wait_for_message(self, timeout_secs: float) -> str:
|
||||
return await self._tools.tool_wait_for_message(timeout_secs=timeout_secs)
|
||||
|
||||
async def inbox_pop(self, activity_id: str) -> str:
|
||||
from molecule_runtime.a2a_tools import tool_inbox_pop
|
||||
|
||||
return await tool_inbox_pop(activity_id=activity_id)
|
||||
return await self._tools.tool_inbox_pop(activity_id=activity_id)
|
||||
|
||||
async def send_message_to_user(
|
||||
self, message: str, workspace_id: Optional[str]
|
||||
) -> str:
|
||||
from molecule_runtime.a2a_tools import tool_send_message_to_user
|
||||
|
||||
return await tool_send_message_to_user(
|
||||
return await self._tools.tool_send_message_to_user(
|
||||
message=message, workspace_id=workspace_id
|
||||
)
|
||||
|
||||
async def delegate_task(
|
||||
self, workspace_id: str, task: str, source_workspace_id: Optional[str]
|
||||
) -> str:
|
||||
from molecule_runtime.a2a_tools import tool_delegate_task
|
||||
|
||||
return await tool_delegate_task(
|
||||
return await self._tools.tool_delegate_task(
|
||||
workspace_id=workspace_id,
|
||||
task=task,
|
||||
source_workspace_id=source_workspace_id,
|
||||
@ -207,14 +213,19 @@ async def run_bridge(
|
||||
session_store = session_store or _SessionStore(_state_dir() / "sessions.json")
|
||||
|
||||
cycle = 0
|
||||
backoff = 0.0
|
||||
while iterations is None or cycle < iterations:
|
||||
cycle += 1
|
||||
try:
|
||||
payload = await tools.wait_for_message(timeout_secs=_LONG_POLL_SECS)
|
||||
except Exception as exc:
|
||||
logger.warning("wait_for_message raised: %s", exc)
|
||||
await asyncio.sleep(1.0)
|
||||
backoff = min(backoff * 2, _BACKOFF_MAX) if backoff else _BACKOFF_INITIAL
|
||||
logger.warning(
|
||||
"wait_for_message raised: %s — backing off %.1fs", exc, backoff,
|
||||
)
|
||||
await asyncio.sleep(backoff)
|
||||
continue
|
||||
backoff = 0.0
|
||||
|
||||
message = _extract_inbox_message(payload)
|
||||
if message is None:
|
||||
|
||||
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "codex-channel-molecule"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
description = "Bridge daemon for codex CLI ↔ Molecule platform — long-polls the platform inbox, runs `codex exec --resume <session>` per inbound message, replies via send_message_to_user MCP tool. Counterpart to hermes-channel-molecule."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@ -297,3 +297,247 @@ async def test_a2a_multipart_text_is_concatenated(tmp_path):
|
||||
|
||||
msg, _ = runner.calls[0]
|
||||
assert msg == "first chunk\nsecond chunk"
|
||||
|
||||
|
||||
# ----------------------------------------------------------------------
|
||||
# Error-path coverage — branches operators actually hit in production
|
||||
# when the platform is flaky, the message is malformed, or codex returns
|
||||
# nothing useful. Each fake supplies the exact failure shape.
|
||||
# ----------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_for_message_raises_triggers_exponential_backoff(tmp_path):
|
||||
"""When wait_for_message raises (DNS failure, dead platform, expired
|
||||
token), the bridge must back off exponentially so N daemons under a
|
||||
common outage don't melt the platform with 1 Hz retries.
|
||||
|
||||
Sleep is monkeypatched to a recorder so the test runs instantly while
|
||||
still asserting the exact 1, 2, 4, 8, 16, 32, 60-cap progression.
|
||||
"""
|
||||
class AlwaysRaisingTools(FakeTools):
|
||||
async def wait_for_message(self, timeout_secs: float) -> str:
|
||||
raise ConnectionError("simulated platform outage")
|
||||
|
||||
tools = AlwaysRaisingTools(inbox=[])
|
||||
runner = FakeRunner(results=[])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
sleeps: List[float] = []
|
||||
|
||||
async def _record_sleep(secs):
|
||||
sleeps.append(secs)
|
||||
|
||||
import codex_channel_molecule.bridge as bridge_mod
|
||||
|
||||
orig_sleep = bridge_mod.asyncio.sleep
|
||||
bridge_mod.asyncio.sleep = _record_sleep
|
||||
try:
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=8)
|
||||
finally:
|
||||
bridge_mod.asyncio.sleep = orig_sleep
|
||||
|
||||
# Expected progression: 1 → 2 → 4 → 8 → 16 → 32 → 60 (cap) → 60.
|
||||
assert sleeps == [1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 60.0, 60.0]
|
||||
assert runner.calls == [] # never reached codex
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backoff_resets_on_first_successful_poll(tmp_path):
|
||||
"""A success after consecutive failures must clear the backoff so
|
||||
the next failure starts at 1s again — not stuck at the cap."""
|
||||
sequence: List[Any] = ["raise", "raise", {
|
||||
"kind": "canvas_user", "activity_id": "act-1",
|
||||
"arrival_workspace_id": "ws-x", "text": "real",
|
||||
}, "raise"]
|
||||
|
||||
class StepTools(FakeTools):
|
||||
def __init__(self):
|
||||
super().__init__(inbox=[])
|
||||
self._idx = 0
|
||||
|
||||
async def wait_for_message(self, timeout_secs: float) -> str:
|
||||
item = sequence[self._idx]
|
||||
self._idx += 1
|
||||
if item == "raise":
|
||||
raise ConnectionError("flake")
|
||||
return json.dumps(item)
|
||||
|
||||
tools = StepTools()
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="ok", session_id="sess", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
sleeps: List[float] = []
|
||||
|
||||
async def _record_sleep(secs):
|
||||
sleeps.append(secs)
|
||||
|
||||
import codex_channel_molecule.bridge as bridge_mod
|
||||
|
||||
orig_sleep = bridge_mod.asyncio.sleep
|
||||
bridge_mod.asyncio.sleep = _record_sleep
|
||||
try:
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=4)
|
||||
finally:
|
||||
bridge_mod.asyncio.sleep = orig_sleep
|
||||
|
||||
# 1s after first raise, 2s after second, success in between resets,
|
||||
# then 1s again after the trailing raise.
|
||||
assert sleeps == [1.0, 2.0, 1.0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbox_pop_failure_logs_but_does_not_re_run_codex(tmp_path):
|
||||
"""If pop fails after a successful reply, codex must NOT run again
|
||||
on the same message — the platform still has the row but our turn
|
||||
is done. We accept duplicate delivery on the next poll cycle."""
|
||||
inbox = [{
|
||||
"kind": "canvas_user", "activity_id": "act-pop-fail",
|
||||
"arrival_workspace_id": "ws-x", "text": "msg",
|
||||
}]
|
||||
|
||||
class PopFailTools(FakeTools):
|
||||
async def inbox_pop(self, activity_id):
|
||||
raise RuntimeError("simulated 503")
|
||||
|
||||
tools = PopFailTools(inbox)
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="reply", session_id="sess", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
# Reply was sent; codex ran exactly once even though pop blew up.
|
||||
assert runner.calls == [("msg", None)]
|
||||
assert tools.canvas_replies == [("reply", "ws-x")]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_peer_agent_without_peer_id_drops_reply_and_acks(tmp_path):
|
||||
"""A malformed peer_agent message (no peer_id — registry lookup
|
||||
failure or platform bug) must not crash the bridge or call
|
||||
delegate_task with an empty workspace_id.
|
||||
|
||||
The row IS acked: keeping an undeliverable message in the queue
|
||||
would loop forever on every poll. Operator sees the warning in the
|
||||
log and can investigate — at-least-once means "we won't lose a
|
||||
message we CAN deliver", not "we'll loop on poison forever".
|
||||
"""
|
||||
inbox = [{
|
||||
"kind": "peer_agent", "activity_id": "act-no-peer",
|
||||
"text": "from a peer with no id",
|
||||
}]
|
||||
tools = FakeTools(inbox)
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="reply", session_id="sess", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
assert runner.calls == [("from a peer with no id", None)]
|
||||
assert tools.peer_replies == [] # no destination → no delegate_task call
|
||||
assert tools.canvas_replies == []
|
||||
assert tools.popped == ["act-no-peer"] # poison drained
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_kind_drops_reply_and_acks(tmp_path):
|
||||
"""Future or malformed message kinds must not crash the daemon —
|
||||
log + drop + ack so the queue isn't blocked. A future daemon that
|
||||
learns the new kind reads it from the platform's persistent log,
|
||||
not from the inbox."""
|
||||
inbox = [{
|
||||
"kind": "future_kind_we_dont_know", "activity_id": "act-?",
|
||||
"text": "??",
|
||||
}]
|
||||
tools = FakeTools(inbox)
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="r", session_id="s", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
assert tools.canvas_replies == []
|
||||
assert tools.peer_replies == []
|
||||
assert tools.popped == ["act-?"] # drained, not looped
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_codex_output_falls_back_to_placeholder(tmp_path):
|
||||
"""If codex returns empty stdout (silent failure, model refused, etc.)
|
||||
we still send SOMETHING to the user — silent inbound never gets a
|
||||
reply and looks like the daemon ate it."""
|
||||
inbox = [{
|
||||
"kind": "canvas_user", "activity_id": "act-silent",
|
||||
"arrival_workspace_id": "ws-x", "text": "hi",
|
||||
}]
|
||||
tools = FakeTools(inbox)
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="", session_id="sess", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
assert len(tools.canvas_replies) == 1
|
||||
text, _ = tools.canvas_replies[0]
|
||||
assert text == "(codex returned empty output)"
|
||||
assert tools.popped == ["act-silent"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_canvas_user_falls_back_to_workspace_id(tmp_path):
|
||||
"""The chat-id encoder accepts ``workspace_id`` when
|
||||
``arrival_workspace_id`` is absent — older platform builds use the
|
||||
short field name. Same session keying either way."""
|
||||
inbox = [{
|
||||
"kind": "canvas_user", "activity_id": "act-old-shape",
|
||||
"workspace_id": "ws-legacy", # no arrival_workspace_id
|
||||
"text": "msg",
|
||||
}]
|
||||
tools = FakeTools(inbox)
|
||||
runner = FakeRunner([CodexResult(
|
||||
text="ack", session_id="sess-legacy", exit_code=0, stderr_tail="",
|
||||
)])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
assert tools.canvas_replies == [("ack", "ws-legacy")]
|
||||
assert store.get("canvas:ws-legacy") == "sess-legacy"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"payload",
|
||||
[
|
||||
"not json at all", # JSONDecodeError
|
||||
'"a string, not an object"', # decoded but not a dict
|
||||
'{"timeout": true}', # real timeout sentinel
|
||||
'{"no_activity_id": "field"}', # dict but missing activity_id
|
||||
],
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_payloads_are_silently_dropped(tmp_path, payload):
|
||||
"""Each shape that wait_for_message can return must skip the message
|
||||
handler without raising. Invariant comes from _extract_inbox_message."""
|
||||
class StaticTools(FakeTools):
|
||||
def __init__(self, payload):
|
||||
super().__init__(inbox=[])
|
||||
self._payload = payload
|
||||
|
||||
async def wait_for_message(self, timeout_secs):
|
||||
return self._payload
|
||||
|
||||
tools = StaticTools(payload)
|
||||
runner = FakeRunner(results=[])
|
||||
store = _SessionStore(tmp_path / "sessions.json")
|
||||
|
||||
await run_bridge(runner=runner, tools=tools, session_store=store, iterations=2)
|
||||
|
||||
assert runner.calls == []
|
||||
assert tools.popped == []
|
||||
|
||||
Loading…
Reference in New Issue
Block a user