molecule-core/workspace/runtime_wedge.py
Hongming Wang cd899c969f docs(wedge): integration recipe for adapters that want to flip-to-degraded
Doc-only follow-up to the wedge-state extraction. Adds proactive
guidance so the next adapter (hermes / codex / langgraph / a future
template) discovers the runtime_wedge primitive and integrates the
~6 LOC pattern uniformly instead of inventing its own wedge state.

Two additions:

  - workspace/runtime_wedge.py — new "How to use from a NEW adapter"
    section in the module docstring with the minimum viable
    integration recipe, what-you-get-for-free list, and explicit
    DON'TS (don't store local wedge state, don't mark for transient
    errors, don't write your own clear logic). Plus a "when wedge is
    the WRONG primitive" note to keep adopters from over-using it.

  - workspace/adapter_base.py — adds runtime_wedge to the
    "Cross-cutting capabilities your adapter can opt into" list in
    BaseAdapter's docstring (alongside capabilities() and
    idle_timeout_override()). Discoverability path: adapter author
    reads BaseAdapter docstring → sees runtime_wedge mention → reads
    runtime_wedge module docstring → has the recipe.

Also tightens the "to add a new agent infra" steps in BaseAdapter to
match the actual current model (standalone template repo + ADAPTER_MODULE
env var) rather than the obsolete workspace/adapters/<infra>/ layout
that hasn't been the path since the universal-runtime extraction
started.

Zero code change. Tests untouched (1251/1251 still pass).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 00:12:14 -07:00

158 lines
6.7 KiB
Python

"""Per-process runtime-wedge state.
Adapter executors that hit a non-recoverable wedge (e.g. claude-agent-sdk's
`Control request timeout: initialize` corrupting the client process's
internal state) call mark_wedged(reason). The heartbeat task reads
is_wedged() / wedge_reason() and forwards them in the heartbeat payload's
runtime_state field — the platform then flips workspace status to
`degraded` so the canvas surfaces a Restart hint instead of leaving the
user staring at a green dot while every chat hangs.
Module scope (not instance scope) is deliberate: the wedge is a property
of the Python process, not any particular executor. With one executor
per workspace process today this is the simplest lock-free
read+write fit. A future per-org multi-executor design could move this
to a shared registry.
This module lives in molecule-runtime (NOT in any adapter / template
repo) because:
1. workspace/heartbeat.py reads it on every heartbeat — cross-cutting
concern, runtime owns it.
2. Multiple adapter executors can mark themselves wedged with their
own reason; the runtime aggregates one flag for the platform.
3. Decoupling from claude_sdk_executor is the prerequisite for the
universal-runtime refactor (molecule-core task #87) — without
this extraction, claude_sdk_executor.py couldn't move to its
template repo because heartbeat would lose access to the wedge
state.
Public API: mark_wedged(reason), clear_wedge(), is_wedged(),
wedge_reason(). The reset_for_test() helper is for unit tests only.
How to use from a NEW adapter (template repo)
---------------------------------------------
Hermes, Codex, LangGraph, or any future adapter that wants the same
"flip-to-degraded-on-fatal-wedge" UX should call mark_wedged + clear_wedge
from its executor. The runtime imports + heartbeat plumbing are already
in place — adapters do not change anything in molecule-runtime.
Minimum integration (~6 LOC inside the executor):
from molecule_runtime.runtime_wedge import mark_wedged, clear_wedge
async def execute(self, ctx, queue):
try:
result = await self._run_query(ctx)
except SomeFatalSdkError as e:
# Pick a short, operator-actionable reason. This becomes the
# banner text on the canvas's degraded card — keep it under
# ~80 chars and name the recovery action when possible.
mark_wedged(f"hermes init timeout — restart workspace ({e})")
raise
clear_wedge() # observed-success → next heartbeat reports healthy
return result
What you get for free:
- Heartbeat payload sets runtime_state="wedged" + sample_error=<reason>
on the next 30s tick.
- registry.go's evaluateStatus flips the workspace to `degraded` and
broadcasts WORKSPACE_DEGRADED so the canvas card turns yellow with
your reason as the subtitle.
- clear_wedge() on the next successful turn flips the workspace back
to `online` automatically — no manual operator action.
What NOT to do:
- Don't store wedge state in your adapter module. The platform-side
consumer (heartbeat) imports from runtime_wedge by name; an adapter-
local copy won't be observed.
- Don't call mark_wedged for transient errors (rate limits, single
failed network call). The whole point is "the SDK process is in a
state that can only be cleared by restart" — false positives
train operators to ignore the degraded banner.
- Don't write your own clear logic. clear_wedge() is the only path
the heartbeat watches; a custom flag won't propagate.
When wedge is the WRONG primitive: if the failure is per-request (the
SDK works for some inputs but not others), surface as a normal A2A
error response, not a wedge. Wedge means "every subsequent request in
this process will fail until restart."
Compatibility shim (will be removed once #87 Phase 2 lands)
-----------------------------------------------------------
claude_sdk_executor.py re-exports the four functions under the historical
names (is_wedged, wedge_reason, _mark_sdk_wedged, _clear_sdk_wedge_on_success)
for one release cycle. New adapter code should import from runtime_wedge
directly; the shim only exists so existing third-party adapters that
copied our claude_sdk_executor wedge convention have time to migrate.
"""
from __future__ import annotations
import logging
logger = logging.getLogger(__name__)
# Single-flag state. None = healthy; non-empty string = wedged with that
# human-readable reason. Surfaced verbatim as the canvas's degraded-card
# banner text via heartbeat.sample_error.
_wedged_reason: str | None = None
def is_wedged() -> bool:
"""True if some adapter executor in this process has marked itself
wedged. Sticky until the same executor calls clear_wedge() on
observed recovery (or the process restarts)."""
return _wedged_reason is not None
def wedge_reason() -> str:
"""Human-readable description of the wedge cause, or empty string
when not wedged. Surfaced to the canvas via heartbeat sample_error."""
return _wedged_reason or ""
def mark_wedged(reason: str) -> None:
"""Flag the runtime as wedged. Only the FIRST call wins so a
subsequent identical-class wedge can't overwrite a more specific
initial reason — the operator-visible banner stays stable.
Adapters call this from their executor's exception path when the
SDK has hit a non-recoverable error class. Safe to call multiple
times; the no-op when already wedged is intentional.
"""
global _wedged_reason
if _wedged_reason is None:
_wedged_reason = reason
logger.error(
"runtime wedge detected: %s — workspace will report degraded until cleared",
reason,
)
def clear_wedge() -> None:
"""Auto-recovery: adapter calls this after an observed successful
operation. The original wedge could be transient (single network
blip during the SDK's first-message handshake), and a sticky-only
flag would lock the workspace into degraded forever even after the
SDK started working again. Clearing on observed success means the
next heartbeat after a working query reports runtime_state empty
and the platform flips status back to online.
No-op when not wedged (the common case)."""
global _wedged_reason
if _wedged_reason is not None:
logger.info("runtime wedge cleared after successful operation — workspace will recover to online on next heartbeat")
_wedged_reason = None
def reset_for_test() -> None:
"""Test-only escape hatch. Production code clears the wedge via
clear_wedge() on observed success; this helper is for unit tests
that need to reset between cases without going through the full
SDK round-trip."""
global _wedged_reason
_wedged_reason = None