forked from molecule-ai/molecule-core
chore(workspace): drop cli_executor — Phase 3 of #87 (DRAFT, blocked on gemini-cli image rebuild)
DRAFT — do NOT merge until gemini-cli template image rebuilds with its local cli_executor.py copy (template PR #9 just merged at 07:59 UTC; image build kicks off now). Final adapter-specific deletion from molecule-runtime, completing #87 for the priority adapters (claude-code via PR #2156, plus gemini-cli via this PR + template #9). Deletes: - workspace/cli_executor.py (461 LOC) — CLIAgentExecutor + the RUNTIME_PRESETS dict for codex / ollama / gemini-cli. The file moved to molecule-ai-workspace-template-gemini-cli (PR #9, merged). - workspace/tests/test_agent_base_urls.py — only consumer of CLIAgentExecutor in the test suite. Tests for the executor behavior live in the template repo now. Updates: - workspace/tests/test_executor_helpers.py — docstring refresh: executor_helpers.py is the runtime-agnostic shared helpers; the executor classes themselves live in template repos post-#87. Codex / ollama presets disappear naturally with the file. They never had template repos, so no production path could invoke them anyway — this is dead-code removal as a side effect of the move. Verified-safe-to-delete: - heartbeat.py: doesn't import cli_executor - claude_sdk_executor.py: deleted by PR #2156 (in flight) - preflight.py: only references runtime names by string; no import - main.py: doesn't import cli_executor (uses adapter discovery via ADAPTER_MODULE; the template's adapter constructs the executor) - Only test_agent_base_urls.py + test_executor_helpers.py docstring referenced cli_executor Verification: - 1249/1249 workspace pytest pass (was 1251; -2 = test_agent_base_urls.py cases — exact match) - No live import of cli_executor anywhere in molecule-core after deletion (grep verified) Sequencing: 1. ✅ Template PR #9 (gemini-cli local copy) — MERGED 2. ⏳ Template image rebuild — running 3. THIS PR — wait until image is published, then mark ready-for-review Closes #87 for the priority adapters: workspace/ is now adapter- agnostic except for adapter discovery (ADAPTER_MODULE) + the runtime_wedge primitive. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4e6030d783
commit
98ca5c50fa
@ -1,465 +0,0 @@
|
||||
"""CLI-based agent executor for A2A protocol.
|
||||
|
||||
Supports CLI agents that accept a prompt and output a response:
|
||||
- OpenAI Codex: codex --print -p "..."
|
||||
- Ollama: ollama run <model> "..."
|
||||
- Custom: any command that reads stdin or accepts -p
|
||||
|
||||
NOTE: the `claude-code` runtime no longer routes here — its template
|
||||
repo (molecule-ai-workspace-template-claude-code) ships its own
|
||||
ClaudeSDKExecutor wrapping the claude-agent-sdk Python package as of
|
||||
#87 Phase 2. This executor is reserved for CLI-only runtimes that
|
||||
don't yet have a programmatic SDK integration.
|
||||
|
||||
The runtime is selected via config.yaml:
|
||||
runtime: codex | ollama | custom
|
||||
runtime_config:
|
||||
command: "codex" # for custom
|
||||
args: ["--extra-flag"] # additional CLI args
|
||||
auth_token_env: "OPENAI_API_KEY"
|
||||
auth_token_file: ".auth-token"
|
||||
timeout: 300
|
||||
model: "sonnet"
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import atexit
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from a2a.server.agent_execution import AgentExecutor, RequestContext
|
||||
from a2a.server.events import EventQueue
|
||||
# KI-009: a2a-sdk v1 renames a2a.utils → a2a.helpers
|
||||
from a2a.helpers import new_agent_text_message
|
||||
|
||||
from config import RuntimeConfig
|
||||
from executor_helpers import (
|
||||
CONFIG_MOUNT,
|
||||
MEMORY_CONTENT_MAX_CHARS,
|
||||
WORKSPACE_MOUNT,
|
||||
brief_summary,
|
||||
classify_subprocess_error,
|
||||
commit_memory,
|
||||
extract_message_text,
|
||||
get_a2a_instructions,
|
||||
get_mcp_server_path,
|
||||
get_system_prompt,
|
||||
read_delegation_results,
|
||||
recall_memories,
|
||||
sanitize_agent_error,
|
||||
set_current_task,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Built-in runtime presets.
|
||||
# The `claude-code` runtime uses ClaudeSDKExecutor in its own template
|
||||
# repo (post-#87 Phase 2) and intentionally has no entry here.
|
||||
RUNTIME_PRESETS: dict[str, dict] = {
|
||||
"codex": {
|
||||
"command": "codex",
|
||||
"base_args": ["--print", "--dangerously-skip-permissions"],
|
||||
"prompt_flag": "-p",
|
||||
"model_flag": "--model",
|
||||
"system_prompt_flag": "--system-prompt",
|
||||
"auth_pattern": "env", # uses OPENAI_API_KEY env var
|
||||
"default_auth_env": "OPENAI_API_KEY",
|
||||
"default_auth_file": "",
|
||||
},
|
||||
"ollama": {
|
||||
"command": "ollama",
|
||||
"base_args": ["run"],
|
||||
"prompt_flag": None, # prompt is positional
|
||||
"model_flag": None, # model is positional after "run"
|
||||
"system_prompt_flag": "--system",
|
||||
"auth_pattern": None, # no auth needed
|
||||
"default_auth_env": "",
|
||||
"default_auth_file": "",
|
||||
},
|
||||
# Gemini CLI (github.com/google-gemini/gemini-cli, Apache 2.0).
|
||||
# Auth via GEMINI_API_KEY env var; MCP is wired via ~/.gemini/settings.json
|
||||
# (not --mcp-config) — the adapter's setup() handles that step.
|
||||
# System prompt is seeded into GEMINI.md (equivalent of CLAUDE.md).
|
||||
"gemini-cli": {
|
||||
"command": "gemini",
|
||||
"base_args": ["--yolo"], # auto-approve all tool calls (non-interactive)
|
||||
"prompt_flag": "-p",
|
||||
"model_flag": "--model",
|
||||
"system_prompt_flag": None, # GEMINI.md used instead; seeded by adapter.setup()
|
||||
"auth_pattern": "env", # GEMINI_API_KEY; also enables A2A MCP instructions
|
||||
"default_auth_env": "GEMINI_API_KEY",
|
||||
"default_auth_file": "",
|
||||
"mcp_via_settings": True, # MCP injected into ~/.gemini/settings.json, not --mcp-config
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class CLIAgentExecutor(AgentExecutor):
|
||||
"""Executes agent tasks by invoking a CLI tool.
|
||||
|
||||
Works with any CLI agent that accepts a prompt and outputs text.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
runtime: str,
|
||||
runtime_config: RuntimeConfig,
|
||||
system_prompt: str | None = None,
|
||||
config_path: str = "/configs",
|
||||
heartbeat: "HeartbeatLoop | None" = None,
|
||||
):
|
||||
if runtime == "claude-code":
|
||||
# Defensive — the adapter should never construct a CLI executor
|
||||
# for claude-code. Fail loud rather than silently falling back.
|
||||
# The claude-code template owns its own ClaudeSDKExecutor in
|
||||
# molecule-ai-workspace-template-claude-code (post-#87 Phase 2).
|
||||
raise ValueError(
|
||||
"claude-code runtime is served by ClaudeSDKExecutor in its "
|
||||
"template repo, not CLIAgentExecutor. If you're seeing this "
|
||||
"in molecule-runtime, the adapter wiring is wrong."
|
||||
)
|
||||
self.runtime = runtime
|
||||
self.config = runtime_config
|
||||
self.system_prompt = system_prompt
|
||||
self.config_path = config_path
|
||||
self._heartbeat = heartbeat
|
||||
|
||||
# Resolve preset or use custom
|
||||
if runtime in RUNTIME_PRESETS:
|
||||
self.preset = RUNTIME_PRESETS[runtime]
|
||||
elif runtime == "custom":
|
||||
self.preset = {
|
||||
"command": runtime_config.command,
|
||||
"base_args": [], # args go in config.args, appended at end
|
||||
"prompt_flag": "-p",
|
||||
"model_flag": None,
|
||||
"system_prompt_flag": None,
|
||||
"auth_pattern": None,
|
||||
"default_auth_env": "",
|
||||
"default_auth_file": "",
|
||||
}
|
||||
else:
|
||||
raise ValueError(f"Unknown runtime: {runtime}. Use: {', '.join(RUNTIME_PRESETS.keys())}, custom")
|
||||
|
||||
# Resolve auth token
|
||||
self._auth_token = self._resolve_auth_token()
|
||||
self._auth_helper_path: str | None = None
|
||||
self._temp_files: list[str] = [] # Track temp files for cleanup
|
||||
|
||||
if self._auth_token and self.preset.get("auth_pattern") == "apiKeyHelper":
|
||||
self._auth_helper_path = self._create_auth_helper(self._auth_token)
|
||||
|
||||
# Create MCP config once (reuse across invocations)
|
||||
self._mcp_config_path: str | None = None
|
||||
if self.preset.get("auth_pattern") in ("apiKeyHelper", "env"):
|
||||
mcp_config = json.dumps({
|
||||
"mcpServers": {
|
||||
"a2a": {"command": sys.executable, "args": [get_mcp_server_path()]}
|
||||
}
|
||||
})
|
||||
fd, self._mcp_config_path = tempfile.mkstemp(suffix=".json", prefix="a2a-mcp-")
|
||||
self._temp_files.append(self._mcp_config_path) # Track immediately
|
||||
os.close(fd)
|
||||
with open(self._mcp_config_path, "w") as f:
|
||||
f.write(mcp_config)
|
||||
|
||||
# Register cleanup for reliable temp file removal (atexit is more reliable than __del__)
|
||||
atexit.register(self._cleanup_temp_files)
|
||||
|
||||
# Verify command exists
|
||||
cmd = self.config.command or self.preset["command"]
|
||||
if not shutil.which(cmd):
|
||||
logger.warning(f"CLI command '{cmd}' not found in PATH")
|
||||
|
||||
def _resolve_auth_token(self) -> str | None:
|
||||
"""Resolve auth token from env var or file.
|
||||
|
||||
Resolution order:
|
||||
1. required_env — first entry that exists in the environment
|
||||
2. auth_token_env (deprecated) — explicit env var name
|
||||
3. Preset default_auth_env — adapter-declared fallback
|
||||
4. auth_token_file (deprecated) — file on disk
|
||||
5. Preset default_auth_file — adapter-declared file fallback
|
||||
"""
|
||||
# 1. New path: required_env (first match wins)
|
||||
for env_name in (self.config.required_env or []):
|
||||
token = os.environ.get(env_name)
|
||||
if token:
|
||||
return token
|
||||
|
||||
# 2. Legacy: explicit env var from config
|
||||
env_name = self.config.auth_token_env or self.preset.get("default_auth_env", "")
|
||||
if env_name:
|
||||
token = os.environ.get(env_name)
|
||||
if token:
|
||||
return token
|
||||
|
||||
# 3. Legacy: token file from config
|
||||
file_name = self.config.auth_token_file or self.preset.get("default_auth_file", "")
|
||||
if file_name:
|
||||
token_path = Path(self.config_path) / file_name
|
||||
if token_path.exists():
|
||||
return token_path.read_text().strip()
|
||||
|
||||
return None
|
||||
|
||||
def _create_auth_helper(self, token: str) -> str:
|
||||
"""Create a shell script that outputs the auth token (for apiKeyHelper pattern)."""
|
||||
fd, helper_path = tempfile.mkstemp(suffix=".sh", prefix="agent-auth-")
|
||||
self._temp_files.append(helper_path) # Track immediately before any exception can leak
|
||||
os.close(fd)
|
||||
with open(helper_path, "w") as f:
|
||||
f.write(f"#!/bin/sh\necho {shlex.quote(token)}\n")
|
||||
os.chmod(helper_path, 0o700)
|
||||
return helper_path
|
||||
|
||||
def _build_command(self, message: str) -> list[str]:
|
||||
"""Build the full CLI command from preset + config + message."""
|
||||
cmd = self.config.command or self.preset["command"]
|
||||
args = list(self.preset.get("base_args", []))
|
||||
|
||||
# Model
|
||||
model = self.config.model or None
|
||||
model_flag = self.preset.get("model_flag")
|
||||
if model and model_flag:
|
||||
args.extend([model_flag, model])
|
||||
elif model and self.runtime == "ollama":
|
||||
# Ollama: model is positional after "run"
|
||||
args.append(model)
|
||||
|
||||
# System prompt (+ A2A instructions). The remaining CLI runtimes don't
|
||||
# support session resume, so we inject the system prompt on every call.
|
||||
system_prompt = get_system_prompt(self.config_path, fallback=self.system_prompt) or ""
|
||||
mcp_capable = self.preset.get("auth_pattern") in ("apiKeyHelper", "env")
|
||||
a2a_instructions = get_a2a_instructions(mcp=mcp_capable)
|
||||
if a2a_instructions:
|
||||
system_prompt = (
|
||||
f"{system_prompt}\n\n{a2a_instructions}" if system_prompt else a2a_instructions
|
||||
)
|
||||
system_flag = self.preset.get("system_prompt_flag")
|
||||
if system_prompt and system_flag:
|
||||
args.extend([system_flag, system_prompt])
|
||||
|
||||
# Auth (apiKeyHelper pattern — reserved for future CLI runtimes)
|
||||
if self._auth_helper_path and self.preset.get("auth_pattern") == "apiKeyHelper":
|
||||
settings = json.dumps({"apiKeyHelper": self._auth_helper_path})
|
||||
args.extend(["--settings", settings])
|
||||
|
||||
# A2A MCP server — inject for MCP-compatible runtimes (created once in __init__).
|
||||
# Runtimes that declare `mcp_via_settings: True` (e.g. gemini-cli) wire MCP
|
||||
# through their own settings file (adapter.setup()) instead of --mcp-config.
|
||||
if self._mcp_config_path and not self.preset.get("mcp_via_settings"):
|
||||
args.extend(["--mcp-config", self._mcp_config_path])
|
||||
|
||||
# Extra args from config (before prompt so flags are parsed correctly)
|
||||
args.extend(self.config.args)
|
||||
|
||||
# Prompt (must be last — some CLIs treat final arg as the prompt)
|
||||
prompt_flag = self.preset.get("prompt_flag")
|
||||
if prompt_flag:
|
||||
args.extend([prompt_flag, message])
|
||||
else:
|
||||
# Positional prompt (ollama)
|
||||
args.append(message)
|
||||
|
||||
return [cmd] + args
|
||||
|
||||
async def execute(self, context: RequestContext, event_queue: EventQueue):
|
||||
"""Execute a task by invoking the CLI agent."""
|
||||
user_input = extract_message_text(context.message)
|
||||
if not user_input:
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message("Error: message contained no text content.")
|
||||
)
|
||||
return
|
||||
|
||||
# Keep a clean copy of the user's actual message for memory BEFORE any
|
||||
# delegation or memory injection happens.
|
||||
original_input = user_input
|
||||
|
||||
logger.debug("CLI execute [%s]: %s", self.runtime, user_input[:200])
|
||||
|
||||
# Inject delegation results that arrived since last message
|
||||
delegation_context = read_delegation_results()
|
||||
if delegation_context:
|
||||
user_input = f"[Delegation results received while you were idle]\n{delegation_context}\n\n[New message]\n{user_input}"
|
||||
|
||||
try:
|
||||
# set_current_task INSIDE the try so active_tasks is always
|
||||
# decremented by the finally block even if CancelledError hits
|
||||
# during the heartbeat HTTP push. Moving it outside the try
|
||||
# created a window where cancellation left active_tasks stuck
|
||||
# at 1, permanently blocking queue drain. (#2026)
|
||||
await set_current_task(self._heartbeat, brief_summary(user_input))
|
||||
|
||||
# Auto-recall: inject prior memories into every prompt. (The CLI
|
||||
# runtimes don't keep a session, so there's no "first turn" concept.)
|
||||
memories = await recall_memories()
|
||||
if memories:
|
||||
user_input = f"[Prior context from memory]\n{memories}\n\n{user_input}"
|
||||
|
||||
await self._run_cli(user_input, event_queue)
|
||||
finally:
|
||||
await set_current_task(self._heartbeat, "")
|
||||
# Auto-commit: save the original user request (not the memory-injected version)
|
||||
await commit_memory(
|
||||
f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}"
|
||||
)
|
||||
|
||||
async def _run_cli(self, user_input: str, event_queue: EventQueue):
|
||||
"""Run the CLI subprocess and enqueue the result."""
|
||||
cmd = self._build_command(user_input)
|
||||
timeout = self.config.timeout or None # None = no timeout (wait until agent finishes)
|
||||
max_retries = 3
|
||||
base_delay = 5 # seconds
|
||||
|
||||
# Build env — pass through auth env var if using env pattern
|
||||
env = dict(os.environ)
|
||||
if self._auth_token and self.preset.get("auth_pattern") == "env":
|
||||
# Use first required_env entry, or fall back to legacy auth_token_env
|
||||
auth_env = (self.config.required_env or [None])[0] if self.config.required_env else None
|
||||
auth_env = auth_env or self.config.auth_token_env or self.preset.get("default_auth_env", "")
|
||||
if auth_env:
|
||||
env[auth_env] = self._auth_token
|
||||
|
||||
for attempt in range(max_retries):
|
||||
proc = None
|
||||
try:
|
||||
# Run in /workspace if it exists and has content (cloned repo),
|
||||
# otherwise /configs (agent config files)
|
||||
cwd = (
|
||||
WORKSPACE_MOUNT
|
||||
if os.path.isdir(WORKSPACE_MOUNT) and os.listdir(WORKSPACE_MOUNT)
|
||||
else CONFIG_MOUNT
|
||||
)
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
cwd=cwd,
|
||||
)
|
||||
if timeout:
|
||||
stdout, stderr = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=timeout
|
||||
)
|
||||
else:
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
stdout_text = stdout.decode().strip()
|
||||
stderr_text = stderr.decode().strip()
|
||||
|
||||
if proc.returncode != 0:
|
||||
logger.error("CLI agent [%s] exit=%d stdout=%s stderr=%s",
|
||||
self.runtime, proc.returncode,
|
||||
stdout_text[:200] if stdout_text else "(empty)",
|
||||
stderr_text[:500] if stderr_text else "(empty)")
|
||||
|
||||
if proc.returncode == 0 or stdout_text:
|
||||
# Success, or non-zero exit but produced output (some CLIs exit 1 with valid output)
|
||||
result = stdout_text
|
||||
if result:
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message(result)
|
||||
)
|
||||
return
|
||||
else:
|
||||
# Empty response — likely rate limited, retry with backoff
|
||||
if attempt < max_retries - 1:
|
||||
delay = base_delay * (2 ** attempt)
|
||||
logger.warning("CLI agent [%s] returned empty (attempt %d/%d), retrying in %ds",
|
||||
self.runtime, attempt + 1, max_retries, delay)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message("(no response generated after retries)")
|
||||
)
|
||||
return
|
||||
else:
|
||||
error_msg = stderr_text or f"Exit code {proc.returncode}"
|
||||
# Classify once — used both for retry policy and the
|
||||
# sanitized user-facing error message.
|
||||
category = classify_subprocess_error(error_msg, proc.returncode)
|
||||
if category in ("rate_limited", "session_error", "auth_failed") \
|
||||
and attempt < max_retries - 1:
|
||||
delay = base_delay * (2 ** attempt)
|
||||
logger.warning(
|
||||
"CLI agent [%s] %s (attempt %d/%d), retrying in %ds",
|
||||
self.runtime, category, attempt + 1, max_retries, delay,
|
||||
)
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
|
||||
# Log the full stderr (may contain paths/tokens); surface
|
||||
# only the sanitized category to the user.
|
||||
logger.error("CLI agent error [%s]: %s", self.runtime, error_msg[:500])
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message(sanitize_agent_error(category=category))
|
||||
)
|
||||
return
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("CLI agent timeout [%s] after %ds", self.runtime, timeout)
|
||||
if proc:
|
||||
# Kill and reap the process to prevent zombies
|
||||
try:
|
||||
proc.kill()
|
||||
except ProcessLookupError:
|
||||
pass # already exited
|
||||
except Exception as kill_err:
|
||||
logger.warning("CLI kill error: %s", kill_err)
|
||||
# Always await wait() to reap zombie, even if kill failed
|
||||
try:
|
||||
await asyncio.wait_for(proc.wait(), timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("CLI agent: proc.wait() also timed out — possible zombie")
|
||||
except Exception as wait_err:
|
||||
logger.warning("CLI wait error: %s", wait_err)
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message(sanitize_agent_error(category="timeout"))
|
||||
)
|
||||
return
|
||||
except Exception as exc:
|
||||
logger.exception("CLI agent exception [%s]", self.runtime)
|
||||
await event_queue.enqueue_event(
|
||||
new_agent_text_message(sanitize_agent_error(exc))
|
||||
)
|
||||
return
|
||||
|
||||
def _cleanup_temp_files(self): # pragma: no cover
|
||||
"""Clean up temp files. Called via atexit for reliable cleanup."""
|
||||
for f in self._temp_files:
|
||||
try:
|
||||
os.unlink(f)
|
||||
except OSError:
|
||||
pass
|
||||
if self._auth_helper_path:
|
||||
try:
|
||||
os.unlink(self._auth_helper_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def __del__(self): # pragma: no cover
|
||||
"""Clean up temp files (fallback — prefer atexit-registered _cleanup_temp_files)."""
|
||||
for f in getattr(self, "_temp_files", []):
|
||||
try:
|
||||
os.unlink(f)
|
||||
except OSError:
|
||||
pass
|
||||
if getattr(self, "_auth_helper_path", None):
|
||||
try:
|
||||
os.unlink(self._auth_helper_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
async def cancel(self, context: RequestContext, event_queue: EventQueue): # pragma: no cover
|
||||
"""Cancel a running task."""
|
||||
pass
|
||||
@ -1,6 +1,8 @@
|
||||
"""Shared helpers for AgentExecutor implementations.
|
||||
|
||||
Used by both CLIAgentExecutor (codex, ollama) and ClaudeSDKExecutor (claude-code).
|
||||
Used by adapter executors that live in template repos (claude-code,
|
||||
gemini-cli, etc.) post-#87 — this module stays in molecule-runtime
|
||||
because the helpers are runtime-agnostic, not adapter-specific.
|
||||
Provides:
|
||||
- Memory recall/commit (HTTP to platform /memories endpoints)
|
||||
- Delegation results consumption (atomic file rename)
|
||||
|
||||
@ -95,7 +95,9 @@ HEARTBEAT_INTERVAL = 30 # seconds
|
||||
MAX_CONSECUTIVE_FAILURES = 10
|
||||
MAX_SEEN_DELEGATION_IDS = 200
|
||||
SELF_MESSAGE_COOLDOWN = 60 # seconds — minimum between self-messages to prevent loops
|
||||
# Shared path — also used by cli_executor._read_delegation_results()
|
||||
# Shared path — adapter executors (in their template repos) read this
|
||||
# same file via executor_helpers.read_delegation_results so heartbeat-
|
||||
# delivered async delegation results land in the next agent turn.
|
||||
DELEGATION_RESULTS_FILE = os.environ.get("DELEGATION_RESULTS_FILE", "/tmp/delegation_results.jsonl")
|
||||
|
||||
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
"""Tests for explicit base URL support in model and CLI runtimes."""
|
||||
|
||||
import asyncio
|
||||
import importlib
|
||||
import sys
|
||||
from types import ModuleType, SimpleNamespace
|
||||
|
||||
from cli_executor import CLIAgentExecutor
|
||||
from config import RuntimeConfig
|
||||
|
||||
|
||||
def _install_agent_mocks(monkeypatch, chat_module_name: str, class_name: str, captured: dict):
|
||||
"""Install lightweight provider + langgraph mocks before importing agent.py."""
|
||||
|
||||
prebuilt_mod = ModuleType("langgraph.prebuilt")
|
||||
|
||||
def fake_create_react_agent(*, model, tools, prompt):
|
||||
captured["react_agent"] = {"model": model, "tools": tools, "prompt": prompt}
|
||||
return {"model": model, "tools": tools, "prompt": prompt}
|
||||
|
||||
prebuilt_mod.create_react_agent = fake_create_react_agent
|
||||
|
||||
langgraph_mod = ModuleType("langgraph")
|
||||
monkeypatch.setitem(sys.modules, "langgraph", langgraph_mod)
|
||||
monkeypatch.setitem(sys.modules, "langgraph.prebuilt", prebuilt_mod)
|
||||
|
||||
provider_mod = ModuleType(chat_module_name)
|
||||
|
||||
class FakeLLM:
|
||||
def __init__(self, **kwargs):
|
||||
captured["llm_kwargs"] = kwargs
|
||||
|
||||
setattr(provider_mod, class_name, FakeLLM)
|
||||
monkeypatch.setitem(sys.modules, chat_module_name, provider_mod)
|
||||
|
||||
|
||||
def test_create_agent_uses_anthropic_base_url(monkeypatch):
|
||||
"""Anthropic models should pass ANTHROPIC_BASE_URL through explicitly."""
|
||||
captured = {}
|
||||
_install_agent_mocks(monkeypatch, "langchain_anthropic", "ChatAnthropic", captured)
|
||||
monkeypatch.setenv("ANTHROPIC_BASE_URL", "https://anthropic.example/v1")
|
||||
|
||||
# Re-import after mocks so agent.py binds to our fake modules.
|
||||
sys.modules.pop("agent", None)
|
||||
agent_mod = importlib.import_module("agent")
|
||||
|
||||
agent_mod.create_agent("anthropic:claude-sonnet-4-6", [], "system prompt")
|
||||
|
||||
assert captured["llm_kwargs"]["model"] == "claude-sonnet-4-6"
|
||||
assert captured["llm_kwargs"]["anthropic_api_url"] == "https://anthropic.example/v1"
|
||||
|
||||
|
||||
def test_codex_runtime_preserves_openai_base_url(monkeypatch):
|
||||
"""Codex CLI runtime should pass OPENAI_BASE_URL into the subprocess env."""
|
||||
captured = {}
|
||||
|
||||
class FakeProc:
|
||||
returncode = 0
|
||||
|
||||
async def communicate(self):
|
||||
return (b"ok", b"")
|
||||
|
||||
async def fake_create_subprocess_exec(*cmd, **kwargs):
|
||||
captured["cmd"] = cmd
|
||||
captured["env"] = kwargs["env"]
|
||||
return FakeProc()
|
||||
|
||||
async def fake_set_current_task(_task: str):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
|
||||
monkeypatch.setenv("OPENAI_BASE_URL", "https://codex.example/v1")
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test")
|
||||
|
||||
executor = CLIAgentExecutor(
|
||||
runtime="codex",
|
||||
runtime_config=RuntimeConfig(model="gpt-5.4"),
|
||||
system_prompt="system prompt",
|
||||
config_path="/tmp",
|
||||
heartbeat=None,
|
||||
)
|
||||
executor._set_current_task = fake_set_current_task
|
||||
|
||||
class FakeQueue:
|
||||
def __init__(self):
|
||||
self.events = []
|
||||
|
||||
async def enqueue_event(self, event):
|
||||
self.events.append(event)
|
||||
|
||||
asyncio.run(executor._run_cli("hello", FakeQueue()))
|
||||
|
||||
assert captured["env"]["OPENAI_API_KEY"] == "sk-test"
|
||||
assert captured["env"]["OPENAI_BASE_URL"] == "https://codex.example/v1"
|
||||
@ -1,5 +1,7 @@
|
||||
"""Tests for executor_helpers.py — the shared helpers that back both
|
||||
CLIAgentExecutor (codex, ollama) and ClaudeSDKExecutor (claude-code).
|
||||
"""Tests for executor_helpers.py — the shared helpers that back the
|
||||
adapter executors. Post-#87 the executors live in template repos
|
||||
(claude-code, gemini-cli, etc.); this module stays in molecule-runtime
|
||||
because the helpers are runtime-agnostic.
|
||||
|
||||
Covers 100% of the public surface:
|
||||
- get_mcp_server_path
|
||||
|
||||
Loading…
Reference in New Issue
Block a user