molecule-ai-workspace-templ.../tests/test_runtime_wedge_mirror.py
Hongming Wang 02e4520cf3 chore(executor): runtime_wedge mirror follow-ups from PR #29 review
Two review nits:

1. Narrow the import-arm catch in _mark_sdk_wedged and
   _clear_sdk_wedge_on_success to (ImportError, ModuleNotFoundError).
   The bare `except Exception:` would have masked an AttributeError /
   TypeError from a runtime_wedge API rename — silently degrading the
   mirror to "no-op" and making heartbeat + the smoke gate (#131)
   blind to claude-code wedges. The structural snapshot test in
   molecule-core (task #169) catches the rename at PR-time. Older
   runtimes that don't ship runtime_wedge at all still hit ImportError
   and silently no-op — the local sticky flag still gates is_wedged()
   inside this module so internal callers keep working.

2. Add mirror-CALL-failure injection tests. The recorder used by the
   original tests never raised, so the inner try around
   _mark_runtime_wedged(reason) (and the symmetric clear) wasn't
   pinned. New tests inject a recorder whose mark/clear raise on call,
   then assert: (a) the call attempt was recorded, (b) the local
   sticky flag stayed correct, (c) the failure was logged at ERROR.
   Pins both the contract ("mirror is best-effort, local is source of
   truth") AND the operator-visible signal (an ERROR log line is the
   only way to see a silent mirror regression).

Regression-injection-checked: removing the call-side try arm makes
both new tests fail with clear messages. Tests: 7 in
test_runtime_wedge_mirror.py, 45 across the whole tests/ tree.
2026-05-01 18:04:24 -07:00

328 lines
14 KiB
Python

"""Pin _mark_sdk_wedged + _clear_sdk_wedge_on_success mirror into
molecule_runtime.runtime_wedge.
The local _sdk_wedged_reason flag (module-level in claude_sdk_executor)
must be mirrored into the universal runtime_wedge module so two
consumers can observe the wedge:
1. Heartbeat (workspace/heartbeat.py:_runtime_state_payload) — flips
workspace status to `degraded` on the canvas. WITHOUT the mirror,
a wedged workspace stays green-dot while every chat hangs.
2. Boot smoke (workspace/smoke_mode.py:run_executor_smoke) — task
#131. Catches PR-25-class regressions (malformed CLI argv → SDK
init wedge) BEFORE the broken image ships to GHCR. WITHOUT the
mirror, the smoke sees the outer wait_for time out and reports
PASS even though the runtime self-reported wedged.
Stubs molecule_runtime.runtime_wedge as a recorder, then asserts the
mirror calls land. Regression-injection-checked: deleting either of
the new try/except blocks in _mark_sdk_wedged / _clear_sdk_wedge_on_success
makes these tests fail with a clear message naming the missing call.
"""
import os
import sys
import types
from unittest.mock import MagicMock
# ---- Stubs ----
#
# claude_sdk_executor.py imports a tall stack at module load. We
# replace each with the minimum surface needed so the test file runs
# in CI without the real packages installed. Patterns mirror
# test_dev_channels_flag.py — same _ensure_module/_ensure_attr
# helpers so a real-package install on workstation still wins over
# the stubs.
def _ensure_module(dotted: str) -> types.ModuleType:
if dotted not in sys.modules:
sys.modules[dotted] = types.ModuleType(dotted)
return sys.modules[dotted]
def _ensure_attr(mod: types.ModuleType, name: str, value: object) -> None:
if not hasattr(mod, name):
setattr(mod, name, value)
def _install_executor_stubs():
"""Mirror of test_dev_channels_flag._install_stubs — same surface."""
sdk = _ensure_module("claude_agent_sdk")
_ensure_attr(sdk, "ClaudeAgentOptions", MagicMock(name="ClaudeAgentOptions"))
_ensure_attr(sdk, "AssistantMessage", type("AssistantMessage", (), {}))
_ensure_attr(sdk, "TextBlock", type("TextBlock", (), {}))
_ensure_attr(sdk, "ResultMessage", type("ResultMessage", (), {}))
_ensure_attr(sdk, "query", MagicMock(name="query"))
_ensure_module("a2a")
_ensure_module("a2a.server")
a2a_exec = _ensure_module("a2a.server.agent_execution")
_ensure_attr(a2a_exec, "AgentExecutor", type("AgentExecutor", (), {}))
_ensure_attr(a2a_exec, "RequestContext", type("RequestContext", (), {}))
a2a_events = _ensure_module("a2a.server.events")
_ensure_attr(a2a_events, "EventQueue", type("EventQueue", (), {}))
a2a_helpers = _ensure_module("a2a.helpers")
_ensure_attr(a2a_helpers, "new_text_message", lambda *_a, **_kw: None)
_ensure_module("molecule_runtime")
helpers = _ensure_module("molecule_runtime.executor_helpers")
_ensure_attr(helpers, "CONFIG_MOUNT", "/configs")
_ensure_attr(helpers, "WORKSPACE_MOUNT", "/workspace")
_ensure_attr(helpers, "MEMORY_CONTENT_MAX_CHARS", 10000)
_ensure_attr(helpers, "auto_push_hook", lambda *a, **kw: None)
_ensure_attr(helpers, "brief_summary", lambda *a, **kw: "")
_ensure_attr(helpers, "collect_outbound_files", lambda *a, **kw: [])
_ensure_attr(helpers, "commit_memory", lambda *a, **kw: None)
_ensure_attr(helpers, "extract_attached_files", lambda *a, **kw: [])
_ensure_attr(helpers, "extract_message_text", lambda *a, **kw: "")
_ensure_attr(helpers, "get_a2a_instructions", lambda **kw: "")
_ensure_attr(helpers, "get_hma_instructions", lambda *a, **kw: "")
_ensure_attr(helpers, "get_mcp_server_path", lambda *a, **kw: "/dev/null")
_ensure_attr(helpers, "get_system_prompt", lambda *a, **kw: "")
_ensure_attr(helpers, "read_delegation_results", lambda *a, **kw: "")
_ensure_attr(helpers, "recall_memories", lambda *a, **kw: "")
_ensure_attr(helpers, "sanitize_agent_error", lambda e: str(e))
_ensure_attr(helpers, "set_current_task", lambda *a, **kw: None)
def _install_runtime_wedge_recorder() -> dict:
"""Replace molecule_runtime.runtime_wedge with a recorder that
captures every (mark_wedged|clear_wedge) call. Returns the recorder
dict so tests can assert on it. Forces a fresh module each time so
state from a previous test doesn't bleed in."""
rec = {"mark_calls": [], "clear_calls": 0}
mod = types.ModuleType("molecule_runtime.runtime_wedge")
def _mark(reason: str) -> None:
rec["mark_calls"].append(reason)
def _clear() -> None:
rec["clear_calls"] += 1
mod.mark_wedged = _mark
mod.clear_wedge = _clear
sys.modules["molecule_runtime.runtime_wedge"] = mod
return rec
def _load_executor():
"""Re-import claude_sdk_executor with fresh stubs."""
_install_executor_stubs()
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
sys.modules.pop("claude_sdk_executor", None)
import claude_sdk_executor # noqa: WPS433
return claude_sdk_executor
# ─── Mirror tests ─────────────────────────────────────────────────────
def test_mark_sdk_wedged_mirrors_into_runtime_wedge():
"""_mark_sdk_wedged must call runtime_wedge.mark_wedged with the
same reason. Heartbeat reads runtime_wedge — without this mirror
the canvas keeps showing green-dot while every chat hangs."""
rec = _install_runtime_wedge_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
mod._mark_sdk_wedged("claude SDK init timeout — restart workspace")
assert rec["mark_calls"] == [
"claude SDK init timeout — restart workspace",
], (
"_mark_sdk_wedged did not mirror into runtime_wedge.mark_wedged. "
"Heartbeat + smoke_mode (#131) both observe the universal flag — "
"without the mirror, a wedged workspace looks healthy to both."
)
# Local flag should still be set — mirror is additive, not a replacement.
assert mod.is_wedged() is True
assert mod.wedge_reason() == "claude SDK init timeout — restart workspace"
def test_mark_sdk_wedged_first_call_wins_for_mirror_too():
"""The local flag has first-wins semantics so a transient secondary
wedge can't overwrite a more specific initial reason. The mirror
must follow the same rule — otherwise heartbeat banner text could
flip mid-incident."""
rec = _install_runtime_wedge_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
mod._mark_sdk_wedged("specific initial reason — restart workspace")
mod._mark_sdk_wedged("generic later reason")
assert rec["mark_calls"] == ["specific initial reason — restart workspace"], (
"Mirror fired more than once across repeated _mark_sdk_wedged calls. "
"Local flag has first-wins; mirror must too, or the canvas banner "
"and smoke gate will see the wrong reason."
)
def test_clear_sdk_wedge_on_success_mirrors_into_runtime_wedge():
"""Clear must propagate too — otherwise a transient wedge that the
next successful turn would clear locally would leave the universal
flag latched, and the workspace would stay degraded forever
(heartbeat would never report runtime_state empty)."""
rec = _install_runtime_wedge_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
mod._mark_sdk_wedged("transient blip")
mod._clear_sdk_wedge_on_success()
assert rec["clear_calls"] == 1, (
"_clear_sdk_wedge_on_success did not mirror into runtime_wedge.clear_wedge. "
"Local clear without mirror = workspace stays degraded forever after "
"an observed-success recovery."
)
assert mod.is_wedged() is False
def test_clear_when_not_wedged_does_not_call_runtime_wedge():
"""No-op symmetry: if local flag wasn't set, the mirror must not
fire either. Avoids clearing a wedge that some OTHER adapter set
in the same process (forward-cover for the future per-org
multi-executor design hinted at in the module docstring)."""
rec = _install_runtime_wedge_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
mod._clear_sdk_wedge_on_success()
assert rec["clear_calls"] == 0, (
"_clear_sdk_wedge_on_success fired the mirror even though the "
"local flag wasn't set — would stomp on a peer adapter's wedge "
"in a multi-executor setup."
)
def test_mirror_swallows_runtime_wedge_import_error():
"""Older runtime versions (pre-task-#131 wheel) don't ship
runtime_wedge. The mirror call must swallow ImportError so a
template pinned to an older runtime keeps booting — the local
sticky flag still gates is_wedged() inside this module so the
retry loop / cancel handler keep working."""
# Install all the executor stubs then explicitly REMOVE the
# runtime_wedge submodule so the import inside _mark_sdk_wedged
# raises ImportError.
_install_executor_stubs()
sys.modules.pop("molecule_runtime.runtime_wedge", None)
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
sys.modules.pop("claude_sdk_executor", None)
import claude_sdk_executor as mod # noqa: WPS433
mod._reset_sdk_wedge_for_test()
# Should not raise even though runtime_wedge import will fail.
mod._mark_sdk_wedged("init timeout")
assert mod.is_wedged() is True
assert mod.wedge_reason() == "init timeout"
# Clear path also swallows.
mod._clear_sdk_wedge_on_success()
assert mod.is_wedged() is False
# ─── Mirror-call-failure injection (review follow-up) ──────────────────
#
# The recorder above never raises, so the inner `try` arm around
# `_mark_runtime_wedged(reason)` (and the symmetric clear) wasn't
# pinned by the original mirror tests. Inject a recorder whose
# call-side raises so the catch arm is exercised: the mirror failure
# must be logged but must NOT suppress the local sticky flag.
def _install_runtime_wedge_raising_recorder() -> dict:
"""Replace molecule_runtime.runtime_wedge with a recorder whose
mark_wedged + clear_wedge implementations RAISE on call (not on
import). Captures the call-attempt count so the test can verify
the catch arm fired without leaking the exception. Returns the
recorder dict (mark_attempts, clear_attempts)."""
rec = {"mark_attempts": 0, "clear_attempts": 0}
mod = types.ModuleType("molecule_runtime.runtime_wedge")
def _mark(_reason: str) -> None:
rec["mark_attempts"] += 1
raise RuntimeError("simulated runtime_wedge.mark_wedged internal raise")
def _clear() -> None:
rec["clear_attempts"] += 1
raise RuntimeError("simulated runtime_wedge.clear_wedge internal raise")
mod.mark_wedged = _mark
mod.clear_wedge = _clear
sys.modules["molecule_runtime.runtime_wedge"] = mod
return rec
def test_mark_sdk_wedged_swallows_mirror_call_exception(caplog):
"""If runtime_wedge.mark_wedged itself raises (signature is fine,
body has a bug), the caller in claude_sdk_executor must log AND
keep the local sticky flag set. Otherwise an internal regression
in runtime_wedge would silently make this workspace appear healthy
while every chat actually hangs.
"""
import logging
rec = _install_runtime_wedge_raising_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
with caplog.at_level(logging.ERROR, logger="claude_sdk_executor"):
mod._mark_sdk_wedged("local-and-mirror reason")
assert rec["mark_attempts"] == 1, (
"executor never called runtime_wedge.mark_wedged — the inner "
"try block was skipped or short-circuited"
)
assert mod.is_wedged() is True, (
"mirror-call exception suppressed the local sticky flag — "
"violates the 'mirror is best-effort, local is source of truth' "
"contract"
)
assert mod.wedge_reason() == "local-and-mirror reason"
# Loud log line is the only operator-visible signal that the mirror
# silently failed — pin its presence so a future logger.exception →
# logger.debug downgrade can't sneak through.
assert any(
"runtime_wedge.mark_wedged mirror failed" in r.message
for r in caplog.records
), "mirror-call failure was not logged at ERROR — operator can't see the regression"
def test_clear_sdk_wedge_on_success_swallows_mirror_call_exception(caplog):
"""Symmetric to the mark test: a runtime_wedge.clear_wedge bug
must not leave the local flag stuck-on (which would make
auto-recovery silently broken even though the SDK started working
again)."""
import logging
rec = _install_runtime_wedge_raising_recorder()
mod = _load_executor()
mod._reset_sdk_wedge_for_test()
mod._mark_sdk_wedged("transient")
# Mark also raised but local flag is set — that's the precondition.
assert mod.is_wedged() is True
rec["mark_attempts"] = 0 # only count the clear attempt below
with caplog.at_level(logging.ERROR, logger="claude_sdk_executor"):
mod._clear_sdk_wedge_on_success()
assert rec["clear_attempts"] == 1, (
"executor never called runtime_wedge.clear_wedge — inner try "
"block was skipped"
)
assert mod.is_wedged() is False, (
"mirror clear-call exception left the local sticky flag set — "
"auto-recovery is silently broken"
)
assert any(
"runtime_wedge.clear_wedge mirror failed" in r.message
for r in caplog.records
), "clear mirror-call failure was not logged at ERROR"