Adds a configurable instruction injection system that prepends rules to every agent's system prompt. Instructions are stored in the DB and fetched at workspace startup, supporting three scopes: - Global: applies to all agents (e.g., "verify with tools before reporting") - Team: applies to agents in a specific team - Workspace: applies to a single agent (role-specific rules) Components: - Migration 040: platform_instructions table with scope hierarchy - Go API: CRUD endpoints + resolve endpoint that merges scopes - Python runtime: fetches instructions at startup via /instructions/resolve and prepends them to the system prompt as highest-priority context Initial global instructions seeded: 1. Verify Before Acting (check issues/PRs/docs first) 2. Verify Output Before Reporting (second signal before reporting done) 3. Tool Usage Requirements (claims must include tool output) 4. No Hallucinated Emergencies (CRITICAL needs proof) 5. Staging-First Workflow (never push to main directly) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
387 lines
16 KiB
Python
387 lines
16 KiB
Python
"""Base adapter interface for agent infrastructure providers."""
|
|
|
|
import logging
|
|
import os
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from a2a.server.agent_execution import AgentExecutor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SetupResult:
|
|
"""Result from the shared _common_setup() pipeline."""
|
|
system_prompt: str
|
|
loaded_skills: list # LoadedSkill instances
|
|
langchain_tools: list # LangChain BaseTool instances
|
|
is_coordinator: bool
|
|
children: list # child workspace dicts
|
|
|
|
|
|
@dataclass
|
|
class AdapterConfig:
|
|
"""Standardized config passed to every adapter."""
|
|
model: str # e.g. "anthropic:claude-sonnet-4-6" or "openrouter:google/gemini-2.5-flash"
|
|
system_prompt: str | None = None # Assembled system prompt text
|
|
tools: list[str] = field(default_factory=list) # Tool names from config.yaml
|
|
runtime_config: dict[str, Any] = field(default_factory=dict) # Raw runtime_config block
|
|
config_path: str = "/configs" # Path to configs directory
|
|
workspace_id: str = "" # Workspace identifier
|
|
prompt_files: list[str] = field(default_factory=list) # Ordered prompt file names
|
|
a2a_port: int = 8000 # Port for A2A server
|
|
heartbeat: Any = None # HeartbeatLoop instance
|
|
|
|
|
|
class BaseAdapter(ABC):
|
|
"""Interface every agent infrastructure adapter must implement.
|
|
|
|
To add a new agent infra:
|
|
1. Create workspace/adapters/<your_infra>/
|
|
2. Implement adapter.py with a class extending BaseAdapter
|
|
3. Add requirements.txt with your infra's dependencies
|
|
4. Export as Adapter in __init__.py
|
|
5. Submit a PR
|
|
"""
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def name() -> str: # pragma: no cover
|
|
"""Return the runtime identifier (e.g. 'langgraph', 'crewai').
|
|
This must match the 'runtime' field in config.yaml."""
|
|
...
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def display_name() -> str: # pragma: no cover
|
|
"""Human-readable name for UI display."""
|
|
...
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def description() -> str: # pragma: no cover
|
|
"""Short description of what this adapter provides."""
|
|
...
|
|
|
|
@staticmethod
|
|
def get_config_schema() -> dict:
|
|
"""Return JSON Schema for runtime_config fields this adapter supports.
|
|
Used by the Config tab UI to render the right form fields.
|
|
Override in subclasses for adapter-specific settings."""
|
|
return {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Plugin install hooks
|
|
# ------------------------------------------------------------------
|
|
# New pipeline: each plugin ships per-runtime adaptors resolved via
|
|
# `plugins_registry.resolve()`. Adapters expose hooks below that
|
|
# adaptors call to wire plugin content into the runtime.
|
|
#
|
|
# Default implementations are filesystem-only (write to /configs,
|
|
# append to CLAUDE.md). Runtimes with a dynamic tool registry
|
|
# (e.g. DeepAgents sub-agents) override the hooks to also register
|
|
# in-process state.
|
|
|
|
def memory_filename(self) -> str:
|
|
"""File under /configs that the runtime treats as long-lived memory.
|
|
|
|
Both Claude Code and DeepAgents read CLAUDE.md natively, so this is
|
|
the sensible default. Override only if a runtime expects a different
|
|
filename.
|
|
"""
|
|
return "CLAUDE.md"
|
|
|
|
def register_tool_hook(self, name: str, fn) -> None:
|
|
"""Default no-op. Override on runtimes with a dynamic tool registry.
|
|
|
|
Runtimes that pick tools up at startup via filesystem scan (Claude
|
|
Code reads /configs/skills, LangGraph globs **/*.py) don't need to
|
|
do anything here — the adaptor's file-write step is enough.
|
|
"""
|
|
return None
|
|
|
|
async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict:
|
|
"""Return live transcript entries for the most-recent agent session.
|
|
|
|
Default implementation returns ``supported: False`` for runtimes
|
|
that don't expose a per-session log on disk. Override in subclasses
|
|
that DO (Claude Code reads ``~/.claude/projects/<cwd>/<session>.jsonl``).
|
|
|
|
This is the "look over the agent's shoulder" feature — lets canvas /
|
|
operators see live tool calls + AI thinking instead of waiting for
|
|
the high-level activity log to flush.
|
|
|
|
Args:
|
|
since: line offset to skip — caller's last cursor (0 = from start)
|
|
limit: max lines to return (caller-side cap, default 100, max 1000)
|
|
|
|
Returns:
|
|
``{runtime, supported, lines, cursor, more, source}`` where
|
|
``cursor`` is the new offset to pass on the next poll, ``more``
|
|
is True if additional lines remain past ``limit``, and ``source``
|
|
is the file path lines were read from (useful for debugging).
|
|
"""
|
|
return {
|
|
"runtime": self.name(),
|
|
"supported": False,
|
|
"lines": [],
|
|
"cursor": since,
|
|
"more": False,
|
|
"source": None,
|
|
}
|
|
|
|
def pre_stop_state(self) -> dict:
|
|
"""Capture in-memory state for pause/resume serialization.
|
|
|
|
Called by main.py's shutdown handler just before the container exits.
|
|
Returns a dict that will be scrubbed (via lib.snapshot_scrub) and
|
|
written to /configs/.agent_snapshot.json.
|
|
|
|
Default implementation:
|
|
1. Attempts to read ``self._executor._session_id`` (set by
|
|
create_executor) and includes it as ``session_id``.
|
|
2. Includes up to 200 recent transcript lines via transcript_lines().
|
|
|
|
Override in adapters that hold additional in-memory state that
|
|
should survive a container stop.
|
|
|
|
Returns:
|
|
A JSON-serializable dict. All string values are scrubbed before
|
|
persisting, so it is safe to include raw content from the
|
|
agent's context.
|
|
"""
|
|
from lib.pre_stop import MAX_TRANSCRIPT_LINES
|
|
|
|
state: dict = {}
|
|
|
|
# Session handle — critical for resuming the Claude Code session.
|
|
executor = getattr(self, "_executor", None)
|
|
if executor is not None:
|
|
session_id = getattr(executor, "_session_id", None)
|
|
if session_id:
|
|
state["session_id"] = session_id
|
|
|
|
# Recent conversation log — captures where the agent left off.
|
|
# transcript_lines() may be async; call it synchronously if possible,
|
|
# otherwise let async adapters override pre_stop_state entirely.
|
|
try:
|
|
import inspect as _inspect
|
|
transcript_fn = self.transcript_lines
|
|
if _inspect.iscoroutinefunction(transcript_fn):
|
|
# Async adapter — override pre_stop_state() for transcript access.
|
|
# The base impl still captures session_id above.
|
|
pass
|
|
else:
|
|
transcript = transcript_fn(since=0, limit=MAX_TRANSCRIPT_LINES)
|
|
if transcript.get("supported"):
|
|
state["transcript_lines"] = transcript.get("lines", [])
|
|
except Exception:
|
|
# Best-effort: never let transcript capture failure block serialization.
|
|
pass
|
|
|
|
return state
|
|
|
|
def restore_state(self, snapshot: dict) -> None:
|
|
"""Restore in-memory state from a pause/resume snapshot.
|
|
|
|
Called by main.py on first boot when /configs/.agent_snapshot.json
|
|
exists. Gives the adapter a chance to restore session handles,
|
|
conversation context, or any other in-memory state before the A2A
|
|
server starts accepting requests.
|
|
|
|
Default implementation stores ``snapshot["session_id"]`` and
|
|
``snapshot["transcript_lines"]`` as ``self._snapshot_session_id``
|
|
and ``self._snapshot_transcript`` so that ``create_executor()`` or
|
|
the executor itself can pick them up.
|
|
|
|
Args:
|
|
snapshot: The scrubbed snapshot dict previously written by
|
|
pre_stop_state(). All secrets have already been redacted.
|
|
"""
|
|
self._snapshot_session_id: str | None = snapshot.get("session_id")
|
|
self._snapshot_transcript: list | None = snapshot.get("transcript_lines")
|
|
|
|
def register_subagent_hook(self, name: str, spec: dict) -> None:
|
|
"""Default no-op. DeepAgents overrides to register a sub-agent."""
|
|
return None
|
|
|
|
def append_to_memory_hook(self, config: AdapterConfig, filename: str, content: str) -> None:
|
|
"""Append text to /configs/<filename> if the marker isn't already present.
|
|
|
|
Idempotent: looks for the first line of `content` as a marker so a
|
|
re-install doesn't duplicate the block. Adaptors should pass content
|
|
beginning with a unique header (e.g. ``# Plugin: molecule-dev-conventions``).
|
|
"""
|
|
import os
|
|
target = os.path.join(config.config_path, filename)
|
|
marker = content.splitlines()[0].strip() if content else ""
|
|
existing = ""
|
|
if os.path.exists(target):
|
|
with open(target) as f:
|
|
existing = f.read()
|
|
if marker and marker in existing:
|
|
logger.info("append_to_memory: %s already contains %r — skipping", filename, marker)
|
|
return
|
|
os.makedirs(os.path.dirname(target) or ".", exist_ok=True)
|
|
with open(target, "a") as f:
|
|
if existing and not existing.endswith("\n"):
|
|
f.write("\n")
|
|
f.write(content if content.endswith("\n") else content + "\n")
|
|
logger.info("append_to_memory: appended %d chars to %s", len(content), filename)
|
|
|
|
async def install_plugins_via_registry(
|
|
self,
|
|
config: AdapterConfig,
|
|
plugins,
|
|
) -> list:
|
|
"""Drive the new per-runtime adaptor pipeline for every loaded plugin.
|
|
|
|
For each plugin in `plugins.plugins`, resolve the adaptor for this
|
|
runtime (via :func:`plugins_registry.resolve`) and invoke
|
|
``install(ctx)``. Returns the list of :class:`InstallResult` so
|
|
callers can surface warnings (e.g. raw-drop fallback hits).
|
|
|
|
Adapters whose runtime supports the new pipeline call this from
|
|
``setup()`` instead of the legacy ``inject_plugins()``.
|
|
"""
|
|
from pathlib import Path
|
|
from plugins_registry import InstallContext, resolve
|
|
|
|
results = []
|
|
runtime = self.name().replace("-", "_") # e.g. "claude-code" -> "claude_code"
|
|
|
|
for plugin in plugins.plugins:
|
|
adaptor, source = resolve(plugin.name, runtime, Path(plugin.path))
|
|
ctx = InstallContext(
|
|
configs_dir=Path(config.config_path),
|
|
workspace_id=config.workspace_id,
|
|
runtime=runtime,
|
|
plugin_root=Path(plugin.path),
|
|
memory_filename=self.memory_filename(),
|
|
register_tool=self.register_tool_hook,
|
|
register_subagent=self.register_subagent_hook,
|
|
append_to_memory=lambda fn, c, _cfg=config: self.append_to_memory_hook(_cfg, fn, c),
|
|
)
|
|
try:
|
|
result = await adaptor.install(ctx)
|
|
results.append(result)
|
|
logger.info(
|
|
"Plugin %s installed via %s adaptor (warnings: %d)",
|
|
plugin.name, source, len(result.warnings),
|
|
)
|
|
except Exception as exc:
|
|
logger.exception("Plugin %s install via %s failed: %s", plugin.name, source, exc)
|
|
|
|
return results
|
|
|
|
async def inject_plugins(self, config: AdapterConfig, plugins) -> None:
|
|
"""Legacy hook — kept for backwards compatibility during migration.
|
|
|
|
Default: drive the new per-runtime adaptor pipeline. Adapters not yet
|
|
migrated may still override this with their own logic.
|
|
"""
|
|
await self.install_plugins_via_registry(config, plugins)
|
|
|
|
async def _common_setup(self, config: AdapterConfig) -> SetupResult:
|
|
"""Shared setup pipeline — loads plugins, skills, tools, coordinator, and builds system prompt.
|
|
|
|
All adapters can call this to get the full platform feature set.
|
|
Returns a SetupResult with LangChain BaseTool instances that adapters
|
|
convert to their native format if needed.
|
|
"""
|
|
from plugins import load_plugins
|
|
from skill_loader.loader import load_skills
|
|
from coordinator import get_children, get_parent_context, build_children_description
|
|
from prompt import build_system_prompt, get_peer_capabilities, get_platform_instructions
|
|
from builtin_tools.approval import request_approval
|
|
from builtin_tools.delegation import delegate_to_workspace, check_delegation_status
|
|
from builtin_tools.memory import commit_memory, search_memory
|
|
from builtin_tools.sandbox import run_code
|
|
|
|
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
|
|
|
# Load plugins from per-workspace dir first, then shared fallback
|
|
workspace_plugins_dir = os.path.join(config.config_path, "plugins")
|
|
plugins = load_plugins(
|
|
workspace_plugins_dir=workspace_plugins_dir,
|
|
shared_plugins_dir=os.environ.get("PLUGINS_DIR", "/plugins"),
|
|
)
|
|
await self.inject_plugins(config, plugins)
|
|
if plugins.plugin_names:
|
|
logger.info(f"Plugins: {', '.join(plugins.plugin_names)}")
|
|
|
|
# Load skills (workspace + plugin skills, deduped)
|
|
loaded_skills = load_skills(config.config_path, config.tools)
|
|
seen_skill_ids = {s.metadata.id for s in loaded_skills}
|
|
for plugin_skills_dir in plugins.skill_dirs:
|
|
plugin_skill_names = [
|
|
d for d in os.listdir(plugin_skills_dir)
|
|
if os.path.isdir(os.path.join(plugin_skills_dir, d))
|
|
]
|
|
for skill in load_skills(plugin_skills_dir, plugin_skill_names):
|
|
if skill.metadata.id not in seen_skill_ids:
|
|
loaded_skills.append(skill)
|
|
seen_skill_ids.add(skill.metadata.id)
|
|
logger.info(f"Loaded {len(loaded_skills)} skills: {[s.metadata.id for s in loaded_skills]}")
|
|
|
|
# Assemble tools: 6 core + skill tools
|
|
all_tools = [delegate_to_workspace, check_delegation_status, request_approval, commit_memory, search_memory, run_code]
|
|
for skill in loaded_skills:
|
|
all_tools.extend(skill.tools)
|
|
|
|
# Coordinator mode: detect children and add routing tool
|
|
children = await get_children()
|
|
is_coordinator = len(children) > 0
|
|
if is_coordinator:
|
|
from coordinator import route_task_to_team
|
|
logger.info(f"Coordinator mode: {len(children)} children")
|
|
all_tools.append(route_task_to_team)
|
|
|
|
# Parent context (if this is a child workspace)
|
|
parent_context = await get_parent_context()
|
|
|
|
# Build system prompt with all context
|
|
peers = await get_peer_capabilities(platform_url, config.workspace_id)
|
|
platform_instructions = await get_platform_instructions(platform_url, config.workspace_id)
|
|
coordinator_prompt = build_children_description(children) if is_coordinator else ""
|
|
extra_prompts = list(plugins.prompt_fragments)
|
|
if coordinator_prompt:
|
|
extra_prompts.append(coordinator_prompt)
|
|
|
|
system_prompt = build_system_prompt(
|
|
config.config_path, config.workspace_id, loaded_skills, peers,
|
|
prompt_files=config.prompt_files,
|
|
plugin_rules=plugins.rules,
|
|
plugin_prompts=extra_prompts,
|
|
parent_context=parent_context,
|
|
platform_instructions=platform_instructions,
|
|
)
|
|
|
|
return SetupResult(
|
|
system_prompt=system_prompt,
|
|
loaded_skills=loaded_skills,
|
|
langchain_tools=all_tools,
|
|
is_coordinator=is_coordinator,
|
|
children=children,
|
|
)
|
|
|
|
@abstractmethod
|
|
async def setup(self, config: AdapterConfig) -> None:
|
|
"""One-time setup: validate config, prepare internal state.
|
|
Called after deps are installed but before create_executor().
|
|
Raise RuntimeError if setup fails (missing deps, bad config, etc.)."""
|
|
... # pragma: no cover
|
|
|
|
@abstractmethod
|
|
async def create_executor(self, config: AdapterConfig) -> AgentExecutor:
|
|
"""Create and return an AgentExecutor ready for A2A integration.
|
|
The returned executor's execute() method will be called by the
|
|
A2A server's DefaultRequestHandler.
|
|
|
|
Subclasses should also store the returned executor as ``self._executor``
|
|
so ``pre_stop_state()`` can access it for serialization.
|
|
"""
|
|
... # pragma: no cover
|