diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py deleted file mode 100644 index 57d53643..00000000 --- a/workspace/claude_sdk_executor.py +++ /dev/null @@ -1,715 +0,0 @@ -"""SDK-based agent executor for Claude Code runtime. - -Uses the official `claude-agent-sdk` Python package to invoke the Claude Code -engine programmatically โ€” no subprocess, no stdout parsing, no zombie reap. - -Replaces CLIAgentExecutor for the `claude-code` runtime only. Other CLI runtimes -(codex, ollama) keep using `cli_executor.py`. - -Benefits over CLI subprocess: -- No per-message ~500ms startup overhead -- No stdout buffering issues -- Native Python session management (no JSON parsing of stdout) -- Real message stream โ€” can surface tool calls in future for live UX -- Cooperative cancel (closes the query async generator on cancel()) -- Same Claude Code engine, so plugins / skills / CLAUDE.md still apply - -Concurrency model ------------------ -Turns are serialized per-executor via an asyncio.Lock. The old CLI executor -serialized implicitly by spawning one subprocess per message and awaiting it; -the SDK removes that, so we re-introduce serialization explicitly. This keeps -session_id updates race-free and makes cancel() well-defined (there's at most -one active stream at any given moment). -""" - -from __future__ import annotations - -import asyncio -import logging -import os -import sys -from collections.abc import AsyncIterator, Callable -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any - -import yaml - -import claude_agent_sdk as sdk - -from a2a.server.agent_execution import AgentExecutor, RequestContext -from a2a.server.events import EventQueue -from a2a.helpers import new_agent_text_message - -from executor_helpers import ( - CONFIG_MOUNT, - MEMORY_CONTENT_MAX_CHARS, - WORKSPACE_MOUNT, - auto_push_hook, - brief_summary, - collect_outbound_files, - commit_memory, - extract_attached_files, - extract_message_text, - get_a2a_instructions, - get_hma_instructions, - get_mcp_server_path, - get_system_prompt, - read_delegation_results, - recall_memories, - sanitize_agent_error, - set_current_task, -) - -if TYPE_CHECKING: - from heartbeat import HeartbeatLoop - -logger = logging.getLogger(__name__) - -_NO_TEXT_MSG = "Error: message contained no text content." -_NO_RESPONSE_MSG = "(no response generated)" -_MAX_RETRIES = 3 -_BASE_RETRY_DELAY_S = 5 -# Cap for stderr captured from the CLI subprocess in the executor log. Keeps -# log lines bounded while still surfacing enough context to diagnose crashes. -# Fixes #66 (previously the executor logged nothing beyond the generic -# "Check stderr output for details" message). -_PROCESS_ERROR_STDERR_MAX_CHARS = 4096 - -# Substrings in error messages that indicate a transient failure worth retrying. -_RETRYABLE_PATTERNS = ( - "rate", - "limit", - "429", - "overloaded", - "capacity", - "exit code 1", - "try again", -) - -# Wedge state moved to runtime_wedge (see that module's docstring for -# the rationale + the broader "Compatibility shim" note). This block -# re-exports under the historical names so the in-file call sites in -# _run_query stay terse and any external consumer that imported them -# from claude_sdk_executor keeps working for one release cycle. -from runtime_wedge import ( # noqa: E402 - clear_wedge as _clear_sdk_wedge_on_success, - is_wedged, - mark_wedged as _mark_sdk_wedged, - reset_for_test as _reset_sdk_wedge_for_test, - wedge_reason, -) - -# Names below are re-exported (not consumed inside this file) for -# backwards compatibility with third-party adapters that imported them -# from claude_sdk_executor before the wedge state moved to runtime_wedge. -# Listing them in __all__ marks the intent explicitly and stops static -# analysis from flagging the imports as unused. -__all__ = [ - "is_wedged", - "wedge_reason", - "_reset_sdk_wedge_for_test", -] - - -# Per-tool-use summarizers. Reads the most-useful argument from each -# tool's input dict so the canvas progress feed shows -# `๐Ÿ›  Read /tmp/foo` instead of the bare tool name. Anything not in the -# table falls through to a generic "๐Ÿ›  (โ€ฆ)" line. Order keys by -# tool frequency so a future contributor can see the high-traffic -# tools first. -_TOOL_USE_SUMMARIZERS: dict[str, Callable[[dict], str]] = { - "Read": lambda i: f"๐Ÿ“„ Read {i.get('file_path', '?')}", - "Write": lambda i: f"โœ๏ธ Write {i.get('file_path', '?')}", - "Edit": lambda i: f"โœ๏ธ Edit {i.get('file_path', '?')}", - "Bash": lambda i: f"โšก Bash: {(i.get('command') or '')[:80]}", - "Glob": lambda i: f"๐Ÿ” Glob {i.get('pattern', '?')}", - "Grep": lambda i: f"๐Ÿ” Grep {i.get('pattern', '?')}", - "WebFetch": lambda i: f"๐ŸŒ WebFetch {i.get('url', '?')}", - "WebSearch": lambda i: f"๐ŸŒ WebSearch {i.get('query', '?')}", - "Task": lambda i: f"๐Ÿค– Task: {(i.get('description') or '')[:60]}", - "TodoWrite": lambda _i: "๐Ÿ“ TodoWrite", -} - - -def _summarize_tool_use(tool_name: str, tool_input: dict) -> str: - summarizer = _TOOL_USE_SUMMARIZERS.get(tool_name) - if summarizer: - try: - return summarizer(tool_input or {})[:200] - except Exception: - pass - # Generic fallback. Truncated so a tool with a giant input dict - # doesn't write a 10kB activity row per call. - return f"๐Ÿ›  {tool_name}(โ€ฆ)"[:200] - - -async def _report_tool_use(block: Any) -> None: - """Fire-and-forget agent_log activity row per tool the SDK invoked, - so the canvas's MyChat live-progress feed can render each step - Claude is doing instead of staring at a single spinner. - - Posts directly to /workspaces/:id/activity rather than through - a2a_tools.report_activity โ€” that helper also pushes a current_task - heartbeat which would duplicate as a TASK_UPDATED line in the - chat feed. The workspace card's current_task is already set - once per turn by the executor's set_current_task(brief_summary) - call, so the per-tool telemetry stays a chat-only signal. - - Best-effort โ€” any failure (network blip, platform unreachable, the - block didn't have the attrs we expected) is swallowed silently. - The tool will still execute regardless; only the progress - telemetry is lost. Deliberately does NOT raise โ€” a malformed - block must not abort the message-stream iteration in - `_run_query`. - """ - try: - # Lazy imports to keep this helper non-essential โ€” the - # executor must still run when the workspace's network/auth - # plumbing isn't fully set up (e.g. unit tests). - import httpx - from a2a_client import PLATFORM_URL, WORKSPACE_ID - from platform_auth import auth_headers - except Exception: - return - try: - tool_name = getattr(block, "name", "") or "" - tool_input = getattr(block, "input", {}) or {} - if not tool_name: - return - summary = _summarize_tool_use(tool_name, tool_input) - # 5s budget โ€” long enough to absorb a single platform GC - # pause, short enough that a wedged platform doesn't slow - # the tool-iteration cadence beyond noticeable. - async with httpx.AsyncClient(timeout=5.0) as client: - await client.post( - f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity", - json={ - "activity_type": "agent_log", - "source_id": WORKSPACE_ID, - # target_id == source for self-actions. Matches the - # convention other self-logged activity rows use - # (a2a_receive when the workspace logs its own - # outbound reply) so DB consumers joining on - # target_id see a well-defined value. - "target_id": WORKSPACE_ID, - "summary": summary, - "status": "ok", - "method": tool_name, - }, - headers=auth_headers(), - ) - except Exception: - # Telemetry failures must not break the conversation. - return - - -# Substring patterns that classify an exception as the specific -# claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient -# subprocess crash, etc.). Match is case-insensitive on the formatted -# error string. Adding a new pattern here MUST come with a test in -# tests/test_claude_sdk_executor.py โ€” false-positives lock the -# workspace into degraded until the next successful query clears it. -# -# `:initialize` suffix-anchored โ€” the SDK can theoretically time out -# on later control messages (in-flight tool callbacks), but those -# don't leave the SDK in the unrecoverable post-init state we're -# trying to detect. Limit the pattern to the specific wedge. -_WEDGE_ERROR_PATTERNS = ( - "control request timeout: initialize", -) - - -_SWALLOWED_STDERR_MARKER = "Check stderr output for details" - - -def _probe_claude_cli_error() -> str | None: - """Run ``claude --print`` directly and capture its stderr + stdout. - - Used as a fallback when the claude-agent-sdk raises a bare ``Exception`` - with the swallowed "Check stderr output for details" placeholder โ€” that - happens when the SDK wraps a stream error from the CLI subprocess and - loses both the ``.stderr`` attribute and the exit code. At that point - the only way to see the real failure reason (rate limit, auth error, - network outage, missing token) is to run the CLI ourselves. - - Bounded by a 30s timeout so a hung CLI can't stall the error path. - Returns None if the probe itself failed (wrong invariant โ€” don't - corrupt the main error message with probe noise). - """ - try: - import subprocess - # --print reads stdin, prints response, exits. Empty stdin gives the - # CLI something to work with without triggering an actual model call - # when it's going to fail anyway. - proc = subprocess.run( - ["claude", "--print"], - input="probe", - capture_output=True, - text=True, - timeout=30, - ) - if proc.returncode == 0: - # CLI succeeded โ€” the original error was a transient state that - # resolved between the SDK failure and our probe. Signal that. - return "" - raw = (proc.stderr or "") + (proc.stdout or "") - raw = raw.strip() - if not raw: - return f"" - if len(raw) > _PROCESS_ERROR_STDERR_MAX_CHARS: - raw = raw[:_PROCESS_ERROR_STDERR_MAX_CHARS] + "... [truncated]" - return raw - except Exception as probe_exc: # pragma: no cover โ€” best-effort diagnostic - return f"" - - -def _format_process_error(exc: BaseException) -> str: - """Render a Claude-SDK ProcessError (or any ClaudeSDKError) with its full - captured context โ€” exit code, stderr, exception type. Plain strings for - non-SDK exceptions fall back to str(exc). - - Bounded at _PROCESS_ERROR_STDERR_MAX_CHARS so a runaway CLI can't spam - the log. Used by the executor's error path (fixes #66 โ€” the SDK's - ProcessError carries `.stderr`/`.exit_code` attributes that the previous - code silently discarded, leaving every CLI crash with an identical - "Check stderr output for details" message in the workspace log). - - Fixes #160: when the SDK raises a bare ``Exception`` containing the - "Check stderr output for details" placeholder (which happens when the - CLI subprocess emits a stream error the SDK can't categorize โ€” rate - limit, auth, network), there's no ``.stderr``/``.exit_code`` to read. - In that case we fall back to running the CLI ourselves via - ``_probe_claude_cli_error`` so the operator sees the real failure - reason (e.g. ``You've hit your limit ยท resets Apr 17``) instead of - chasing ghosts in the workspace logs. - """ - parts = [f"{type(exc).__name__}: {exc}"] - exit_code = getattr(exc, "exit_code", None) - if exit_code is not None: - parts.append(f"exit_code={exit_code}") - stderr = getattr(exc, "stderr", None) - if stderr: - trimmed = stderr[:_PROCESS_ERROR_STDERR_MAX_CHARS] - if len(stderr) > _PROCESS_ERROR_STDERR_MAX_CHARS: - trimmed += f"... [{len(stderr) - _PROCESS_ERROR_STDERR_MAX_CHARS} more chars truncated]" - parts.append(f"stderr={trimmed!r}") - elif exit_code is None and _SWALLOWED_STDERR_MARKER in str(exc): - # #160: generic exception with the swallowed-stderr placeholder. - # Probe the CLI directly โ€” this is the only way to surface the real - # error when the SDK lost it in translation. - probed = _probe_claude_cli_error() - if probed: - parts.append(f"probed_cli_error={probed!r}") - return " | ".join(parts) - - -@dataclass -class QueryResult: - """Outcome of a single `query()` stream. - - `text` is the canonical final response; `session_id` is the id the SDK - reports in its ResultMessage (used for resume on the next turn). - """ - text: str - session_id: str | None - - -class ClaudeSDKExecutor(AgentExecutor): - """Executes agent tasks via the claude-agent-sdk programmatic API.""" - - def __init__( - self, - system_prompt: str | None, - config_path: str, - heartbeat: "HeartbeatLoop | None", - model: str = "sonnet", - ): - self.system_prompt = system_prompt - self.config_path = config_path - self.heartbeat = heartbeat - self.model = model - self._session_id: str | None = None - self._active_stream: AsyncIterator[Any] | None = None - # Serializes concurrent execute() calls on the same executor so - # session_id / _active_stream mutations stay race-free. - self._run_lock = asyncio.Lock() - - # ------------------------------------------------------------------ - # Prompt + options builders - # ------------------------------------------------------------------ - - def _resolve_cwd(self) -> str: - """Run in /workspace if it has been populated, otherwise /configs.""" - if os.path.isdir(WORKSPACE_MOUNT) and os.listdir(WORKSPACE_MOUNT): - return WORKSPACE_MOUNT - return CONFIG_MOUNT - - def _build_system_prompt(self) -> str | None: - """Compose system prompt from file + A2A + HMA memory instructions.""" - base = get_system_prompt(self.config_path, fallback=self.system_prompt) - a2a = get_a2a_instructions(mcp=True) - hma = get_hma_instructions() - parts = [p for p in (base, a2a, hma) if p] - return "\n\n".join(parts) if parts else None - - def _prepare_prompt(self, user_input: str) -> str: - """Prepend delegation results that arrived while idle.""" - delegation_context = read_delegation_results() - if delegation_context: - return ( - "[Delegation results received while you were idle]\n" - f"{delegation_context}\n\n[New message]\n{user_input}" - ) - return user_input - - async def _inject_memories_if_first_turn(self, prompt: str) -> str: - if self._session_id: - return prompt - memories = await recall_memories() - if not memories: - return prompt - return f"[Prior context from memory]\n{memories}\n\n{prompt}" - - def _load_config_dict(self) -> dict: - """Read config.yaml as a raw dict for field-level inspection. - - Returns an empty dict on any I/O or parse error so callers can - always use ``.get()`` without guards. - """ - try: - config_file = os.path.join(self.config_path, "config.yaml") - with open(config_file) as f: - return yaml.safe_load(f) or {} - except Exception: - return {} - - def _build_options(self) -> Any: - """Build ClaudeAgentOptions. - - No allowed_tools allowlist โ€” bypassPermissions grants full access, - matching the old CLI `--dangerously-skip-permissions` so Claude can - use every built-in tool (Task, TodoWrite, NotebookEdit, BashOutput/ - KillShell, ExitPlanMode, etc.) plus all MCP tools. - - The MCP server launcher uses `sys.executable` so tests and alternate - virtual-env layouts don't depend on a `python3` shim being on PATH. - - output_config wiring (issue #652) - ---------------------------------- - Reads ``effort`` and ``task_budget`` from config.yaml and populates - ``output_config`` on the SDK options before the API call: - - - ``effort`` (str): one of low|medium|high|xhigh|max. xhigh is the - Opus 4.7 recommended default for long agentic tasks. - - ``task_budget`` (int): advisory total-token budget across the full - agentic loop. Must be >= 20000 (API minimum) or 0/absent (unset). - When set, the ``task-budgets-2026-03-13`` beta header is added so - the API accepts the field. - """ - mcp_servers = { - "a2a": { - "command": sys.executable, - "args": [get_mcp_server_path()], - } - } - - create_kwargs: dict = dict( - model=self.model, - permission_mode="bypassPermissions", - cwd=self._resolve_cwd(), - mcp_servers=mcp_servers, - system_prompt=self._build_system_prompt(), - resume=self._session_id, - ) - - # --- output_config: effort + task_budget (issue #652) --- - config = self._load_config_dict() - output_config: dict = {} - effort = config.get("effort", "") - task_budget = config.get("task_budget", 0) - - if effort: - output_config["effort"] = effort # "low"|"medium"|"high"|"xhigh"|"max" - - if task_budget and int(task_budget) >= 20000: - output_config["task_budget"] = { - "type": "tokens", - "total": int(task_budget), - } - betas = list(create_kwargs.get("betas", [])) - if "task-budgets-2026-03-13" not in betas: - betas.append("task-budgets-2026-03-13") - create_kwargs["betas"] = betas - elif task_budget and int(task_budget) > 0: - # Below minimum โ€” reject clearly before any API call is made. - raise ValueError( - f"task_budget must be >= 20000 tokens (got {task_budget})" - ) - - if output_config: - create_kwargs["output_config"] = output_config - - return sdk.ClaudeAgentOptions(**create_kwargs) - - # ------------------------------------------------------------------ - # Query streaming - # ------------------------------------------------------------------ - - async def _run_query(self, prompt: str, options: Any) -> QueryResult: - """Drive the SDK query stream and return a QueryResult. - - Prefers ResultMessage.result (the canonical final text โ€” same field - the CLI's --output-format json used) and only falls back to the - concatenation of AssistantMessage TextBlocks when result is absent. - Otherwise pre-tool reasoning and post-tool summary get double-emitted. - - Pure: does not mutate executor state other than setting / clearing - `self._active_stream` so cancel() can reach in. The caller decides - whether to persist the returned session_id. - """ - assistant_chunks: list[str] = [] - result_text: str | None = None - session_id: str | None = None - self._active_stream = sdk.query(prompt=prompt, options=options) - try: - async for message in self._active_stream: - if isinstance(message, sdk.AssistantMessage): - for block in message.content: - if isinstance(block, sdk.TextBlock): - assistant_chunks.append(block.text) - else: - # ToolUseBlock / ServerToolUseBlock are present - # on the real SDK but not on the conftest stub โ€” - # check by class name to avoid an isinstance() - # against a class the stub doesn't define. - cls = type(block).__name__ - if cls in ("ToolUseBlock", "ServerToolUseBlock"): - await _report_tool_use(block) - elif isinstance(message, sdk.ResultMessage): - sid = getattr(message, "session_id", None) - if sid: - session_id = sid - result_text = getattr(message, "result", None) - finally: - self._active_stream = None - text = result_text if result_text is not None else "".join(assistant_chunks) - # Auto-recover the wedge flag โ€” if a previous query() left this - # process in `_sdk_wedged` and THIS query just completed - # cleanly, the SDK clearly works again. Clear so the next - # heartbeat reports runtime_state empty and the platform flips - # status degraded โ†’ online without a manual restart. - # - # Gate on actual content from the stream so a degenerate - # "iterator returned without raising but emitted nothing" - # case (possible from a partial stream or a stub SDK) doesn't - # falsely advertise recovery. A real successful query yields - # at least a ResultMessage (sets result_text) or one - # AssistantMessage TextBlock (populates assistant_chunks). - if result_text is not None or assistant_chunks: - _clear_sdk_wedge_on_success() - return QueryResult(text=text, session_id=session_id) - - # ------------------------------------------------------------------ - # AgentExecutor interface - # ------------------------------------------------------------------ - - async def execute(self, context: RequestContext, event_queue: EventQueue): - """Run a turn through the Claude Agent SDK and emit the response. - - Serialized via `self._run_lock` โ€” concurrent A2A messages to the same - workspace queue rather than racing on `_session_id` / `_active_stream`. - """ - user_input = extract_message_text(context.message) - # Surface attached files to claude-code via a manifest in the prompt. - # Claude Code reads files through its own Read/Glob tools by path โ€” - # as long as the prompt names the path, the CLI will open them on - # demand. Same contract every platform runtime uses so the UX is - # identical across hermes / langgraph / claude-code. - attached = extract_attached_files(context.message) - if attached: - manifest = "\n\nAttached files:\n" + "\n".join( - f"- {f['name']} ({f['mime_type'] or 'unknown type'}) at {f['path']}" - for f in attached - ) - user_input = (user_input + manifest) if user_input else manifest.lstrip() - if not user_input: - await event_queue.enqueue_event(new_agent_text_message(_NO_TEXT_MSG)) - return - - async with self._run_lock: - response_text = await self._execute_locked(user_input) - - # Enqueue outside the lock so the next queued turn can start - # preparing its prompt while this turn's response ships. Event - # ordering is preserved per-queue by the A2A server, so no races. - # If the response mentions /workspace/... files, stage each and - # emit FileParts alongside the text so the canvas can download. - outbound = collect_outbound_files(response_text) - if outbound: - from a2a.types import FilePart, FileWithUri, Message, Part, Role, TextPart - import uuid as _uuid - parts: list = [Part(root=TextPart(text=response_text))] if response_text else [] - for f in outbound: - parts.append(Part(root=FilePart(file=FileWithUri( - uri="workspace:" + f["path"], - name=f["name"], - mimeType=f["mime_type"], - )))) - await event_queue.enqueue_event(Message( - messageId=_uuid.uuid4().hex, - role=Role.agent, - parts=parts, - )) - else: - await event_queue.enqueue_event(new_agent_text_message(response_text)) - - @staticmethod - def _is_retryable(exc: BaseException) -> bool: - """Check if an SDK exception looks like a transient rate-limit or - capacity error that's worth retrying with backoff.""" - msg = str(exc).lower() - return any(p in msg for p in _RETRYABLE_PATTERNS) - - def _reset_session_after_error(self, exc: BaseException) -> None: - """Clear `_session_id` if the exception looks like a subprocess - crash (#75). On the next `_build_options()` call `resume=None` is - passed to the SDK, so the CLI boots a brand-new session instead of - trying to resume one the previous subprocess left in an - unrecoverable state. - - Kept in its own method so the policy can evolve (e.g. also clear - on MessageParseError) without touching the retry loop. Logs at - INFO when a session was actually cleared; silent when there was - nothing to reset. - """ - exc_name = type(exc).__name__ - # Conservative: reset only on subprocess-level failures. Pure - # rate-limit / capacity errors don't leave the session in a bad - # state โ€” keep the session_id so the resumed turn preserves - # conversational continuity. - is_subprocess_error = ( - exc_name in ("ProcessError", "CLIConnectionError") - or getattr(exc, "exit_code", None) is not None - or "exit code" in str(exc).lower() - ) - if not is_subprocess_error: - return - if self._session_id is None: - return - logger.info( - "SDK session reset after %s: clearing session_id so the next " - "attempt starts fresh (fixes #75 session contamination)", - exc_name, - ) - self._session_id = None - - async def _execute_locked(self, user_input: str) -> str: - """Body of execute() that runs under the run lock. - - Retries transient errors (rate limits, capacity, exit-code-1) up to - _MAX_RETRIES times with exponential backoff (5s, 10s, 20s). - """ - # Keep a clean copy of the user's actual message for the memory record, - # BEFORE any delegation or memory injection. - original_input = user_input - logger.debug("SDK execute [claude-code]: %s", user_input[:200]) - - prompt = self._prepare_prompt(user_input) - - response_text: str = "" - try: - # set_current_task INSIDE the try so active_tasks is always - # decremented by the finally block even if CancelledError hits - # during the heartbeat HTTP push. Moving it outside the try - # created a narrow window where cancellation left active_tasks - # stuck at 1 forever, permanently blocking queue drain. (#2026) - await set_current_task(self.heartbeat, brief_summary(user_input)) - prompt = await self._inject_memories_if_first_turn(prompt) - for attempt in range(_MAX_RETRIES): - options = self._build_options() - try: - result = await self._run_query(prompt=prompt, options=options) - if result.session_id: - self._session_id = result.session_id - response_text = result.text - break # success - except Exception as exc: - formatted = _format_process_error(exc) - # #75: CLI subprocess crashes leave our _session_id - # referencing a session the next subprocess can't - # resume. Without this reset the next attempt would - # crash identically even when the underlying cause - # was transient, cascading into "crashed once โ†’ - # crashes forever until container restart." Clear - # the session_id so the next attempt (retry or - # next user turn) starts fresh. - self._reset_session_after_error(exc) - if attempt < _MAX_RETRIES - 1 and self._is_retryable(exc): - delay = _BASE_RETRY_DELAY_S * (2 ** attempt) - logger.warning( - "SDK agent [claude-code] transient error (attempt %d/%d), " - "retrying in %ds: %s", - attempt + 1, _MAX_RETRIES, delay, formatted, - ) - await asyncio.sleep(delay) - continue - # Non-retryable or exhausted retries. Log exit_code + - # stderr explicitly (fixes #66) so operators don't have - # to reproduce the crash manually to find out why the - # subprocess died. - logger.error("SDK agent error [claude-code]: %s", formatted) - logger.exception("SDK agent error [claude-code] โ€” full traceback follows") - # Detect the specific claude_agent_sdk init-wedge case - # so the heartbeat task can flip the workspace to - # `degraded`. Match on the lowercased formatted error; - # `formatted` is whatever _format_process_error built, - # which already includes both the message and the - # exception class name. - formatted_lc = formatted.lower() - for pat in _WEDGE_ERROR_PATTERNS: - if pat in formatted_lc: - _mark_sdk_wedged( - f"claude_agent_sdk wedge: {formatted[:200]} โ€” restart workspace to recover" - ) - break - response_text = sanitize_agent_error(exc) - break - finally: - await set_current_task(self.heartbeat, "") - await commit_memory( - f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}" - ) - # Auto-push unpushed commits and open PR (non-blocking, best-effort). - await auto_push_hook() - - return response_text or _NO_RESPONSE_MSG - - async def cancel(self, context: RequestContext, event_queue: EventQueue): - """Cooperatively cancel the currently running turn. - - cancel() targets whatever turn is in flight *right now*, not the - specific turn the caller may have been looking at when they sent - the cancel request. If turn A has finished and turn B is already - running under the run lock by the time cancel arrives, turn B is - the one that gets aborted. This matches how a "stop" button in a - chat UI typically behaves (stop whatever is running) and is a - conscious trade-off against per-turn bookkeeping. - - Implementation: the SDK's query() is an async generator; calling - aclose() raises GeneratorExit inside the running turn and unwinds - cleanly. We read `self._active_stream` into a local BEFORE calling - aclose so the reference can't be reassigned by another turn - mid-cancel. Best-effort โ€” if no stream is active (cancel arrived - between turns, or the stream has no aclose), this is a no-op. - """ - stream = self._active_stream - if stream is None: - return - aclose = getattr(stream, "aclose", None) - if aclose is None: - return - try: - await aclose() - except Exception: - logger.exception("SDK cancel: aclose() raised") diff --git a/workspace/cli_executor.py b/workspace/cli_executor.py index ce180f82..0396c61f 100644 --- a/workspace/cli_executor.py +++ b/workspace/cli_executor.py @@ -5,10 +5,11 @@ Supports CLI agents that accept a prompt and output a response: - Ollama: ollama run "..." - Custom: any command that reads stdin or accepts -p -NOTE: the `claude-code` runtime no longer routes here. It uses -ClaudeSDKExecutor (see claude_sdk_executor.py) which wraps the -claude-agent-sdk Python package. This executor is reserved for CLI-only -runtimes that don't yet have a programmatic SDK integration. +NOTE: the `claude-code` runtime no longer routes here โ€” its template +repo (molecule-ai-workspace-template-claude-code) ships its own +ClaudeSDKExecutor wrapping the claude-agent-sdk Python package as of +#87 Phase 2. This executor is reserved for CLI-only runtimes that +don't yet have a programmatic SDK integration. The runtime is selected via config.yaml: runtime: codex | ollama | custom @@ -59,8 +60,8 @@ logger = logging.getLogger(__name__) # Built-in runtime presets. -# The `claude-code` runtime uses ClaudeSDKExecutor (claude_sdk_executor.py) -# and intentionally has no entry here. +# The `claude-code` runtime uses ClaudeSDKExecutor in its own template +# repo (post-#87 Phase 2) and intentionally has no entry here. RUNTIME_PRESETS: dict[str, dict] = { "codex": { "command": "codex", @@ -117,9 +118,12 @@ class CLIAgentExecutor(AgentExecutor): if runtime == "claude-code": # Defensive โ€” the adapter should never construct a CLI executor # for claude-code. Fail loud rather than silently falling back. + # The claude-code template owns its own ClaudeSDKExecutor in + # molecule-ai-workspace-template-claude-code (post-#87 Phase 2). raise ValueError( - "claude-code runtime is served by ClaudeSDKExecutor, not " - "CLIAgentExecutor. Check adapters/claude_code/adapter.py." + "claude-code runtime is served by ClaudeSDKExecutor in its " + "template repo, not CLIAgentExecutor. If you're seeing this " + "in molecule-runtime, the adapter wiring is wrong." ) self.runtime = runtime self.config = runtime_config diff --git a/workspace/runtime_wedge.py b/workspace/runtime_wedge.py index ffc7f90f..c33ecb10 100644 --- a/workspace/runtime_wedge.py +++ b/workspace/runtime_wedge.py @@ -85,15 +85,6 @@ When wedge is the WRONG primitive: if the failure is per-request (the SDK works for some inputs but not others), surface as a normal A2A error response, not a wedge. Wedge means "every subsequent request in this process will fail until restart." - -Compatibility shim (will be removed once #87 Phase 2 lands) ------------------------------------------------------------ - -claude_sdk_executor.py re-exports the four functions under the historical -names (is_wedged, wedge_reason, _mark_sdk_wedged, _clear_sdk_wedge_on_success) -for one release cycle. New adapter code should import from runtime_wedge -directly; the shim only exists so existing third-party adapters that -copied our claude_sdk_executor wedge convention have time to migrate. """ from __future__ import annotations diff --git a/workspace/tests/conftest.py b/workspace/tests/conftest.py index 4c1c5f04..a67bd08b 100644 --- a/workspace/tests/conftest.py +++ b/workspace/tests/conftest.py @@ -209,53 +209,14 @@ def _make_tools_mocks(): sys.modules["builtin_tools.security"] = _sec_mod -def _make_claude_agent_sdk_mock(): - """Stub claude_agent_sdk so claude_sdk_executor can be imported without - the real SDK installed. Tests that exercise execute() patch query(). - - Installed at collection time so a top-level `import claude_agent_sdk` - in claude_sdk_executor.py resolves to this stub. Real tests can override - individual attributes via patch(). - """ - mod = ModuleType("claude_agent_sdk") - - class _StubTextBlock: - def __init__(self, text=""): - self.text = text - - class _StubAssistantMessage: - def __init__(self, blocks=None): - self.content = blocks or [] - - class _StubResultMessage: - def __init__(self, session_id=None, result=None): - self.session_id = session_id - self.result = result - - class _StubOptions: - def __init__(self, **kwargs): - self.kwargs = kwargs - - async def _stub_query(prompt, options): # pragma: no cover โ€” overridden in tests - yield _StubAssistantMessage([_StubTextBlock("stub")]) - yield _StubResultMessage(session_id="stub-session") - - mod.TextBlock = _StubTextBlock - mod.AssistantMessage = _StubAssistantMessage - mod.ResultMessage = _StubResultMessage - mod.ClaudeAgentOptions = _StubOptions - mod.query = _stub_query - sys.modules["claude_agent_sdk"] = mod - - # Install mocks before any test collection imports a2a_executor if "a2a" not in sys.modules: _make_a2a_mocks() -# Install claude_agent_sdk stub unconditionally: the real SDK ships with -# workspace-template:claude-code but tests run outside the container. -if "claude_agent_sdk" not in sys.modules: - _make_claude_agent_sdk_mock() +# Note: the claude_agent_sdk stub was removed alongside +# workspace/claude_sdk_executor.py (#87 Phase 2). The executor + its +# tests now live in the claude-code template repo, where the real SDK +# IS installed via Dockerfile, so no stub is needed. if "langchain_core" not in sys.modules: _make_langchain_mocks() diff --git a/workspace/tests/test_claude_sdk_executor.py b/workspace/tests/test_claude_sdk_executor.py deleted file mode 100644 index 7122fe98..00000000 --- a/workspace/tests/test_claude_sdk_executor.py +++ /dev/null @@ -1,1408 +0,0 @@ -"""Tests for claude_sdk_executor.py โ€” Claude Agent SDK based executor. - -The claude_agent_sdk module is stubbed session-wide in conftest.py so that -`import claude_agent_sdk` at the top of claude_sdk_executor.py resolves to -a fake module. Tests override individual SDK attributes (notably query()) -via patch(). -""" - -import asyncio -import time -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - -# claude_agent_sdk is stubbed in conftest.py โ€” import the fake classes from -# the stub so tests can use them by identity with isinstance() checks inside -# the executor. -import claude_agent_sdk as _sdk_stub - -_FakeTextBlock = _sdk_stub.TextBlock -_FakeAssistantMessage = _sdk_stub.AssistantMessage -_FakeResultMessage = _sdk_stub.ResultMessage - -from claude_sdk_executor import ( # noqa: E402 - ClaudeSDKExecutor, - QueryResult, - _mark_sdk_wedged, - _reset_sdk_wedge_for_test, - is_wedged, - wedge_reason, -) - -# Module alias used by the wedge tests below โ€” they read -# `_executor_mod.` to make the module-state vs function-state -# distinction explicit at the call site, separate from the names -# imported above. Hoisted to the top-of-file imports because the late -# binding (originally at line ~1248) was invisible to @pytest.mark.asyncio -# wrappers under coverage instrumentation (--cov, added by #1817): -# sys.settrace + the asyncio wrapper combination caused a -# `NameError: name '_executor_mod' is not defined` on every async wedge -# test. Hoisting the alias fixes that scope-resolution issue. -import claude_sdk_executor as _executor_mod # noqa: E402 - - -# ---------- Helpers ---------- - -def _make_context(text_parts): - parts = [] - for t in text_parts: - p = MagicMock() - p.text = t - # The extract_message_text helper checks for `.root.text` as a fallback; - # ensure that path doesn't accidentally double-up. - del p.root - parts.append(p) - ctx = MagicMock() - ctx.message.parts = parts - return ctx - - -def _make_event_queue(): - return AsyncMock() - - -def _make_executor(model="sonnet"): - return ClaudeSDKExecutor( - system_prompt="You are a helpful agent.", - config_path="/configs", - heartbeat=MagicMock(current_task="", active_tasks=0), - model=model, - ) - - -# ---------- Construction ---------- - - -def test_constructor_sets_fields(): - hb = MagicMock() - e = ClaudeSDKExecutor( - system_prompt="sys", - config_path="/cfg", - heartbeat=hb, - model="opus", - ) - assert e.system_prompt == "sys" - assert e.config_path == "/cfg" - assert e.heartbeat is hb - assert e.model == "opus" - assert e._session_id is None - - -def test_resolve_cwd_prefers_workspace_when_populated(tmp_path, monkeypatch): - e = _make_executor() - with patch("os.path.isdir", return_value=True), \ - patch("os.listdir", return_value=["repo.txt"]): - assert e._resolve_cwd() == "/workspace" - - -def test_resolve_cwd_falls_back_to_configs_when_workspace_empty(): - e = _make_executor() - with patch("os.path.isdir", return_value=True), \ - patch("os.listdir", return_value=[]): - assert e._resolve_cwd() == "/configs" - - -def test_build_system_prompt_combines_base_and_a2a(): - e = _make_executor() - with patch("claude_sdk_executor.get_system_prompt", return_value="BASE"), \ - patch("claude_sdk_executor.get_a2a_instructions", return_value="A2A"): - prompt = e._build_system_prompt() - assert prompt is not None - assert "BASE" in prompt - assert "A2A" in prompt - - -# ---------- execute() ---------- - - -@pytest.mark.asyncio -async def test_execute_empty_message_emits_error(): - e = _make_executor() - ctx = _make_context([""]) - eq = _make_event_queue() - await e.execute(ctx, eq) - eq.enqueue_event.assert_called_once() - msg = eq.enqueue_event.call_args[0][0] - # new_agent_text_message returns a Message; check it has a text part - assert "no text content" in str(msg).lower() or "no text" in repr(msg).lower() - - -@pytest.mark.asyncio -async def test_execute_collects_assistant_text_blocks(): - e = _make_executor() - ctx = _make_context(["Hello"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - assert prompt == "Hello" - yield _FakeAssistantMessage([_FakeTextBlock("Hi "), _FakeTextBlock("there")]) - yield _FakeResultMessage(session_id="sess-xyz") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - assert e._session_id == "sess-xyz" - eq.enqueue_event.assert_called_once() - - -@pytest.mark.asyncio -async def test_execute_injects_delegation_results(): - e = _make_executor() - ctx = _make_context(["Original"]) - eq = _make_event_queue() - captured = {} - - async def fake_query(prompt, options): - captured["prompt"] = prompt - yield _FakeAssistantMessage([_FakeTextBlock("done")]) - yield _FakeResultMessage(session_id="s1") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", - return_value="- [ok] sub-task complete"), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - assert "Delegation results" in captured["prompt"] - assert "Original" in captured["prompt"] - - -@pytest.mark.asyncio -async def test_execute_injects_memories_on_first_turn(): - e = _make_executor() - assert e._session_id is None # first turn - ctx = _make_context(["Q"]) - eq = _make_event_queue() - captured = {} - - async def fake_query(prompt, options): - captured["prompt"] = prompt - yield _FakeAssistantMessage([_FakeTextBlock("a")]) - yield _FakeResultMessage(session_id="s2") - - with patch("claude_sdk_executor.recall_memories", - new=AsyncMock(return_value="- [LOCAL] previous fact")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - assert "Prior context from memory" in captured["prompt"] - assert "previous fact" in captured["prompt"] - - -@pytest.mark.asyncio -async def test_execute_skips_memory_recall_after_session_established(): - e = _make_executor() - e._session_id = "already-resumed" # not first turn - ctx = _make_context(["Q"]) - eq = _make_event_queue() - recall_mock = AsyncMock(return_value="- [LOCAL] should not appear") - - async def fake_query(prompt, options): - # Memory should NOT have been injected - assert "Prior context" not in prompt - yield _FakeAssistantMessage([_FakeTextBlock("ok")]) - yield _FakeResultMessage(session_id="already-resumed") - - with patch("claude_sdk_executor.recall_memories", new=recall_mock), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - recall_mock.assert_not_called() - - -@pytest.mark.asyncio -async def test_execute_passes_options_with_resume_when_session_present(): - e = _make_executor() - e._session_id = "sess-prev" - ctx = _make_context(["q"]) - eq = _make_event_queue() - captured = {} - - async def fake_query(prompt, options): - captured["options"] = options - yield _FakeAssistantMessage([_FakeTextBlock("a")]) - yield _FakeResultMessage(session_id="sess-prev") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - opts = captured["options"] - assert opts.kwargs.get("resume") == "sess-prev" - assert opts.kwargs.get("model") == "sonnet" - assert opts.kwargs.get("permission_mode") == "bypassPermissions" - assert "a2a" in opts.kwargs.get("mcp_servers", {}) - # No allowed_tools restriction โ€” bypass permission mode grants full access, - # matching the old CLI `--dangerously-skip-permissions` behavior. - assert "allowed_tools" not in opts.kwargs - - -@pytest.mark.asyncio -async def test_execute_handles_sdk_exception_gracefully(): - """A raised exception becomes a sanitized user message (no raw `e`).""" - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - class SecretLeak(RuntimeError): - pass - - async def boom_query(prompt, options): - if False: - yield # pragma: no cover โ€” makes the function an async generator - raise SecretLeak("token=abc-123-XYZ leaking") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()) as commit, \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()) as set_task, \ - patch("claude_agent_sdk.query", new=boom_query): - await e.execute(ctx, eq) - - # Error reported, but sanitized โ€” the exception class is visible, - # the secret-laden body is not. - eq.enqueue_event.assert_called_once() - msg = eq.enqueue_event.call_args[0][0] - rendered = str(msg) - assert "SecretLeak" in rendered - assert "abc-123-XYZ" not in rendered - assert "workspace logs" in rendered - # Cleanup still ran - commit.assert_called_once() - # set_current_task called twice: once with summary, once with "" - assert set_task.call_count == 2 - assert set_task.call_args_list[-1].args[1] == "" - - -@pytest.mark.asyncio -async def test_execute_commits_memory_with_original_input(): - e = _make_executor() - ctx = _make_context(["Build me a thing"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - yield _FakeAssistantMessage([_FakeTextBlock("done")]) - yield _FakeResultMessage(session_id="s") - - commit_mock = AsyncMock() - with patch("claude_sdk_executor.recall_memories", - new=AsyncMock(return_value="- [LOCAL] noise")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=commit_mock), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - commit_mock.assert_called_once() - saved = commit_mock.call_args[0][0] - # Original user text is in the saved memory, not the prepended memory block - assert "Build me a thing" in saved - assert "noise" not in saved - - -@pytest.mark.asyncio -async def test_execute_prefers_result_message_text_over_assistant_chunks(): - """When ResultMessage.result is set, use it (avoids double-emitting - pre-tool reasoning + post-tool summary the way concat would).""" - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - # Pre-tool reasoning - yield _FakeAssistantMessage([_FakeTextBlock("Let me check...")]) - # Post-tool summary - yield _FakeAssistantMessage([_FakeTextBlock("FINAL_ANSWER")]) - # Result with the canonical final text - yield _FakeResultMessage(session_id="s", result="FINAL_ANSWER") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - msg = eq.enqueue_event.call_args[0][0] - rendered = str(msg) - assert "FINAL_ANSWER" in rendered - # The pre-tool "Let me check..." text must NOT leak through - assert "Let me check" not in rendered - - -@pytest.mark.asyncio -async def test_execute_falls_back_to_assistant_chunks_when_result_missing(): - """If ResultMessage has no .result, concatenate AssistantMessage text.""" - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - yield _FakeAssistantMessage([_FakeTextBlock("hello ")]) - yield _FakeAssistantMessage([_FakeTextBlock("world")]) - yield _FakeResultMessage(session_id="s", result=None) - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - msg = eq.enqueue_event.call_args[0][0] - assert "hello world" in str(msg) - - -@pytest.mark.asyncio -async def test_execute_emits_placeholder_when_no_text(): - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - async def empty_query(prompt, options): - # No AssistantMessage at all, just a result - yield _FakeResultMessage(session_id="s") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=empty_query): - await e.execute(ctx, eq) - - eq.enqueue_event.assert_called_once() - msg = eq.enqueue_event.call_args[0][0] - assert "no response" in str(msg).lower() - - -# ---------- Empty-string result vs None (regression: `or` vs `is not None`) ---------- - - -@pytest.mark.asyncio -async def test_execute_empty_string_result_is_respected_over_chunks(): - """If ResultMessage.result is an explicit empty string, honor it โ€” - do NOT fall back to concatenated assistant chunks (Python `or` bug).""" - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - yield _FakeAssistantMessage([_FakeTextBlock("chatter that should be suppressed")]) - yield _FakeResultMessage(session_id="s", result="") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - msg = eq.enqueue_event.call_args[0][0] - rendered = str(msg) - assert "chatter" not in rendered - # Empty response_text โ†’ placeholder - assert "no response" in rendered.lower() - - -# ---------- Delegation injection does NOT pollute the memory record ---------- - - -@pytest.mark.asyncio -async def test_execute_memory_commit_excludes_delegation_preamble(): - """original_input is captured BEFORE delegation injection so the - committed memory is the user's real message, not the prepended block.""" - e = _make_executor() - ctx = _make_context(["Genuine user question"]) - eq = _make_event_queue() - - async def fake_query(prompt, options): - yield _FakeResultMessage(session_id="s", result="ok") - - commit_mock = AsyncMock() - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", - return_value="- [done] Sub-agent finished the sync query"), \ - patch("claude_sdk_executor.commit_memory", new=commit_mock), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=fake_query): - await e.execute(ctx, eq) - - saved = commit_mock.call_args[0][0] - assert "Genuine user question" in saved - assert "Delegation results" not in saved - assert "Sub-agent finished" not in saved - - -# ---------- cancel() ---------- - - -@pytest.mark.asyncio -async def test_cancel_no_active_stream_is_noop(): - e = _make_executor() - assert e._active_stream is None - await e.cancel(context=MagicMock(), event_queue=_make_event_queue()) - # Still None, no exception raised - assert e._active_stream is None - - -@pytest.mark.asyncio -async def test_cancel_closes_active_stream(): - e = _make_executor() - stream = MagicMock() - stream.aclose = AsyncMock() - e._active_stream = stream - await e.cancel(context=MagicMock(), event_queue=_make_event_queue()) - stream.aclose.assert_awaited_once() - - -@pytest.mark.asyncio -async def test_cancel_aclose_exception_is_logged_not_raised(): - e = _make_executor() - stream = MagicMock() - stream.aclose = AsyncMock(side_effect=RuntimeError("already closed")) - e._active_stream = stream - # Must not raise - await e.cancel(context=MagicMock(), event_queue=_make_event_queue()) - - -@pytest.mark.asyncio -async def test_cancel_stream_without_aclose_is_noop(): - e = _make_executor() - # A stream object that does not expose aclose (e.g. synchronous iterator) - e._active_stream = MagicMock(spec=["__iter__"]) - # Must not raise - await e.cancel(context=MagicMock(), event_queue=_make_event_queue()) - - -# ---------- _build_system_prompt / _prepare_prompt direct unit tests ---------- - - -def test_build_system_prompt_combines_base_and_a2a_via_fixture(): - """Direct test bypassing the execute() path.""" - e = _make_executor() - with patch("claude_sdk_executor.get_system_prompt", return_value="BASE"), \ - patch("claude_sdk_executor.get_a2a_instructions", return_value="A2A"), \ - patch("claude_sdk_executor.get_hma_instructions", return_value=""): - out = e._build_system_prompt() - assert out == "BASE\n\nA2A" - - -def test_build_system_prompt_base_only(): - e = _make_executor() - with patch("claude_sdk_executor.get_system_prompt", return_value="BASE"), \ - patch("claude_sdk_executor.get_a2a_instructions", return_value=""), \ - patch("claude_sdk_executor.get_hma_instructions", return_value=""): - assert e._build_system_prompt() == "BASE" - - -def test_build_system_prompt_a2a_only(): - e = _make_executor() - with patch("claude_sdk_executor.get_system_prompt", return_value=None), \ - patch("claude_sdk_executor.get_a2a_instructions", return_value="A2A"), \ - patch("claude_sdk_executor.get_hma_instructions", return_value=""): - assert e._build_system_prompt() == "A2A" - - -def test_build_system_prompt_includes_hma(): - """HMA instructions are appended when present.""" - e = _make_executor() - with patch("claude_sdk_executor.get_system_prompt", return_value="BASE"), \ - patch("claude_sdk_executor.get_a2a_instructions", return_value="A2A"), \ - patch("claude_sdk_executor.get_hma_instructions", return_value="## Hierarchical Memory"): - out = e._build_system_prompt() - assert "BASE" in out - assert "A2A" in out - assert "## Hierarchical Memory" in out - - -def test_prepare_prompt_no_delegation_returns_unchanged(): - e = _make_executor() - with patch("claude_sdk_executor.read_delegation_results", return_value=""): - assert e._prepare_prompt("hi") == "hi" - - -def test_prepare_prompt_with_delegation_prepends_block(): - e = _make_executor() - with patch("claude_sdk_executor.read_delegation_results", - return_value="- [ok] something"): - out = e._prepare_prompt("hi") - assert "Delegation results" in out - assert "- [ok] something" in out - assert out.endswith("hi") - - -@pytest.mark.asyncio -async def test_inject_memories_if_first_turn_skips_resumed_session(): - e = _make_executor() - e._session_id = "existing" - with patch("claude_sdk_executor.recall_memories", - new=AsyncMock(return_value="- [LOCAL] anything")) as recall: - out = await e._inject_memories_if_first_turn("hello") - recall.assert_not_called() - assert out == "hello" - - -@pytest.mark.asyncio -async def test_inject_memories_if_first_turn_no_memories(): - e = _make_executor() - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")): - out = await e._inject_memories_if_first_turn("hello") - assert out == "hello" - - -# ---------- Concurrency: _run_lock serializes turns ---------- - - -@pytest.mark.asyncio -async def test_concurrent_execute_calls_serialize_strictly(): - """Two execute() calls on the same executor must be mutually exclusive. - - Before the lock, `_session_id` and `_active_stream` were mutated without - coordination and concurrent turns clobbered each other. We verify: - - 1. The second turn's query() entry is strictly AFTER the first turn's - query() exit (timestamp-based, not just "it runs eventually"). - 2. Both turns complete and enqueue exactly one response each. - 3. `_active_stream` is None at the end โ€” no dangling reference. - 4. `_session_id` reflects the LAST turn to set it (the one with the - highest monotonic timestamp at exit). - """ - e = _make_executor() - ctx1 = _make_context(["first prompt"]) - ctx2 = _make_context(["second prompt"]) - eq1 = _make_event_queue() - eq2 = _make_event_queue() - - # Each query() call records {"enter": t0, "exit": t1} keyed by prompt. - timings: dict[str, dict[str, float]] = {} - - async def timed_query(prompt, options): - # Capture the user's original message text from the prompt. The - # executor may prepend delegation/memory sections, so strip to the - # trailing line. - key = prompt.strip().split("\n")[-1] - timings[key] = {"enter": time.monotonic()} - # Hold the lock long enough that any concurrency violation would - # show up as entry1_time โ‰ˆ entry2_time. - await asyncio.sleep(0.05) - timings[key]["exit"] = time.monotonic() - yield _FakeAssistantMessage([_FakeTextBlock("chunk")]) - yield _FakeResultMessage( - session_id=f"sess-for-{key}", - result=f"done-{key}", - ) - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=timed_query): - # Fire both concurrently โ€” the lock must serialize them. - await asyncio.gather( - e.execute(ctx1, eq1), - e.execute(ctx2, eq2), - ) - - # Each turn produced exactly one response - eq1.enqueue_event.assert_called_once() - eq2.enqueue_event.assert_called_once() - - # Both turns recorded timings (so the query() body actually ran for each) - assert "first prompt" in timings - assert "second prompt" in timings - - # Whichever turn acquired the lock first must have FULLY EXITED query() - # before the other turn ENTERED query(). Find the earlier-entry turn and - # assert ordering. - first, second = sorted(timings.items(), key=lambda kv: kv[1]["enter"]) - first_key, first_times = first - second_key, second_times = second - assert second_times["enter"] >= first_times["exit"], ( - f"Concurrency bug: {second_key} entered query() at " - f"{second_times['enter']} before {first_key} exited at " - f"{first_times['exit']}" - ) - - # The stream reference is cleared after each turn - assert e._active_stream is None - - # The last turn to finish wrote the persisted session_id - assert e._session_id == f"sess-for-{second_key}" - - -@pytest.mark.asyncio -async def test_cancel_unwinds_async_generator_with_finally_cleanup(): - """A cancel while a turn is in-flight must close the stream and run - its cleanup (finally block). - - We use a real async generator with a cancellable `asyncio.Future` as - the blocking primitive. When cancel() calls `aclose()`, the generator - receives GeneratorExit and its finally block runs โ€” proving the actual - cleanup semantics the SDK's query() generator would exhibit. - """ - e = _make_executor() - ctx = _make_context(["q"]) - eq = _make_event_queue() - - inside_query = asyncio.Event() - finally_ran = asyncio.Event() - blocker: asyncio.Future[None] = asyncio.get_event_loop().create_future() - - async def cancellable_query(prompt, options): - try: - yield _FakeAssistantMessage([_FakeTextBlock("starting")]) - inside_query.set() - await blocker # cancel() will cancel this future via aclose - except (asyncio.CancelledError, GeneratorExit): - pass - finally: - finally_ran.set() - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=cancellable_query): - turn = asyncio.create_task(e.execute(ctx, eq)) - await inside_query.wait() - # Cancel the future so aclose() can complete without hanging - blocker.cancel() - await e.cancel(context=ctx, event_queue=eq) - await turn - - # The generator's finally block executed โ€” proves cleanup propagated - assert finally_ran.is_set() - # The turn still emits a response (partial text or placeholder) and does - # not leak an active stream reference - eq.enqueue_event.assert_called_once() - assert e._active_stream is None - - -# ---------- Retry logic ---------- - - -def test_is_retryable_matches_known_patterns(): - """Transient errors containing rate-limit keywords are retryable.""" - assert ClaudeSDKExecutor._is_retryable(Exception("429 rate limit exceeded")) - assert ClaudeSDKExecutor._is_retryable(Exception("Server overloaded")) - assert ClaudeSDKExecutor._is_retryable(Exception("Command failed with exit code 1")) - assert ClaudeSDKExecutor._is_retryable(Exception("capacity reached, try again later")) - - -def test_is_retryable_rejects_non_transient(): - """Non-transient errors should not be retried.""" - assert not ClaudeSDKExecutor._is_retryable(Exception("invalid api key")) - assert not ClaudeSDKExecutor._is_retryable(Exception("permission denied")) - assert not ClaudeSDKExecutor._is_retryable(Exception("file not found")) - - -@pytest.mark.asyncio -async def test_execute_retries_on_transient_error_then_succeeds(): - """A rate-limit error on attempt 1 retries, and attempt 2 succeeds.""" - e = _make_executor() - ctx = _make_context(["do something"]) - eq = _make_event_queue() - - call_count = 0 - - async def flaky_query(prompt, options): - nonlocal call_count - call_count += 1 - if call_count == 1: - raise Exception("Command failed with exit code 1 (exit code: 1)") - yield _FakeAssistantMessage([_FakeTextBlock("recovered")]) - yield _FakeResultMessage(session_id="s-retry", result="recovered answer") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=flaky_query), \ - patch("asyncio.sleep", new=AsyncMock()) as mock_sleep: - await e.execute(ctx, eq) - - # Should have retried once - assert call_count == 2 - # Sleep was called with the backoff delay - mock_sleep.assert_called_once_with(5) # _BASE_RETRY_DELAY_S * 2^0 - # Final response is the recovered answer - eq.enqueue_event.assert_called_once() - assert "recovered" in str(eq.enqueue_event.call_args[0][0]) - assert e._session_id == "s-retry" - - -@pytest.mark.asyncio -async def test_execute_exhausts_retries_then_returns_error(): - """All retries fail โ†’ sanitized error returned.""" - e = _make_executor() - ctx = _make_context(["do something"]) - eq = _make_event_queue() - - async def always_fail(prompt, options): - if False: - yield # pragma: no cover - raise Exception("Command failed with exit code 1 (exit code: 1)") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=always_fail), \ - patch("asyncio.sleep", new=AsyncMock()) as mock_sleep: - await e.execute(ctx, eq) - - # Should have slept twice (attempts 1โ†’2 and 2โ†’3) - assert mock_sleep.call_count == 2 - assert mock_sleep.call_args_list[0].args == (5,) # 5 * 2^0 - assert mock_sleep.call_args_list[1].args == (10,) # 5 * 2^1 - # Final response is the sanitized error - eq.enqueue_event.assert_called_once() - assert "Agent error" in str(eq.enqueue_event.call_args[0][0]) - - -@pytest.mark.asyncio -async def test_execute_no_retry_on_non_transient_error(): - """Non-transient errors fail immediately without retry.""" - e = _make_executor() - ctx = _make_context(["do something"]) - eq = _make_event_queue() - - async def auth_fail(prompt, options): - if False: - yield # pragma: no cover - raise Exception("invalid authentication credentials") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=auth_fail), \ - patch("asyncio.sleep", new=AsyncMock()) as mock_sleep: - await e.execute(ctx, eq) - - # No sleep โ€” failed immediately without retry - mock_sleep.assert_not_called() - eq.enqueue_event.assert_called_once() - assert "Agent error" in str(eq.enqueue_event.call_args[0][0]) - - -# --------------------------------------------------------------------------- -# _format_process_error โ€” #66: surface CLI subprocess stderr + exit_code -# --------------------------------------------------------------------------- - - -def test_format_process_error_with_stderr_and_exit_code(): - """Rich ProcessError-style exception โ†’ log line includes all context.""" - from claude_sdk_executor import _format_process_error - - class FakeProcessError(Exception): - def __init__(self, msg, exit_code, stderr): - super().__init__(msg) - self.exit_code = exit_code - self.stderr = stderr - - exc = FakeProcessError("Command failed", exit_code=1, stderr="permission denied: /auth-token") - out = _format_process_error(exc) - assert "FakeProcessError" in out - assert "Command failed" in out - assert "exit_code=1" in out - assert "permission denied: /auth-token" in out - - -def test_format_process_error_truncates_huge_stderr(): - """Runaway CLI can't spam the log โ€” stderr is capped at _PROCESS_ERROR_STDERR_MAX_CHARS.""" - from claude_sdk_executor import _format_process_error, _PROCESS_ERROR_STDERR_MAX_CHARS - - class FakeProcessError(Exception): - def __init__(self, msg, stderr): - super().__init__(msg) - self.stderr = stderr - self.exit_code = None - - huge = "X" * (_PROCESS_ERROR_STDERR_MAX_CHARS + 5000) - out = _format_process_error(FakeProcessError("boom", huge)) - # Truncation note must mention how many chars were dropped - assert "more chars truncated" in out - # Must not contain the full huge string - assert out.count("X") <= _PROCESS_ERROR_STDERR_MAX_CHARS + 100 # slack for repr overhead - - -def test_format_process_error_plain_exception(): - """Non-SDK exceptions fall back to str(exc) without crashing on missing attrs.""" - from claude_sdk_executor import _format_process_error - - out = _format_process_error(RuntimeError("generic failure")) - assert "RuntimeError" in out - assert "generic failure" in out - # No exit_code / stderr pieces when the attrs don't exist - assert "exit_code=" not in out - assert "stderr=" not in out - - -def test_format_process_error_no_stderr_but_has_exit_code(): - """Exit code alone (no stderr) still gets surfaced.""" - from claude_sdk_executor import _format_process_error - - class PartialError(Exception): - def __init__(self, msg): - super().__init__(msg) - self.exit_code = 137 # SIGKILL - self.stderr = None - - out = _format_process_error(PartialError("killed")) - assert "exit_code=137" in out - assert "stderr" not in out - - -# --------------------------------------------------------------------------- -# _format_process_error โ€” #160: probe CLI when SDK swallowed the real stderr -# --------------------------------------------------------------------------- - - -def test_format_process_error_probes_cli_when_stderr_swallowed(monkeypatch): - """Generic Exception with "Check stderr output for details" โ†’ probe the - claude CLI directly so the real error (rate limit, auth, etc.) surfaces - in the log instead of the useless placeholder text. Fixes #160.""" - from claude_sdk_executor import _format_process_error - - def fake_probe(): - return "You've hit your limit ยท resets Apr 17, 11pm (UTC)" - - # Patch the probe helper so we don't actually spawn a subprocess in tests - monkeypatch.setattr( - "claude_sdk_executor._probe_claude_cli_error", - fake_probe, - ) - - swallowed = Exception( - "Command failed with exit code 1 (exit code: 1)\n" - "Error output: Check stderr output for details" - ) - out = _format_process_error(swallowed) - assert "probed_cli_error" in out - assert "hit your limit" in out - - -def test_format_process_error_does_not_probe_when_stderr_already_present(): - """If the exception already carries .stderr, we DO NOT run the probe โ€” - that'd be wasted subprocess work and could mask the real stderr with - probe-time state.""" - from claude_sdk_executor import _format_process_error - - # Track whether the probe was called - called = {"value": False} - - def fake_probe(): - called["value"] = True - return "probe ran" - - import claude_sdk_executor - original = claude_sdk_executor._probe_claude_cli_error - claude_sdk_executor._probe_claude_cli_error = fake_probe - try: - class FakeProcessError(Exception): - def __init__(self): - super().__init__("real error") - self.stderr = "real stderr content" - self.exit_code = 1 - - out = _format_process_error(FakeProcessError()) - assert "real stderr content" in out - assert called["value"] is False, "probe should not have been called" - finally: - claude_sdk_executor._probe_claude_cli_error = original - - -def test_format_process_error_does_not_probe_without_swallowed_marker(monkeypatch): - """Only probe when the exception message matches the specific swallowed- - stderr marker. Unrelated plain exceptions (e.g. 'generic failure') should - not trigger a CLI probe โ€” that'd make every error path 30s slower.""" - from claude_sdk_executor import _format_process_error - - called = {"value": False} - - def fake_probe(): - called["value"] = True - return "probe ran" - - monkeypatch.setattr( - "claude_sdk_executor._probe_claude_cli_error", - fake_probe, - ) - - out = _format_process_error(RuntimeError("generic failure")) - assert called["value"] is False - assert "probed_cli_error" not in out - - -def test_process_error_reaches_logs_via_execute(caplog): - """End-to-end: a ProcessError in query() โ†’ executor logs both the - formatted summary and the full traceback. Fixes #66 โ€” previously no - information leaked out of the subprocess.""" - import logging - from claude_sdk_executor import ClaudeSDKExecutor - - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - ctx = _make_context(["do something"]) - eq = _make_event_queue() - - class FakeProcessError(Exception): - def __init__(self): - super().__init__("Command failed with exit code 1 (exit code: 1)") - self.exit_code = 1 - self.stderr = "claude: CLAUDE_CODE_OAUTH_TOKEN invalid" - - async def process_fail(prompt, options): - if False: - yield # pragma: no cover - raise FakeProcessError() - - with caplog.at_level(logging.ERROR), \ - patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=process_fail), \ - patch("asyncio.sleep", new=AsyncMock()): - asyncio.run(e.execute(ctx, eq)) - - # Error-level log must include exit_code and stderr content - error_messages = " | ".join(r.message for r in caplog.records if r.levelname == "ERROR") - assert "exit_code=1" in error_messages - assert "CLAUDE_CODE_OAUTH_TOKEN invalid" in error_messages - - -# --------------------------------------------------------------------------- -# _reset_session_after_error โ€” #75: session contamination after ProcessError -# --------------------------------------------------------------------------- - - -def test_reset_session_clears_session_id_on_process_error(): - """A ProcessError-like exception clears _session_id so the next - attempt doesn't try to resume a dead session.""" - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - e._session_id = "sess-abc-123" - - class FakeProcessError(Exception): - def __init__(self): - super().__init__("Command failed with exit code 1 (exit code: 1)") - self.exit_code = 1 - self.stderr = "whatever" - - e._reset_session_after_error(FakeProcessError()) - assert e._session_id is None, "session_id must be cleared after a ProcessError" - - -def test_reset_session_respects_rate_limit_continuity(): - """Transient rate-limit errors leave the session alone โ€” resuming preserves - conversational continuity. Only subprocess-level failures need a reset.""" - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - e._session_id = "sess-preserve-me" - - # A rate-limit error has no exit_code, no "exit code" in message, and - # its class name is a plain Exception. - rate_limit = Exception("Too many requests - rate limit exceeded") - e._reset_session_after_error(rate_limit) - assert e._session_id == "sess-preserve-me", ( - "Rate-limit error must NOT clear session_id โ€” it would break " - "conversational continuity across retries" - ) - - -def test_reset_session_handles_missing_session_id_gracefully(): - """Calling with no session_id set is a no-op (no crash, no log spam).""" - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - assert e._session_id is None - - class FakeProcessError(Exception): - def __init__(self): - super().__init__("boom") - self.exit_code = 1 - self.stderr = "err" - - e._reset_session_after_error(FakeProcessError()) - assert e._session_id is None # still None, no exception raised - - -def test_reset_session_triggers_on_exit_code_message(): - """Some SDK errors don't have an exit_code attr but mention it in - their message. Treat those as subprocess errors too.""" - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - e._session_id = "sess-xyz" - - # No exit_code attribute, but the message signals a subprocess crash - msg_only = Exception("Fatal error in message reader: Command failed with exit code 1") - e._reset_session_after_error(msg_only) - assert e._session_id is None - - -def test_execute_clears_session_between_retries_on_process_error(caplog): - """End-to-end: execute() retries a retryable ProcessError, and the - second retry sees a fresh session_id (=None) rather than the stale - one from before the crash. This proves #75 is actually wired.""" - import logging - from claude_sdk_executor import ClaudeSDKExecutor - - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - e._session_id = "stale-session-doomed" - - ctx = _make_context(["do something"]) - eq = _make_event_queue() - - # Track the session_id visible on each attempt via the options builder - seen_session_ids = [] - original_build = e._build_options - - def spy_build(): - seen_session_ids.append(e._session_id) - return original_build() - - class FakeProcessError(Exception): - def __init__(self): - # "exit code 1" is in the retryable patterns, so we'll get the loop - super().__init__("Command failed with exit code 1 (exit code: 1)") - self.exit_code = 1 - self.stderr = "first crash" - - async def always_fail(prompt, options): - if False: - yield # pragma: no cover - raise FakeProcessError() - - with caplog.at_level(logging.INFO), \ - patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=always_fail), \ - patch.object(e, "_build_options", side_effect=spy_build), \ - patch("asyncio.sleep", new=AsyncMock()): - asyncio.run(e.execute(ctx, eq)) - - # First attempt sees the stale session; second/third attempts see None - assert seen_session_ids[0] == "stale-session-doomed" - assert all(s is None for s in seen_session_ids[1:]), ( - f"after first ProcessError, subsequent attempts should see a cleared " - f"session_id; got {seen_session_ids}" - ) - # INFO log confirms the reset fired - info_messages = " | ".join(r.message for r in caplog.records if r.levelname == "INFO") - assert "SDK session reset after FakeProcessError" in info_messages - - -# --------------------------------------------------------------------------- -# _build_options โ€” issue #652: effort + task_budget output_config wiring -# --------------------------------------------------------------------------- - - -def _build_options_with_config(config: dict): - """Helper: build ClaudeAgentOptions with the given config.yaml values. - - Stubs out all I/O helpers so only the output_config wiring logic is tested. - """ - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - with patch.object(e, "_load_config_dict", return_value=config), \ - patch.object(e, "_resolve_cwd", return_value="/workspace"), \ - patch.object(e, "_build_system_prompt", return_value=None), \ - patch("claude_sdk_executor.get_mcp_server_path", return_value="/mcp.py"): - return e._build_options() - - -def test_build_options_effort_only_sets_output_config_no_beta(): - """effort='xhigh', no task_budget โ†’ output_config={'effort':'xhigh'}, no betas. - - Acceptance criterion: effort field wired into output_config without adding - the task-budgets beta header (beta is only required for task_budget). - """ - opts = _build_options_with_config({"effort": "xhigh"}) - assert opts.kwargs.get("output_config") == {"effort": "xhigh"} - assert "betas" not in opts.kwargs - - -def test_build_options_task_budget_sets_output_config_and_beta(): - """task_budget=128000 โ†’ output_config with token budget struct + beta header. - - Acceptance criterion: task_budget >= 20000 writes the nested - {'type':'tokens','total':N} struct and adds 'task-budgets-2026-03-13' to betas. - """ - opts = _build_options_with_config({"task_budget": 128000}) - assert opts.kwargs.get("output_config") == { - "task_budget": {"type": "tokens", "total": 128000} - } - assert "task-budgets-2026-03-13" in opts.kwargs.get("betas", []) - - -def test_build_options_both_effort_and_task_budget(): - """Both effort and task_budget โ†’ combined output_config + beta header. - - Acceptance criterion: both keys present in the single output_config dict; - betas includes the task-budget feature flag. - """ - opts = _build_options_with_config({"effort": "high", "task_budget": 50000}) - assert opts.kwargs.get("output_config") == { - "effort": "high", - "task_budget": {"type": "tokens", "total": 50000}, - } - assert "task-budgets-2026-03-13" in opts.kwargs.get("betas", []) - - -def test_build_options_neither_effort_nor_task_budget_no_output_config(): - """Empty config (effort='', task_budget=0) โ†’ output_config absent, no betas. - - Acceptance criterion: when neither field is configured the SDK options - are unchanged โ€” no spurious output_config or betas keys. - """ - opts = _build_options_with_config({}) - assert "output_config" not in opts.kwargs - assert "betas" not in opts.kwargs - - -def test_build_options_task_budget_below_minimum_raises_value_error(): - """task_budget=5000 (below 20000 API minimum) โ†’ ValueError before any API call. - - Acceptance criterion: the executor must refuse to build options when - task_budget is set but too small, so no invalid request reaches the API. - """ - e = ClaudeSDKExecutor(system_prompt=None, config_path="/tmp", heartbeat=None) - with patch.object(e, "_load_config_dict", return_value={"task_budget": 5000}), \ - patch.object(e, "_resolve_cwd", return_value="/workspace"), \ - patch.object(e, "_build_system_prompt", return_value=None), \ - patch("claude_sdk_executor.get_mcp_server_path", return_value="/mcp.py"): - with pytest.raises(ValueError, match="task_budget must be >= 20000"): - e._build_options() - - -# --------------------------------------------------------------------------- -# _load_config_dict โ€” exception-safety and happy-path (issue #652) -# --------------------------------------------------------------------------- - - -def test_load_config_dict_reads_valid_yaml(tmp_path): - """Valid config.yaml โ†’ returns the parsed dict. - - Acceptance criterion: normal I/O path returns the YAML contents as a dict. - """ - cfg = tmp_path / "config.yaml" - cfg.write_text("effort: xhigh\ntask_budget: 50000\n") - e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None) - result = e._load_config_dict() - assert result == {"effort": "xhigh", "task_budget": 50000} - - -def test_load_config_dict_missing_file_returns_empty(tmp_path): - """Missing config.yaml โ†’ returns {} without raising. - - Acceptance criterion: FileNotFoundError is swallowed; callers can safely - use .get() without guards. - """ - e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None) - result = e._load_config_dict() - assert result == {} - - -def test_load_config_dict_invalid_yaml_returns_empty(tmp_path): - """Malformed YAML โ†’ returns {} without raising. - - Acceptance criterion: a YAML parse error is swallowed; callers never see - an exception from _load_config_dict. - """ - cfg = tmp_path / "config.yaml" - cfg.write_text("effort: [unclosed\n") - e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None) - result = e._load_config_dict() - assert result == {} - - -def test_load_config_dict_empty_file_returns_empty(tmp_path): - """Empty config.yaml (yaml.safe_load returns None) โ†’ returns {} via `or {}`. - - Acceptance criterion: None from safe_load is normalised to an empty dict. - """ - cfg = tmp_path / "config.yaml" - cfg.write_text("") - e = ClaudeSDKExecutor(system_prompt=None, config_path=str(tmp_path), heartbeat=None) - result = e._load_config_dict() - assert result == {} - - -# ==================== SDK wedge detector ==================== -# -# Exercises the module-level _sdk_wedged_reason flag set when the -# claude_agent_sdk init handshake times out. The flag is sticky โ€” the -# heartbeat task reads it via is_wedged() / wedge_reason() and reports -# runtime_state="wedged" so the platform flips status โ†’ degraded. - - - -def test_wedge_helpers_default_clean(): - """Fresh module: no wedge.""" - _reset_sdk_wedge_for_test() - assert is_wedged() is False - assert wedge_reason() == "" - - -def test_mark_sdk_wedged_sets_flag_and_reason(): - """First mark wins and sets both is_wedged() and the reason text.""" - _reset_sdk_wedge_for_test() - _mark_sdk_wedged("init timeout โ€” restart") - try: - assert is_wedged() is True - assert "init timeout" in wedge_reason() - finally: - _reset_sdk_wedge_for_test() - - -def test_mark_sdk_wedged_sticky_first_wins(): - """A second wedge call with a different reason does NOT overwrite - the first. The first cause is the one the user needs to see; later - knock-on errors from the same wedge would otherwise mask it.""" - _reset_sdk_wedge_for_test() - _mark_sdk_wedged("first cause โ€” Control request timeout") - _mark_sdk_wedged("noise from a downstream symptom") - try: - assert wedge_reason() == "first cause โ€” Control request timeout" - finally: - _reset_sdk_wedge_for_test() - - -@pytest.mark.asyncio -async def test_execute_marks_wedge_on_control_request_timeout(): - """End-to-end: when _run_query raises an exception whose formatted - error contains 'Control request timeout' (case-insensitive), the - executor's catch block flags the SDK as wedged. Subsequent - is_wedged() reads return True until process restart (or the - test-only reset).""" - _executor_mod._reset_sdk_wedge_for_test() - e = _make_executor() - ctx = _make_context(["test prompt"]) - eq = _make_event_queue() - - async def boom(prompt, options): - # Match the literal exception claude_agent_sdk raises in the - # observed wedge path. - raise Exception("Control request timeout: initialize") - yield # pragma: no cover โ€” make this an async generator - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=boom): - try: - await e.execute(ctx, eq) - assert _executor_mod.is_wedged() is True, "wedge flag must be set" - assert "Control request timeout" in _executor_mod.wedge_reason() - finally: - _executor_mod._reset_sdk_wedge_for_test() - - -@pytest.mark.asyncio -async def test_execute_does_not_mark_wedge_on_unrelated_error(): - """Sanity: a generic non-wedge exception (e.g. ValueError) MUST - NOT trigger the wedge flag. False-positives lock the workspace - into degraded for the whole process lifetime.""" - _executor_mod._reset_sdk_wedge_for_test() - e = _make_executor() - ctx = _make_context(["test prompt"]) - eq = _make_event_queue() - - async def boom(prompt, options): - raise ValueError("ordinary tool failure, not a wedge") - yield # pragma: no cover - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=boom): - try: - await e.execute(ctx, eq) - assert _executor_mod.is_wedged() is False, "non-wedge error must not flip the flag" - finally: - _executor_mod._reset_sdk_wedge_for_test() - - -@pytest.mark.asyncio -async def test_execute_clears_wedge_on_successful_query(): - """Auto-recovery: a process that previously hit a wedge should be - able to recover when the SDK starts working again. _run_query - calls _clear_sdk_wedge_on_success at the end of a clean - completion; the flag flips back to None and the next heartbeat - reports runtime_state empty so the platform recovers status โ†’ - online without forcing the user to restart the workspace.""" - # Pre-set the wedge as if a prior call had tripped it. - _executor_mod._reset_sdk_wedge_for_test() - _executor_mod._mark_sdk_wedged("transient: Control request timeout: initialize") - assert _executor_mod.is_wedged() is True - - e = _make_executor() - ctx = _make_context(["test prompt"]) - eq = _make_event_queue() - - async def good_query(prompt, options): - # Working SDK โ€” yield one normal assistant message + result. - yield _FakeAssistantMessage([_FakeTextBlock("hello back")]) - yield _FakeResultMessage(session_id="recovered-sess") - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=good_query): - try: - await e.execute(ctx, eq) - assert _executor_mod.is_wedged() is False, "wedge flag must clear after a successful query" - assert _executor_mod.wedge_reason() == "" - finally: - _executor_mod._reset_sdk_wedge_for_test() - - -@pytest.mark.asyncio -async def test_execute_does_not_clear_wedge_on_empty_stream(): - """Regression for the gate added in 3c4eef49: a stream that - iterates without raising but emits NEITHER an AssistantMessage - NOR a ResultMessage (degenerate or stub-driven shape) must NOT - clear the wedge flag. A real successful query yields at least - one of those; treating an empty stream as "recovered" would - falsely flip the workspace back to online without any evidence - the SDK is actually working.""" - _executor_mod._reset_sdk_wedge_for_test() - _executor_mod._mark_sdk_wedged("pre-existing wedge โ€” must not clear on empty stream") - assert _executor_mod.is_wedged() is True - - e = _make_executor() - ctx = _make_context(["test prompt"]) - eq = _make_event_queue() - - async def empty_query(prompt, options): - # Iterator returns without yielding โ€” the degenerate case. - if False: - yield # pragma: no cover - - with patch("claude_sdk_executor.recall_memories", new=AsyncMock(return_value="")), \ - patch("claude_sdk_executor.read_delegation_results", return_value=""), \ - patch("claude_sdk_executor.commit_memory", new=AsyncMock()), \ - patch("claude_sdk_executor.set_current_task", new=AsyncMock()), \ - patch("claude_agent_sdk.query", new=empty_query): - try: - await e.execute(ctx, eq) - assert _executor_mod.is_wedged() is True, \ - "wedge must persist when the stream emitted no content" - finally: - _executor_mod._reset_sdk_wedge_for_test() diff --git a/workspace/tests/test_runtime_wedge.py b/workspace/tests/test_runtime_wedge.py index 0df53e1d..e9cdbd20 100644 --- a/workspace/tests/test_runtime_wedge.py +++ b/workspace/tests/test_runtime_wedge.py @@ -66,38 +66,7 @@ class TestRuntimeWedge: assert runtime_wedge.wedge_reason() == "second wedge โ€” different reason" -class TestClaudeSdkExecutorReExportShim: - """claude_sdk_executor.py keeps re-exporting the old names for one - release cycle so any third-party adapter copying our wedge convention - has time to migrate. These tests pin the shim โ€” when removed, the - test file goes too.""" - - def test_is_wedged_re_exported(self): - from claude_sdk_executor import is_wedged - assert is_wedged is runtime_wedge.is_wedged - - def test_wedge_reason_re_exported(self): - from claude_sdk_executor import wedge_reason - assert wedge_reason is runtime_wedge.wedge_reason - - def test_internal_helpers_re_exported(self): - # Keep the underscore names too โ€” claude_sdk_executor's own - # _run_query calls _mark_sdk_wedged / _clear_sdk_wedge_on_success - # via these re-exports. - from claude_sdk_executor import ( - _mark_sdk_wedged, - _clear_sdk_wedge_on_success, - _reset_sdk_wedge_for_test, - ) - assert _mark_sdk_wedged is runtime_wedge.mark_wedged - assert _clear_sdk_wedge_on_success is runtime_wedge.clear_wedge - assert _reset_sdk_wedge_for_test is runtime_wedge.reset_for_test - - def test_re_export_state_is_shared(self): - # The shim isn't a copy โ€” both names refer to the same module - # state. Marking via the executor name must be observable via - # the runtime_wedge name (and vice versa). - from claude_sdk_executor import _mark_sdk_wedged - _mark_sdk_wedged("via executor shim") - assert runtime_wedge.is_wedged() is True - assert runtime_wedge.wedge_reason() == "via executor shim" +# TestClaudeSdkExecutorReExportShim removed alongside +# workspace/claude_sdk_executor.py โ€” the shim served its one-release- +# cycle purpose during the universal-runtime refactor (#87 Phase 2). +# The executor + its shim now live in the claude-code template repo.