"""Base adapter interface for agent infrastructure providers.""" import logging import os from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any from a2a.server.agent_execution import AgentExecutor logger = logging.getLogger(__name__) @dataclass class SetupResult: """Result from the shared _common_setup() pipeline.""" system_prompt: str loaded_skills: list # LoadedSkill instances langchain_tools: list # LangChain BaseTool instances is_coordinator: bool children: list # child workspace dicts @dataclass class AdapterConfig: """Standardized config passed to every adapter.""" model: str # e.g. "anthropic:claude-sonnet-4-6" or "openrouter:google/gemini-2.5-flash" system_prompt: str | None = None # Assembled system prompt text tools: list[str] = field(default_factory=list) # Tool names from config.yaml runtime_config: dict[str, Any] = field(default_factory=dict) # Raw runtime_config block config_path: str = "/configs" # Path to configs directory workspace_id: str = "" # Workspace identifier prompt_files: list[str] = field(default_factory=list) # Ordered prompt file names a2a_port: int = 8000 # Port for A2A server heartbeat: Any = None # HeartbeatLoop instance class BaseAdapter(ABC): """Interface every agent infrastructure adapter must implement. To add a new agent infra: 1. Create workspace/adapters// 2. Implement adapter.py with a class extending BaseAdapter 3. Add requirements.txt with your infra's dependencies 4. Export as Adapter in __init__.py 5. Submit a PR """ @staticmethod @abstractmethod def name() -> str: # pragma: no cover """Return the runtime identifier (e.g. 'langgraph', 'crewai'). This must match the 'runtime' field in config.yaml.""" ... @staticmethod @abstractmethod def display_name() -> str: # pragma: no cover """Human-readable name for UI display.""" ... @staticmethod @abstractmethod def description() -> str: # pragma: no cover """Short description of what this adapter provides.""" ... @staticmethod def get_config_schema() -> dict: """Return JSON Schema for runtime_config fields this adapter supports. Used by the Config tab UI to render the right form fields. Override in subclasses for adapter-specific settings.""" return {} # ------------------------------------------------------------------ # Plugin install hooks # ------------------------------------------------------------------ # New pipeline: each plugin ships per-runtime adaptors resolved via # `plugins_registry.resolve()`. Adapters expose hooks below that # adaptors call to wire plugin content into the runtime. # # Default implementations are filesystem-only (write to /configs, # append to CLAUDE.md). Runtimes with a dynamic tool registry # (e.g. DeepAgents sub-agents) override the hooks to also register # in-process state. def memory_filename(self) -> str: """File under /configs that the runtime treats as long-lived memory. Both Claude Code and DeepAgents read CLAUDE.md natively, so this is the sensible default. Override only if a runtime expects a different filename. """ return "CLAUDE.md" def register_tool_hook(self, name: str, fn) -> None: """Default no-op. Override on runtimes with a dynamic tool registry. Runtimes that pick tools up at startup via filesystem scan (Claude Code reads /configs/skills, LangGraph globs **/*.py) don't need to do anything here — the adaptor's file-write step is enough. """ return None async def transcript_lines(self, since: int = 0, limit: int = 100) -> dict: """Return live transcript entries for the most-recent agent session. Default implementation returns ``supported: False`` for runtimes that don't expose a per-session log on disk. Override in subclasses that DO (Claude Code reads ``~/.claude/projects//.jsonl``). This is the "look over the agent's shoulder" feature — lets canvas / operators see live tool calls + AI thinking instead of waiting for the high-level activity log to flush. Args: since: line offset to skip — caller's last cursor (0 = from start) limit: max lines to return (caller-side cap, default 100, max 1000) Returns: ``{runtime, supported, lines, cursor, more, source}`` where ``cursor`` is the new offset to pass on the next poll, ``more`` is True if additional lines remain past ``limit``, and ``source`` is the file path lines were read from (useful for debugging). """ return { "runtime": self.name(), "supported": False, "lines": [], "cursor": since, "more": False, "source": None, } def register_subagent_hook(self, name: str, spec: dict) -> None: """Default no-op. DeepAgents overrides to register a sub-agent.""" return None def append_to_memory_hook(self, config: AdapterConfig, filename: str, content: str) -> None: """Append text to /configs/ if the marker isn't already present. Idempotent: looks for the first line of `content` as a marker so a re-install doesn't duplicate the block. Adaptors should pass content beginning with a unique header (e.g. ``# Plugin: molecule-dev-conventions``). """ import os target = os.path.join(config.config_path, filename) marker = content.splitlines()[0].strip() if content else "" existing = "" if os.path.exists(target): with open(target) as f: existing = f.read() if marker and marker in existing: logger.info("append_to_memory: %s already contains %r — skipping", filename, marker) return os.makedirs(os.path.dirname(target) or ".", exist_ok=True) with open(target, "a") as f: if existing and not existing.endswith("\n"): f.write("\n") f.write(content if content.endswith("\n") else content + "\n") logger.info("append_to_memory: appended %d chars to %s", len(content), filename) async def install_plugins_via_registry( self, config: AdapterConfig, plugins, ) -> list: """Drive the new per-runtime adaptor pipeline for every loaded plugin. For each plugin in `plugins.plugins`, resolve the adaptor for this runtime (via :func:`plugins_registry.resolve`) and invoke ``install(ctx)``. Returns the list of :class:`InstallResult` so callers can surface warnings (e.g. raw-drop fallback hits). Adapters whose runtime supports the new pipeline call this from ``setup()`` instead of the legacy ``inject_plugins()``. """ from pathlib import Path from plugins_registry import InstallContext, resolve results = [] runtime = self.name().replace("-", "_") # e.g. "claude-code" -> "claude_code" for plugin in plugins.plugins: adaptor, source = resolve(plugin.name, runtime, Path(plugin.path)) ctx = InstallContext( configs_dir=Path(config.config_path), workspace_id=config.workspace_id, runtime=runtime, plugin_root=Path(plugin.path), memory_filename=self.memory_filename(), register_tool=self.register_tool_hook, register_subagent=self.register_subagent_hook, append_to_memory=lambda fn, c, _cfg=config: self.append_to_memory_hook(_cfg, fn, c), ) try: result = await adaptor.install(ctx) results.append(result) logger.info( "Plugin %s installed via %s adaptor (warnings: %d)", plugin.name, source, len(result.warnings), ) except Exception as exc: logger.exception("Plugin %s install via %s failed: %s", plugin.name, source, exc) return results async def inject_plugins(self, config: AdapterConfig, plugins) -> None: """Legacy hook — kept for backwards compatibility during migration. Default: drive the new per-runtime adaptor pipeline. Adapters not yet migrated may still override this with their own logic. """ await self.install_plugins_via_registry(config, plugins) async def _common_setup(self, config: AdapterConfig) -> SetupResult: """Shared setup pipeline — loads plugins, skills, tools, coordinator, and builds system prompt. All adapters can call this to get the full platform feature set. Returns a SetupResult with LangChain BaseTool instances that adapters convert to their native format if needed. """ from plugins import load_plugins from skill_loader.loader import load_skills from coordinator import get_children, get_parent_context, build_children_description from prompt import build_system_prompt, get_peer_capabilities from builtin_tools.approval import request_approval from builtin_tools.delegation import delegate_to_workspace, check_delegation_status from builtin_tools.memory import commit_memory, search_memory from builtin_tools.sandbox import run_code platform_url = os.environ.get("PLATFORM_URL", "http://platform:8080") # Load plugins from per-workspace dir first, then shared fallback workspace_plugins_dir = os.path.join(config.config_path, "plugins") plugins = load_plugins( workspace_plugins_dir=workspace_plugins_dir, shared_plugins_dir=os.environ.get("PLUGINS_DIR", "/plugins"), ) await self.inject_plugins(config, plugins) if plugins.plugin_names: logger.info(f"Plugins: {', '.join(plugins.plugin_names)}") # Load skills (workspace + plugin skills, deduped) loaded_skills = load_skills(config.config_path, config.tools) seen_skill_ids = {s.metadata.id for s in loaded_skills} for plugin_skills_dir in plugins.skill_dirs: plugin_skill_names = [ d for d in os.listdir(plugin_skills_dir) if os.path.isdir(os.path.join(plugin_skills_dir, d)) ] for skill in load_skills(plugin_skills_dir, plugin_skill_names): if skill.metadata.id not in seen_skill_ids: loaded_skills.append(skill) seen_skill_ids.add(skill.metadata.id) logger.info(f"Loaded {len(loaded_skills)} skills: {[s.metadata.id for s in loaded_skills]}") # Assemble tools: 6 core + skill tools all_tools = [delegate_to_workspace, check_delegation_status, request_approval, commit_memory, search_memory, run_code] for skill in loaded_skills: all_tools.extend(skill.tools) # Coordinator mode: detect children and add routing tool children = await get_children() is_coordinator = len(children) > 0 if is_coordinator: from coordinator import route_task_to_team logger.info(f"Coordinator mode: {len(children)} children") all_tools.append(route_task_to_team) # Parent context (if this is a child workspace) parent_context = await get_parent_context() # Build system prompt with all context peers = await get_peer_capabilities(platform_url, config.workspace_id) coordinator_prompt = build_children_description(children) if is_coordinator else "" extra_prompts = list(plugins.prompt_fragments) if coordinator_prompt: extra_prompts.append(coordinator_prompt) system_prompt = build_system_prompt( config.config_path, config.workspace_id, loaded_skills, peers, prompt_files=config.prompt_files, plugin_rules=plugins.rules, plugin_prompts=extra_prompts, parent_context=parent_context, ) return SetupResult( system_prompt=system_prompt, loaded_skills=loaded_skills, langchain_tools=all_tools, is_coordinator=is_coordinator, children=children, ) @abstractmethod async def setup(self, config: AdapterConfig) -> None: """One-time setup: validate config, prepare internal state. Called after deps are installed but before create_executor(). Raise RuntimeError if setup fails (missing deps, bad config, etc.).""" ... # pragma: no cover @abstractmethod async def create_executor(self, config: AdapterConfig) -> AgentExecutor: """Create and return an AgentExecutor ready for A2A integration. The returned executor's execute() method will be called by the A2A server's DefaultRequestHandler.""" ... # pragma: no cover