diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index e5be5553..2a07264e 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -454,6 +454,29 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea return } + // Self-reported runtime wedge: takes precedence over the error_rate + // path. The heartbeat task lives in its own asyncio task and keeps + // firing 200s even after claude_agent_sdk locks up on + // `Control request timeout: initialize` — so error_rate stays at 0 + // (no calls have been recorded as errors yet) while every actual + // /a2a POST hangs. The workspace tells us about that case via + // runtime_state="wedged"; we honor it directly. Sample_error from + // the heartbeat carries the human-readable reason ("SDK init + // timeout — restart workspace"), which the canvas surfaces in the + // degraded card without the operator scraping container logs. + if payload.RuntimeState == "wedged" && currentStatus == "online" { + _, err := db.DB.ExecContext(ctx, + `UPDATE workspaces SET status = 'degraded', updated_at = now() WHERE id = $1 AND status = 'online'`, + payload.WorkspaceID) + if err != nil { + log.Printf("Heartbeat: failed to mark %s degraded (wedged): %v", payload.WorkspaceID, err) + } + h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_DEGRADED", payload.WorkspaceID, map[string]interface{}{ + "runtime_state": "wedged", + "sample_error": payload.SampleError, + }) + } + if currentStatus == "online" && payload.ErrorRate >= 0.5 { if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'degraded', updated_at = now() WHERE id = $1`, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to mark %s degraded: %v", payload.WorkspaceID, err) @@ -464,7 +487,13 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea }) } - if currentStatus == "degraded" && payload.ErrorRate < 0.1 { + // Recovery from degraded → online when BOTH the error rate has + // fallen back AND the workspace is no longer reporting a wedge. + // The wedge condition is sticky for the process lifetime + // (claude_sdk_executor only clears it on restart), so when the + // container restarts and starts heartbeating fresh — RuntimeState + // is empty, error_rate is 0 — this branch flips us back to online. + if currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" { if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', updated_at = now() WHERE id = $1`, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err) } diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 62c9e984..9f33b140 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -298,6 +298,163 @@ func TestHeartbeatHandler_OnlineStaysOnline(t *testing.T) { } } +// ==================== Heartbeat — runtime wedge (claude_agent_sdk init timeout) ==================== + +// TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded verifies the +// runtime_state="wedged" path. Heartbeat task in the workspace lives in +// its own asyncio task and keeps reporting online while the Claude SDK +// is wedged on Control request timeout; the workspace tells us about +// the wedge via this field, and we honor it by flipping status → +// degraded with the wedge reason in last_sample_error. +func TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + wedgeMsg := "claude_agent_sdk wedge: Control request timeout: initialize — restart workspace to recover" + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-wedged"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + // Heartbeat UPDATE — sample_error carries the wedge reason from the + // workspace's _runtime_state_payload() helper. + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-wedged", 0.0, wedgeMsg, 0, 600, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // evaluateStatus: currentStatus = online + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-wedged"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + // The wedge-handling branch fires the degraded UPDATE with the + // `AND status = 'online'` guard (race-safe against concurrent + // removal). Match the SQL with the guard included. + mock.ExpectExec("UPDATE workspaces SET status = 'degraded'.*status = 'online'"). + WithArgs("ws-wedged"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // RecordAndBroadcast for WORKSPACE_DEGRADED + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + body := `{"workspace_id":"ws-wedged","error_rate":0.0,"sample_error":"` + wedgeMsg + `","active_tasks":0,"uptime_seconds":600,"runtime_state":"wedged"}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears verifies that +// the degraded → online recovery path requires BOTH error_rate < 0.1 +// AND runtime_state cleared. A workspace still reporting wedged stays +// degraded even when error_rate happens to be 0 (no calls have been +// recorded as errors yet — the wedge is captured as a runtime state, +// not an error count). +func TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-still-wedged"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-still-wedged", 0.0, "still broken", 0, 800, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // currentStatus = degraded + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-still-wedged"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded")) + + // No additional UPDATE expected — the recovery branch's + // `runtime_state == ""` guard blocks the flip back to online. + // (sqlmock fails the test if any unmocked Exec runs.) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + body := `{"workspace_id":"ws-still-wedged","error_rate":0.0,"sample_error":"still broken","active_tasks":0,"uptime_seconds":800,"runtime_state":"wedged"}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + +// TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears verifies the +// happy-path recovery: a workspace previously marked degraded is +// post-restart, error_rate is back to 0, and runtime_state is empty +// (the new process re-imported claude_sdk_executor with the flag +// fresh). Status flips back to online and a WORKSPACE_ONLINE event +// fires. +func TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-recovered"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-recovered", 0.0, "", 0, 30, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-recovered"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded")) + + // Recovery UPDATE fires (degraded → online). + mock.ExpectExec("UPDATE workspaces SET status = 'online'"). + WithArgs("ws-recovered"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + + // runtime_state intentionally absent (== ""); error_rate = 0; this + // is exactly what a freshly-restarted workspace's first heartbeat + // looks like. + body := `{"workspace_id":"ws-recovered","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":30}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + // ==================== UpdateCard ==================== func TestUpdateCard_Success(t *testing.T) { diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index 26061a1f..aeb961ea 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -51,6 +51,19 @@ type HeartbeatPayload struct { // a previously-reported spend value. Any non-zero value is clamped to // [0, maxMonthlySpend] before the DB write. (#615) MonthlySpend int64 `json:"monthly_spend"` + // RuntimeState is a self-reported runtime health flag separate from + // "is the heartbeat task firing at all". The heartbeat task lives in + // its own asyncio task and keeps pinging even when the agent runtime + // is wedged (e.g. claude_agent_sdk's `Control request timeout: + // initialize` leaves the SDK in a permanent error state for the + // process lifetime). RuntimeState is how the workspace tells the + // platform "I'm alive but my Claude runtime is broken — flip me to + // degraded so the canvas can show a Restart hint." + // + // Empty string = healthy / no signal. The only currently-recognised + // non-empty value is "wedged"; future values can extend this without + // migration. + RuntimeState string `json:"runtime_state"` } type UpdateCardPayload struct { diff --git a/workspace-server/migrations/043_workspace_status_enum.down.sql b/workspace-server/migrations/043_workspace_status_enum.down.sql new file mode 100644 index 00000000..88ecfbc1 --- /dev/null +++ b/workspace-server/migrations/043_workspace_status_enum.down.sql @@ -0,0 +1,19 @@ +-- 043_workspace_status_enum.down.sql +-- +-- Reverse 043_workspace_status_enum.up.sql: convert workspaces.status +-- back to plain TEXT and drop the workspace_status enum type. + +BEGIN; + +ALTER TABLE workspaces + ALTER COLUMN status DROP DEFAULT; + +ALTER TABLE workspaces + ALTER COLUMN status TYPE TEXT USING status::TEXT; + +ALTER TABLE workspaces + ALTER COLUMN status SET DEFAULT 'provisioning'; + +DROP TYPE workspace_status; + +COMMIT; diff --git a/workspace-server/migrations/043_workspace_status_enum.up.sql b/workspace-server/migrations/043_workspace_status_enum.up.sql new file mode 100644 index 00000000..1d42fec3 --- /dev/null +++ b/workspace-server/migrations/043_workspace_status_enum.up.sql @@ -0,0 +1,65 @@ +-- 043_workspace_status_enum.up.sql +-- +-- Convert workspaces.status from free-form TEXT to a real Postgres +-- ENUM type. The previous shape (TEXT DEFAULT 'provisioning' with no +-- CHECK constraint, set by 001_workspaces.sql) let any handler write +-- any string, including typos and stale values from older code paths. +-- Locking the value set forces every writer to use one of the agreed +-- states and lets us add a new state (`degraded`, used by the SDK +-- wedge detector landing in this same change) without losing type +-- safety on the column. +-- +-- Value set covers every status the production codebase actually writes: +-- +-- provisioning — workspace row exists, container is being created +-- (initial INSERT default) +-- online — heartbeat fresh + last response was successful +-- offline — Redis liveness key expired (ws-side dead) or +-- the proxy detected an unreachable upstream +-- degraded — runtime is alive but reporting trouble (heartbeat +-- error_rate >= 0.5, OR new in this change: +-- workspace explicitly reported runtime_state="wedged") +-- failed — provisioning never completed, or workspace marked +-- itself failed via bundle import / runtime crash +-- removed — soft-delete tombstone; the row stays so foreign- +-- key references survive but no operations target it +-- +-- Verified before writing this migration that production code in +-- workspace-server/internal/{handlers,registry,bundle} writes only +-- values from this list (test fixtures may write others; tests run +-- against an isolated fixture DB so the cast doesn't affect them). + +BEGIN; + +CREATE TYPE workspace_status AS ENUM ( + 'provisioning', + 'online', + 'offline', + 'degraded', + 'failed', + 'removed' +); + +-- The two-step ALTER (DROP DEFAULT then change type then SET DEFAULT) +-- is required because Postgres rejects an ALTER COLUMN TYPE on a +-- column that has a DEFAULT whose expression doesn't match the new +-- type. The intermediate moment with no default is fine — no INSERT +-- happens between these statements inside the same transaction. +-- +-- The `USING status::workspace_status` cast is the type-conversion +-- expression Postgres needs when the source and target types aren't +-- assignment-compatible. If any existing row has a status value +-- outside the enum's set, this statement aborts the transaction and +-- the migration leaves the table untouched — that's the correct +-- behavior (we'd want to know about the rogue value before locking +-- the type). +ALTER TABLE workspaces + ALTER COLUMN status DROP DEFAULT; + +ALTER TABLE workspaces + ALTER COLUMN status TYPE workspace_status USING status::workspace_status; + +ALTER TABLE workspaces + ALTER COLUMN status SET DEFAULT 'provisioning'::workspace_status; + +COMMIT; diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index ab8d107a..1c65c346 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -87,6 +87,68 @@ _RETRYABLE_PATTERNS = ( "try again", ) +# Module-level SDK-wedge flag. When claude_agent_sdk's `query.initialize()` +# raises `Control request timeout: initialize`, the SDK's internal client- +# process state is corrupted for the rest of the Python process — every +# subsequent `_run_query()` call hits the same wedge and re-throws. The +# executor itself can't auto-recover (the underlying CLI subprocess and +# its read pipe are in an unrecoverable state); only a workspace restart +# clears it. +# +# The heartbeat task reads these helpers and reports +# `runtime_state="wedged"` to the platform, which flips the workspace to +# `degraded` so the canvas surfaces a Restart hint instead of leaving +# the user staring at a green dot while every chat hangs. +# +# Module scope (not instance scope) is deliberate: the wedge is a +# property of the Python process, not the executor. A future per-org +# multi-executor design could move this to a shared registry, but with +# one executor per workspace process today the simplest lock-free +# read+write fits. +_sdk_wedged_reason: str | None = None + + +def is_wedged() -> bool: + """True if the Claude SDK has hit a non-recoverable init wedge in + this process. Sticky until process restart.""" + return _sdk_wedged_reason is not None + + +def wedge_reason() -> str: + """Human-readable description of the wedge cause, or empty string + when not wedged. Surfaced to the canvas via heartbeat sample_error.""" + return _sdk_wedged_reason or "" + + +def _mark_sdk_wedged(reason: str) -> None: + """Internal — flag the SDK as wedged. Only the first call wins + (subsequent identical wedges shouldn't overwrite a more specific + reason). Tests use `_reset_sdk_wedge_for_test()` to clear.""" + global _sdk_wedged_reason + if _sdk_wedged_reason is None: + _sdk_wedged_reason = reason + logger.error("SDK wedge detected: %s — workspace will report degraded until restart", reason) + + +def _reset_sdk_wedge_for_test() -> None: + """Test-only escape hatch. Production code never resets the flag — + wedge clears via process restart, which naturally re-imports this + module with the flag at None.""" + global _sdk_wedged_reason + _sdk_wedged_reason = None + + +# Substring patterns that classify an exception as the specific +# claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient +# subprocess crash, etc.). Match is case-insensitive on the formatted +# error string. Adding a new pattern here MUST come with a test in +# tests/test_claude_sdk_executor.py — the flag is sticky and false- +# positives lock the workspace into degraded for the whole process +# lifetime. +_WEDGE_ERROR_PATTERNS = ( + "control request timeout", +) + _SWALLOWED_STDERR_MARKER = "Check stderr output for details" @@ -506,6 +568,19 @@ class ClaudeSDKExecutor(AgentExecutor): # subprocess died. logger.error("SDK agent error [claude-code]: %s", formatted) logger.exception("SDK agent error [claude-code] — full traceback follows") + # Detect the specific claude_agent_sdk init-wedge case + # so the heartbeat task can flip the workspace to + # `degraded`. Match on the lowercased formatted error; + # `formatted` is whatever _format_process_error built, + # which already includes both the message and the + # exception class name. + formatted_lc = formatted.lower() + for pat in _WEDGE_ERROR_PATTERNS: + if pat in formatted_lc: + _mark_sdk_wedged( + f"claude_agent_sdk wedge: {formatted[:200]} — restart workspace to recover" + ) + break response_text = sanitize_agent_error(exc) break finally: diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index 7e8b1c69..c0fc2f1d 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -19,6 +19,30 @@ import httpx from platform_auth import auth_headers, refresh_cache, self_source_headers + +def _runtime_state_payload() -> dict: + """Build the {runtime_state, sample_error} portion of the heartbeat + body when the Claude SDK has hit a wedge. Returns an empty dict + when the runtime is healthy so the heartbeat payload doesn't grow + fields the platform doesn't need. + + Imported lazily so workspaces running non-Claude runtimes (where + `claude_sdk_executor` may not be importable at all) keep working — + a missing import means "no Claude wedge possible here, healthy." + """ + try: + from claude_sdk_executor import is_wedged, wedge_reason + except Exception: + return {} + if not is_wedged(): + return {} + return { + "runtime_state": "wedged", + # sample_error doubles as the human-readable banner text on the + # canvas's degraded card — keep it short and actionable. + "sample_error": wedge_reason(), + } + logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 30 # seconds @@ -85,16 +109,23 @@ class HeartbeatLoop: while True: # 1. Send heartbeat (Phase 30.1: include auth header if token known) try: + body = { + "workspace_id": self.workspace_id, + "error_rate": self.error_rate, + "sample_error": self.sample_error, + "active_tasks": self.active_tasks, + "current_task": self.current_task, + "uptime_seconds": int(time.time() - self.start_time), + } + # Layer the runtime-wedge fields on top so a + # non-empty sample_error from the wedge wins + # over the (typically empty) heartbeat + # sample_error field. The platform reads + # runtime_state to flip status → degraded. + body.update(_runtime_state_payload()) await client.post( f"{self.platform_url}/registry/heartbeat", - json={ - "workspace_id": self.workspace_id, - "error_rate": self.error_rate, - "sample_error": self.sample_error, - "active_tasks": self.active_tasks, - "current_task": self.current_task, - "uptime_seconds": int(time.time() - self.start_time), - }, + json=body, headers=auth_headers(), ) self.error_count = 0 @@ -113,16 +144,18 @@ class HeartbeatLoop: logger.warning("Heartbeat 401 for %s — refreshing token cache and retrying once", self.workspace_id) refresh_cache() try: + retry_body = { + "workspace_id": self.workspace_id, + "error_rate": self.error_rate, + "sample_error": self.sample_error, + "active_tasks": self.active_tasks, + "current_task": self.current_task, + "uptime_seconds": int(time.time() - self.start_time), + } + retry_body.update(_runtime_state_payload()) await client.post( f"{self.platform_url}/registry/heartbeat", - json={ - "workspace_id": self.workspace_id, - "error_rate": self.error_rate, - "sample_error": self.sample_error, - "active_tasks": self.active_tasks, - "current_task": self.current_task, - "uptime_seconds": int(time.time() - self.start_time), - }, + json=retry_body, headers=auth_headers(), ) self._consecutive_failures = 0 diff --git a/workspace/tests/test_claude_sdk_executor.py b/workspace/tests/test_claude_sdk_executor.py index ac3b0c3d..5b50a950 100644 --- a/workspace/tests/test_claude_sdk_executor.py +++ b/workspace/tests/test_claude_sdk_executor.py @@ -1221,3 +1221,101 @@ def test_load_config_dict_empty_file_returns_empty(tmp_path): e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None) result = e._load_config_dict() assert result == {} + + +# ==================== SDK wedge detector ==================== +# +# Exercises the module-level _sdk_wedged_reason flag set when the +# claude_agent_sdk init handshake times out. The flag is sticky — the +# heartbeat task reads it via is_wedged() / wedge_reason() and reports +# runtime_state="wedged" so the platform flips status → degraded. + +import claude_sdk_executor as _executor_mod + + +def test_wedge_helpers_default_clean(): + """Fresh module: no wedge.""" + _executor_mod._reset_sdk_wedge_for_test() + assert _executor_mod.is_wedged() is False + assert _executor_mod.wedge_reason() == "" + + +def test_mark_sdk_wedged_sets_flag_and_reason(): + """First mark wins and sets both is_wedged() and the reason text.""" + _executor_mod._reset_sdk_wedge_for_test() + _executor_mod._mark_sdk_wedged("init timeout — restart") + try: + assert _executor_mod.is_wedged() is True + assert "init timeout" in _executor_mod.wedge_reason() + finally: + _executor_mod._reset_sdk_wedge_for_test() + + +def test_mark_sdk_wedged_sticky_first_wins(): + """A second wedge call with a different reason does NOT overwrite + the first. The first cause is the one the user needs to see; later + knock-on errors from the same wedge would otherwise mask it.""" + _executor_mod._reset_sdk_wedge_for_test() + _executor_mod._mark_sdk_wedged("first cause — Control request timeout") + _executor_mod._mark_sdk_wedged("noise from a downstream symptom") + try: + assert _executor_mod.wedge_reason() == "first cause — Control request timeout" + finally: + _executor_mod._reset_sdk_wedge_for_test() + + +@pytest.mark.asyncio +async def test_execute_marks_wedge_on_control_request_timeout(): + """End-to-end: when _run_query raises an exception whose formatted + error contains 'Control request timeout' (case-insensitive), the + executor's catch block flags the SDK as wedged. Subsequent + is_wedged() reads return True until process restart (or the + test-only reset).""" + _executor_mod._reset_sdk_wedge_for_test() + e = _make_executor() + ctx = _make_context(["test prompt"]) + eq = _make_event_queue() + + async def boom(prompt, options): + # Match the literal exception claude_agent_sdk raises in the + # observed wedge path. + raise Exception("Control request timeout: initialize") + yield # pragma: no cover — make this an async generator + + with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ + patch("claude_sdk_executor.read_delegation_results", return_value=""), \ + patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ + patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ + patch("claude_agent_sdk.query", new=boom): + try: + await e.execute(ctx, eq) + assert _executor_mod.is_wedged() is True, "wedge flag must be set" + assert "Control request timeout" in _executor_mod.wedge_reason() + finally: + _executor_mod._reset_sdk_wedge_for_test() + + +@pytest.mark.asyncio +async def test_execute_does_not_mark_wedge_on_unrelated_error(): + """Sanity: a generic non-wedge exception (e.g. ValueError) MUST + NOT trigger the wedge flag. False-positives lock the workspace + into degraded for the whole process lifetime.""" + _executor_mod._reset_sdk_wedge_for_test() + e = _make_executor() + ctx = _make_context(["test prompt"]) + eq = _make_event_queue() + + async def boom(prompt, options): + raise ValueError("ordinary tool failure, not a wedge") + yield # pragma: no cover + + with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ + patch("claude_sdk_executor.read_delegation_results", return_value=""), \ + patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ + patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ + patch("claude_agent_sdk.query", new=boom): + try: + await e.execute(ctx, eq) + assert _executor_mod.is_wedged() is False, "non-wedge error must not flip the flag" + finally: + _executor_mod._reset_sdk_wedge_for_test()