diff --git a/workspace/adapter_base.py b/workspace/adapter_base.py index 70a46e38..6442ecc0 100644 --- a/workspace/adapter_base.py +++ b/workspace/adapter_base.py @@ -116,11 +116,21 @@ class BaseAdapter(ABC): """Interface every agent infrastructure adapter must implement. To add a new agent infra: - 1. Create workspace/adapters// + 1. Create a standalone template repo (molecule-ai-workspace-template-) 2. Implement adapter.py with a class extending BaseAdapter - 3. Add requirements.txt with your infra's dependencies - 4. Export as Adapter in __init__.py - 5. Submit a PR + 3. Add requirements.txt with your infra's dependencies + molecule-runtime + 4. Set ADAPTER_MODULE in the Dockerfile to your adapter module path + + Cross-cutting capabilities your adapter can opt into: + - capabilities() — declare native ownership of heartbeat, scheduler, + session, status mgmt, etc. (see RuntimeCapabilities above) + - idle_timeout_override() — extend the platform's per-dispatch + silence window for SDKs with long synth turns + - runtime_wedge.mark_wedged() / clear_wedge() — flip the workspace + to `degraded` + auto-recover when your SDK hits a non-recoverable + error class. Import directly from `runtime_wedge`; the heartbeat + forwards the state to the platform automatically. See the + runtime_wedge module docstring for the integration recipe. """ @staticmethod diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index a05823c8..57d53643 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -87,71 +87,29 @@ _RETRYABLE_PATTERNS = ( "try again", ) -# Module-level SDK-wedge flag. When claude_agent_sdk's `query.initialize()` -# raises `Control request timeout: initialize`, the SDK's internal client- -# process state is corrupted for the rest of the Python process — every -# subsequent `_run_query()` call hits the same wedge and re-throws. The -# executor itself can't auto-recover (the underlying CLI subprocess and -# its read pipe are in an unrecoverable state); only a workspace restart -# clears it. -# -# The heartbeat task reads these helpers and reports -# `runtime_state="wedged"` to the platform, which flips the workspace to -# `degraded` so the canvas surfaces a Restart hint instead of leaving -# the user staring at a green dot while every chat hangs. -# -# Module scope (not instance scope) is deliberate: the wedge is a -# property of the Python process, not the executor. A future per-org -# multi-executor design could move this to a shared registry, but with -# one executor per workspace process today the simplest lock-free -# read+write fits. -_sdk_wedged_reason: str | None = None +# Wedge state moved to runtime_wedge (see that module's docstring for +# the rationale + the broader "Compatibility shim" note). This block +# re-exports under the historical names so the in-file call sites in +# _run_query stay terse and any external consumer that imported them +# from claude_sdk_executor keeps working for one release cycle. +from runtime_wedge import ( # noqa: E402 + clear_wedge as _clear_sdk_wedge_on_success, + is_wedged, + mark_wedged as _mark_sdk_wedged, + reset_for_test as _reset_sdk_wedge_for_test, + wedge_reason, +) - -def is_wedged() -> bool: - """True if the Claude SDK has hit a non-recoverable init wedge in - this process. Sticky until process restart.""" - return _sdk_wedged_reason is not None - - -def wedge_reason() -> str: - """Human-readable description of the wedge cause, or empty string - when not wedged. Surfaced to the canvas via heartbeat sample_error.""" - return _sdk_wedged_reason or "" - - -def _mark_sdk_wedged(reason: str) -> None: - """Internal — flag the SDK as wedged. Only the first call wins - (subsequent identical wedges shouldn't overwrite a more specific - reason). Tests use `_reset_sdk_wedge_for_test()` to clear.""" - global _sdk_wedged_reason - if _sdk_wedged_reason is None: - _sdk_wedged_reason = reason - logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason) - - -def _clear_sdk_wedge_on_success() -> None: - """Auto-recovery — called from _run_query after a successful - completion. The original wedge could be transient (a single network - blip during the SDK's first-message handshake), and a sticky-only - flag would lock the workspace into degraded forever even after the - SDK started working again. Clearing on observed success means the - next heartbeat after a working query reports `runtime_state` empty - and the platform flips status back to online. - - No-op when not wedged (the common case).""" - global _sdk_wedged_reason - if _sdk_wedged_reason is not None: - logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat") - _sdk_wedged_reason = None - - -def _reset_sdk_wedge_for_test() -> None: - """Test-only escape hatch. Production code clears the wedge via - `_clear_sdk_wedge_on_success` when a query succeeds; this helper - is for unit tests that need to reset between cases.""" - global _sdk_wedged_reason - _sdk_wedged_reason = None +# Names below are re-exported (not consumed inside this file) for +# backwards compatibility with third-party adapters that imported them +# from claude_sdk_executor before the wedge state moved to runtime_wedge. +# Listing them in __all__ marks the intent explicitly and stops static +# analysis from flagging the imports as unused. +__all__ = [ + "is_wedged", + "wedge_reason", + "_reset_sdk_wedge_for_test", +] # Per-tool-use summarizers. Reads the most-useful argument from each diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index 6230c93a..e31fba0c 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -22,16 +22,25 @@ from platform_auth import auth_headers, refresh_cache, self_source_headers def _runtime_state_payload() -> dict: """Build the {runtime_state, sample_error} portion of the heartbeat - body when the Claude SDK has hit a wedge. Returns an empty dict - when the runtime is healthy so the heartbeat payload doesn't grow - fields the platform doesn't need. + body when SOME adapter executor has marked itself wedged. Returns + an empty dict when the runtime is healthy so the heartbeat payload + doesn't grow fields the platform doesn't need. - Imported lazily so workspaces running non-Claude runtimes (where - `claude_sdk_executor` may not be importable at all) keep working — - a missing import means "no Claude wedge possible here, healthy." + Source of truth is runtime_wedge (lives in molecule-runtime, + independent of any specific adapter). Pre task #87 this imported + from claude_sdk_executor — that worked because the executor was + bundled into molecule-runtime, but blocked moving it to the + claude-code template repo. The runtime_wedge module is now the + cross-cutting wedge-state holder; adapters mark/clear via it, + heartbeat reads it. + + Imported lazily so a workspace whose runtime image somehow ships + without runtime_wedge (corrupt install, mid-rolling-deploy state) + keeps heartbeating — a missing import means "no wedge info; assume + healthy." """ try: - from claude_sdk_executor import is_wedged, wedge_reason + from runtime_wedge import is_wedged, wedge_reason except Exception: return {} if not is_wedged(): diff --git a/workspace/runtime_wedge.py b/workspace/runtime_wedge.py new file mode 100644 index 00000000..ffc7f90f --- /dev/null +++ b/workspace/runtime_wedge.py @@ -0,0 +1,203 @@ +"""Per-process runtime-wedge state. + +Adapter executors that hit a non-recoverable wedge (e.g. claude-agent-sdk's +`Control request timeout: initialize` corrupting the client process's +internal state) call mark_wedged(reason). The heartbeat task reads +is_wedged() / wedge_reason() and forwards them in the heartbeat payload's +runtime_state field — the platform then flips workspace status to +`degraded` so the canvas surfaces a Restart hint instead of leaving the +user staring at a green dot while every chat hangs. + +Module scope (not instance scope) is deliberate: the wedge is a property +of the Python process, not any particular executor. With one executor +per workspace process today this is the simplest lock-free +read+write fit. A future per-org multi-executor design could move this +to a shared registry. + +This module lives in molecule-runtime (NOT in any adapter / template +repo) because: + + 1. workspace/heartbeat.py reads it on every heartbeat — cross-cutting + concern, runtime owns it. + 2. Multiple adapter executors can mark themselves wedged with their + own reason; the runtime aggregates one flag for the platform. + 3. Decoupling from claude_sdk_executor is the prerequisite for the + universal-runtime refactor (molecule-core task #87) — without + this extraction, claude_sdk_executor.py couldn't move to its + template repo because heartbeat would lose access to the wedge + state. + +Public API: mark_wedged(reason), clear_wedge(), is_wedged(), +wedge_reason(). The reset_for_test() helper is for unit tests only. + +How to use from a NEW adapter (template repo) +--------------------------------------------- + +Hermes, Codex, LangGraph, or any future adapter that wants the same +"flip-to-degraded-on-fatal-wedge" UX should call mark_wedged + clear_wedge +from its executor. The runtime imports + heartbeat plumbing are already +in place — adapters do not change anything in molecule-runtime. + +Minimum integration (~6 LOC inside the executor): + + # Import path: + # - In a TEMPLATE repo (the common case for new adapters), the + # runtime is installed via PyPI as `molecule-ai-workspace-runtime`, + # so the import is `from molecule_runtime.runtime_wedge import …`. + # - In molecule-core itself (when editing this repo's own + # workspace/ tree), the module is at the top level — import as + # `from runtime_wedge import …`. + from molecule_runtime.runtime_wedge import mark_wedged, clear_wedge + + async def execute(self, ctx, queue): + try: + result = await self._run_query(ctx) + except SomeFatalSdkError as e: + # Pick a short, operator-actionable reason. This becomes the + # banner text on the canvas's degraded card — keep it under + # ~80 chars and name the recovery action when possible. + mark_wedged(f"hermes init timeout — restart workspace ({e})") + raise + clear_wedge() # observed-success → next heartbeat reports healthy + return result + +What you get for free: + - Heartbeat payload sets runtime_state="wedged" + sample_error= + on the next 30s tick. + - registry.go's evaluateStatus flips the workspace to `degraded` and + broadcasts WORKSPACE_DEGRADED so the canvas card turns yellow with + your reason as the subtitle. + - clear_wedge() on the next successful turn flips the workspace back + to `online` automatically — no manual operator action. + +What NOT to do: + - Don't store wedge state in your adapter module. The platform-side + consumer (heartbeat) imports from runtime_wedge by name; an adapter- + local copy won't be observed. + - Don't call mark_wedged for transient errors (rate limits, single + failed network call). The whole point is "the SDK process is in a + state that can only be cleared by restart" — false positives + train operators to ignore the degraded banner. + - Don't write your own clear logic. clear_wedge() is the only path + the heartbeat watches; a custom flag won't propagate. + +When wedge is the WRONG primitive: if the failure is per-request (the +SDK works for some inputs but not others), surface as a normal A2A +error response, not a wedge. Wedge means "every subsequent request in +this process will fail until restart." + +Compatibility shim (will be removed once #87 Phase 2 lands) +----------------------------------------------------------- + +claude_sdk_executor.py re-exports the four functions under the historical +names (is_wedged, wedge_reason, _mark_sdk_wedged, _clear_sdk_wedge_on_success) +for one release cycle. New adapter code should import from runtime_wedge +directly; the shim only exists so existing third-party adapters that +copied our claude_sdk_executor wedge convention have time to migrate. +""" +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +class _WedgeState: + """Internal carrier for the wedge flag. Exposed only via the module- + level helpers below; adapters never see this class. + + Wrapping the state in a class (instead of a bare module-level global) + is forward-cover for the day a runtime hosts multiple executors per + process — a future per-scope variant can hand out keyed instances + without changing the public mark_wedged / clear_wedge / is_wedged / + wedge_reason API. Today there's exactly one instance (_DEFAULT). + """ + + def __init__(self) -> None: + # None = healthy; non-empty string = wedged with that human- + # readable reason. Surfaced verbatim as the canvas's degraded- + # card banner text via heartbeat.sample_error. + self._reason: str | None = None + + def is_wedged(self) -> bool: + return self._reason is not None + + def reason(self) -> str: + return self._reason or "" + + def mark(self, reason: str) -> None: + # First-write-wins: a subsequent identical-class wedge can't + # overwrite a more specific initial reason so the operator- + # visible banner stays stable. + if self._reason is None: + self._reason = reason + logger.error( + "runtime wedge detected: %s — workspace will report degraded until cleared", + reason, + ) + + def clear(self) -> None: + # No-op when not wedged (the common case — adapters call this + # on every successful query). + if self._reason is not None: + logger.info( + "runtime wedge cleared after successful operation — workspace will recover to online on next heartbeat", + ) + self._reason = None + + def reset(self) -> None: + # Unconditional clear — for test fixtures only. Skips the + # info-level log line the production clear() path emits. + self._reason = None + + +# Single shared instance backing the module-level helpers. Today there's +# one executor per workspace process so this fits perfectly; the class +# wrap above is the seam for any future per-scope variant. +_DEFAULT = _WedgeState() + + +def is_wedged() -> bool: + """True if some adapter executor in this process has marked itself + wedged. Sticky until the same executor calls clear_wedge() on + observed recovery (or the process restarts).""" + return _DEFAULT.is_wedged() + + +def wedge_reason() -> str: + """Human-readable description of the wedge cause, or empty string + when not wedged. Surfaced to the canvas via heartbeat sample_error.""" + return _DEFAULT.reason() + + +def mark_wedged(reason: str) -> None: + """Flag the runtime as wedged. Only the FIRST call wins so a + subsequent identical-class wedge can't overwrite a more specific + initial reason — the operator-visible banner stays stable. + + Adapters call this from their executor's exception path when the + SDK has hit a non-recoverable error class. Safe to call multiple + times; the no-op when already wedged is intentional. + """ + _DEFAULT.mark(reason) + + +def clear_wedge() -> None: + """Auto-recovery: adapter calls this after an observed successful + operation. The original wedge could be transient (single network + blip during the SDK's first-message handshake), and a sticky-only + flag would lock the workspace into degraded forever even after the + SDK started working again. Clearing on observed success means the + next heartbeat after a working query reports runtime_state empty + and the platform flips status back to online. + + No-op when not wedged (the common case).""" + _DEFAULT.clear() + + +def reset_for_test() -> None: + """Test-only escape hatch. Production code clears the wedge via + clear_wedge() on observed success; this helper is for unit tests + that need to reset between cases without going through the full + SDK round-trip.""" + _DEFAULT.reset() diff --git a/workspace/tests/test_runtime_wedge.py b/workspace/tests/test_runtime_wedge.py new file mode 100644 index 00000000..0df53e1d --- /dev/null +++ b/workspace/tests/test_runtime_wedge.py @@ -0,0 +1,103 @@ +"""Tests for runtime_wedge — the runtime-side wedge-state module that +heartbeat reads + adapter executors write. Extracted from claude_sdk_ +executor (task #87 universal-runtime refactor) so the executor can move +to its template repo without breaking heartbeat. + +The behavior is identical to the prior in-executor implementation; tests +pin the contract so the re-export shim in claude_sdk_executor.py can +later be deleted without surprise.""" +import pytest + +import runtime_wedge + + +@pytest.fixture(autouse=True) +def _reset(): + """Each test starts with a clean wedge state — production wedges are + sticky-per-process, but cross-test bleed would couple unrelated cases.""" + runtime_wedge.reset_for_test() + yield + runtime_wedge.reset_for_test() + + +class TestRuntimeWedge: + def test_starts_unwedged(self): + assert runtime_wedge.is_wedged() is False + assert runtime_wedge.wedge_reason() == "" + + def test_mark_wedged_sets_flag_and_reason(self): + runtime_wedge.mark_wedged("SDK init timeout") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "SDK init timeout" + + def test_first_mark_wins(self): + # Stable banner text is more important than the most-recent + # cause. A second wedge while already wedged should NOT + # overwrite — operator sees the original (more diagnosable) + # reason, not whatever the SDK said next. + runtime_wedge.mark_wedged("SDK init timeout") + runtime_wedge.mark_wedged("Subsequent identical-class wedge") + assert runtime_wedge.wedge_reason() == "SDK init timeout" + + def test_clear_wedge_restores_healthy(self): + # Auto-recovery: when the SDK starts working again, the next + # heartbeat must report empty runtime_state so the platform + # flips status from degraded back to online. + runtime_wedge.mark_wedged("transient blip") + runtime_wedge.clear_wedge() + assert runtime_wedge.is_wedged() is False + assert runtime_wedge.wedge_reason() == "" + + def test_clear_wedge_when_not_wedged_is_noop(self): + # No-op safety — production calls clear_wedge() on every + # successful query (~thousands of times per session); throwing + # or logging when not wedged would spam. + runtime_wedge.clear_wedge() + runtime_wedge.clear_wedge() # still safe twice in a row + assert runtime_wedge.is_wedged() is False + + def test_re_marking_after_clear_is_allowed(self): + # Real production path: SDK wedges, recovers, wedges again. + # Each cycle should land cleanly (not silently drop). + runtime_wedge.mark_wedged("first wedge") + runtime_wedge.clear_wedge() + runtime_wedge.mark_wedged("second wedge — different reason") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "second wedge — different reason" + + +class TestClaudeSdkExecutorReExportShim: + """claude_sdk_executor.py keeps re-exporting the old names for one + release cycle so any third-party adapter copying our wedge convention + has time to migrate. These tests pin the shim — when removed, the + test file goes too.""" + + def test_is_wedged_re_exported(self): + from claude_sdk_executor import is_wedged + assert is_wedged is runtime_wedge.is_wedged + + def test_wedge_reason_re_exported(self): + from claude_sdk_executor import wedge_reason + assert wedge_reason is runtime_wedge.wedge_reason + + def test_internal_helpers_re_exported(self): + # Keep the underscore names too — claude_sdk_executor's own + # _run_query calls _mark_sdk_wedged / _clear_sdk_wedge_on_success + # via these re-exports. + from claude_sdk_executor import ( + _mark_sdk_wedged, + _clear_sdk_wedge_on_success, + _reset_sdk_wedge_for_test, + ) + assert _mark_sdk_wedged is runtime_wedge.mark_wedged + assert _clear_sdk_wedge_on_success is runtime_wedge.clear_wedge + assert _reset_sdk_wedge_for_test is runtime_wedge.reset_for_test + + def test_re_export_state_is_shared(self): + # The shim isn't a copy — both names refer to the same module + # state. Marking via the executor name must be observable via + # the runtime_wedge name (and vice versa). + from claude_sdk_executor import _mark_sdk_wedged + _mark_sdk_wedged("via executor shim") + assert runtime_wedge.is_wedged() is True + assert runtime_wedge.wedge_reason() == "via executor shim"