diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index a05823c8..670020c9 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -87,71 +87,26 @@ _RETRYABLE_PATTERNS = ( "try again", ) -# Module-level SDK-wedge flag. When claude_agent_sdk's `query.initialize()` -# raises `Control request timeout: initialize`, the SDK's internal client- -# process state is corrupted for the rest of the Python process — every -# subsequent `_run_query()` call hits the same wedge and re-throws. The -# executor itself can't auto-recover (the underlying CLI subprocess and -# its read pipe are in an unrecoverable state); only a workspace restart -# clears it. +# SDK-wedge state lives in the runtime-side module (runtime_wedge) so +# heartbeat.py and any future cross-cutting consumer can read it without +# importing this adapter-specific executor. Decoupling was the prerequisite +# for moving claude_sdk_executor out of molecule-runtime into the +# claude-code template repo (task #87 — universal-runtime refactor). # -# The heartbeat task reads these helpers and reports -# `runtime_state="wedged"` to the platform, which flips the workspace to -# `degraded` so the canvas surfaces a Restart hint instead of leaving -# the user staring at a green dot while every chat hangs. -# -# Module scope (not instance scope) is deliberate: the wedge is a -# property of the Python process, not the executor. A future per-org -# multi-executor design could move this to a shared registry, but with -# one executor per workspace process today the simplest lock-free -# read+write fits. -_sdk_wedged_reason: str | None = None - - -def is_wedged() -> bool: - """True if the Claude SDK has hit a non-recoverable init wedge in - this process. Sticky until process restart.""" - return _sdk_wedged_reason is not None - - -def wedge_reason() -> str: - """Human-readable description of the wedge cause, or empty string - when not wedged. Surfaced to the canvas via heartbeat sample_error.""" - return _sdk_wedged_reason or "" - - -def _mark_sdk_wedged(reason: str) -> None: - """Internal — flag the SDK as wedged. Only the first call wins - (subsequent identical wedges shouldn't overwrite a more specific - reason). Tests use `_reset_sdk_wedge_for_test()` to clear.""" - global _sdk_wedged_reason - if _sdk_wedged_reason is None: - _sdk_wedged_reason = reason - logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason) - - -def _clear_sdk_wedge_on_success() -> None: - """Auto-recovery — called from _run_query after a successful - completion. The original wedge could be transient (a single network - blip during the SDK's first-message handshake), and a sticky-only - flag would lock the workspace into degraded forever even after the - SDK started working again. Clearing on observed success means the - next heartbeat after a working query reports `runtime_state` empty - and the platform flips status back to online. - - No-op when not wedged (the common case).""" - global _sdk_wedged_reason - if _sdk_wedged_reason is not None: - logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat") - _sdk_wedged_reason = None - - -def _reset_sdk_wedge_for_test() -> None: - """Test-only escape hatch. Production code clears the wedge via - `_clear_sdk_wedge_on_success` when a query succeeds; this helper - is for unit tests that need to reset between cases.""" - global _sdk_wedged_reason - _sdk_wedged_reason = None +# Local re-exports keep the in-file call sites (_run_query etc.) terse +# and preserve the historical names so the behavior is identical to +# the pre-extraction version. is_wedged/wedge_reason are also re-exported +# so any external consumer that imported them from this module keeps +# working — heartbeat.py has been updated to import from runtime_wedge +# directly, but a third-party adapter copying our wedge convention may +# still expect them here. +from runtime_wedge import ( # noqa: E402 + clear_wedge as _clear_sdk_wedge_on_success, + is_wedged, + mark_wedged as _mark_sdk_wedged, + reset_for_test as _reset_sdk_wedge_for_test, + wedge_reason, +) # Per-tool-use summarizers. Reads the most-useful argument from each diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index 6230c93a..e31fba0c 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -22,16 +22,25 @@ from platform_auth import auth_headers, refresh_cache, self_source_headers def _runtime_state_payload() -> dict: """Build the {runtime_state, sample_error} portion of the heartbeat - body when the Claude SDK has hit a wedge. Returns an empty dict - when the runtime is healthy so the heartbeat payload doesn't grow - fields the platform doesn't need. + body when SOME adapter executor has marked itself wedged. Returns + an empty dict when the runtime is healthy so the heartbeat payload + doesn't grow fields the platform doesn't need. - Imported lazily so workspaces running non-Claude runtimes (where - `claude_sdk_executor` may not be importable at all) keep working — - a missing import means "no Claude wedge possible here, healthy." + Source of truth is runtime_wedge (lives in molecule-runtime, + independent of any specific adapter). Pre task #87 this imported + from claude_sdk_executor — that worked because the executor was + bundled into molecule-runtime, but blocked moving it to the + claude-code template repo. The runtime_wedge module is now the + cross-cutting wedge-state holder; adapters mark/clear via it, + heartbeat reads it. + + Imported lazily so a workspace whose runtime image somehow ships + without runtime_wedge (corrupt install, mid-rolling-deploy state) + keeps heartbeating — a missing import means "no wedge info; assume + healthy." """ try: - from claude_sdk_executor import is_wedged, wedge_reason + from runtime_wedge import is_wedged, wedge_reason except Exception: return {} if not is_wedged(): diff --git a/workspace/runtime_wedge.py b/workspace/runtime_wedge.py new file mode 100644 index 00000000..96fad792 --- /dev/null +++ b/workspace/runtime_wedge.py @@ -0,0 +1,99 @@ +"""Per-process runtime-wedge state. + +Adapter executors that hit a non-recoverable wedge (e.g. claude-agent-sdk's +`Control request timeout: initialize` corrupting the client process's +internal state) call mark_wedged(reason). The heartbeat task reads +is_wedged() / wedge_reason() and forwards them in the heartbeat payload's +runtime_state field — the platform then flips workspace status to +`degraded` so the canvas surfaces a Restart hint instead of leaving the +user staring at a green dot while every chat hangs. + +Module scope (not instance scope) is deliberate: the wedge is a property +of the Python process, not any particular executor. With one executor +per workspace process today this is the simplest lock-free +read+write fit. A future per-org multi-executor design could move this +to a shared registry. + +This module lives in molecule-runtime (NOT in any adapter / template +repo) because: + + 1. workspace/heartbeat.py reads it on every heartbeat — cross-cutting + concern, runtime owns it. + 2. Multiple adapter executors can mark themselves wedged with their + own reason; the runtime aggregates one flag for the platform. + 3. Decoupling from claude_sdk_executor is the prerequisite for the + universal-runtime refactor (molecule-core task #87) — without + this extraction, claude_sdk_executor.py couldn't move to its + template repo because heartbeat would lose access to the wedge + state. + +Public API: mark_wedged(reason), clear_wedge(), is_wedged(), +wedge_reason(). The reset_for_test() helper is for unit tests only. +""" +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +# Single-flag state. None = healthy; non-empty string = wedged with that +# human-readable reason. Surfaced verbatim as the canvas's degraded-card +# banner text via heartbeat.sample_error. +_wedged_reason: str | None = None + + +def is_wedged() -> bool: + """True if some adapter executor in this process has marked itself + wedged. Sticky until the same executor calls clear_wedge() on + observed recovery (or the process restarts).""" + return _wedged_reason is not None + + +def wedge_reason() -> str: + """Human-readable description of the wedge cause, or empty string + when not wedged. Surfaced to the canvas via heartbeat sample_error.""" + return _wedged_reason or "" + + +def mark_wedged(reason: str) -> None: + """Flag the runtime as wedged. Only the FIRST call wins so a + subsequent identical-class wedge can't overwrite a more specific + initial reason — the operator-visible banner stays stable. + + Adapters call this from their executor's exception path when the + SDK has hit a non-recoverable error class. Safe to call multiple + times; the no-op when already wedged is intentional. + """ + global _wedged_reason + if _wedged_reason is None: + _wedged_reason = reason + logger.error( + "runtime wedge detected: %s — workspace will report degraded until cleared", + reason, + ) + + +def clear_wedge() -> None: + """Auto-recovery: adapter calls this after an observed successful + operation. The original wedge could be transient (single network + blip during the SDK's first-message handshake), and a sticky-only + flag would lock the workspace into degraded forever even after the + SDK started working again. Clearing on observed success means the + next heartbeat after a working query reports runtime_state empty + and the platform flips status back to online. + + No-op when not wedged (the common case).""" + global _wedged_reason + if _wedged_reason is not None: + logger.info("runtime wedge cleared after successful operation — workspace will recover to online on next heartbeat") + _wedged_reason = None + + +def reset_for_test() -> None: + """Test-only escape hatch. Production code clears the wedge via + clear_wedge() on observed success; this helper is for unit tests + that need to reset between cases without going through the full + SDK round-trip.""" + global _wedged_reason + _wedged_reason = None diff --git a/workspace/tests/test_runtime_wedge.py b/workspace/tests/test_runtime_wedge.py new file mode 100644 index 00000000..0df53e1d --- /dev/null +++ b/workspace/tests/test_runtime_wedge.py @@ -0,0 +1,103 @@ +"""Tests for runtime_wedge — the runtime-side wedge-state module that +heartbeat reads + adapter executors write. Extracted from claude_sdk_ +executor (task #87 universal-runtime refactor) so the executor can move +to its template repo without breaking heartbeat. + +The behavior is identical to the prior in-executor implementation; tests +pin the contract so the re-export shim in claude_sdk_executor.py can +later be deleted without surprise.""" +import pytest + +import runtime_wedge + + +@pytest.fixture(autouse=True) +def _reset(): + """Each test starts with a clean wedge state — production wedges are + sticky-per-process, but cross-test bleed would couple unrelated cases.""" + runtime_wedge.reset_for_test() + yield + runtime_wedge.reset_for_test() + + +class TestRuntimeWedge: + def test_starts_unwedged(self): + assert runtime_wedge.is_wedged() is False + assert runtime_wedge.wedge_reason() == "" + + def test_mark_wedged_sets_flag_and_reason(self): + runtime_wedge.mark_wedged("SDK init timeout") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "SDK init timeout" + + def test_first_mark_wins(self): + # Stable banner text is more important than the most-recent + # cause. A second wedge while already wedged should NOT + # overwrite — operator sees the original (more diagnosable) + # reason, not whatever the SDK said next. + runtime_wedge.mark_wedged("SDK init timeout") + runtime_wedge.mark_wedged("Subsequent identical-class wedge") + assert runtime_wedge.wedge_reason() == "SDK init timeout" + + def test_clear_wedge_restores_healthy(self): + # Auto-recovery: when the SDK starts working again, the next + # heartbeat must report empty runtime_state so the platform + # flips status from degraded back to online. + runtime_wedge.mark_wedged("transient blip") + runtime_wedge.clear_wedge() + assert runtime_wedge.is_wedged() is False + assert runtime_wedge.wedge_reason() == "" + + def test_clear_wedge_when_not_wedged_is_noop(self): + # No-op safety — production calls clear_wedge() on every + # successful query (~thousands of times per session); throwing + # or logging when not wedged would spam. + runtime_wedge.clear_wedge() + runtime_wedge.clear_wedge() # still safe twice in a row + assert runtime_wedge.is_wedged() is False + + def test_re_marking_after_clear_is_allowed(self): + # Real production path: SDK wedges, recovers, wedges again. + # Each cycle should land cleanly (not silently drop). + runtime_wedge.mark_wedged("first wedge") + runtime_wedge.clear_wedge() + runtime_wedge.mark_wedged("second wedge — different reason") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "second wedge — different reason" + + +class TestClaudeSdkExecutorReExportShim: + """claude_sdk_executor.py keeps re-exporting the old names for one + release cycle so any third-party adapter copying our wedge convention + has time to migrate. These tests pin the shim — when removed, the + test file goes too.""" + + def test_is_wedged_re_exported(self): + from claude_sdk_executor import is_wedged + assert is_wedged is runtime_wedge.is_wedged + + def test_wedge_reason_re_exported(self): + from claude_sdk_executor import wedge_reason + assert wedge_reason is runtime_wedge.wedge_reason + + def test_internal_helpers_re_exported(self): + # Keep the underscore names too — claude_sdk_executor's own + # _run_query calls _mark_sdk_wedged / _clear_sdk_wedge_on_success + # via these re-exports. + from claude_sdk_executor import ( + _mark_sdk_wedged, + _clear_sdk_wedge_on_success, + _reset_sdk_wedge_for_test, + ) + assert _mark_sdk_wedged is runtime_wedge.mark_wedged + assert _clear_sdk_wedge_on_success is runtime_wedge.clear_wedge + assert _reset_sdk_wedge_for_test is runtime_wedge.reset_for_test + + def test_re_export_state_is_shared(self): + # The shim isn't a copy — both names refer to the same module + # state. Marking via the executor name must be observable via + # the runtime_wedge name (and vice versa). + from claude_sdk_executor import _mark_sdk_wedged + _mark_sdk_wedged("via executor shim") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "via executor shim"