feat(platform,workspace): SDK-wedge detection + workspace_status ENUM
Heartbeat lies. The asyncio task that POSTs /registry/heartbeat lives in its own process slot, so a workspace whose claude_agent_sdk has wedged on `Control request timeout: initialize` keeps reporting "online" — every chat send hangs the full 5-min platform deadline even though the runtime is dead in the water. This commit teaches the workspace to admit it's wedged and the platform to honor that admission by flipping status → degraded. Five layers, all in one commit because they share a contract: 1. Migration 043 — convert workspaces.status from free-form TEXT to a real `workspace_status` Postgres ENUM with the 6 values production code actually writes (provisioning, online, offline, degraded, failed, removed). Locks the value set; future typo writes error at the DB instead of silently storing rogue strings. Down migration reverts to TEXT and drops the type. 2. workspace-server/internal/models — `HeartbeatPayload` gains a `runtime_state string` field. Empty = healthy. Currently the only non-empty value the handler honors is "wedged"; future symptoms can extend without another migration. 3. workspace-server/internal/handlers/registry.go — `evaluateStatus` gains a wedge branch BEFORE the existing error_rate >= 0.5 path: if `RuntimeState=="wedged"` and currently online, flip to degraded and broadcast WORKSPACE_DEGRADED with the wedge sample error. Recovery (`degraded → online`) now requires BOTH error_rate < 0.1 AND runtime_state cleared, so a workspace still reporting wedged stays degraded even when its error count happens to be 0 (the wedge captures a runtime state, not an error count). 4. workspace/claude_sdk_executor.py — module-level `_sdk_wedged_reason` flag set when execute()'s catch block sees an error matching `_WEDGE_ERROR_PATTERNS` (currently just "control request timeout"). Sticky for the process lifetime; the SDK's internal client-process state is corrupted on this error and only a workspace restart (= new Python process = fresh module state) clears it. Helpers `is_wedged()` / `wedge_reason()` / `_reset_sdk_wedge_for_test()` exposed. 5. workspace/heartbeat.py — heartbeat body now layers on `_runtime_state_payload()` for both the happy path and the 401-retry path. Lazy-imports claude_sdk_executor so non-Claude runtimes (where the module may not even be importable) keep working unchanged. Canvas required no changes — `STATUS_CONFIG.degraded` was already defined in design-tokens.ts (amber dot, "Degraded" label) and WorkspaceNode.tsx already renders `lastSampleError` underneath the status pill when status === "degraded". The existing wiring just never fired because nothing was writing degraded in this code path. Tests: - 3 Go handler tests for the new transitions (online → degraded on wedged, degraded stays put while still wedged, degraded → online after wedge clears) - 5 Python wedge-detector tests (default clean, mark sets flag, sticky-first-wins, execute() flips on Control request timeout, execute() does NOT flip on unrelated errors) - Migration smoke-tested against the local dev DB (3 existing rows, all enum-compatible; migration applied cleanly, post-state has the column as workspace_status type and the index preserved) Verified: 79 Python tests pass; full Go test suite passes; migration applies clean on a real DB; reverse migration restores the column to TEXT. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
c159d85eb5
commit
4eb09e2146
@ -454,6 +454,29 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
return
|
||||
}
|
||||
|
||||
// Self-reported runtime wedge: takes precedence over the error_rate
|
||||
// path. The heartbeat task lives in its own asyncio task and keeps
|
||||
// firing 200s even after claude_agent_sdk locks up on
|
||||
// `Control request timeout: initialize` — so error_rate stays at 0
|
||||
// (no calls have been recorded as errors yet) while every actual
|
||||
// /a2a POST hangs. The workspace tells us about that case via
|
||||
// runtime_state="wedged"; we honor it directly. Sample_error from
|
||||
// the heartbeat carries the human-readable reason ("SDK init
|
||||
// timeout — restart workspace"), which the canvas surfaces in the
|
||||
// degraded card without the operator scraping container logs.
|
||||
if payload.RuntimeState == "wedged" && currentStatus == "online" {
|
||||
_, err := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = 'degraded', updated_at = now() WHERE id = $1 AND status = 'online'`,
|
||||
payload.WorkspaceID)
|
||||
if err != nil {
|
||||
log.Printf("Heartbeat: failed to mark %s degraded (wedged): %v", payload.WorkspaceID, err)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_DEGRADED", payload.WorkspaceID, map[string]interface{}{
|
||||
"runtime_state": "wedged",
|
||||
"sample_error": payload.SampleError,
|
||||
})
|
||||
}
|
||||
|
||||
if currentStatus == "online" && payload.ErrorRate >= 0.5 {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'degraded', updated_at = now() WHERE id = $1`, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to mark %s degraded: %v", payload.WorkspaceID, err)
|
||||
@ -464,7 +487,13 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
})
|
||||
}
|
||||
|
||||
if currentStatus == "degraded" && payload.ErrorRate < 0.1 {
|
||||
// Recovery from degraded → online when BOTH the error rate has
|
||||
// fallen back AND the workspace is no longer reporting a wedge.
|
||||
// The wedge condition is sticky for the process lifetime
|
||||
// (claude_sdk_executor only clears it on restart), so when the
|
||||
// container restarts and starts heartbeating fresh — RuntimeState
|
||||
// is empty, error_rate is 0 — this branch flips us back to online.
|
||||
if currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', updated_at = now() WHERE id = $1`, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err)
|
||||
}
|
||||
|
||||
@ -298,6 +298,163 @@ func TestHeartbeatHandler_OnlineStaysOnline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== Heartbeat — runtime wedge (claude_agent_sdk init timeout) ====================
|
||||
|
||||
// TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded verifies the
|
||||
// runtime_state="wedged" path. Heartbeat task in the workspace lives in
|
||||
// its own asyncio task and keeps reporting online while the Claude SDK
|
||||
// is wedged on Control request timeout; the workspace tells us about
|
||||
// the wedge via this field, and we honor it by flipping status →
|
||||
// degraded with the wedge reason in last_sample_error.
|
||||
func TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
wedgeMsg := "claude_agent_sdk wedge: Control request timeout: initialize — restart workspace to recover"
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-wedged").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
|
||||
// Heartbeat UPDATE — sample_error carries the wedge reason from the
|
||||
// workspace's _runtime_state_payload() helper.
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-wedged", 0.0, wedgeMsg, 0, 600, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// evaluateStatus: currentStatus = online
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-wedged").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
|
||||
|
||||
// The wedge-handling branch fires the degraded UPDATE with the
|
||||
// `AND status = 'online'` guard (race-safe against concurrent
|
||||
// removal). Match the SQL with the guard included.
|
||||
mock.ExpectExec("UPDATE workspaces SET status = 'degraded'.*status = 'online'").
|
||||
WithArgs("ws-wedged").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// RecordAndBroadcast for WORKSPACE_DEGRADED
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"workspace_id":"ws-wedged","error_rate":0.0,"sample_error":"` + wedgeMsg + `","active_tasks":0,"uptime_seconds":600,"runtime_state":"wedged"}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears verifies that
|
||||
// the degraded → online recovery path requires BOTH error_rate < 0.1
|
||||
// AND runtime_state cleared. A workspace still reporting wedged stays
|
||||
// degraded even when error_rate happens to be 0 (no calls have been
|
||||
// recorded as errors yet — the wedge is captured as a runtime state,
|
||||
// not an error count).
|
||||
func TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-still-wedged").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-still-wedged", 0.0, "still broken", 0, 800, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// currentStatus = degraded
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-still-wedged").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
|
||||
|
||||
// No additional UPDATE expected — the recovery branch's
|
||||
// `runtime_state == ""` guard blocks the flip back to online.
|
||||
// (sqlmock fails the test if any unmocked Exec runs.)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"workspace_id":"ws-still-wedged","error_rate":0.0,"sample_error":"still broken","active_tasks":0,"uptime_seconds":800,"runtime_state":"wedged"}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears verifies the
|
||||
// happy-path recovery: a workspace previously marked degraded is
|
||||
// post-restart, error_rate is back to 0, and runtime_state is empty
|
||||
// (the new process re-imported claude_sdk_executor with the flag
|
||||
// fresh). Status flips back to online and a WORKSPACE_ONLINE event
|
||||
// fires.
|
||||
func TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-recovered").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-recovered", 0.0, "", 0, 30, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-recovered").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
|
||||
|
||||
// Recovery UPDATE fires (degraded → online).
|
||||
mock.ExpectExec("UPDATE workspaces SET status = 'online'").
|
||||
WithArgs("ws-recovered").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
// runtime_state intentionally absent (== ""); error_rate = 0; this
|
||||
// is exactly what a freshly-restarted workspace's first heartbeat
|
||||
// looks like.
|
||||
body := `{"workspace_id":"ws-recovered","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":30}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== UpdateCard ====================
|
||||
|
||||
func TestUpdateCard_Success(t *testing.T) {
|
||||
|
||||
@ -51,6 +51,19 @@ type HeartbeatPayload struct {
|
||||
// a previously-reported spend value. Any non-zero value is clamped to
|
||||
// [0, maxMonthlySpend] before the DB write. (#615)
|
||||
MonthlySpend int64 `json:"monthly_spend"`
|
||||
// RuntimeState is a self-reported runtime health flag separate from
|
||||
// "is the heartbeat task firing at all". The heartbeat task lives in
|
||||
// its own asyncio task and keeps pinging even when the agent runtime
|
||||
// is wedged (e.g. claude_agent_sdk's `Control request timeout:
|
||||
// initialize` leaves the SDK in a permanent error state for the
|
||||
// process lifetime). RuntimeState is how the workspace tells the
|
||||
// platform "I'm alive but my Claude runtime is broken — flip me to
|
||||
// degraded so the canvas can show a Restart hint."
|
||||
//
|
||||
// Empty string = healthy / no signal. The only currently-recognised
|
||||
// non-empty value is "wedged"; future values can extend this without
|
||||
// migration.
|
||||
RuntimeState string `json:"runtime_state"`
|
||||
}
|
||||
|
||||
type UpdateCardPayload struct {
|
||||
|
||||
@ -0,0 +1,19 @@
|
||||
-- 043_workspace_status_enum.down.sql
|
||||
--
|
||||
-- Reverse 043_workspace_status_enum.up.sql: convert workspaces.status
|
||||
-- back to plain TEXT and drop the workspace_status enum type.
|
||||
|
||||
BEGIN;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status DROP DEFAULT;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status TYPE TEXT USING status::TEXT;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status SET DEFAULT 'provisioning';
|
||||
|
||||
DROP TYPE workspace_status;
|
||||
|
||||
COMMIT;
|
||||
65
workspace-server/migrations/043_workspace_status_enum.up.sql
Normal file
65
workspace-server/migrations/043_workspace_status_enum.up.sql
Normal file
@ -0,0 +1,65 @@
|
||||
-- 043_workspace_status_enum.up.sql
|
||||
--
|
||||
-- Convert workspaces.status from free-form TEXT to a real Postgres
|
||||
-- ENUM type. The previous shape (TEXT DEFAULT 'provisioning' with no
|
||||
-- CHECK constraint, set by 001_workspaces.sql) let any handler write
|
||||
-- any string, including typos and stale values from older code paths.
|
||||
-- Locking the value set forces every writer to use one of the agreed
|
||||
-- states and lets us add a new state (`degraded`, used by the SDK
|
||||
-- wedge detector landing in this same change) without losing type
|
||||
-- safety on the column.
|
||||
--
|
||||
-- Value set covers every status the production codebase actually writes:
|
||||
--
|
||||
-- provisioning — workspace row exists, container is being created
|
||||
-- (initial INSERT default)
|
||||
-- online — heartbeat fresh + last response was successful
|
||||
-- offline — Redis liveness key expired (ws-side dead) or
|
||||
-- the proxy detected an unreachable upstream
|
||||
-- degraded — runtime is alive but reporting trouble (heartbeat
|
||||
-- error_rate >= 0.5, OR new in this change:
|
||||
-- workspace explicitly reported runtime_state="wedged")
|
||||
-- failed — provisioning never completed, or workspace marked
|
||||
-- itself failed via bundle import / runtime crash
|
||||
-- removed — soft-delete tombstone; the row stays so foreign-
|
||||
-- key references survive but no operations target it
|
||||
--
|
||||
-- Verified before writing this migration that production code in
|
||||
-- workspace-server/internal/{handlers,registry,bundle} writes only
|
||||
-- values from this list (test fixtures may write others; tests run
|
||||
-- against an isolated fixture DB so the cast doesn't affect them).
|
||||
|
||||
BEGIN;
|
||||
|
||||
CREATE TYPE workspace_status AS ENUM (
|
||||
'provisioning',
|
||||
'online',
|
||||
'offline',
|
||||
'degraded',
|
||||
'failed',
|
||||
'removed'
|
||||
);
|
||||
|
||||
-- The two-step ALTER (DROP DEFAULT then change type then SET DEFAULT)
|
||||
-- is required because Postgres rejects an ALTER COLUMN TYPE on a
|
||||
-- column that has a DEFAULT whose expression doesn't match the new
|
||||
-- type. The intermediate moment with no default is fine — no INSERT
|
||||
-- happens between these statements inside the same transaction.
|
||||
--
|
||||
-- The `USING status::workspace_status` cast is the type-conversion
|
||||
-- expression Postgres needs when the source and target types aren't
|
||||
-- assignment-compatible. If any existing row has a status value
|
||||
-- outside the enum's set, this statement aborts the transaction and
|
||||
-- the migration leaves the table untouched — that's the correct
|
||||
-- behavior (we'd want to know about the rogue value before locking
|
||||
-- the type).
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status DROP DEFAULT;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status TYPE workspace_status USING status::workspace_status;
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ALTER COLUMN status SET DEFAULT 'provisioning'::workspace_status;
|
||||
|
||||
COMMIT;
|
||||
@ -87,6 +87,68 @@ _RETRYABLE_PATTERNS = (
|
||||
"try again",
|
||||
)
|
||||
|
||||
# Module-level SDK-wedge flag. When claude_agent_sdk's `query.initialize()`
|
||||
# raises `Control request timeout: initialize`, the SDK's internal client-
|
||||
# process state is corrupted for the rest of the Python process — every
|
||||
# subsequent `_run_query()` call hits the same wedge and re-throws. The
|
||||
# executor itself can't auto-recover (the underlying CLI subprocess and
|
||||
# its read pipe are in an unrecoverable state); only a workspace restart
|
||||
# clears it.
|
||||
#
|
||||
# The heartbeat task reads these helpers and reports
|
||||
# `runtime_state="wedged"` to the platform, which flips the workspace to
|
||||
# `degraded` so the canvas surfaces a Restart hint instead of leaving
|
||||
# the user staring at a green dot while every chat hangs.
|
||||
#
|
||||
# Module scope (not instance scope) is deliberate: the wedge is a
|
||||
# property of the Python process, not the executor. A future per-org
|
||||
# multi-executor design could move this to a shared registry, but with
|
||||
# one executor per workspace process today the simplest lock-free
|
||||
# read+write fits.
|
||||
_sdk_wedged_reason: str | None = None
|
||||
|
||||
|
||||
def is_wedged() -> bool:
|
||||
"""True if the Claude SDK has hit a non-recoverable init wedge in
|
||||
this process. Sticky until process restart."""
|
||||
return _sdk_wedged_reason is not None
|
||||
|
||||
|
||||
def wedge_reason() -> str:
|
||||
"""Human-readable description of the wedge cause, or empty string
|
||||
when not wedged. Surfaced to the canvas via heartbeat sample_error."""
|
||||
return _sdk_wedged_reason or ""
|
||||
|
||||
|
||||
def _mark_sdk_wedged(reason: str) -> None:
|
||||
"""Internal — flag the SDK as wedged. Only the first call wins
|
||||
(subsequent identical wedges shouldn't overwrite a more specific
|
||||
reason). Tests use `_reset_sdk_wedge_for_test()` to clear."""
|
||||
global _sdk_wedged_reason
|
||||
if _sdk_wedged_reason is None:
|
||||
_sdk_wedged_reason = reason
|
||||
logger.error("SDK wedge detected: %s — workspace will report degraded until restart", reason)
|
||||
|
||||
|
||||
def _reset_sdk_wedge_for_test() -> None:
|
||||
"""Test-only escape hatch. Production code never resets the flag —
|
||||
wedge clears via process restart, which naturally re-imports this
|
||||
module with the flag at None."""
|
||||
global _sdk_wedged_reason
|
||||
_sdk_wedged_reason = None
|
||||
|
||||
|
||||
# Substring patterns that classify an exception as the specific
|
||||
# claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient
|
||||
# subprocess crash, etc.). Match is case-insensitive on the formatted
|
||||
# error string. Adding a new pattern here MUST come with a test in
|
||||
# tests/test_claude_sdk_executor.py — the flag is sticky and false-
|
||||
# positives lock the workspace into degraded for the whole process
|
||||
# lifetime.
|
||||
_WEDGE_ERROR_PATTERNS = (
|
||||
"control request timeout",
|
||||
)
|
||||
|
||||
|
||||
_SWALLOWED_STDERR_MARKER = "Check stderr output for details"
|
||||
|
||||
@ -506,6 +568,19 @@ class ClaudeSDKExecutor(AgentExecutor):
|
||||
# subprocess died.
|
||||
logger.error("SDK agent error [claude-code]: %s", formatted)
|
||||
logger.exception("SDK agent error [claude-code] — full traceback follows")
|
||||
# Detect the specific claude_agent_sdk init-wedge case
|
||||
# so the heartbeat task can flip the workspace to
|
||||
# `degraded`. Match on the lowercased formatted error;
|
||||
# `formatted` is whatever _format_process_error built,
|
||||
# which already includes both the message and the
|
||||
# exception class name.
|
||||
formatted_lc = formatted.lower()
|
||||
for pat in _WEDGE_ERROR_PATTERNS:
|
||||
if pat in formatted_lc:
|
||||
_mark_sdk_wedged(
|
||||
f"claude_agent_sdk wedge: {formatted[:200]} — restart workspace to recover"
|
||||
)
|
||||
break
|
||||
response_text = sanitize_agent_error(exc)
|
||||
break
|
||||
finally:
|
||||
|
||||
@ -19,6 +19,30 @@ import httpx
|
||||
|
||||
from platform_auth import auth_headers, refresh_cache, self_source_headers
|
||||
|
||||
|
||||
def _runtime_state_payload() -> dict:
|
||||
"""Build the {runtime_state, sample_error} portion of the heartbeat
|
||||
body when the Claude SDK has hit a wedge. Returns an empty dict
|
||||
when the runtime is healthy so the heartbeat payload doesn't grow
|
||||
fields the platform doesn't need.
|
||||
|
||||
Imported lazily so workspaces running non-Claude runtimes (where
|
||||
`claude_sdk_executor` may not be importable at all) keep working —
|
||||
a missing import means "no Claude wedge possible here, healthy."
|
||||
"""
|
||||
try:
|
||||
from claude_sdk_executor import is_wedged, wedge_reason
|
||||
except Exception:
|
||||
return {}
|
||||
if not is_wedged():
|
||||
return {}
|
||||
return {
|
||||
"runtime_state": "wedged",
|
||||
# sample_error doubles as the human-readable banner text on the
|
||||
# canvas's degraded card — keep it short and actionable.
|
||||
"sample_error": wedge_reason(),
|
||||
}
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
HEARTBEAT_INTERVAL = 30 # seconds
|
||||
@ -85,16 +109,23 @@ class HeartbeatLoop:
|
||||
while True:
|
||||
# 1. Send heartbeat (Phase 30.1: include auth header if token known)
|
||||
try:
|
||||
body = {
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
}
|
||||
# Layer the runtime-wedge fields on top so a
|
||||
# non-empty sample_error from the wedge wins
|
||||
# over the (typically empty) heartbeat
|
||||
# sample_error field. The platform reads
|
||||
# runtime_state to flip status → degraded.
|
||||
body.update(_runtime_state_payload())
|
||||
await client.post(
|
||||
f"{self.platform_url}/registry/heartbeat",
|
||||
json={
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
},
|
||||
json=body,
|
||||
headers=auth_headers(),
|
||||
)
|
||||
self.error_count = 0
|
||||
@ -113,16 +144,18 @@ class HeartbeatLoop:
|
||||
logger.warning("Heartbeat 401 for %s — refreshing token cache and retrying once", self.workspace_id)
|
||||
refresh_cache()
|
||||
try:
|
||||
retry_body = {
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
}
|
||||
retry_body.update(_runtime_state_payload())
|
||||
await client.post(
|
||||
f"{self.platform_url}/registry/heartbeat",
|
||||
json={
|
||||
"workspace_id": self.workspace_id,
|
||||
"error_rate": self.error_rate,
|
||||
"sample_error": self.sample_error,
|
||||
"active_tasks": self.active_tasks,
|
||||
"current_task": self.current_task,
|
||||
"uptime_seconds": int(time.time() - self.start_time),
|
||||
},
|
||||
json=retry_body,
|
||||
headers=auth_headers(),
|
||||
)
|
||||
self._consecutive_failures = 0
|
||||
|
||||
@ -1221,3 +1221,101 @@ def test_load_config_dict_empty_file_returns_empty(tmp_path):
|
||||
e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None)
|
||||
result = e._load_config_dict()
|
||||
assert result == {}
|
||||
|
||||
|
||||
# ==================== SDK wedge detector ====================
|
||||
#
|
||||
# Exercises the module-level _sdk_wedged_reason flag set when the
|
||||
# claude_agent_sdk init handshake times out. The flag is sticky — the
|
||||
# heartbeat task reads it via is_wedged() / wedge_reason() and reports
|
||||
# runtime_state="wedged" so the platform flips status → degraded.
|
||||
|
||||
import claude_sdk_executor as _executor_mod
|
||||
|
||||
|
||||
def test_wedge_helpers_default_clean():
|
||||
"""Fresh module: no wedge."""
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
assert _executor_mod.is_wedged() is False
|
||||
assert _executor_mod.wedge_reason() == ""
|
||||
|
||||
|
||||
def test_mark_sdk_wedged_sets_flag_and_reason():
|
||||
"""First mark wins and sets both is_wedged() and the reason text."""
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
_executor_mod._mark_sdk_wedged("init timeout — restart")
|
||||
try:
|
||||
assert _executor_mod.is_wedged() is True
|
||||
assert "init timeout" in _executor_mod.wedge_reason()
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
|
||||
def test_mark_sdk_wedged_sticky_first_wins():
|
||||
"""A second wedge call with a different reason does NOT overwrite
|
||||
the first. The first cause is the one the user needs to see; later
|
||||
knock-on errors from the same wedge would otherwise mask it."""
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
_executor_mod._mark_sdk_wedged("first cause — Control request timeout")
|
||||
_executor_mod._mark_sdk_wedged("noise from a downstream symptom")
|
||||
try:
|
||||
assert _executor_mod.wedge_reason() == "first cause — Control request timeout"
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_marks_wedge_on_control_request_timeout():
|
||||
"""End-to-end: when _run_query raises an exception whose formatted
|
||||
error contains 'Control request timeout' (case-insensitive), the
|
||||
executor's catch block flags the SDK as wedged. Subsequent
|
||||
is_wedged() reads return True until process restart (or the
|
||||
test-only reset)."""
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
e = _make_executor()
|
||||
ctx = _make_context(["test prompt"])
|
||||
eq = _make_event_queue()
|
||||
|
||||
async def boom(prompt, options):
|
||||
# Match the literal exception claude_agent_sdk raises in the
|
||||
# observed wedge path.
|
||||
raise Exception("Control request timeout: initialize")
|
||||
yield # pragma: no cover — make this an async generator
|
||||
|
||||
with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \
|
||||
patch("claude_sdk_executor.read_delegation_results", return_value=""), \
|
||||
patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \
|
||||
patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \
|
||||
patch("claude_agent_sdk.query", new=boom):
|
||||
try:
|
||||
await e.execute(ctx, eq)
|
||||
assert _executor_mod.is_wedged() is True, "wedge flag must be set"
|
||||
assert "Control request timeout" in _executor_mod.wedge_reason()
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execute_does_not_mark_wedge_on_unrelated_error():
|
||||
"""Sanity: a generic non-wedge exception (e.g. ValueError) MUST
|
||||
NOT trigger the wedge flag. False-positives lock the workspace
|
||||
into degraded for the whole process lifetime."""
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
e = _make_executor()
|
||||
ctx = _make_context(["test prompt"])
|
||||
eq = _make_event_queue()
|
||||
|
||||
async def boom(prompt, options):
|
||||
raise ValueError("ordinary tool failure, not a wedge")
|
||||
yield # pragma: no cover
|
||||
|
||||
with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \
|
||||
patch("claude_sdk_executor.read_delegation_results", return_value=""), \
|
||||
patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \
|
||||
patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \
|
||||
patch("claude_agent_sdk.query", new=boom):
|
||||
try:
|
||||
await e.execute(ctx, eq)
|
||||
assert _executor_mod.is_wedged() is False, "non-wedge error must not flip the flag"
|
||||
finally:
|
||||
_executor_mod._reset_sdk_wedge_for_test()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user