From b2561aa825379343d0af9a610beeca7dbc5cd7cf Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:52:24 -0700 Subject: [PATCH] feat(executor): mirror SDK wedge into molecule_runtime.runtime_wedge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The local _sdk_wedged_reason flag was only observed inside this module — heartbeat reads runtime_wedge.is_wedged() (universal cross-cutting holder) and so does the new boot-smoke gate from molecule-core PR #2473 / task #131. Without the mirror, a wedged claude-code workspace stayed green-dot on the canvas while every chat hung, AND the publish-image gate could not catch PR-25-class init wedges before the broken image shipped to GHCR. _mark_sdk_wedged now mirrors into runtime_wedge.mark_wedged, and _clear_sdk_wedge_on_success mirrors into runtime_wedge.clear_wedge. Both are best-effort — older runtimes that don't ship runtime_wedge silently no-op the mirror, so a template pinned to an older runtime still boots. Mirror exceptions are logged but don't suppress the local sticky flag, so internal callers (retry loop, cancel handler) see consistent state regardless of the universal-side outcome. Tests cover: mark mirrors with reason, first-call-wins propagates, clear mirrors, no-op when not wedged, ImportError-resilience. Regression-injection-checked: silencing the mirror branch fails the mark+first-wins tests at unit-test time with a clear message naming the missing runtime_wedge call. --- claude_sdk_executor.py | 51 ++++++- tests/test_runtime_wedge_mirror.py | 230 +++++++++++++++++++++++++++++ 2 files changed, 276 insertions(+), 5 deletions(-) create mode 100644 tests/test_runtime_wedge_mirror.py diff --git a/claude_sdk_executor.py b/claude_sdk_executor.py index 99db015..c2a504e 100644 --- a/claude_sdk_executor.py +++ b/claude_sdk_executor.py @@ -95,10 +95,18 @@ _RETRYABLE_PATTERNS = ( # its read pipe are in an unrecoverable state); only a workspace restart # clears it. # -# The heartbeat task reads these helpers and reports -# `runtime_state="wedged"` to the platform, which flips the workspace to -# `degraded` so the canvas surfaces a Restart hint instead of leaving -# the user staring at a green dot while every chat hangs. +# Two consumers read these helpers: +# 1. Heartbeat (via molecule_runtime.runtime_wedge — see _mark_sdk_wedged +# below). Reports `runtime_state="wedged"` to the platform, which +# flips the workspace to `degraded` so the canvas surfaces a Restart +# hint instead of leaving the user staring at a green dot while +# every chat hangs. +# 2. Boot smoke (molecule-core task #131). When the publish-image +# workflow boots the image with MOLECULE_SMOKE_MODE=1, +# run_executor_smoke consults runtime_wedge.is_wedged() at the end +# of every result path and upgrades a provisional PASS to FAIL when +# the flag is set. Catches PR-25-class regressions (malformed CLI +# argv → SDK init wedge) BEFORE the broken image ships to GHCR. # # Module scope (not instance scope) is deliberate: the wedge is a # property of the Python process, not the executor. A future per-org @@ -123,11 +131,33 @@ def wedge_reason() -> str: def _mark_sdk_wedged(reason: str) -> None: """Internal — flag the SDK as wedged. Only the first call wins (subsequent identical wedges shouldn't overwrite a more specific - reason). Tests use `_reset_sdk_wedge_for_test()` to clear.""" + reason). Tests use `_reset_sdk_wedge_for_test()` to clear. + + Mirrors the flag into molecule_runtime.runtime_wedge — that's the + universal cross-cutting wedge holder that heartbeat.py reads (to + flip the workspace to `degraded`) and that smoke_mode reads (to + fail the publish-image gate on init wedges, task #131). Without + this mirror the local sticky flag is unobserved by both consumers. + Best-effort: a missing/older runtime that doesn't ship runtime_wedge + silently no-ops the mirror — the local flag still gates + is_wedged() inside this module so internal callers (retry loop, + cancel handler) keep working. + """ global _sdk_wedged_reason if _sdk_wedged_reason is None: _sdk_wedged_reason = reason logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason) + try: + from molecule_runtime.runtime_wedge import mark_wedged as _mark_runtime_wedged + except Exception: + return + try: + _mark_runtime_wedged(reason) + except Exception: + # Mirror is best-effort — a runtime_wedge regression + # (signature change, internal raise) must not silently + # suppress the local wedge state. + logger.exception("runtime_wedge.mark_wedged mirror failed — local SDK wedge flag is still set") def _clear_sdk_wedge_on_success() -> None: @@ -139,11 +169,22 @@ def _clear_sdk_wedge_on_success() -> None: next heartbeat after a working query reports `runtime_state` empty and the platform flips status back to online. + Symmetric with _mark_sdk_wedged: also clears the universal + runtime_wedge flag so heartbeat + smoke_mode see the same state. + No-op when not wedged (the common case).""" global _sdk_wedged_reason if _sdk_wedged_reason is not None: logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat") _sdk_wedged_reason = None + try: + from molecule_runtime.runtime_wedge import clear_wedge as _clear_runtime_wedge + except Exception: + return + try: + _clear_runtime_wedge() + except Exception: + logger.exception("runtime_wedge.clear_wedge mirror failed — local clear succeeded") def _reset_sdk_wedge_for_test() -> None: diff --git a/tests/test_runtime_wedge_mirror.py b/tests/test_runtime_wedge_mirror.py new file mode 100644 index 0000000..4c714bc --- /dev/null +++ b/tests/test_runtime_wedge_mirror.py @@ -0,0 +1,230 @@ +"""Pin _mark_sdk_wedged + _clear_sdk_wedge_on_success mirror into +molecule_runtime.runtime_wedge. + +The local _sdk_wedged_reason flag (module-level in claude_sdk_executor) +must be mirrored into the universal runtime_wedge module so two +consumers can observe the wedge: + + 1. Heartbeat (workspace/heartbeat.py:_runtime_state_payload) — flips + workspace status to `degraded` on the canvas. WITHOUT the mirror, + a wedged workspace stays green-dot while every chat hangs. + + 2. Boot smoke (workspace/smoke_mode.py:run_executor_smoke) — task + #131. Catches PR-25-class regressions (malformed CLI argv → SDK + init wedge) BEFORE the broken image ships to GHCR. WITHOUT the + mirror, the smoke sees the outer wait_for time out and reports + PASS even though the runtime self-reported wedged. + +Stubs molecule_runtime.runtime_wedge as a recorder, then asserts the +mirror calls land. Regression-injection-checked: deleting either of +the new try/except blocks in _mark_sdk_wedged / _clear_sdk_wedge_on_success +makes these tests fail with a clear message naming the missing call. +""" + +import os +import sys +import types +from unittest.mock import MagicMock + + +# ---- Stubs ---- +# +# claude_sdk_executor.py imports a tall stack at module load. We +# replace each with the minimum surface needed so the test file runs +# in CI without the real packages installed. Patterns mirror +# test_dev_channels_flag.py — same _ensure_module/_ensure_attr +# helpers so a real-package install on workstation still wins over +# the stubs. + + +def _ensure_module(dotted: str) -> types.ModuleType: + if dotted not in sys.modules: + sys.modules[dotted] = types.ModuleType(dotted) + return sys.modules[dotted] + + +def _ensure_attr(mod: types.ModuleType, name: str, value: object) -> None: + if not hasattr(mod, name): + setattr(mod, name, value) + + +def _install_executor_stubs(): + """Mirror of test_dev_channels_flag._install_stubs — same surface.""" + sdk = _ensure_module("claude_agent_sdk") + _ensure_attr(sdk, "ClaudeAgentOptions", MagicMock(name="ClaudeAgentOptions")) + _ensure_attr(sdk, "AssistantMessage", type("AssistantMessage", (), {})) + _ensure_attr(sdk, "TextBlock", type("TextBlock", (), {})) + _ensure_attr(sdk, "ResultMessage", type("ResultMessage", (), {})) + _ensure_attr(sdk, "query", MagicMock(name="query")) + + _ensure_module("a2a") + _ensure_module("a2a.server") + a2a_exec = _ensure_module("a2a.server.agent_execution") + _ensure_attr(a2a_exec, "AgentExecutor", type("AgentExecutor", (), {})) + _ensure_attr(a2a_exec, "RequestContext", type("RequestContext", (), {})) + a2a_events = _ensure_module("a2a.server.events") + _ensure_attr(a2a_events, "EventQueue", type("EventQueue", (), {})) + a2a_helpers = _ensure_module("a2a.helpers") + _ensure_attr(a2a_helpers, "new_text_message", lambda *_a, **_kw: None) + + _ensure_module("molecule_runtime") + helpers = _ensure_module("molecule_runtime.executor_helpers") + _ensure_attr(helpers, "CONFIG_MOUNT", "/configs") + _ensure_attr(helpers, "WORKSPACE_MOUNT", "/workspace") + _ensure_attr(helpers, "MEMORY_CONTENT_MAX_CHARS", 10000) + _ensure_attr(helpers, "auto_push_hook", lambda *a, **kw: None) + _ensure_attr(helpers, "brief_summary", lambda *a, **kw: "") + _ensure_attr(helpers, "collect_outbound_files", lambda *a, **kw: []) + _ensure_attr(helpers, "commit_memory", lambda *a, **kw: None) + _ensure_attr(helpers, "extract_attached_files", lambda *a, **kw: []) + _ensure_attr(helpers, "extract_message_text", lambda *a, **kw: "") + _ensure_attr(helpers, "get_a2a_instructions", lambda **kw: "") + _ensure_attr(helpers, "get_hma_instructions", lambda *a, **kw: "") + _ensure_attr(helpers, "get_mcp_server_path", lambda *a, **kw: "/dev/null") + _ensure_attr(helpers, "get_system_prompt", lambda *a, **kw: "") + _ensure_attr(helpers, "read_delegation_results", lambda *a, **kw: "") + _ensure_attr(helpers, "recall_memories", lambda *a, **kw: "") + _ensure_attr(helpers, "sanitize_agent_error", lambda e: str(e)) + _ensure_attr(helpers, "set_current_task", lambda *a, **kw: None) + + +def _install_runtime_wedge_recorder() -> dict: + """Replace molecule_runtime.runtime_wedge with a recorder that + captures every (mark_wedged|clear_wedge) call. Returns the recorder + dict so tests can assert on it. Forces a fresh module each time so + state from a previous test doesn't bleed in.""" + rec = {"mark_calls": [], "clear_calls": 0} + mod = types.ModuleType("molecule_runtime.runtime_wedge") + + def _mark(reason: str) -> None: + rec["mark_calls"].append(reason) + + def _clear() -> None: + rec["clear_calls"] += 1 + + mod.mark_wedged = _mark + mod.clear_wedge = _clear + sys.modules["molecule_runtime.runtime_wedge"] = mod + return rec + + +def _load_executor(): + """Re-import claude_sdk_executor with fresh stubs.""" + _install_executor_stubs() + parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + sys.modules.pop("claude_sdk_executor", None) + import claude_sdk_executor # noqa: WPS433 + return claude_sdk_executor + + +# ─── Mirror tests ───────────────────────────────────────────────────── + + +def test_mark_sdk_wedged_mirrors_into_runtime_wedge(): + """_mark_sdk_wedged must call runtime_wedge.mark_wedged with the + same reason. Heartbeat reads runtime_wedge — without this mirror + the canvas keeps showing green-dot while every chat hangs.""" + rec = _install_runtime_wedge_recorder() + mod = _load_executor() + mod._reset_sdk_wedge_for_test() + + mod._mark_sdk_wedged("claude SDK init timeout — restart workspace") + + assert rec["mark_calls"] == [ + "claude SDK init timeout — restart workspace", + ], ( + "_mark_sdk_wedged did not mirror into runtime_wedge.mark_wedged. " + "Heartbeat + smoke_mode (#131) both observe the universal flag — " + "without the mirror, a wedged workspace looks healthy to both." + ) + # Local flag should still be set — mirror is additive, not a replacement. + assert mod.is_wedged() is True + assert mod.wedge_reason() == "claude SDK init timeout — restart workspace" + + +def test_mark_sdk_wedged_first_call_wins_for_mirror_too(): + """The local flag has first-wins semantics so a transient secondary + wedge can't overwrite a more specific initial reason. The mirror + must follow the same rule — otherwise heartbeat banner text could + flip mid-incident.""" + rec = _install_runtime_wedge_recorder() + mod = _load_executor() + mod._reset_sdk_wedge_for_test() + + mod._mark_sdk_wedged("specific initial reason — restart workspace") + mod._mark_sdk_wedged("generic later reason") + + assert rec["mark_calls"] == ["specific initial reason — restart workspace"], ( + "Mirror fired more than once across repeated _mark_sdk_wedged calls. " + "Local flag has first-wins; mirror must too, or the canvas banner " + "and smoke gate will see the wrong reason." + ) + + +def test_clear_sdk_wedge_on_success_mirrors_into_runtime_wedge(): + """Clear must propagate too — otherwise a transient wedge that the + next successful turn would clear locally would leave the universal + flag latched, and the workspace would stay degraded forever + (heartbeat would never report runtime_state empty).""" + rec = _install_runtime_wedge_recorder() + mod = _load_executor() + mod._reset_sdk_wedge_for_test() + mod._mark_sdk_wedged("transient blip") + + mod._clear_sdk_wedge_on_success() + + assert rec["clear_calls"] == 1, ( + "_clear_sdk_wedge_on_success did not mirror into runtime_wedge.clear_wedge. " + "Local clear without mirror = workspace stays degraded forever after " + "an observed-success recovery." + ) + assert mod.is_wedged() is False + + +def test_clear_when_not_wedged_does_not_call_runtime_wedge(): + """No-op symmetry: if local flag wasn't set, the mirror must not + fire either. Avoids clearing a wedge that some OTHER adapter set + in the same process (forward-cover for the future per-org + multi-executor design hinted at in the module docstring).""" + rec = _install_runtime_wedge_recorder() + mod = _load_executor() + mod._reset_sdk_wedge_for_test() + + mod._clear_sdk_wedge_on_success() + + assert rec["clear_calls"] == 0, ( + "_clear_sdk_wedge_on_success fired the mirror even though the " + "local flag wasn't set — would stomp on a peer adapter's wedge " + "in a multi-executor setup." + ) + + +def test_mirror_swallows_runtime_wedge_import_error(): + """Older runtime versions (pre-task-#131 wheel) don't ship + runtime_wedge. The mirror call must swallow ImportError so a + template pinned to an older runtime keeps booting — the local + sticky flag still gates is_wedged() inside this module so the + retry loop / cancel handler keep working.""" + # Install all the executor stubs then explicitly REMOVE the + # runtime_wedge submodule so the import inside _mark_sdk_wedged + # raises ImportError. + _install_executor_stubs() + sys.modules.pop("molecule_runtime.runtime_wedge", None) + + parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + sys.modules.pop("claude_sdk_executor", None) + import claude_sdk_executor as mod # noqa: WPS433 + mod._reset_sdk_wedge_for_test() + + # Should not raise even though runtime_wedge import will fail. + mod._mark_sdk_wedged("init timeout") + assert mod.is_wedged() is True + assert mod.wedge_reason() == "init timeout" + + # Clear path also swallows. + mod._clear_sdk_wedge_on_success() + assert mod.is_wedged() is False