molecule-ai-workspace-runtime/molecule_runtime/executor_helpers.py
rabbitblood 18d904cfc1 fix: MCP server path resolution + absolute imports (2nd half of #507)
The a2a MCP subprocess was launched with a hard-coded /app/a2a_mcp_server.py
path that only existed in the legacy workspace-template layout. Current
templates copy adapter.py into /app but not the MCP server script, so
claude-code's mcp_servers={"a2a": ...} config spawned a non-existent file,
the server never registered any tools, and every agent reported that
search_memory / commit_memory / list_peers / delegate_task / send_message_to_user
were unavailable in the tool registry.

Surfaced this cycle after the CRLF hook fix (PR molecule-core#508 +
plugin repo's .gitattributes) unblocked the primary (no response generated)
symptom. Before that, agents crashed before the missing-MCP issue was
observable — the two bugs stacked.

Changes
-------
* executor_helpers._default_mcp_server_path: resolves the installed
  molecule_runtime.a2a_mcp_server module's __file__ so the path is
  always correct regardless of template layout. Legacy /app path kept
  as last-resort fallback for any old images still in rotation.
* a2a_mcp_server.py, a2a_tools.py, a2a_client.py: convert bare module
  imports (from a2a_tools import ...) to absolute
  (from molecule_runtime.a2a_tools import ...). Previously this worked
  only when main.py injected the package dir onto sys.path; the MCP
  subprocess doesn't go through main.py, so the bare imports would fail.
  Added a sys.path shim at the top of a2a_mcp_server.py so running as a
  standalone script (python path/to/a2a_mcp_server.py) still works —
  the subprocess can now locate the package root automatically.
* consolidation.py, heartbeat.py, main.py: same bare-to-absolute
  conversion for platform_auth imports (unblocks the same class of
  failure if any of these modules are imported from a non-main.py
  entrypoint in the future).

Verification
------------
Deployed the updated files into ws-8010dbd0 (PM) and ran an isolated
sdk.query() as agent user. SystemMessage.init.mcp_servers now reports
[{'name': 'a2a', 'status': 'connected'}] and the tools list includes
all 8 mcp__a2a__* entries:
  mcp__a2a__check_task_status, mcp__a2a__commit_memory,
  mcp__a2a__delegate_task, mcp__a2a__delegate_task_async,
  mcp__a2a__get_workspace_info, mcp__a2a__list_peers,
  mcp__a2a__recall_memory, mcp__a2a__send_message_to_user

Rolled the in-container hotfix across all 22 workspaces pending release
(docker cp the 4 changed files into each site-packages/molecule_runtime/).

Fixes Molecule-AI/molecule-core#507 (secondary)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 13:28:57 -07:00

419 lines
16 KiB
Python

"""Shared helpers for AgentExecutor implementations.
Used by both CLIAgentExecutor (codex, ollama) and ClaudeSDKExecutor (claude-code).
Provides:
- Memory recall/commit (HTTP to platform /memories endpoints)
- Delegation results consumption (atomic file rename)
- Current task heartbeat updates
- System prompt loading from /configs
- A2A instructions text for system prompt injection (MCP and CLI variants)
- Brief task summary extraction (markdown-aware)
- Error message sanitization (exception classes and subprocess categories)
- Shared workspace path constants and the MCP server path resolver
"""
from __future__ import annotations
import json
import logging
import os
import re
from pathlib import Path
from typing import TYPE_CHECKING, Any
import httpx
if TYPE_CHECKING:
from heartbeat import HeartbeatLoop
logger = logging.getLogger(__name__)
# ========================================================================
# Constants — workspace container layout
# ========================================================================
WORKSPACE_MOUNT = "/workspace"
CONFIG_MOUNT = "/configs"
# Legacy template layout copied a2a_mcp_server.py into /app. Current
# templates don't — the script lives inside the installed runtime package.
# Kept as a last-resort fallback only.
LEGACY_MCP_SERVER_PATH = "/app/a2a_mcp_server.py"
DEFAULT_DELEGATION_RESULTS_FILE = "/tmp/delegation_results.jsonl"
PLATFORM_HTTP_TIMEOUT_S = 5.0
MEMORY_RECALL_LIMIT = 10
MEMORY_CONTENT_MAX_CHARS = 200
BRIEF_SUMMARY_MAX_LEN = 80
def _default_mcp_server_path() -> str:
"""Resolve the installed ``a2a_mcp_server.py`` path from the package.
Fix for the secondary half of #507: when agents started producing text
again (after the CRLF hook fix), the a2a MCP server failed to start
because the hard-coded ``/app/a2a_mcp_server.py`` doesn't exist in the
current workspace-template image — the template's Dockerfile copies
``adapter.py`` into /app but not the MCP server script. claude-code
then initialised with zero MCP tools, so every agent reported
"search_memory / commit_memory / list_peers / delegate_task not
available" on the first post-fix pulse.
Resolve the path from the package itself so it always points at the
real installed script, regardless of which template layout imported
the runtime. Legacy /app/ path kept only as last-resort fallback.
"""
try:
from molecule_runtime import a2a_mcp_server as _mcp_mod
path = getattr(_mcp_mod, "__file__", None)
if path and os.path.isfile(path):
return path
except Exception:
pass
return LEGACY_MCP_SERVER_PATH
def get_mcp_server_path() -> str:
"""Return the path to the stdio MCP server script.
Overridable via A2A_MCP_SERVER_PATH for tests and non-default layouts.
"""
return os.environ.get("A2A_MCP_SERVER_PATH", _default_mcp_server_path())
# ========================================================================
# HTTP client (shared, lazily initialised)
# ========================================================================
_http_client: httpx.AsyncClient | None = None
def get_http_client() -> httpx.AsyncClient:
"""Lazy-init a shared httpx client for platform API calls."""
global _http_client
if _http_client is None or _http_client.is_closed:
_http_client = httpx.AsyncClient(timeout=PLATFORM_HTTP_TIMEOUT_S)
return _http_client
def reset_http_client_for_tests() -> None:
"""Test helper — drop the shared client so the next call rebuilds it.
Not for production use. Exposed so tests can guarantee a clean slate
between cases without touching module internals.
"""
global _http_client
_http_client = None
# ========================================================================
# Memory recall + commit
# ========================================================================
async def recall_memories() -> str:
"""Recall recent memories from the platform API.
Returns a newline-joined bullet list of up to MEMORY_RECALL_LIMIT most recent
memories, or empty string when the platform is unreachable / not configured
/ returns a non-200 / returns an unexpected payload shape.
"""
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url:
return ""
# Fix E (Cycle 5): send auth headers so the WorkspaceAuth middleware
# (Fix A) allows access once the workspace has a live token on file.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try:
resp = await get_http_client().get(
f"{platform_url}/workspaces/{workspace_id}/memories",
headers=_auth,
)
if not 200 <= resp.status_code < 300:
logger.debug(
"recall_memories: non-2xx response %s from platform",
resp.status_code,
)
return ""
data = resp.json()
except Exception as exc:
logger.debug("recall_memories: request failed: %s", exc)
return ""
if not isinstance(data, list) or not data:
return ""
lines = [
f"- [{m.get('scope', '?')}] {m.get('content', '')}"
for m in data[-MEMORY_RECALL_LIMIT:]
]
return "\n".join(lines)
async def commit_memory(content: str) -> None:
"""Save a memory to the platform API. Best-effort, no error propagation."""
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url or not content:
return
# Fix E (Cycle 5): include auth header so WorkspaceAuth middleware allows access.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try:
await get_http_client().post(
f"{platform_url}/workspaces/{workspace_id}/memories",
json={"content": content, "scope": "LOCAL"},
headers=_auth,
)
except Exception as exc:
logger.debug("commit_memory: request failed: %s", exc)
# ========================================================================
# Delegation results — written by heartbeat loop, consumed atomically
# ========================================================================
def read_delegation_results() -> str:
"""Read and consume delegation results written by the heartbeat loop.
Uses atomic rename to prevent races with the heartbeat writer.
Returns formatted text suitable for prompt injection, or empty string.
"""
results_file = Path(
os.environ.get("DELEGATION_RESULTS_FILE", DEFAULT_DELEGATION_RESULTS_FILE)
)
if not results_file.exists():
return ""
consumed = results_file.with_suffix(".consumed")
try:
results_file.rename(consumed)
except OSError:
return "" # File disappeared between exists() and rename()
try:
raw = consumed.read_text(encoding="utf-8", errors="replace")
except OSError:
return ""
finally:
consumed.unlink(missing_ok=True)
parts: list[str] = []
for line in raw.strip().split("\n"):
if not line.strip():
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
continue
status = record.get("status", "?")
summary = record.get("summary", "")
preview = record.get("response_preview", "")
parts.append(f"- [{status}] {summary}")
if preview:
parts.append(f" Response: {preview[:200]}")
return "\n".join(parts)
# ========================================================================
# Current task heartbeat update
# ========================================================================
async def set_current_task(heartbeat: "HeartbeatLoop | None", task: str) -> None:
"""Update current task on heartbeat and push immediately via platform API."""
if heartbeat is not None:
heartbeat.current_task = task
heartbeat.active_tasks = 1 if task else 0
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if not (workspace_id and platform_url):
return
try:
try:
from platform_auth import auth_headers as _auth
_headers = _auth()
except Exception:
_headers = {}
await get_http_client().post(
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"current_task": task,
"active_tasks": 1 if task else 0,
"error_rate": 0,
"sample_error": "",
"uptime_seconds": 0,
},
headers=_headers,
)
except Exception as exc:
logger.debug("set_current_task: heartbeat push failed: %s", exc)
# ========================================================================
# System prompt loading
# ========================================================================
def get_system_prompt(config_path: str, fallback: str | None = None) -> str | None:
"""Read system-prompt.md from the config dir each call (supports hot-reload).
Falls back to the provided string if the file doesn't exist.
"""
prompt_file = Path(config_path) / "system-prompt.md"
if prompt_file.exists():
return prompt_file.read_text(encoding="utf-8", errors="replace").strip()
return fallback
_A2A_INSTRUCTIONS_MCP = """## Inter-Agent Communication
You have MCP tools for communicating with other workspaces:
- list_peers: discover available peer workspaces (name, ID, status, role)
- delegate_task: send a task and WAIT for the response (for quick tasks)
- delegate_task_async: send a task and return immediately with a task_id (for long tasks)
- check_task_status: poll an async task's status and get results when done
- get_workspace_info: get your own workspace info
For quick questions, use delegate_task (synchronous).
For long-running work (building pages, running audits), use delegate_task_async + check_task_status.
Always use list_peers first to discover available workspace IDs.
Access control is enforced — you can only reach siblings and parent/children.
PROACTIVE MESSAGING: Use send_message_to_user to push messages to the user's chat at ANY time:
- Acknowledge tasks immediately: "Got it, delegating to the team now..."
- Send progress updates during long work: "Research Lead finished, waiting on Dev Lead..."
- Deliver follow-up results: "All teams reported back. Here's the synthesis: ..."
This lets you respond quickly ("I'll work on this") and come back later with results.
If delegate_task returns a DELEGATION FAILED message, do NOT forward the raw error to the user.
Instead: (1) try delegating to a different peer, (2) handle the task yourself, or
(3) tell the user which peer is unavailable and provide your own best answer."""
_A2A_INSTRUCTIONS_CLI = """## Inter-Agent Communication
You can delegate tasks to other workspaces using the a2a command:
python3 /app/a2a_cli.py peers # List available peers
python3 /app/a2a_cli.py delegate <workspace_id> <task> # Sync: wait for response
python3 /app/a2a_cli.py delegate --async <workspace_id> <task> # Async: return task_id
python3 /app/a2a_cli.py status <workspace_id> <task_id> # Check async task
python3 /app/a2a_cli.py info # Your workspace info
For quick questions, use sync delegate. For long tasks, use --async + status.
Only delegate to peers listed by the peers command (access control enforced)."""
def get_a2a_instructions(mcp: bool = True) -> str:
"""Return inter-agent communication instructions for system-prompt injection.
Pass `mcp=True` (default) for MCP-capable runtimes (Claude Code via SDK,
Codex). Pass `mcp=False` for CLI-only runtimes (Ollama, custom) that have
to call a2a_cli.py as a subprocess.
"""
return _A2A_INSTRUCTIONS_MCP if mcp else _A2A_INSTRUCTIONS_CLI
# ========================================================================
# Misc text helpers
# ========================================================================
_MARKDOWN_FENCE = "```"
_MARKDOWN_HR = "---"
_BRIEF_SUMMARY_MIN_LEN = 4 # 1 char + 3-char ellipsis
def brief_summary(text: str, max_len: int = BRIEF_SUMMARY_MAX_LEN) -> str:
"""Extract a one-line task summary for the canvas card display.
Strips markdown headers (#, ##, ###), bold/italic markers (**, __),
and skips code fences and horizontal rules. Returns the first meaningful
line, truncated with an ellipsis when it exceeds `max_len`.
`max_len` is clamped to at least 4 (one real character plus a 3-char
ellipsis) so degenerate callers can't produce negative slice indices.
"""
max_len = max(max_len, _BRIEF_SUMMARY_MIN_LEN)
for raw_line in text.split("\n"):
line = raw_line.strip()
while line.startswith("#"):
line = line[1:]
line = line.strip()
if not line or line.startswith(_MARKDOWN_FENCE) or line == _MARKDOWN_HR:
continue
line = line.replace("**", "").replace("__", "")
if len(line) > max_len:
return line[: max_len - 3] + "..."
return line
return text[:max_len]
def extract_message_text(message: Any) -> str:
"""Extract text from an A2A message (handles both .text and .root.text patterns)."""
parts = getattr(message, "parts", None) or []
text_parts: list[str] = []
for part in parts:
text = getattr(part, "text", None)
if text:
text_parts.append(text)
continue
root = getattr(part, "root", None)
if root is not None:
root_text = getattr(root, "text", None)
if root_text:
text_parts.append(root_text)
return " ".join(text_parts).strip()
# Word-boundary patterns for subprocess stderr classification. Using word
# boundaries avoids false positives like "author" matching "auth" or
# "generate" matching "rate".
_RATE_LIMIT_RE = re.compile(r"\brate\b|\b429\b|\boverloaded\b", re.IGNORECASE)
_AUTH_RE = re.compile(r"\bauth(?:entication|orization)?\b|\bapi[_-]?key\b", re.IGNORECASE)
_SESSION_RE = re.compile(r"\bsession\b|\bno conversation found\b", re.IGNORECASE)
def classify_subprocess_error(stderr_text: str, exit_code: int | None) -> str:
"""Map a subprocess stderr blob to a short, user-safe category tag.
The full stderr goes to the workspace logs via `logger.error`; only the
category is surfaced to the user to avoid leaking tokens, internal paths,
or stack traces in the chat UI. Used with `sanitize_agent_error` to
produce a user-facing message for subprocess failures.
"""
if _RATE_LIMIT_RE.search(stderr_text):
return "rate_limited"
if _AUTH_RE.search(stderr_text):
return "auth_failed"
if _SESSION_RE.search(stderr_text):
return "session_error"
if exit_code is not None and exit_code != 0:
return f"exit_{exit_code}"
return "subprocess_error"
def sanitize_agent_error(
exc: BaseException | None = None,
category: str | None = None,
) -> str:
"""Render an agent-side failure into a user-safe error message.
Either pass an exception (class name is used as the tag) or an explicit
category string (e.g. from `classify_subprocess_error`). If both are
given, `category` wins. If neither, the tag defaults to "unknown".
The message body is deliberately dropped — exception messages and
subprocess stderr frequently leak stack traces, paths, tokens, and
API keys. Full detail is available in the workspace logs via
`logger.exception()` / `logger.error()`.
"""
if category:
tag = category
elif exc is not None:
tag = type(exc).__name__
else:
tag = "unknown"
return f"Agent error ({tag}) — see workspace logs for details."