Files
molecule-ai-workspace-templ…/claude_sdk_executor.py
hongming-personal 98ea43dc29
CI / Template validation (static) (push) Successful in 7s
CI / Adapter unit tests (push) Successful in 8s
CI / T4 tier-4 conformance (live) (push) Successful in 14s
CI / Template validation (runtime) (push) Successful in 2m14s
CI / validate (push) Successful in 0s
fix: re-inject durable memory into the fresh session on context-overflow auto-heal
2026-06-14 08:08:08 +00:00

1919 lines
94 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""SDK-based agent executor for Claude Code runtime.
Uses the official `claude-agent-sdk` Python package to invoke the Claude Code
engine programmatically — no subprocess, no stdout parsing, no zombie reap.
Replaces CLIAgentExecutor for the `claude-code` runtime only. Other CLI runtimes
(codex, ollama) keep using `cli_executor.py`.
Benefits over CLI subprocess:
- No per-message ~500ms startup overhead
- No stdout buffering issues
- Native Python session management (no JSON parsing of stdout)
- Real message stream — can surface tool calls in future for live UX
- Cooperative cancel (closes the query async generator on cancel())
- Same Claude Code engine, so plugins / skills / CLAUDE.md still apply
Concurrency model
-----------------
Turns are serialized per-executor via an asyncio.Lock. The old CLI executor
serialized implicitly by spawning one subprocess per message and awaiting it;
the SDK removes that, so we re-introduce serialization explicitly. This keeps
session_id updates race-free and makes cancel() well-defined (there's at most
one active stream at any given moment).
"""
from __future__ import annotations
import asyncio
import glob
import logging
import os
import shutil
import sys
from collections.abc import AsyncIterator, Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import yaml
import claude_agent_sdk as sdk
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.helpers import new_text_message
from molecule_runtime.executor_helpers import (
CONFIG_MOUNT,
MEMORY_CONTENT_MAX_CHARS,
WORKSPACE_MOUNT,
auto_push_hook,
brief_summary,
collect_outbound_files,
commit_memory,
error_detail_for_external,
extract_attached_files,
extract_message_text,
get_a2a_instructions,
get_display_instructions,
get_hma_instructions,
get_mcp_server_path,
get_system_prompt,
read_delegation_results,
recall_memories,
sanitize_agent_error,
set_current_task,
)
if TYPE_CHECKING:
from molecule_runtime.heartbeat import HeartbeatLoop
logger = logging.getLogger(__name__)
_NO_TEXT_MSG = "Error: message contained no text content."
_NO_RESPONSE_MSG = "(no response generated)"
def _apply_extra_mcp_servers(mcp_servers: dict, config: dict) -> dict:
"""Merge config-declared MCP servers into the base ``mcp_servers`` dict.
The org-level platform agent declares a second MCP server (the platform-
management MCP) in its ``config.yaml`` under ``mcp_servers:`` so it can drive
the org alongside the always-on ``a2a`` server. Each entry is
``{name, command, args?, env?}``. Ordinary workspaces declare none, so this
is a no-op for them.
Defensive: entries that are not dicts, are missing ``name``/``command``, or
try to redefine the built-in ``a2a`` server are skipped — a malformed config
can neither crash the executor nor shadow the A2A mesh.
(RFC: molecule-core docs/design/rfc-platform-agent.md)
"""
for entry in config.get("mcp_servers") or []:
if not isinstance(entry, dict):
continue
name = entry.get("name")
command = entry.get("command")
if not name or not command or name == "a2a":
continue
server: dict = {"command": command, "args": entry.get("args") or []}
if entry.get("env"):
server["env"] = entry["env"]
mcp_servers[name] = server
return mcp_servers
_MAX_RETRIES = 3
_BASE_RETRY_DELAY_S = 5
# Cap for stderr captured from the CLI subprocess in the executor log. Keeps
# log lines bounded while still surfacing enough context to diagnose crashes.
# Fixes #66 (previously the executor logged nothing beyond the generic
# "Check stderr output for details" message).
_PROCESS_ERROR_STDERR_MAX_CHARS = 4096
# Substrings in error messages that indicate a transient failure worth retrying.
_RETRYABLE_PATTERNS = (
"rate",
"limit",
"429",
"overloaded",
"capacity",
"exit code 1",
"try again",
)
# Module-level SDK-wedge flag. When claude_agent_sdk's `query.initialize()`
# raises `Control request timeout: initialize`, the SDK's internal client-
# process state is corrupted for the rest of the Python process — every
# subsequent `_run_query()` call hits the same wedge and re-throws. The
# executor itself can't auto-recover (the underlying CLI subprocess and
# its read pipe are in an unrecoverable state); only a workspace restart
# clears it.
#
# Two consumers read these helpers:
# 1. Heartbeat (via molecule_runtime.runtime_wedge — see _mark_sdk_wedged
# below). Reports `runtime_state="wedged"` to the platform, which
# flips the workspace to `degraded` so the canvas surfaces a Restart
# hint instead of leaving the user staring at a green dot while
# every chat hangs.
# 2. Boot smoke (molecule-core task #131). When the publish-image
# workflow boots the image with MOLECULE_SMOKE_MODE=1,
# run_executor_smoke consults runtime_wedge.is_wedged() at the end
# of every result path and upgrades a provisional PASS to FAIL when
# the flag is set. Catches PR-25-class regressions (malformed CLI
# argv → SDK init wedge) BEFORE the broken image ships to GHCR.
#
# Module scope (not instance scope) is deliberate: the wedge is a
# property of the Python process, not the executor. A future per-org
# multi-executor design could move this to a shared registry, but with
# one executor per workspace process today the simplest lock-free
# read+write fits.
_sdk_wedged_reason: str | None = None
def is_wedged() -> bool:
"""True if the Claude SDK has hit a non-recoverable init wedge in
this process. Sticky until process restart."""
return _sdk_wedged_reason is not None
def wedge_reason() -> str:
"""Human-readable description of the wedge cause, or empty string
when not wedged. Surfaced to the canvas via heartbeat sample_error."""
return _sdk_wedged_reason or ""
def _mark_sdk_wedged(reason: str) -> None:
"""Internal — flag the SDK as wedged. Only the first call wins
(subsequent identical wedges shouldn't overwrite a more specific
reason). Tests use `_reset_sdk_wedge_for_test()` to clear.
Mirrors the flag into molecule_runtime.runtime_wedge — that's the
universal cross-cutting wedge holder that heartbeat.py reads (to
flip the workspace to `degraded`) and that smoke_mode reads (to
fail the publish-image gate on init wedges, task #131). Without
this mirror the local sticky flag is unobserved by both consumers.
Best-effort: a missing/older runtime that doesn't ship runtime_wedge
silently no-ops the mirror — the local flag still gates
is_wedged() inside this module so internal callers (retry loop,
cancel handler) keep working.
"""
global _sdk_wedged_reason
if _sdk_wedged_reason is None:
_sdk_wedged_reason = reason
logger.error("SDK wedge detected: %s — workspace will report degraded until a successful query clears it", reason)
# Catch is narrowed to import errors: a SIGNATURE drift
# (mark_wedged renamed/removed) must surface so the smoke gate
# + heartbeat aren't silently blind. The runtime's structural
# snapshot test (molecule-core task #169) catches the rename
# at PR-time. Older runtimes that don't ship runtime_wedge at
# all hit ImportError here and silently no-op the mirror —
# the local sticky flag still gates is_wedged() inside this
# module so internal callers (retry loop, cancel handler)
# keep working.
try:
from molecule_runtime.runtime_wedge import mark_wedged as _mark_runtime_wedged
except (ImportError, ModuleNotFoundError):
return
try:
_mark_runtime_wedged(reason)
except Exception:
# Mirror call (not import) is still best-effort — a
# runtime_wedge internal raise must not silently suppress
# the local wedge state. Logged loudly so the regression
# is at least visible in the executor log.
logger.exception("runtime_wedge.mark_wedged mirror failed — local SDK wedge flag is still set")
def _clear_sdk_wedge_on_success() -> None:
"""Auto-recovery — called from _run_query after a successful
completion. The original wedge could be transient (a single network
blip during the SDK's first-message handshake), and a sticky-only
flag would lock the workspace into degraded forever even after the
SDK started working again. Clearing on observed success means the
next heartbeat after a working query reports `runtime_state` empty
and the platform flips status back to online.
Symmetric with _mark_sdk_wedged: also clears the universal
runtime_wedge flag so heartbeat + smoke_mode see the same state.
No-op when not wedged (the common case)."""
global _sdk_wedged_reason
if _sdk_wedged_reason is not None:
logger.info("SDK wedge cleared after successful query — workspace will recover to online on next heartbeat")
_sdk_wedged_reason = None
# Same import-narrowing rationale as _mark_sdk_wedged above.
try:
from molecule_runtime.runtime_wedge import clear_wedge as _clear_runtime_wedge
except (ImportError, ModuleNotFoundError):
return
try:
_clear_runtime_wedge()
except Exception:
logger.exception("runtime_wedge.clear_wedge mirror failed — local clear succeeded")
def _reset_sdk_wedge_for_test() -> None:
"""Test-only escape hatch. Production code clears the wedge via
`_clear_sdk_wedge_on_success` when a query succeeds; this helper
is for unit tests that need to reset between cases."""
global _sdk_wedged_reason
_sdk_wedged_reason = None
# Per-tool-use summarizers. Reads the most-useful argument from each
# tool's input dict so the canvas progress feed shows
# `🛠 Read /tmp/foo` instead of the bare tool name. Anything not in the
# table falls through to a generic "🛠 <tool>(…)" line. Order keys by
# tool frequency so a future contributor can see the high-traffic
# tools first.
_TOOL_USE_SUMMARIZERS: dict[str, Callable[[dict], str]] = {
"Read": lambda i: f"📄 Read {i.get('file_path', '?')}",
"Write": lambda i: f"✍️ Write {i.get('file_path', '?')}",
"Edit": lambda i: f"✏️ Edit {i.get('file_path', '?')}",
"Bash": lambda i: f"⚡ Bash: {(i.get('command') or '')[:80]}",
"Glob": lambda i: f"🔍 Glob {i.get('pattern', '?')}",
"Grep": lambda i: f"🔍 Grep {i.get('pattern', '?')}",
"WebFetch": lambda i: f"🌐 WebFetch {i.get('url', '?')}",
"WebSearch": lambda i: f"🌐 WebSearch {i.get('query', '?')}",
"Task": lambda i: f"🤖 Task: {(i.get('description') or '')[:60]}",
"TodoWrite": lambda _i: "📝 TodoWrite",
}
def _summarize_tool_use(tool_name: str, tool_input: dict) -> str:
summarizer = _TOOL_USE_SUMMARIZERS.get(tool_name)
if summarizer:
try:
return summarizer(tool_input or {})[:200]
except Exception:
pass
# Generic fallback. Truncated so a tool with a giant input dict
# doesn't write a 10kB activity row per call.
return f"🛠 {tool_name}(…)"[:200]
async def _report_tool_use(block: Any) -> None:
"""Fire-and-forget agent_log activity row per tool the SDK invoked,
so the canvas's MyChat live-progress feed can render each step
Claude is doing instead of staring at a single spinner.
Posts directly to /workspaces/:id/activity rather than through
a2a_tools.report_activity — that helper also pushes a current_task
heartbeat which would duplicate as a TASK_UPDATED line in the
chat feed. The workspace card's current_task is already set
once per turn by the executor's set_current_task(brief_summary)
call, so the per-tool telemetry stays a chat-only signal.
Best-effort — any failure (network blip, platform unreachable, the
block didn't have the attrs we expected) is swallowed silently.
The tool will still execute regardless; only the progress
telemetry is lost. Deliberately does NOT raise — a malformed
block must not abort the message-stream iteration in
`_run_query`.
"""
try:
# Lazy imports to keep this helper non-essential — the
# executor must still run when the workspace's network/auth
# plumbing isn't fully set up (e.g. unit tests).
import httpx
from molecule_runtime.a2a_client import PLATFORM_URL, WORKSPACE_ID
from molecule_runtime.platform_auth import auth_headers
except Exception:
return
try:
tool_name = getattr(block, "name", "") or ""
tool_input = getattr(block, "input", {}) or {}
if not tool_name:
return
summary = _summarize_tool_use(tool_name, tool_input)
# 5s budget — long enough to absorb a single platform GC
# pause, short enough that a wedged platform doesn't slow
# the tool-iteration cadence beyond noticeable.
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
json={
"activity_type": "agent_log",
"source_id": WORKSPACE_ID,
# target_id == source for self-actions. Matches the
# convention other self-logged activity rows use
# (a2a_receive when the workspace logs its own
# outbound reply) so DB consumers joining on
# target_id see a well-defined value.
"target_id": WORKSPACE_ID,
"summary": summary,
"status": "ok",
"method": tool_name,
},
headers=auth_headers(),
)
except Exception:
# Telemetry failures must not break the conversation.
return
# Substring patterns that classify an exception as the specific
# claude_agent_sdk init-timeout wedge (vs. a rate-limit, transient
# subprocess crash, etc.). Match is case-insensitive on the formatted
# error string. Adding a new pattern here MUST come with a test in
# tests/test_claude_sdk_executor.py — false-positives lock the
# workspace into degraded until the next successful query clears it.
#
# `:initialize` suffix-anchored — the SDK can theoretically time out
# on later control messages (in-flight tool callbacks), but those
# don't leave the SDK in the unrecoverable post-init state we're
# trying to detect. Limit the pattern to the specific wedge.
_WEDGE_ERROR_PATTERNS = (
"control request timeout: initialize",
)
# Substrings that classify an error as a CONTEXT-WINDOW OVERFLOW — the
# accumulated session transcript grew past the model's context window, so
# the next request's input tokens alone exceed the limit and EVERY
# subsequent dispatch on the same (resumed) session re-overflows and fails.
# This is the Kimi wedge: claude-code routed at a 262144-token model
# reported `token limit 262144 requested 268132`; once the session crossed
# the window, every A2A turn 400'd identically and the agent was stuck.
#
# WHY claude-code's own auto-compact didn't save Kimi
# ---------------------------------------------------
# claude-code DOES auto-compact, but the compaction threshold is derived
# from the model's context window via the CLI's internal resolver (`B2`):
# it returns 1e6 for known long-context Anthropic models, a cached value
# for `claude-sonnet-4-6`, and otherwise falls through to a hard-coded
# `pi6 = 200000` default. A non-Anthropic model reached through the
# molecule LLM proxy (Kimi/MiniMax/GLM/DeepSeek) is NOT in that table, so
# the resolver returns the 200k fallback. Kimi's REAL window is 262144 —
# LARGER than the assumed 200k — so claude-code believes it has *more*
# headroom than it does only when the model is smaller, but the deeper
# failure is the inverse: the proxy advertises the model's true 262144
# window to claude-code's token accounting in some paths while the
# compaction trigger uses the 200k fallback, so the session is allowed to
# grow into a band (200k262k) where claude-code thinks compaction already
# ran "enough" but the upstream still rejects. Net effect for the operator:
# auto-compact fired against the wrong number and the session wedged. The
# durable prevention is to tell claude-code the model's real window via
# `CLAUDE_CODE_MAX_CONTEXT_TOKENS` (see _maybe_set_context_window_env); the
# auto-heal below is the RECOVERY half that un-sticks an already-wedged
# agent.
#
# Matching mirrors claude-code's own overflow regex
# \b(too long|too large|exceeds|token limit|prompt is too long)\b
# plus the proxy-shaped `token limit <N> requested <M>` body and the
# Anthropic-native phrasings, so we catch the error whether it surfaces as
# a raised ProcessError/Exception OR as an `is_error` ResultMessage.
# Case-insensitive substring match on the formatted error text.
#
# Adding a pattern here MUST come with a test in
# tests/test_context_overflow_autoheal.py — a false positive throws away a
# healthy session and forces a (recoverable but wasteful) re-summarization.
_CONTEXT_OVERFLOW_PATTERNS = (
"prompt is too long",
"token limit", # proxy: "token limit 262144 requested 268132"
"context window",
"context_length_exceeded",
"maximum context length",
"exceeds the context",
"input length and `max_tokens`",
"too many tokens",
)
# Known context windows for proxy-routed (non-Anthropic) models that
# claude-code's own resolver doesn't know (it falls back to 200k for these).
# Keyed by a substring of the resolved model id (matched case-insensitively,
# longest key first so e.g. "kimi-k2.6" can override a generic "kimi"). Values
# are the model's REAL token window. Used ONLY as the last fallback in
# _maybe_set_context_window_env (env + config.yaml take precedence). Adding an
# entry requires confidence in the value — too high re-introduces the overflow
# wedge, too low wastes context. SSOT: centralize via config.yaml/provisioner
# (runtime#133). Confirmed: Kimi K2 = 262144 (the JRS incident model; reported
# `token limit 262144`).
_KNOWN_PROXY_MODEL_CONTEXT_WINDOWS: tuple[tuple[str, int], ...] = (
("kimi", 262144), # Moonshot Kimi K2 family — 256K
("moonshot", 262144), # provider-prefixed form (moonshot/kimi-*)
)
def _known_model_context_window(model: str | None) -> int | None:
"""Best-effort context window for a proxy-routed model id, or None when
unknown (leaving claude-code's default resolver in place — no regression
for Anthropic models)."""
low = (model or "").lower()
if not low:
return None
# longest key first for specificity
for key, window in sorted(_KNOWN_PROXY_MODEL_CONTEXT_WINDOWS, key=lambda kv: -len(kv[0])):
if key in low:
return window
return None
def _is_context_overflow(text: str) -> bool:
"""True if `text` looks like a context-window overflow (vs. a generic
rate-limit or subprocess crash). Case-insensitive substring match
against `_CONTEXT_OVERFLOW_PATTERNS`.
Deliberately NARROW: `_RETRYABLE_PATTERNS` already contains the broad
word "limit", which would match almost any rate-limit string; this
classifier exists to distinguish the *context* overflow (heal by
resetting the session) from a *rate* limit (heal by backing off and
retrying the SAME session). The two need opposite remedies — resetting
the session on a rate-limit would needlessly discard good context.
"""
low = (text or "").lower()
return any(p in low for p in _CONTEXT_OVERFLOW_PATTERNS)
_SWALLOWED_STDERR_MARKER = "Check stderr output for details"
def _probe_claude_cli_error() -> str | None:
"""Run ``claude --print`` directly and capture its stderr + stdout.
Used as a fallback when the claude-agent-sdk raises a bare ``Exception``
with the swallowed "Check stderr output for details" placeholder — that
happens when the SDK wraps a stream error from the CLI subprocess and
loses both the ``.stderr`` attribute and the exit code. At that point
the only way to see the real failure reason (rate limit, auth error,
network outage, missing token) is to run the CLI ourselves.
Bounded by a 30s timeout so a hung CLI can't stall the error path.
Returns None if the probe itself failed (wrong invariant — don't
corrupt the main error message with probe noise).
"""
try:
import subprocess
# --print reads stdin, prints response, exits. Empty stdin gives the
# CLI something to work with without triggering an actual model call
# when it's going to fail anyway.
proc = subprocess.run(
["claude", "--print"],
input="probe",
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode == 0:
# CLI succeeded — the original error was a transient state that
# resolved between the SDK failure and our probe. Signal that.
return "<cli probe succeeded — error was transient>"
raw = (proc.stderr or "") + (proc.stdout or "")
raw = raw.strip()
if not raw:
return f"<cli exited {proc.returncode} with empty output>"
if len(raw) > _PROCESS_ERROR_STDERR_MAX_CHARS:
raw = raw[:_PROCESS_ERROR_STDERR_MAX_CHARS] + "... [truncated]"
return raw
except Exception as probe_exc: # pragma: no cover — best-effort diagnostic
return f"<probe failed: {type(probe_exc).__name__}: {probe_exc}>"
def _format_process_error(exc: BaseException) -> str:
"""Render a Claude-SDK ProcessError (or any ClaudeSDKError) with its full
captured context — exit code, stderr, exception type. Plain strings for
non-SDK exceptions fall back to str(exc).
Bounded at _PROCESS_ERROR_STDERR_MAX_CHARS so a runaway CLI can't spam
the log. Used by the executor's error path (fixes #66 — the SDK's
ProcessError carries `.stderr`/`.exit_code` attributes that the previous
code silently discarded, leaving every CLI crash with an identical
"Check stderr output for details" message in the workspace log).
Fixes #160: when the SDK raises a bare ``Exception`` containing the
"Check stderr output for details" placeholder (which happens when the
CLI subprocess emits a stream error the SDK can't categorize — rate
limit, auth, network), there's no ``.stderr``/``.exit_code`` to read.
In that case we fall back to running the CLI ourselves via
``_probe_claude_cli_error`` so the operator sees the real failure
reason (e.g. ``You've hit your limit · resets Apr 17``) instead of
chasing ghosts in the workspace logs.
"""
parts = [f"{type(exc).__name__}: {exc}"]
exit_code = getattr(exc, "exit_code", None)
if exit_code is not None:
parts.append(f"exit_code={exit_code}")
stderr = getattr(exc, "stderr", None)
if stderr:
trimmed = stderr[:_PROCESS_ERROR_STDERR_MAX_CHARS]
if len(stderr) > _PROCESS_ERROR_STDERR_MAX_CHARS:
trimmed += f"... [{len(stderr) - _PROCESS_ERROR_STDERR_MAX_CHARS} more chars truncated]"
parts.append(f"stderr={trimmed!r}")
elif exit_code is None and _SWALLOWED_STDERR_MARKER in str(exc):
# #160: generic exception with the swallowed-stderr placeholder.
# Probe the CLI directly — this is the only way to surface the real
# error when the SDK lost it in translation.
probed = _probe_claude_cli_error()
if probed:
parts.append(f"probed_cli_error={probed!r}")
return " | ".join(parts)
# --- Stuck-MCP readiness gate (the concierge "lost its platform tools" bug) ---
#
# The org-level platform agent declares a second MCP server (`platform`, the
# molecule-mcp-server with ~88 org-admin tools like create_workspace) in its
# config.yaml. That server is a `node` stdio subprocess that takes ~5-8s to
# finish its MCP handshake. The one-shot `claude_agent_sdk.query()` API spawns
# a COLD CLI subprocess per turn and ships its `init` system message — which
# carries the tool list the LLM will see — the instant the CLI boots, while the
# `node` server is still `status: pending`. Result: the 88 `mcp__platform__*`
# tools are absent from that turn's tool list, so the concierge "loses"
# create_workspace etc. A fresh `query()` doesn't reliably fix it because each
# query is a cold subprocess that re-races the same handshake (confirmed live:
# 3/3 fresh `query()` sessions showed `platform: pending` / 0 platform tools at
# init, while a persistent client that POLLS `get_mcp_status()` settles to
# `connected` / 88 tools after ~5s).
#
# Fix: when the config declares extra (non-`a2a`) MCP servers, route the turn
# through a persistent `ClaudeSDKClient` and GATE on `get_mcp_status()` until
# every declared server reports `connected` (bounded poll) BEFORE sending the
# prompt — so the LLM's tool list includes the platform tools. Ordinary
# workspaces declare no extra servers and keep the fast `query()` path
# untouched. If the gate times out with a server still not connected, we raise
# `_McpNotReadyError` so `_execute_locked` can self-heal (reset session + retry
# once), mirroring the context-overflow auto-heal.
#
# Connection-status classification for the readiness gate.
# `connected` — the only success; the server's tools are live.
# `pending` — still handshaking; keep polling within the budget.
# `failed` — the CLI's own MCP-connect timeout (MCP_TIMEOUT, default 30s)
# elapsed before the slow `node` server finished. Observed
# live to be INTERMITTENT under load (the same server connects
# fine on a fresh subprocess), so it is treated as RETRYABLE,
# not hard-terminal: the gate surfaces it as _McpNotReadyError
# which the executor heals by resetting the session and
# retrying on a fresh subprocess (a new MCP-connect attempt).
# `disabled` / `needs-auth` — genuinely terminal: a config/auth problem no
# retry can fix, so the gate raises immediately and the heal's
# retry is wasted but bounded (one reset, then a hard error).
_MCP_READY_STATUS = "connected"
# Hard-terminal: retrying cannot help (misconfiguration / auth).
_MCP_TERMINAL_FAIL_STATUSES = ("disabled", "needs-auth")
# Bound on the readiness poll. The platform `node` MCP was observed to connect
# in ~6-13s; budget 50 × 0.5s = 25s gives ample headroom under load while
# staying under the SDK's 60s initialize timeout so the gate can't itself trip
# an init wedge.
_MCP_READY_MAX_POLLS = 50
_MCP_READY_POLL_INTERVAL_S = 0.5
# How many times the stuck-MCP heal may reset+retry within a single dispatch.
# Each retry is a FRESH subprocess with an independent MCP-connect attempt, so
# a transient `failed`/timeout on one attempt usually clears on the next. Two
# retries (three total attempts) makes an intermittent connect failure
# vanishingly unlikely to surface to the user, while staying bounded so a
# genuinely-broken server still terminates loudly.
_MCP_HEAL_MAX_RETRIES = 2
# CLI MCP-connection timeout (ms) the executor pins for the platform agent so
# the slow `node` handshake isn't marked `failed` prematurely under load. The
# CLI default is 30000; 60000 doubles the headroom. Never clobbers an operator
# pin. (Preventive half of the stuck-MCP fix — the readiness gate is recovery.)
_MCP_CONNECT_TIMEOUT_MS = "60000"
class _McpNotReadyError(Exception):
"""Raised by the readiness-gated query path when a config-declared MCP
server (e.g. `platform`) never reached `connected` within the bounded
poll window, so its tools would be absent from the LLM's tool list.
Routed through `_execute_locked`'s heal classifier (analogous to
`_ResultError` for overflow): the first occurrence resets the session
and retries ONCE; a second occurrence on the retry is a hard, loud
error (the server is genuinely broken, not just slow). Carries the
offending server name + last-seen status for the log + activity row.
"""
def __init__(self, server: str, status: str) -> None:
self.server = server
self.status = status
super().__init__(f"MCP server {server!r} not ready (status={status!r})")
class _ResultError(Exception):
"""Raised by `_run_query` when the SDK completes the stream but the
terminal `ResultMessage` carries `is_error=True`.
A context overflow does NOT always surface as a raised SDK exception:
when the CLI subprocess reaches the model proxy and the upstream
rejects the request body with a 400 (`token limit … requested …`), the
CLI can emit a normal `result` message with `is_error=True` and the
error text in `.result` instead of crashing. Without this, that path
returned the error string as if it were a successful agent reply — the
overflow looked like a (broken) answer and the heal never triggered.
Re-raising as a typed exception routes the `is_error` result through
the exact same retry/heal/wedge classification the raised-exception
path uses, so detection lives in ONE place (`_execute_locked`).
Carries the rendered error text so the classifier can match on it.
"""
def __init__(self, text: str) -> None:
self.text = text or ""
super().__init__(self.text)
def _result_error_detail(
result_text,
subtype=None,
api_error_status=None,
errors=None,
):
"""Build the text carried by a `_ResultError`.
When the SDK ResultMessage actually carried a `result` string (the
common context-overflow case -- `token limit ... requested ...`), use
it verbatim so the existing overflow classifier keeps matching.
When `result` is EMPTY (the opaque agents-team-engine case: the SDK
yields `is_error=True` with `result=None`), synthesize a concise,
NON-SECRET detail from the diagnostic fields the ResultMessage DID
carry -- `subtype` (e.g. "error_during_execution", "error_max_turns")
and `api_error_status` (the upstream HTTP status, e.g. 401/429/404/500;
the SDK documents this field as "Safe to log -- no message content").
A short summary of `errors[]` is appended when present. This turns the
previously opaque `_ResultError("")` -- which rendered as the useless
"Agent error (_ResultError) -- see workspace logs for details" -- into
a self-diagnosing string like
`error_during_execution (api_error_status=401)`.
Safety: subtype + status code are categorical/non-secret. Any
`errors[]` text still flows through the runtime `sanitize_agent_error`,
which truncates to ~1KB and scrubs token/key-shaped substrings before
the string reaches the A2A response -- so this never leaks credentials
or request bodies. We additionally cap the errors summary here so a
pathological list can't dominate the message.
"""
text = (result_text or "").strip()
if text:
return text
parts = []
if subtype:
parts.append(str(subtype))
if api_error_status is not None:
parts.append("(api_error_status=%s)" % api_error_status)
if errors:
try:
summary = "; ".join(str(e) for e in errors if e)
except Exception:
summary = ""
summary = summary.strip()
if summary:
# Cap so a long errors list can't bloat the message; the
# runtime sanitizer is the authoritative token/length scrubber.
if len(summary) > 300:
summary = summary[:300] + ""
parts.append("errors: " + summary)
detail = " ".join(parts).strip()
# Last-resort: never return "" (that is exactly the opaque case we are
# fixing). A generic-but-honest marker still beats an empty string.
return detail or "engine error with no detail (is_error result, empty result/subtype)"
@dataclass
class QueryResult:
"""Outcome of a single `query()` stream.
`text` is the canonical final response; `session_id` is the id the SDK
reports in its ResultMessage (used for resume on the next turn).
`tool_uses` is the ordered list of tool names invoked during the turn
— used as a UX-friendly fallback when `text` is empty (the agent did
only tool calls and no final text block, common for autonomous-tick
ticks that delegate or send_message_to_user without explanation).
"""
text: str
session_id: str | None
tool_uses: list[str] = field(default_factory=list)
class ClaudeSDKExecutor(AgentExecutor):
"""Executes agent tasks via the claude-agent-sdk programmatic API."""
def __init__(
self,
system_prompt: str | None,
config_path: str,
heartbeat: "HeartbeatLoop | None",
model: str = "sonnet",
):
self.system_prompt = system_prompt
self.config_path = config_path
self.heartbeat = heartbeat
self.model = model
self._session_id: str | None = None
self._active_stream: AsyncIterator[Any] | None = None
# Serializes concurrent execute() calls on the same executor so
# session_id / _active_stream mutations stay race-free.
self._run_lock = asyncio.Lock()
# ------------------------------------------------------------------
# Prompt + options builders
# ------------------------------------------------------------------
def _resolve_cwd(self) -> str:
"""Run in /workspace if it has been populated, otherwise /configs."""
if os.path.isdir(WORKSPACE_MOUNT) and os.listdir(WORKSPACE_MOUNT):
return WORKSPACE_MOUNT
return CONFIG_MOUNT
def _build_system_prompt(self) -> str | None:
"""Compose system prompt from file + A2A + HMA memory instructions."""
base = get_system_prompt(self.config_path, fallback=self.system_prompt)
a2a = get_a2a_instructions(mcp=True)
display = get_display_instructions()
hma = get_hma_instructions()
parts = [p for p in (base, a2a, display, hma) if p]
return "\n\n".join(parts) if parts else None
def _prepare_prompt(self, user_input: str) -> str:
"""Prepend delegation results that arrived while idle."""
delegation_context = read_delegation_results()
if delegation_context:
return (
"[Delegation results received while you were idle]\n"
f"{delegation_context}\n\n[New message]\n{user_input}"
)
return user_input
async def _inject_memories_if_first_turn(self, prompt: str) -> str:
if self._session_id:
return prompt
memories = await recall_memories()
if not memories:
return prompt
return f"[Prior context from memory]\n{memories}\n\n{prompt}"
def _load_mcp_fragment(self) -> dict:
"""Read the standalone mcp_servers.yaml overlay fragment, if present.
Same defensive posture as _load_config_dict: any I/O or parse error
returns {} so a malformed fragment can never crash the executor.
"""
try:
fragment_file = os.path.join(self.config_path, "mcp_servers.yaml")
with open(fragment_file) as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def _load_config_dict(self) -> dict:
"""Read config.yaml as a raw dict for field-level inspection.
Returns an empty dict on any I/O or parse error so callers can
always use ``.get()`` without guards.
"""
try:
config_file = os.path.join(self.config_path, "config.yaml")
with open(config_file) as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def _declared_extra_mcp_names(self) -> list[str]:
"""Names of config-declared MCP servers other than the built-in
``a2a`` server.
These are the servers the readiness gate must wait for before the
turn's prompt is sent — ``a2a`` is an in-process stdio MCP that the
executor controls and that connects immediately, but extra servers
(the platform agent's ``platform`` / molecule-mcp-server) are slow
external ``node``/binary subprocesses whose handshake races the CLI's
init message. Empty for ordinary workspaces → the fast `query()` path
stays in effect.
Filtering mirrors `_apply_extra_mcp_servers`: skip non-dict entries,
entries missing ``name``/``command``, and any attempt to redefine
``a2a``.
"""
names: list[str] = []
for entry in self._load_config_dict().get("mcp_servers") or []:
if not isinstance(entry, dict):
continue
name = entry.get("name")
command = entry.get("command")
if not name or not command or name == "a2a":
continue
names.append(name)
return names
def _maybe_set_context_window_env(self) -> None:
"""Tell claude-code the model's REAL context window (deeper fix).
Root cause of the Kimi wedge: claude-code's auto-compact threshold
is derived from its internal context-window resolver, which only
knows Anthropic models and falls back to a hard-coded 200000 for
anything reached through the molecule LLM proxy (Kimi/MiniMax/GLM/
DeepSeek). With the wrong window, compaction fires against the
wrong number and the session is allowed to drift into a band the
upstream still rejects.
claude-code honors ``CLAUDE_CODE_MAX_CONTEXT_TOKENS`` as an
explicit override of that resolver (see the bundled CLI's `B2`
function). Setting it to the model's true window makes auto-compact
trigger at the correct point — PREVENTING the overflow rather than
only recovering from it. The auto-heal stays as the safety net for
any window we don't have configured.
Source of the window (first hit wins):
1. ``MODEL_CONTEXT_WINDOW`` env (persona/operator override).
2. ``context_window`` in config.yaml.
Absent/invalid → leave the env untouched so claude-code keeps its
own default behavior (no regression for Anthropic models, whose
resolver is already correct).
Idempotent + non-destructive: if the env is already set (operator
pinned it, or a prior call set it) we don't overwrite it.
"""
if os.environ.get("CLAUDE_CODE_MAX_CONTEXT_TOKENS"):
return
raw = os.environ.get("MODEL_CONTEXT_WINDOW")
if not raw:
raw = self._load_config_dict().get("context_window")
if raw in (None, ""):
# 3rd source (the gap that wedged JRS): proxy-routed models
# (Kimi/MiniMax/GLM/DeepSeek) are NOT in claude-code's window
# resolver AND usually have no MODEL_CONTEXT_WINDOW/config set, so
# without this the env stays unset → claude-code uses its 200k
# fallback → auto-compact fires against the wrong number → the
# session overflows the model's REAL window (Kimi hit 262191 >
# 262144) → the wipe-reset auto-heal loses all task memory. Map
# the known proxy models to their true window so claude-code's
# NATIVE compaction (which summarizes, NOT wipes) triggers
# correctly and the overflow never happens. Substring match on the
# resolved model id. SSOT note: ideally the provisioner sets
# `context_window` in config.yaml per model (the 2nd source above);
# this map is the safety net until that lands (tracked: runtime#133).
raw = _known_model_context_window(self.model)
if raw in (None, ""):
return
try:
window = int(raw)
except (TypeError, ValueError):
logger.warning(
"context_window=%r is not an integer — leaving claude-code's "
"default window resolver in place", raw,
)
return
if window <= 0:
return
os.environ["CLAUDE_CODE_MAX_CONTEXT_TOKENS"] = str(window)
logger.info(
"set CLAUDE_CODE_MAX_CONTEXT_TOKENS=%d so auto-compact triggers "
"against the model's real context window (model=%s) — prevents "
"the proxy-routed-model context-overflow wedge",
window, self.model,
)
def _maybe_set_mcp_connect_timeout_env(self) -> None:
"""Give slow stdio MCP servers more time to connect (preventive half
of the stuck-MCP fix).
The platform agent's `platform` MCP is a `node` subprocess whose
handshake was observed to take ~6-13s and, under load, occasionally
exceed the claude CLI's default 30s MCP-connect timeout — at which
point the CLI marks the server `failed` and its tools never reach the
LLM. The CLI honors ``MCP_TIMEOUT`` (ms) as an override of that
connect timeout; pinning it higher makes premature `failed` far less
likely, complementing the readiness gate (which RECOVERS) with
PREVENTION.
Only applied when the config declares extra (non-`a2a`) MCP servers —
ordinary workspaces keep the CLI default. Idempotent + non-destructive:
never clobbers an operator pin.
"""
if os.environ.get("MCP_TIMEOUT"):
return
if not self._declared_extra_mcp_names():
return
os.environ["MCP_TIMEOUT"] = _MCP_CONNECT_TIMEOUT_MS
logger.info(
"set MCP_TIMEOUT=%s so the slow platform MCP isn't marked failed "
"before its handshake completes (prevents the stuck-MCP tool-loss)",
_MCP_CONNECT_TIMEOUT_MS,
)
def _build_options(self) -> Any:
"""Build ClaudeAgentOptions.
No allowed_tools allowlist — bypassPermissions grants full access,
matching the old CLI `--dangerously-skip-permissions` so Claude can
use every built-in tool (Task, TodoWrite, NotebookEdit, BashOutput/
KillShell, ExitPlanMode, etc.) plus all MCP tools.
The MCP server launcher uses `sys.executable` so tests and alternate
virtual-env layouts don't depend on a `python3` shim being on PATH.
output_config wiring (issue #652)
----------------------------------
Reads ``effort`` and ``task_budget`` from config.yaml and populates
``output_config`` on the SDK options before the API call:
- ``effort`` (str): one of low|medium|high|xhigh|max. xhigh is the
Opus 4.7 recommended default for long agentic tasks.
- ``task_budget`` (int): advisory total-token budget across the full
agentic loop. Must be >= 20000 (API minimum) or 0/absent (unset).
When set, the ``task-budgets-2026-03-13`` beta header is added so
the API accepts the field.
"""
# Deeper fix for the context-overflow wedge: pin the model's real
# context window so claude-code's auto-compact triggers against the
# right number instead of its 200k fallback for proxy-routed
# models. No-op when unconfigured (Anthropic models keep their
# correct built-in resolver).
self._maybe_set_context_window_env()
# Preventive half of the stuck-MCP fix: give the slow platform MCP
# more connect headroom so it isn't marked `failed` before its
# handshake completes (no-op for ordinary workspaces).
self._maybe_set_mcp_connect_timeout_env()
mcp_servers = {
"a2a": {
"command": sys.executable,
"args": [get_mcp_server_path()],
}
}
# Merge any config-declared MCP servers (e.g. the platform-management
# MCP for the org-level platform agent). No-op for ordinary workspaces.
_apply_extra_mcp_servers(mcp_servers, self._load_config_dict())
# Overlay fragment (core#2522): the provisioner ships the concierge's
# platform-MCP declaration as a standalone /configs/mcp_servers.yaml,
# because on the SaaS restart-provision path no base config.yaml is
# resolvable to append onto (the pilot's TOOLS-FAIL RCA, 2026-06-10).
# Applied AFTER config.yaml so the platform-authored fragment wins on
# a same-name entry. Absent file -> {} -> no-op for every ordinary
# workspace.
_apply_extra_mcp_servers(mcp_servers, self._load_mcp_fragment())
create_kwargs: dict = dict(
model=self.model,
permission_mode="bypassPermissions",
cwd=self._resolve_cwd(),
mcp_servers=mcp_servers,
system_prompt=self._build_system_prompt(),
resume=self._session_id,
# Forward --dangerously-load-development-channels to the spawned
# claude CLI so the host registers our experimental.claude/channel
# capability instead of dropping the notification on the allowlist
# check. The wheel ships the gates (PR molecule-core#2463) and the
# inbox bridge fires the notification, but without this flag the
# CLI silently filters it during the channels research preview.
#
# The flag's signature in Claude Code 2.1.x takes an *allowlist*
# of tagged entries — `server:<name>` for manually-configured
# MCP servers, `plugin:<name>@<marketplace>` for plugin
# channels. Passing `None` (the original PR #25 shape) renders
# as a bare `--<flag>` with no value; the CLI rejects with
# `argument missing` and the SDK times out at `initialize`,
# surfacing as `Control request timeout: initialize` upstream
# (caught live on workspace dd40faf8 on 2026-05-01 — every
# A2A turn wedged 100% of the time). Verified live: with the
# tagged value, A2A returns coherent replies AND the host
# claude session renders inbound messages as `<channel>` tags
# inline (no inbox poll needed). Drop once channels graduate
# to the default allowlist.
#
# Task #214 — CLI 2.1.143 made the flag variadic (nargs='+').
# The `{flag: value}` shape renders as TWO argv elements (see
# claude_agent_sdk subprocess_cli.py:340) and the channels
# parser then greedily absorbs the SDK's downstream `--print
# <prompt>` argv pair, wedging the SDK at initialize. Fix:
# pack `=value` into the key so the renderer's None-value
# path emits a single argv element which the variadic parser
# cannot reach across.
extra_args={"dangerously-load-development-channels=server:molecule": None},
)
# --- output_config: effort + task_budget (issue #652) ---
config = self._load_config_dict()
output_config: dict = {}
effort = config.get("effort", "")
task_budget = config.get("task_budget", 0)
if effort:
output_config["effort"] = effort # "low"|"medium"|"high"|"xhigh"|"max"
if task_budget and int(task_budget) >= 20000:
output_config["task_budget"] = {
"type": "tokens",
"total": int(task_budget),
}
betas = list(create_kwargs.get("betas", []))
if "task-budgets-2026-03-13" not in betas:
betas.append("task-budgets-2026-03-13")
create_kwargs["betas"] = betas
elif task_budget and int(task_budget) > 0:
# Below minimum — reject clearly before any API call is made.
raise ValueError(
f"task_budget must be >= 20000 tokens (got {task_budget})"
)
if output_config:
create_kwargs["output_config"] = output_config
return sdk.ClaudeAgentOptions(**create_kwargs)
# ------------------------------------------------------------------
# Query streaming
# ------------------------------------------------------------------
class _StreamAccumulator:
"""Mutable scratch state shared by the plain and gated query paths.
Both `_run_query` (one-shot `sdk.query()`) and `_run_query_gated`
(persistent `ClaudeSDKClient`) consume the SAME message stream shape
(AssistantMessage / ResultMessage). This holds the per-turn
accumulators so the message-handling logic lives in exactly one place
(`_accumulate_message`) and can't drift between the two paths.
"""
def __init__(self) -> None:
self.assistant_chunks: list[str] = []
self.tool_uses: list[str] = []
self.result_text: str | None = None
self.session_id: str | None = None
self.result_is_error: bool = False
# Diagnostic fields off the terminal ResultMessage, kept so an
# opaque is_error result (result=None) still yields a useful,
# non-secret _ResultError detail (subtype + upstream HTTP status).
self.result_subtype: str | None = None
self.result_api_error_status: int | None = None
self.result_errors: list | None = None
async def _accumulate_message(self, message: Any, acc: "ClaudeSDKExecutor._StreamAccumulator") -> None:
"""Fold one SDK stream message into `acc`.
Extracted verbatim from the original `_run_query` loop so the gated
path (`_run_query_gated`) reuses identical text / tool-use / session-id
/ is_error handling. Keep this the ONLY place that reads message
blocks.
"""
if isinstance(message, sdk.AssistantMessage):
for block in message.content:
if isinstance(block, sdk.TextBlock):
acc.assistant_chunks.append(block.text)
else:
# Handle thinking/reasoning blocks from Anthropic-
# compatible upstreams (MiniMax M2/M2.7, Moonshot
# K2.6) so reasoning-only output doesn't surface as
# empty content. Duck-typing: real SDK objects have
# a `.thinking` attr; dict-shaped blocks have
# `type: "thinking"`.
thinking_text = None
if hasattr(block, "thinking"):
thinking_text = getattr(block, "thinking", None)
elif isinstance(block, dict) and block.get("type") == "thinking":
thinking_text = block.get("thinking")
if thinking_text:
acc.assistant_chunks.append(thinking_text)
return
# ToolUseBlock / ServerToolUseBlock are present
# on the real SDK but not on the conftest stub —
# check by class name to avoid an isinstance()
# against a class the stub doesn't define.
cls = type(block).__name__
if cls in ("ToolUseBlock", "ServerToolUseBlock"):
await _report_tool_use(block)
name = getattr(block, "name", "") or ""
if name:
acc.tool_uses.append(name)
elif isinstance(message, sdk.ResultMessage):
sid = getattr(message, "session_id", None)
if sid:
acc.session_id = sid
acc.result_text = getattr(message, "result", None)
# The SDK reports an upstream-rejected request (e.g. a 400
# context overflow from the model proxy) as a terminal result
# with is_error=True rather than a raised exception. Capture it
# so the error path can re-raise it into the unified
# classification.
acc.result_is_error = bool(getattr(message, "is_error", False))
# Diagnostic fields the SDK ResultMessage carries on a terminal
# error (verified against claude-agent-sdk ResultMessage: .subtype,
# .api_error_status:int|None, .errors:list[str]|None). Captured so
# an opaque is_error result (result=None) still self-diagnoses.
acc.result_subtype = getattr(message, "subtype", None)
acc.result_api_error_status = getattr(message, "api_error_status", None)
acc.result_errors = getattr(message, "errors", None)
@staticmethod
def _mcp_servers_from_status(status: Any) -> list[dict]:
"""Normalize a `get_mcp_status()` response to a list of server dicts.
The SDK returns ``{"mcpServers": [...]}`` (wire shape); be tolerant of
a bare list too so a future SDK shape-change degrades gracefully.
"""
if isinstance(status, dict):
servers = status.get("mcpServers")
else:
servers = status
return [s for s in (servers or []) if isinstance(s, dict)]
async def _await_mcp_ready(self, client: Any, declared: list[str]) -> None:
"""Block (bounded) until every declared MCP server is `connected`.
Polls `client.get_mcp_status()` up to `_MCP_READY_MAX_POLLS` times.
Returns as soon as all `declared` servers report `connected`. Raises
`_McpNotReadyError` if any declared server hits a TERMINAL failure
status (no wait helps) or is still not connected when the poll budget
is exhausted — the caller turns that into a session-reset + one retry.
This is THE fix for the stuck-`platform`-MCP bug: it holds the CLI
subprocess open through the `node` server's slow handshake so the
prompt is only sent once the 88 `mcp__platform__*` tools are live and
will appear in the model's tool list.
"""
declared_set = set(declared)
last_status: dict[str, str] = {}
for _poll in range(_MCP_READY_MAX_POLLS):
try:
status = await client.get_mcp_status()
except Exception:
# A transient status-probe failure shouldn't abort the gate;
# keep polling. If it's genuinely broken we'll exhaust the
# budget and raise below with the last-seen status.
await asyncio.sleep(_MCP_READY_POLL_INTERVAL_S)
continue
servers = self._mcp_servers_from_status(status)
last_status = {
s.get("name", ""): s.get("status", "") for s in servers
}
# Terminal failure on any declared server → stop early, heal/retry.
for name in declared_set:
st = last_status.get(name, "pending")
if st in _MCP_TERMINAL_FAIL_STATUSES:
raise _McpNotReadyError(name, st)
if all(
last_status.get(name) == _MCP_READY_STATUS for name in declared_set
):
logger.debug(
"MCP readiness gate: all declared servers connected (%s)",
declared,
)
return
await asyncio.sleep(_MCP_READY_POLL_INTERVAL_S)
# Budget exhausted — surface the first still-unconnected server so the
# heal path can reset + retry once.
for name in declared:
if last_status.get(name) != _MCP_READY_STATUS:
raise _McpNotReadyError(name, last_status.get(name, "pending"))
# Defensive: loop exited without all-connected yet no name flagged
# (e.g. empty status). Treat as not-ready on the first declared name.
raise _McpNotReadyError(declared[0], last_status.get(declared[0], "unknown"))
async def _run_query_gated(self, prompt: str, options: Any, declared: list[str]) -> QueryResult:
"""Readiness-gated variant of `_run_query` for the platform agent.
Uses a persistent `ClaudeSDKClient` so we can poll `get_mcp_status()`
and WAIT for the slow `platform` MCP to connect BEFORE the prompt is
sent — otherwise the one-shot `query()` ships its init (and the tool
list the LLM sees) while the server is still `pending`, hiding the 88
org-admin tools. Only used when `declared` is non-empty (extra MCP
servers configured); ordinary workspaces never reach here.
Same message handling + wedge-clear + is_error re-raise semantics as
`_run_query` (via the shared `_accumulate_message`), and the same
`self._active_stream` contract so `cancel()` can reach in.
"""
acc = self._StreamAccumulator()
client = sdk.ClaudeSDKClient(options=options)
await client.connect()
try:
# Gate: hold the subprocess open until the platform MCP's tools
# are live. Raises _McpNotReadyError → reset+retry in _execute_locked.
await self._await_mcp_ready(client, declared)
await client.query(prompt)
self._active_stream = client.receive_response()
try:
async for message in self._active_stream:
await self._accumulate_message(message, acc)
except Exception:
# Mirror _run_query: prefer the captured is_error text over a
# trailing generic CLI exception so overflow detection works.
if acc.result_is_error:
raise _ResultError(_result_error_detail(
acc.result_text,
acc.result_subtype,
acc.result_api_error_status,
acc.result_errors,
))
raise
finally:
self._active_stream = None
finally:
# Always tear down the persistent subprocess — one client per turn
# (matches the one-shot query() lifecycle; no cross-turn warm cache
# exists for stdio MCP subprocesses anyway).
try:
await client.disconnect()
except Exception:
logger.debug("MCP-gated client disconnect raised (ignored)")
if acc.result_is_error:
raise _ResultError(_result_error_detail(
acc.result_text,
acc.result_subtype,
acc.result_api_error_status,
acc.result_errors,
))
text = acc.result_text if acc.result_text is not None else "".join(acc.assistant_chunks)
if acc.result_text is not None or acc.assistant_chunks:
_clear_sdk_wedge_on_success()
return QueryResult(text=text, session_id=acc.session_id, tool_uses=acc.tool_uses)
async def _run_query(self, prompt: str, options: Any) -> QueryResult:
"""Drive the SDK query stream and return a QueryResult.
Prefers ResultMessage.result (the canonical final text — same field
the CLI's --output-format json used) and only falls back to the
concatenation of AssistantMessage TextBlocks when result is absent.
Otherwise pre-tool reasoning and post-tool summary get double-emitted.
Pure: does not mutate executor state other than setting / clearing
`self._active_stream` so cancel() can reach in. The caller decides
whether to persist the returned session_id.
Dispatch: when the config declares extra (non-`a2a`) MCP servers (the
platform agent), delegate to the readiness-gated path so the slow
`platform` MCP is `connected` BEFORE the prompt is sent — otherwise its
88 org-admin tools are hidden from the turn's tool list. Ordinary
workspaces declare none and keep this fast one-shot `query()` path.
"""
declared_extra = self._declared_extra_mcp_names()
if declared_extra:
return await self._run_query_gated(prompt, options, declared_extra)
assistant_chunks: list[str] = []
tool_uses: list[str] = []
result_text: str | None = None
session_id: str | None = None
result_is_error: bool = False
result_subtype: str | None = None
result_api_error_status: int | None = None
result_errors: list | None = None
self._active_stream = sdk.query(prompt=prompt, options=options)
try:
try:
async for message in self._active_stream:
if isinstance(message, sdk.AssistantMessage):
for block in message.content:
if isinstance(block, sdk.TextBlock):
assistant_chunks.append(block.text)
else:
# Handle thinking/reasoning blocks from Anthropic-
# compatible upstreams (MiniMax M2/M2.7, Moonshot
# K2.6) so reasoning-only output doesn't surface as
# empty content. Duck-typing: real SDK objects have
# a `.thinking` attr; dict-shaped blocks have
# `type: "thinking"`.
thinking_text = None
if hasattr(block, "thinking"):
thinking_text = getattr(block, "thinking", None)
elif isinstance(block, dict) and block.get("type") == "thinking":
thinking_text = block.get("thinking")
if thinking_text:
assistant_chunks.append(thinking_text)
continue
# ToolUseBlock / ServerToolUseBlock are present
# on the real SDK but not on the conftest stub —
# check by class name to avoid an isinstance()
# against a class the stub doesn't define.
cls = type(block).__name__
if cls in ("ToolUseBlock", "ServerToolUseBlock"):
await _report_tool_use(block)
name = getattr(block, "name", "") or ""
if name:
tool_uses.append(name)
elif isinstance(message, sdk.ResultMessage):
sid = getattr(message, "session_id", None)
if sid:
session_id = sid
result_text = getattr(message, "result", None)
# The SDK reports an upstream-rejected request (e.g. a
# 400 context overflow from the model proxy) as a
# terminal result with is_error=True rather than a
# raised exception. Capture it so the error path below
# can re-raise it into the unified classification.
result_is_error = bool(getattr(message, "is_error", False))
# Same diagnostic capture as the gated path so an
# opaque is_error result (result=None) self-diagnoses.
result_subtype = getattr(message, "subtype", None)
result_api_error_status = getattr(message, "api_error_status", None)
result_errors = getattr(message, "errors", None)
except Exception:
# REAL-SDK SHAPE (claude CLI 2.1.163 / SDK 0.1.72): on a
# terminal error result the SDK yields the is_error=True
# ResultMessage AND THEN raises a generic
# `Exception("Command failed with exit code 1 / Check stderr
# output for details")` because the CLI process exits
# non-zero. That trailing raise pre-empts the post-loop
# `_ResultError` re-raise below, so the REAL overflow text
# ("token limit … requested …" / "Prompt is too long") would
# be discarded and `_is_context_overflow` would only ever see
# the generic "Command failed" string → the heal never fires.
#
# If an is_error result was already seen, that captured text
# is the authoritative failure reason — re-raise it as a
# typed `_ResultError` IN PREFERENCE to the generic CLI
# exception so the overflow text reaches `_is_context_overflow`
# in `_execute_locked`. Any other stream exception (no
# is_error result captured) propagates unchanged.
if result_is_error:
raise _ResultError(_result_error_detail(
result_text,
result_subtype,
result_api_error_status,
result_errors,
))
raise
finally:
self._active_stream = None
# An is_error result is an upstream rejection (e.g. a 400 context
# overflow from the model proxy) that the SDK surfaced as a normal
# terminal message instead of a raised exception. Re-raise it so it
# flows through the same retry/heal/wedge classification in
# _execute_locked as a raised SDK error — detection lives in one
# place. The session_id (if any) was NOT persisted to self by this
# method; the caller's heal path clears it regardless.
#
# This handles the variant where the SDK yields the is_error result
# and then completes the iterator cleanly (no trailing raise); the
# except-branch above handles the real-SDK variant where it raises.
if result_is_error:
raise _ResultError(_result_error_detail(
result_text,
result_subtype,
result_api_error_status,
result_errors,
))
text = result_text if result_text is not None else "".join(assistant_chunks)
# Auto-recover the wedge flag — if a previous query() left this
# process in `_sdk_wedged` and THIS query just completed
# cleanly, the SDK clearly works again. Clear so the next
# heartbeat reports runtime_state empty and the platform flips
# status degraded → online without a manual restart.
#
# Gate on actual content from the stream so a degenerate
# "iterator returned without raising but emitted nothing"
# case (possible from a partial stream or a stub SDK) doesn't
# falsely advertise recovery. A real successful query yields
# at least a ResultMessage (sets result_text) or one
# AssistantMessage TextBlock (populates assistant_chunks).
if result_text is not None or assistant_chunks:
_clear_sdk_wedge_on_success()
return QueryResult(text=text, session_id=session_id, tool_uses=tool_uses)
# ------------------------------------------------------------------
# AgentExecutor interface
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue):
"""Run a turn through the Claude Agent SDK and emit the response.
Serialized via `self._run_lock` — concurrent A2A messages to the same
workspace queue rather than racing on `_session_id` / `_active_stream`.
"""
user_input = extract_message_text(context.message)
# Surface attached files to claude-code via a manifest in the prompt.
# Claude Code reads files through its own Read/Glob tools by path —
# as long as the prompt names the path, the CLI will open them on
# demand. Same contract every platform runtime uses so the UX is
# identical across hermes / langgraph / claude-code.
attached = extract_attached_files(context.message)
if attached:
manifest = "\n\nAttached files:\n" + "\n".join(
f"- {f['name']} ({f['mime_type'] or 'unknown type'}) at {f['path']}"
for f in attached
)
user_input = (user_input + manifest) if user_input else manifest.lstrip()
if not user_input:
await event_queue.enqueue_event(new_text_message(_NO_TEXT_MSG))
return
async with self._run_lock:
response_text = await self._execute_locked(user_input)
# Enqueue outside the lock so the next queued turn can start
# preparing its prompt while this turn's response ships. Event
# ordering is preserved per-queue by the A2A server, so no races.
# If the response mentions /workspace/... files, stage each and
# emit file parts alongside the text so the canvas can download.
#
# a2a-sdk v1 uses protobuf, NOT the v0 Pydantic discriminated-union
# types. There is no FilePart / TextPart / FileWithUri class — Part
# is one struct with optional `text`, `url`, `raw`, `data`,
# `filename`, `media_type` fields (plus `metadata`). Set the field
# that matches the part's nature; leave the rest unset.
outbound = collect_outbound_files(response_text)
if outbound:
from a2a.types import Message, Part, Role
import uuid as _uuid
parts: list = [Part(text=response_text)] if response_text else []
for f in outbound:
parts.append(Part(
url="workspace:" + f["path"],
filename=f["name"],
media_type=f["mime_type"],
))
await event_queue.enqueue_event(Message(
message_id=_uuid.uuid4().hex,
role=Role.ROLE_AGENT,
parts=parts,
))
else:
await event_queue.enqueue_event(new_text_message(response_text))
@staticmethod
def _is_retryable(exc: BaseException) -> bool:
"""Check if an SDK exception looks like a transient rate-limit or
capacity error that's worth retrying with backoff."""
msg = str(exc).lower()
return any(p in msg for p in _RETRYABLE_PATTERNS)
def _reset_session_after_error(self, exc: BaseException) -> None:
"""Clear `_session_id` if the exception looks like a subprocess
crash (#75). On the next `_build_options()` call `resume=None` is
passed to the SDK, so the CLI boots a brand-new session instead of
trying to resume one the previous subprocess left in an
unrecoverable state.
Kept in its own method so the policy can evolve (e.g. also clear
on MessageParseError) without touching the retry loop. Logs at
INFO when a session was actually cleared; silent when there was
nothing to reset.
"""
exc_name = type(exc).__name__
# Conservative: reset only on subprocess-level failures. Pure
# rate-limit / capacity errors don't leave the session in a bad
# state — keep the session_id so the resumed turn preserves
# conversational continuity.
is_subprocess_error = (
exc_name in ("ProcessError", "CLIConnectionError")
or getattr(exc, "exit_code", None) is not None
or "exit code" in str(exc).lower()
)
if not is_subprocess_error:
return
if self._session_id is None:
return
logger.info(
"SDK session reset after %s: clearing session_id so the next "
"attempt starts fresh (fixes #75 session contamination)",
exc_name,
)
self._session_id = None
def _reset_session_for_context_overflow(self) -> None:
"""Hard session reset for a context-window overflow auto-heal.
Stronger than `_reset_session_after_error`: the bloated transcript
on disk is the *cause* of the overflow, so we both (a) clear
`self._session_id` (next `_build_options()` passes `resume=None`,
so the SDK boots a brand-new, empty session) AND (b) best-effort
purge the stale on-disk session transcripts so the oversized
history can never be accidentally resumed by a later boot.
The SDK stores per-session transcripts as
``~/.claude/projects/<project_key>/<session>.jsonl`` (honoring
``CLAUDE_CONFIG_DIR`` when set). We don't have a cheap
project_key→path mapping here, so we purge ALL ``*.jsonl`` session
files under the projects tree — this workspace runs exactly one
agent, so there is no other agent's session to protect, and a
fresh boot simply re-creates the dir. Bounded + best-effort: any
filesystem error is swallowed (the in-memory `resume=None` reset
alone is sufficient to recover; the disk purge is belt-and-braces
so a future explicit-resume path can't reach the bloated file).
"""
self._session_id = None
config_dir = os.environ.get("CLAUDE_CONFIG_DIR") or os.path.join(
os.path.expanduser("~"), ".claude"
)
projects_root = os.path.join(config_dir, "projects")
if not os.path.isdir(projects_root):
return
purged = 0
try:
for jsonl in glob.glob(
os.path.join(projects_root, "**", "*.jsonl"), recursive=True
):
try:
os.remove(jsonl)
purged += 1
except OSError:
# A single un-removable file (perm drift, race) must
# not abort the heal — the resume=None reset already
# guarantees a fresh session next turn.
continue
except Exception:
logger.exception(
"context-overflow heal: session-transcript purge raised "
"(resume=None reset still in effect — recovery proceeds)"
)
return
if purged:
logger.info(
"context-overflow heal: purged %d stale session transcript(s) "
"under %s",
purged,
projects_root,
)
async def _notify_context_overflow_heal(self, detail: str) -> None:
"""Best-effort operator-visible signal that an auto-heal fired.
The ERROR log is the durable record; this posts a one-line
agent_log activity row so the canvas's live feed shows the heal in
real time instead of only surfacing it in container logs. Mirrors
`_report_tool_use`: lazy imports, short timeout, every failure
swallowed — telemetry must never break (or block) the heal+retry.
Deliberately a chat-feed signal, NOT a runtime wedge: the workspace
self-recovers on the very next (retried) turn, so flipping it to
`degraded` would be a false alarm the operator can't action.
"""
try:
import httpx
from molecule_runtime.a2a_client import PLATFORM_URL, WORKSPACE_ID
from molecule_runtime.platform_auth import auth_headers
except Exception:
return
try:
summary = (
"♻️ Auto-heal: context window overflowed — reset session and "
"retried on a fresh session. "
f"({detail[:120]})"
)
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
json={
"activity_type": "agent_log",
"source_id": WORKSPACE_ID,
"target_id": WORKSPACE_ID,
"summary": summary[:300],
"status": "warning",
"method": "context_overflow_autoheal",
},
headers=auth_headers(),
)
except Exception:
return
def _reset_session_for_stuck_mcp(self) -> None:
"""Session reset for a stuck-MCP auto-heal.
Unlike the context-overflow reset, the bloated on-disk transcript is
NOT the cause here — the cause is a slow/failed MCP handshake on this
turn's subprocess. So we only clear `self._session_id` (next turn boots
a fresh, non-resumed session) and DON'T purge transcripts: a resumed
session can re-poison the tool list (the platform server connects
after init either way), but the durable fix is the readiness gate in
`_run_query_gated`; clearing the session id just guarantees the retry
starts from a clean, un-resumed state so nothing about a stale session
interferes with the gate.
"""
if self._session_id is not None:
logger.info(
"stuck-MCP heal: clearing session_id so the retry starts on a "
"fresh, un-resumed session"
)
self._session_id = None
async def _notify_stuck_mcp_heal(self, server: str, status: str) -> None:
"""Best-effort operator-visible signal that a stuck-MCP heal fired.
Mirrors `_notify_context_overflow_heal`: lazy imports, short timeout,
every failure swallowed. NOT a runtime wedge — the workspace recovers
on the retried (readiness-gated) turn, so `degraded` would be a false
alarm.
"""
try:
import httpx
from molecule_runtime.a2a_client import PLATFORM_URL, WORKSPACE_ID
from molecule_runtime.platform_auth import auth_headers
except Exception:
return
try:
summary = (
f"♻️ Auto-heal: MCP server '{server}' was not ready "
f"(status={status}) — reset session and retried with a "
"readiness gate so its tools are live."
)
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
json={
"activity_type": "agent_log",
"source_id": WORKSPACE_ID,
"target_id": WORKSPACE_ID,
"summary": summary[:300],
"status": "warning",
"method": "stuck_mcp_autoheal",
},
headers=auth_headers(),
)
except Exception:
return
async def _execute_locked(self, user_input: str) -> str:
"""Body of execute() that runs under the run lock.
Retries transient errors (rate limits, capacity, exit-code-1) up to
_MAX_RETRIES times with exponential backoff (5s, 10s, 20s).
"""
# Keep a clean copy of the user's actual message for the memory record,
# BEFORE any delegation or memory injection.
original_input = user_input
logger.debug("SDK execute [claude-code]: %s", user_input[:200])
prompt = self._prepare_prompt(user_input)
response_text: str = ""
tool_uses_for_turn: list[str] = []
try:
# set_current_task INSIDE the try so active_tasks is always
# decremented by the finally block even if CancelledError hits
# during the heartbeat HTTP push. Moving it outside the try
# created a narrow window where cancellation left active_tasks
# stuck at 1 forever, permanently blocking queue drain. (#2026)
await set_current_task(self.heartbeat, brief_summary(user_input))
prompt = await self._inject_memories_if_first_turn(prompt)
# Bound the context-overflow auto-heal to ONE reset per
# dispatch. The first overflow resets the session and retries
# on a fresh (empty) session; if THAT immediately overflows
# again, the prompt itself — not accumulated history — exceeds
# the window, which a reset cannot fix. Surface a hard error
# instead of looping the reset forever.
overflow_healed = False
# Bound the stuck-MCP auto-heal to `_MCP_HEAL_MAX_RETRIES` resets
# per dispatch. Each _McpNotReadyError resets the session and
# retries on a fresh subprocess (an independent MCP-connect
# attempt), so an INTERMITTENT connect failure usually clears on
# the next try; only after exhausting the retries do we treat the
# server as genuinely broken and surface a hard error. (Unlike the
# overflow heal, where one reset is definitive, MCP connects are
# probabilistic under load, so a couple of fresh attempts is the
# right bound.)
mcp_heals = 0
for attempt in range(_MAX_RETRIES):
options = self._build_options()
try:
result = await self._run_query(prompt=prompt, options=options)
if result.session_id:
self._session_id = result.session_id
response_text = result.text
tool_uses_for_turn = result.tool_uses
break # success
except Exception as exc:
formatted = _format_process_error(exc)
# #75: CLI subprocess crashes leave our _session_id
# referencing a session the next subprocess can't
# resume. Without this reset the next attempt would
# crash identically even when the underlying cause
# was transient, cascading into "crashed once →
# crashes forever until container restart." Clear
# the session_id so the next attempt (retry or
# next user turn) starts fresh.
self._reset_session_after_error(exc)
# --- Context-overflow auto-heal (the Kimi wedge) ---
# If the error is a context-window overflow, the
# accumulated session transcript has grown past the
# model's window: resuming it makes EVERY future
# dispatch overflow identically (agent stuck forever
# until a manual restart). Heal it in-band: reset the
# session (resume=None + purge the bloated transcript)
# and retry ONCE on a fresh, empty session.
#
# Bounded by `overflow_healed`: at most one reset per
# dispatch. A second overflow after a fresh-session
# reset means the single prompt is itself too large
# (not history) — a reset can't fix that, so we fall
# through to the terminal error path below.
#
# Loud + observable per the fail-loud SOP: ERROR-level
# structured log on detect, plus a best-effort
# operator notification. NOT a runtime wedge — a wedge
# means "only a restart recovers"; this self-recovers,
# so flipping the workspace to degraded would be a
# false alarm.
if not overflow_healed and _is_context_overflow(formatted):
overflow_healed = True
logger.error(
"auto-heal: session reset on context-overflow "
"[claude-code] (attempt %d/%d) — model=%s, "
"resetting session + retrying once on a fresh "
"session: %s",
attempt + 1, _MAX_RETRIES, self.model,
formatted[:200],
)
self._reset_session_for_context_overflow()
await self._notify_context_overflow_heal(formatted)
# RE-INJECT durable memory into the fresh session.
# `prompt` was built at the top of this dispatch when a
# session existed, so `_inject_memories_if_first_turn`
# returned it WITHOUT the recalled memory snapshot. The
# heal just reset `_session_id` to None, so the retry
# runs on a brand-new session — without this re-inject it
# starts amnesiac (the "agent forgets everything, even
# things I asked it to remember, after an overflow reset"
# bug: the durable commit_memory store is never re-fed
# into the post-reset session). Rebuild the prompt from
# the clean input so the fresh session gets the recalled
# memory back. (System-prompt memory snapshot files —
# MEMORY.md/CLAUDE.md — already ride along via `options`,
# but the dynamic recall prefix does not; this restores
# it.)
prompt = await self._inject_memories_if_first_turn(
self._prepare_prompt(original_input)
)
# No backoff: the fresh session is independent of
# any upstream rate state; retry immediately.
continue
# --- Stuck-MCP auto-heal (the concierge "lost its
# platform tools" bug) ---
# The readiness-gated query path raises _McpNotReadyError
# when a config-declared MCP server (e.g. `platform`) never
# reached `connected` within the bounded poll window — its
# tools would be hidden from the LLM. Heal in-band: reset
# the session and retry ONCE (the retry re-runs the gate,
# which gives the slow `node` handshake another, full poll
# window from a clean subprocess). Bounded by `mcp_healed`:
# a second not-ready after a reset means the server is
# genuinely broken, not just slow → fall through to the
# terminal error path. Loud + observable, NOT a wedge.
if isinstance(exc, _McpNotReadyError) and mcp_heals < _MCP_HEAL_MAX_RETRIES:
mcp_heals += 1
logger.error(
"auto-heal: MCP server %r not ready (status=%s) "
"[claude-code] (heal %d/%d) — resetting session "
"+ retrying on a fresh subprocess with the "
"readiness gate",
exc.server, exc.status, mcp_heals,
_MCP_HEAL_MAX_RETRIES,
)
self._reset_session_for_stuck_mcp()
await self._notify_stuck_mcp_heal(exc.server, exc.status)
# No backoff: the retry's readiness gate handles the
# wait; retry immediately.
continue
if isinstance(exc, _McpNotReadyError):
# Exhausted the heal retries: every fresh subprocess
# failed to connect the server. It's broken, not just
# slow — hard, loud, operator-actionable error. Do NOT
# fall into the transient-retry branch.
logger.error(
"stuck-MCP auto-heal exhausted [claude-code]: MCP "
"server %r still not ready (status=%s) after %d "
"fresh-session attempts — the server is failing to "
"connect, not merely slow; check the MCP server "
"command/env",
exc.server, exc.status, _MCP_HEAL_MAX_RETRIES + 1,
)
response_text = sanitize_agent_error(
exc=exc, stderr=error_detail_for_external(exc)
)
break
# A context overflow that survives a heal (overflow_healed
# already True) must go straight to the terminal error
# path — NOT the transient-retry branch below. The
# overflow text ("token limit …") also matches the broad
# `_RETRYABLE_PATTERNS` ("limit"), so without this guard
# the loop would back-off-and-retry a third time, which
# (a) re-overflows identically and (b) defeats the
# one-reset-per-dispatch cap. The single prompt is too
# big; backoff cannot shrink it.
is_unhealable_overflow = _is_context_overflow(formatted)
if (
not is_unhealable_overflow
and attempt < _MAX_RETRIES - 1
and self._is_retryable(exc)
):
delay = _BASE_RETRY_DELAY_S * (2 ** attempt)
logger.warning(
"SDK agent [claude-code] transient error (attempt %d/%d), "
"retrying in %ds: %s",
attempt + 1, _MAX_RETRIES, delay, formatted,
)
await asyncio.sleep(delay)
continue
# Non-retryable or exhausted retries. Log exit_code +
# stderr explicitly (fixes #66) so operators don't have
# to reproduce the crash manually to find out why the
# subprocess died.
if overflow_healed and _is_context_overflow(formatted):
# A context overflow that survived a fresh-session
# reset: the single prompt (this one message + its
# injected memory/delegation context) is itself
# larger than the model's window. A session reset
# cannot shrink one oversized request — surface a
# hard, operator-actionable error rather than
# looping the reset.
logger.error(
"context-overflow auto-heal exhausted [claude-code]: "
"the request still overflows on a FRESH session, so "
"the single prompt exceeds the model window "
"(model=%s) — not a stale-history problem; shrink the "
"input or raise the model's context window: %s",
self.model, formatted[:200],
)
logger.error("SDK agent error [claude-code]: %s", formatted)
logger.exception("SDK agent error [claude-code] — full traceback follows")
# Detect the specific claude_agent_sdk init-wedge case
# so the heartbeat task can flip the workspace to
# `degraded`. Match on the lowercased formatted error;
# `formatted` is whatever _format_process_error built,
# which already includes both the message and the
# exception class name.
formatted_lc = formatted.lower()
for pat in _WEDGE_ERROR_PATTERNS:
if pat in formatted_lc:
_mark_sdk_wedged(
f"claude_agent_sdk wedge: {formatted[:200]} — restart workspace to recover"
)
break
response_text = sanitize_agent_error(
exc=exc, stderr=error_detail_for_external(exc)
)
break
finally:
await set_current_task(self.heartbeat, "")
await commit_memory(
f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}"
)
# Auto-push unpushed commits and open PR (non-blocking, best-effort).
await auto_push_hook()
# If the agent produced no text but did call tools, surface a brief
# summary of which tools were used instead of the bare
# "(no response generated)" sentinel. Common case: autonomous-tick
# ticks that only do delegate_task_async / send_message_to_user with
# no final text block. Canvas users seeing "(no response generated)"
# under a fired schedule have no signal that work actually happened;
# the tool list makes that visible.
if not response_text and tool_uses_for_turn:
# Order-preserving de-dupe so e.g. 4× TaskCreate collapses to 1.
seen: set[str] = set()
unique: list[str] = []
for name in tool_uses_for_turn:
if name not in seen:
seen.add(name)
unique.append(name)
counts: dict[str, int] = {}
for name in tool_uses_for_turn:
counts[name] = counts.get(name, 0) + 1
tool_summary = ", ".join(
f"{name}×{counts[name]}" if counts[name] > 1 else name
for name in unique
)
return f"(no text reply — used tools: {tool_summary})"
return response_text or _NO_RESPONSE_MSG
async def cancel(self, context: RequestContext, event_queue: EventQueue):
"""Cooperatively cancel the currently running turn.
cancel() targets whatever turn is in flight *right now*, not the
specific turn the caller may have been looking at when they sent
the cancel request. If turn A has finished and turn B is already
running under the run lock by the time cancel arrives, turn B is
the one that gets aborted. This matches how a "stop" button in a
chat UI typically behaves (stop whatever is running) and is a
conscious trade-off against per-turn bookkeeping.
Implementation: the SDK's query() is an async generator; calling
aclose() raises GeneratorExit inside the running turn and unwinds
cleanly. We read `self._active_stream` into a local BEFORE calling
aclose so the reference can't be reassigned by another turn
mid-cancel. Best-effort — if no stream is active (cancel arrived
between turns, or the stream has no aclose), this is a no-op.
"""
stream = self._active_stream
if stream is None:
return
aclose = getattr(stream, "aclose", None)
if aclose is None:
return
try:
await aclose()
except Exception:
logger.exception("SDK cancel: aclose() raised")