molecule-core/workspace/runtime_wedge.py
Hongming Wang feb544938b refactor(wedge): address review feedback — class wrap + import-path doc + dedupe shim rationale
Three changes from /code-review-and-quality on PR #2154:

1. Optional (architecture): wrap state in a private _WedgeState class
   instead of bare module-level globals. Public API (mark_wedged /
   clear_wedge / is_wedged / wedge_reason / reset_for_test) is
   unchanged — adapters never see the class. The class is forward-cover
   for any future per-scope variant (multiple executors per process, a
   keyed registry, etc.) without churning the call sites. Today there's
   exactly one instance (_DEFAULT) so behavior is identical.

2. Optional (readability): clarify the import path in the integration
   recipe — in a TEMPLATE repo it's `from molecule_runtime.runtime_wedge`
   (PyPI package); in molecule-core itself it's `from runtime_wedge`
   (top-level module). Removes the trap where a contributor reading the
   docstring while editing in-repo copies the template-style import and
   gets ImportError.

3. Nit (readability): dedupe the shim rationale. claude_sdk_executor's
   re-export comment now points to runtime_wedge's "Compatibility shim"
   section as the source of truth instead of restating the same content.
   Avoids docs-in-two-places drift risk.

Verification:
  - 1251/1251 workspace pytest pass (no behavior change — class wrap
    is pure plumbing; module-level helpers delegate to the singleton)
  - All shim re-export identity tests still pass (the shim's
    `is_wedged is runtime_wedge.is_wedged` assertion holds because we
    re-export the SAME function object that delegates to _DEFAULT)

No new tests needed — the existing test suite covers the public API
contract; the class is an implementation detail behind that contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 00:16:33 -07:00

204 lines
8.6 KiB
Python

"""Per-process runtime-wedge state.
Adapter executors that hit a non-recoverable wedge (e.g. claude-agent-sdk's
`Control request timeout: initialize` corrupting the client process's
internal state) call mark_wedged(reason). The heartbeat task reads
is_wedged() / wedge_reason() and forwards them in the heartbeat payload's
runtime_state field — the platform then flips workspace status to
`degraded` so the canvas surfaces a Restart hint instead of leaving the
user staring at a green dot while every chat hangs.
Module scope (not instance scope) is deliberate: the wedge is a property
of the Python process, not any particular executor. With one executor
per workspace process today this is the simplest lock-free
read+write fit. A future per-org multi-executor design could move this
to a shared registry.
This module lives in molecule-runtime (NOT in any adapter / template
repo) because:
1. workspace/heartbeat.py reads it on every heartbeat — cross-cutting
concern, runtime owns it.
2. Multiple adapter executors can mark themselves wedged with their
own reason; the runtime aggregates one flag for the platform.
3. Decoupling from claude_sdk_executor is the prerequisite for the
universal-runtime refactor (molecule-core task #87) — without
this extraction, claude_sdk_executor.py couldn't move to its
template repo because heartbeat would lose access to the wedge
state.
Public API: mark_wedged(reason), clear_wedge(), is_wedged(),
wedge_reason(). The reset_for_test() helper is for unit tests only.
How to use from a NEW adapter (template repo)
---------------------------------------------
Hermes, Codex, LangGraph, or any future adapter that wants the same
"flip-to-degraded-on-fatal-wedge" UX should call mark_wedged + clear_wedge
from its executor. The runtime imports + heartbeat plumbing are already
in place — adapters do not change anything in molecule-runtime.
Minimum integration (~6 LOC inside the executor):
# Import path:
# - In a TEMPLATE repo (the common case for new adapters), the
# runtime is installed via PyPI as `molecule-ai-workspace-runtime`,
# so the import is `from molecule_runtime.runtime_wedge import …`.
# - In molecule-core itself (when editing this repo's own
# workspace/ tree), the module is at the top level — import as
# `from runtime_wedge import …`.
from molecule_runtime.runtime_wedge import mark_wedged, clear_wedge
async def execute(self, ctx, queue):
try:
result = await self._run_query(ctx)
except SomeFatalSdkError as e:
# Pick a short, operator-actionable reason. This becomes the
# banner text on the canvas's degraded card — keep it under
# ~80 chars and name the recovery action when possible.
mark_wedged(f"hermes init timeout — restart workspace ({e})")
raise
clear_wedge() # observed-success → next heartbeat reports healthy
return result
What you get for free:
- Heartbeat payload sets runtime_state="wedged" + sample_error=<reason>
on the next 30s tick.
- registry.go's evaluateStatus flips the workspace to `degraded` and
broadcasts WORKSPACE_DEGRADED so the canvas card turns yellow with
your reason as the subtitle.
- clear_wedge() on the next successful turn flips the workspace back
to `online` automatically — no manual operator action.
What NOT to do:
- Don't store wedge state in your adapter module. The platform-side
consumer (heartbeat) imports from runtime_wedge by name; an adapter-
local copy won't be observed.
- Don't call mark_wedged for transient errors (rate limits, single
failed network call). The whole point is "the SDK process is in a
state that can only be cleared by restart" — false positives
train operators to ignore the degraded banner.
- Don't write your own clear logic. clear_wedge() is the only path
the heartbeat watches; a custom flag won't propagate.
When wedge is the WRONG primitive: if the failure is per-request (the
SDK works for some inputs but not others), surface as a normal A2A
error response, not a wedge. Wedge means "every subsequent request in
this process will fail until restart."
Compatibility shim (will be removed once #87 Phase 2 lands)
-----------------------------------------------------------
claude_sdk_executor.py re-exports the four functions under the historical
names (is_wedged, wedge_reason, _mark_sdk_wedged, _clear_sdk_wedge_on_success)
for one release cycle. New adapter code should import from runtime_wedge
directly; the shim only exists so existing third-party adapters that
copied our claude_sdk_executor wedge convention have time to migrate.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
class _WedgeState:
"""Internal carrier for the wedge flag. Exposed only via the module-
level helpers below; adapters never see this class.
Wrapping the state in a class (instead of a bare module-level global)
is forward-cover for the day a runtime hosts multiple executors per
process — a future per-scope variant can hand out keyed instances
without changing the public mark_wedged / clear_wedge / is_wedged /
wedge_reason API. Today there's exactly one instance (_DEFAULT).
"""
def __init__(self) -> None:
# None = healthy; non-empty string = wedged with that human-
# readable reason. Surfaced verbatim as the canvas's degraded-
# card banner text via heartbeat.sample_error.
self._reason: str | None = None
def is_wedged(self) -> bool:
return self._reason is not None
def reason(self) -> str:
return self._reason or ""
def mark(self, reason: str) -> None:
# First-write-wins: a subsequent identical-class wedge can't
# overwrite a more specific initial reason so the operator-
# visible banner stays stable.
if self._reason is None:
self._reason = reason
logger.error(
"runtime wedge detected: %s — workspace will report degraded until cleared",
reason,
)
def clear(self) -> None:
# No-op when not wedged (the common case — adapters call this
# on every successful query).
if self._reason is not None:
logger.info(
"runtime wedge cleared after successful operation — workspace will recover to online on next heartbeat",
)
self._reason = None
def reset(self) -> None:
# Unconditional clear — for test fixtures only. Skips the
# info-level log line the production clear() path emits.
self._reason = None
# Single shared instance backing the module-level helpers. Today there's
# one executor per workspace process so this fits perfectly; the class
# wrap above is the seam for any future per-scope variant.
_DEFAULT = _WedgeState()
def is_wedged() -> bool:
"""True if some adapter executor in this process has marked itself
wedged. Sticky until the same executor calls clear_wedge() on
observed recovery (or the process restarts)."""
return _DEFAULT.is_wedged()
def wedge_reason() -> str:
"""Human-readable description of the wedge cause, or empty string
when not wedged. Surfaced to the canvas via heartbeat sample_error."""
return _DEFAULT.reason()
def mark_wedged(reason: str) -> None:
"""Flag the runtime as wedged. Only the FIRST call wins so a
subsequent identical-class wedge can't overwrite a more specific
initial reason — the operator-visible banner stays stable.
Adapters call this from their executor's exception path when the
SDK has hit a non-recoverable error class. Safe to call multiple
times; the no-op when already wedged is intentional.
"""
_DEFAULT.mark(reason)
def clear_wedge() -> None:
"""Auto-recovery: adapter calls this after an observed successful
operation. The original wedge could be transient (single network
blip during the SDK's first-message handshake), and a sticky-only
flag would lock the workspace into degraded forever even after the
SDK started working again. Clearing on observed success means the
next heartbeat after a working query reports runtime_state empty
and the platform flips status back to online.
No-op when not wedged (the common case)."""
_DEFAULT.clear()
def reset_for_test() -> None:
"""Test-only escape hatch. Production code clears the wedge via
clear_wedge() on observed success; this helper is for unit tests
that need to reset between cases without going through the full
SDK round-trip."""
_DEFAULT.reset()