diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index a681e5a5..c9c00e47 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -13,6 +13,7 @@ Environment variables (set by the workspace container): """ import asyncio +import inspect import json import logging import sys @@ -27,6 +28,7 @@ from a2a_tools import ( tool_recall_memory, tool_send_message_to_user, ) +from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS logger = logging.getLogger(__name__) @@ -45,158 +47,27 @@ from a2a_client import ( # noqa: F401, E402 from a2a_tools import report_activity # noqa: F401, E402 # --- Tool definitions (schemas) --- +# +# Built once at import time from the platform_tools registry. The MCP +# `description` field is the spec's `short` line — that's the unified +# tool description used by both the MCP tool listing AND the bullet +# rendering in the agent-facing system-prompt section. The deeper +# `when_to_use` guidance is appended to the system prompt only (it's +# too long to live in MCP `description` without bloating every +# tool-list response the model sees). TOOLS = [ { - "name": "delegate_task", - "description": "Delegate a task to another workspace via A2A protocol and WAIT for the response. Use for quick tasks. The target must be a peer (sibling or parent/child). Use list_peers to find available targets.", - "inputSchema": { - "type": "object", - "properties": { - "workspace_id": { - "type": "string", - "description": "Target workspace ID (from list_peers)", - }, - "task": { - "type": "string", - "description": "The task description to send to the target workspace", - }, - }, - "required": ["workspace_id", "task"], - }, - }, - { - "name": "delegate_task_async", - "description": "Send a task to another workspace with a short timeout (fire-and-forget). Returns immediately — the target continues processing. Best when you don't need the result right away. Note: check_task_status may not work with all workspace implementations.", - "inputSchema": { - "type": "object", - "properties": { - "workspace_id": { - "type": "string", - "description": "Target workspace ID (from list_peers)", - }, - "task": { - "type": "string", - "description": "The task description to send to the target workspace", - }, - }, - "required": ["workspace_id", "task"], - }, - }, - { - "name": "check_task_status", - "description": "Check the status of a previously submitted async task via tasks/get. Note: only works if the target workspace's A2A implementation supports task persistence. May return 'not found' for completed tasks.", - "inputSchema": { - "type": "object", - "properties": { - "workspace_id": { - "type": "string", - "description": "The workspace ID the task was sent to", - }, - "task_id": { - "type": "string", - "description": "The task_id returned by delegate_task_async", - }, - }, - "required": ["workspace_id", "task_id"], - }, - }, - { - "name": "list_peers", - "description": "List all workspaces this agent can communicate with (siblings and parent/children). Returns name, ID, status, and role for each peer.", - "inputSchema": {"type": "object", "properties": {}}, - }, - { - "name": "get_workspace_info", - "description": "Get this workspace's own info — ID, name, role, tier, parent, status.", - "inputSchema": {"type": "object", "properties": {}}, - }, - { - "name": "send_message_to_user", - "description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.", - "inputSchema": { - "type": "object", - "properties": { - "message": { - "type": "string", - # The "no URLs in message text" rule is the single biggest - # cause of bad chat UX: agents drop catbox.moe / file:// - # / temporary upload-host links into the prose, the - # canvas renders them as plain markdown links the user - # can't preview, and SaaS deployments often can't even - # reach those external hosts. Every download MUST go - # through the structured `attachments` field below. - "description": ( - "Caption text for the chat bubble. Required even when sending " - "attachments — set to a short label like 'Here's the build:' " - "or 'Done — see attached.'\n\n" - "DO NOT paste file URLs, download links, or container paths in " - "this string. Files MUST go through the `attachments` field, " - "which renders as a clickable download chip and works on SaaS " - "deployments where external file-host URLs (catbox.moe, file://, " - "etc.) are unreachable from the user's browser." - ), - }, - "attachments": { - "type": "array", - "description": ( - "REQUIRED for any file delivery. Pass absolute file paths inside " - "THIS container (e.g. ['/tmp/build.zip', '/workspace/report.pdf']) " - "— the platform uploads each file and returns a download chip " - "with the file's icon + name + size in the user's chat. The chip " - "works in SaaS deployments because the URL is platform-served, " - "not an external host.\n\n" - "USE THIS instead of: pasting URLs in `message`, base64-encoding " - "in the body, or telling the user to look at a path on disk. " - "If the file isn't already on disk, write it first (Bash, Write " - "tool, etc.) then pass its path here. 25 MB per file cap." - ), - "items": {"type": "string"}, - }, - }, - "required": ["message"], - }, - }, - { - "name": "commit_memory", - "description": "Append a new memory row to persistent storage. Each call CREATES a row — does not overwrite existing memories with the same content. Use to remember decisions, task results, and context that should survive a restart. Scope: LOCAL (this workspace only), TEAM (parent + siblings), GLOBAL (entire org). GLOBAL writes require tier-0 (root) workspace; lower-tier callers get an RBAC error.", - "inputSchema": { - "type": "object", - "properties": { - "content": { - "type": "string", - "description": "The information to remember — be detailed and specific", - }, - "scope": { - "type": "string", - "enum": ["LOCAL", "TEAM", "GLOBAL"], - "description": "Memory scope (default: LOCAL)", - }, - }, - "required": ["content"], - }, - }, - { - "name": "recall_memory", - "description": "Substring-search persistent memory and return ALL matching rows (no pagination). Empty query returns every memory accessible at the given scope. Server-side filter is case-insensitive substring match on `content`. Use at the start of conversations to recall prior context — calling once with empty query is cheap and avoids missing relevant memories that don't match a narrow keyword.", - "inputSchema": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Search query (empty returns all memories)", - }, - "scope": { - "type": "string", - "enum": ["LOCAL", "TEAM", "GLOBAL", ""], - "description": "Filter by scope (empty returns all accessible)", - }, - }, - }, - }, + "name": _spec.name, + "description": _spec.short, + "inputSchema": _spec.input_schema, + } + for _spec in _PLATFORM_TOOL_SPECS ] + + # --- Tool dispatch --- async def handle_tool_call(name: str, arguments: dict) -> str: diff --git a/workspace/adapter_base.py b/workspace/adapter_base.py index de20dbb1..ecb8ff57 100644 --- a/workspace/adapter_base.py +++ b/workspace/adapter_base.py @@ -421,8 +421,8 @@ class BaseAdapter(ABC): from coordinator import get_children, get_parent_context, build_children_description from prompt import build_system_prompt, get_peer_capabilities, get_platform_instructions from builtin_tools.approval import request_approval - from builtin_tools.delegation import delegate_to_workspace, check_delegation_status - from builtin_tools.memory import commit_memory, search_memory + from builtin_tools.delegation import delegate_task, delegate_task_async, check_task_status + from builtin_tools.memory import commit_memory, recall_memory from builtin_tools.sandbox import run_code platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080") @@ -455,8 +455,14 @@ class BaseAdapter(ABC): seen_skill_ids.add(skill.metadata.id) logger.info(f"Loaded {len(loaded_skills)} skills: {[s.metadata.id for s in loaded_skills]}") - # Assemble tools: 6 core + skill tools - all_tools = [delegate_to_workspace, check_delegation_status, request_approval, commit_memory, search_memory, run_code] + # Core platform tools — names mirror the platform_tools registry, + # so the names referenced in get_a2a_instructions/get_hma_instructions + # are guaranteed to exist as @tool symbols here. The structural + # alignment test in tests/test_platform_tools.py pins this. + all_tools = [ + delegate_task, delegate_task_async, check_task_status, + request_approval, commit_memory, recall_memory, run_code, + ] for skill in loaded_skills: all_tools.extend(skill.tools) diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index 25d0ae55..01e4da00 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -2,7 +2,7 @@ Delegations are non-blocking: the tool fires the A2A request in the background and returns immediately with a task_id. The agent can check status anytime via -check_delegation_status, or just continue working and check later. +check_task_status, or just continue working and check later. When the delegate responds, the result is stored and the agent is notified via a status update. @@ -44,7 +44,7 @@ class DelegationStatus(str, Enum): # The reply will arrive via the platform's stitch path when the # peer finishes its current work. The LLM should WAIT, not retry, # and definitely not fall back to doing the work itself — see the - # check_delegation_status docstring for the prompt-side guidance. + # check_task_status docstring for the prompt-side guidance. QUEUED = "queued" COMPLETED = "completed" FAILED = "failed" @@ -110,7 +110,7 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str, Best-effort POST to /workspaces//delegations/record. The agent still fires A2A directly for speed + OTEL propagation, but the platform's GET /delegations endpoint now mirrors the same set an agent's local - check_delegation_status sees. + check_task_status sees. """ try: async with httpx.AsyncClient(timeout=10) as client: @@ -129,11 +129,11 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str, async def _refresh_queued_from_platform(task_id: str) -> bool: """Lazy-refresh a QUEUED delegation's local state from the platform. - Called by check_delegation_status when local status is QUEUED. The + Called by check_task_status when local status is QUEUED. The platform's drain stitch (a2a_queue.go) updates the delegate_result activity_logs row when a queued delegation eventually completes, but it has no callback to this runtime — without this lazy refresh, - the LLM polling check_delegation_status would see "queued" forever + the LLM polling check_task_status would see "queued" forever even after the platform has the result. Returns True if the local delegation was updated to a terminal state @@ -215,7 +215,7 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): delegation.status = DelegationStatus.IN_PROGRESS # #64: register on the platform so GET /workspaces//delegations - # sees the same set as check_delegation_status. Best-effort — platform + # sees the same set as check_task_status. Best-effort — platform # unreachability must not block the actual A2A delegation. await _record_delegation_on_platform(task_id, workspace_id, task) @@ -286,7 +286,7 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): # accepted the request but the peer's runtime is # mid-task. Platform-side drain will deliver the # reply asynchronously. Mark QUEUED locally so - # check_delegation_status can surface that state + # check_task_status can surface that state # to the LLM with explicit "wait, don't bypass" # guidance. Do NOT mark FAILED — the request is # alive in the platform's queue, not lost. @@ -371,14 +371,36 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str): @tool -async def delegate_to_workspace( +async def delegate_task( + workspace_id: str, + task: str, +) -> str: + """Delegate a task to a peer workspace via A2A and WAIT for the response. + + Synchronous variant — blocks until the peer replies (or the platform's + A2A round-trip times out). Use this for QUICK questions and small + sub-tasks where you can afford to wait inline. + + For longer-running work (research, multi-minute jobs) use + delegate_task_async + check_task_status instead so you don't hold + this workspace busy waiting. + + Tool name + description are sourced from the platform_tools registry — + a single ToolSpec drives MCP, LangChain, and system-prompt docs. + """ + from a2a_tools import tool_delegate_task + return await tool_delegate_task(workspace_id, task) + + +@tool +async def delegate_task_async( workspace_id: str, task: str, ) -> dict: """Delegate a task to a peer workspace via A2A protocol (non-blocking). Sends the task in the background and returns immediately with a task_id. - Use check_delegation_status to poll for the result, or continue working + Use check_task_status to poll for the result, or continue working and check later. The delegate works independently. Args: @@ -386,7 +408,7 @@ async def delegate_to_workspace( task: The task description to send to the peer. Returns: - A dict with task_id and status="delegated". Use check_delegation_status(task_id) to get results. + A dict with task_id and status="delegated". Use check_task_status(task_id) to get results. """ task_id = str(uuid.uuid4()) @@ -417,12 +439,12 @@ async def delegate_to_workspace( "success": True, "task_id": task_id, "status": "delegated", - "message": f"Task delegated to {workspace_id}. Use check_delegation_status('{task_id}') to get the result when ready.", + "message": f"Task delegated to {workspace_id}. Use check_task_status('{task_id}') to get the result when ready.", } @tool -async def check_delegation_status( +async def check_task_status( task_id: str = "", ) -> dict: """Check the status of a delegated task, or list all active delegations. @@ -434,7 +456,7 @@ async def check_delegation_status( processing a prior task. The reply WILL arrive — the platform's drain re-dispatches when the peer is free. This tool transparently polls the platform for the eventual outcome on each call, so - keep polling check_delegation_status periodically and you'll see + keep polling check_task_status periodically and you'll see the status flip to "completed" / "failed" automatically. Do NOT retry the delegation. Do NOT do the work yourself. Acknowledge to the user that the peer is busy and will reply, @@ -445,7 +467,7 @@ async def check_delegation_status( yourself if status is "failed", never if status is "queued". Args: - task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations. + task_id: The task_id returned by delegate_task_async. If empty, lists all delegations. Returns: Status and result (if completed) of the delegation. diff --git a/workspace/builtin_tools/memory.py b/workspace/builtin_tools/memory.py index e92bccab..484dc27a 100644 --- a/workspace/builtin_tools/memory.py +++ b/workspace/builtin_tools/memory.py @@ -8,7 +8,7 @@ Hierarchical Memory Architecture: RBAC enforcement ---------------- ``commit_memory`` requires the ``"memory.write"`` action. -``search_memory`` requires the ``"memory.read"`` action. +``recall_memory`` requires the ``"memory.read"`` action. Roles are read from ``config.yaml`` under ``rbac.roles`` (default: operator). Audit trail @@ -188,7 +188,7 @@ async def commit_memory(content: str, scope: str = "LOCAL") -> dict: @tool -async def search_memory(query: str = "", scope: str = "") -> dict: +async def recall_memory(query: str = "", scope: str = "") -> dict: """Search stored memories. Args: diff --git a/workspace/coordinator.py b/workspace/coordinator.py index b9df9cfa..7790262f 100644 --- a/workspace/coordinator.py +++ b/workspace/coordinator.py @@ -81,7 +81,7 @@ def build_children_description(children: list[dict]) -> str: children, heading="## Your Team (sub-workspaces you coordinate)", instruction=( - "Use the `delegate_to_workspace` tool to send tasks to the chosen member. " + "Use the `delegate_task_async` tool to send tasks to the chosen member. " "Only delegate to members listed above." ), ) @@ -92,7 +92,7 @@ def build_children_description(children: list[dict]) -> str: "", "### Coordination Rules — MANDATORY", "1. You are a COORDINATOR. Your ONLY job is to delegate and synthesize. NEVER do the work yourself.", - "2. For EVERY task, use `delegate_to_workspace` to send it to the appropriate team member(s). " + "2. For EVERY task, use `delegate_task_async` to send it to the appropriate team member(s). " "Do this BEFORE writing any analysis, code, or research yourself.", "3. If a task spans multiple members, delegate to ALL of them in parallel and aggregate results.", "4. If ALL members are offline/paused, tell the caller which members are unavailable. " @@ -120,7 +120,7 @@ async def route_task_to_team( task: The task description to route. preferred_member_id: Optional — directly delegate to this member. """ - from builtin_tools.delegation import delegate_to_workspace as delegate + from builtin_tools.delegation import delegate_task_async as delegate children = await get_children() decision = build_team_routing_payload( diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index dc40301e..757061b1 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -273,29 +273,19 @@ def get_system_prompt(config_path: str, fallback: str | None = None) -> str | No return fallback -_A2A_INSTRUCTIONS_MCP = """## Inter-Agent Communication -You have MCP tools for communicating with other workspaces: -- list_peers: discover available peer workspaces (name, ID, status, role) -- delegate_task: send a task and WAIT for the response (for quick tasks) -- delegate_task_async: send a task and return immediately with a task_id (for long tasks) -- check_task_status: poll an async task's status and get results when done -- get_workspace_info: get your own workspace info - -For quick questions, use delegate_task (synchronous). -For long-running work (building pages, running audits), use delegate_task_async + check_task_status. -Always use list_peers first to discover available workspace IDs. -Access control is enforced — you can only reach siblings and parent/children. - -PROACTIVE MESSAGING: Use send_message_to_user to push messages to the user's chat at ANY time: -- Acknowledge tasks immediately: "Got it, delegating to the team now..." -- Send progress updates during long work: "Research Lead finished, waiting on Dev Lead..." -- Deliver follow-up results: "All teams reported back. Here's the synthesis: ..." -This lets you respond quickly ("I'll work on this") and come back later with results. - -If delegate_task returns a DELEGATION FAILED message, do NOT forward the raw error to the user. -Instead: (1) try delegating to a different peer, (2) handle the task yourself, or -(3) tell the user which peer is unavailable and provide your own best answer.""" +# Tool-usage instructions for system-prompt injection. Generated from +# the platform_tools registry — every tool name, description, and usage +# guidance comes from the canonical ToolSpec. Adding/renaming a tool in +# registry.py automatically flows through here. +_A2A_FOOTER = ( + "Always use list_peers first to discover available workspace IDs. " + "Access control is enforced — you can only reach siblings and parent/children. " + "If a delegation returns a DELEGATION FAILED message, do NOT forward " + "the raw error to the user. Instead: (1) try a different peer, " + "(2) handle the task yourself, or (3) tell the user which peer is " + "unavailable and provide your own best answer." +) _A2A_INSTRUCTIONS_CLI = """## Inter-Agent Communication You can delegate tasks to other workspaces using the a2a command: @@ -309,39 +299,55 @@ For quick questions, use sync delegate. For long tasks, use --async + status. Only delegate to peers listed by the peers command (access control enforced).""" +def _render_section(heading: str, specs, footer: str = "") -> str: + """Render a section: heading, per-tool bullet, per-tool when_to_use, footer.""" + parts = [heading, ""] + for spec in specs: + parts.append(f"- **{spec.name}**: {spec.short}") + parts.append("") + for spec in specs: + parts.append(f"### {spec.name}") + parts.append(spec.when_to_use) + parts.append("") + if footer: + parts.append(footer) + return "\n".join(parts).rstrip() + "\n" + + def get_a2a_instructions(mcp: bool = True) -> str: """Return inter-agent communication instructions for system-prompt injection. - Pass `mcp=True` (default) for MCP-capable runtimes (Claude Code via SDK, - Codex). Pass `mcp=False` for CLI-only runtimes (Ollama, custom) that have - to call a2a_cli.py as a subprocess. + Generated from the platform_tools registry. Pass `mcp=True` (default) + for MCP-capable runtimes (claude-code, hermes, langchain, crewai). + Pass `mcp=False` for CLI-only runtimes (ollama, custom subprocess + runtimes that don't speak MCP) — those get a static block describing + the molecule_runtime.a2a_cli subprocess interface instead. """ - return _A2A_INSTRUCTIONS_MCP if mcp else _A2A_INSTRUCTIONS_CLI - - -_HMA_INSTRUCTIONS = """## Hierarchical Memory (HMA) -You have persistent memory tools that survive across sessions and restarts: - -- **commit_memory(content, scope)**: Save important information. - - LOCAL: private to you only (default) - - TEAM: shared with your parent workspace and siblings (same team) - - GLOBAL: shared with the entire org (only root workspaces can write) - -- **recall_memory(query)**: Search your accessible memories. Returns LOCAL + TEAM + GLOBAL matches. - -**When to use memory:** -- After making a decision or learning something non-obvious → commit_memory("decision X because Y", scope="TEAM") -- Before starting work → recall_memory("what did the team decide about X") -- When you discover org-wide knowledge (repo locations, API patterns, conventions) → commit_memory(fact, scope="GLOBAL") if you are a root workspace, or scope="TEAM" to share with your team -- After completing a task → commit_memory("completed task X, PR #N opened", scope="TEAM") so your lead and teammates know - -**Memory is automatically recalled** at the start of each new session. Use it proactively during work to share context. -""" + if not mcp: + return _A2A_INSTRUCTIONS_CLI + from platform_tools.registry import a2a_tools + return _render_section( + "## Inter-Agent Communication", + a2a_tools(), + footer=_A2A_FOOTER, + ) def get_hma_instructions() -> str: - """Return HMA memory instructions for system-prompt injection.""" - return _HMA_INSTRUCTIONS + """Return HMA persistent-memory instructions for system-prompt injection. + + Generated from the platform_tools registry. + """ + from platform_tools.registry import memory_tools + return _render_section( + "## Hierarchical Memory (HMA)", + memory_tools(), + footer=( + "Memory is automatically recalled at the start of each new " + "session. Use commit_memory proactively during work so future " + "sessions and teammates can recall what you learned." + ), + ) # ======================================================================== diff --git a/workspace/main.py b/workspace/main.py index 85e891e2..da8e2f86 100644 --- a/workspace/main.py +++ b/workspace/main.py @@ -337,11 +337,16 @@ async def main(): # pragma: no cover # Rebuild the agent's tool list from updated skills if hasattr(adapter, "all_tools") and hasattr(adapter, "system_prompt"): from builtin_tools.approval import request_approval - from builtin_tools.delegation import delegate_to_workspace - from builtin_tools.memory import commit_memory, search_memory + from builtin_tools.delegation import delegate_task, delegate_task_async, check_task_status + from builtin_tools.memory import commit_memory, recall_memory from builtin_tools.sandbox import run_code - base_tools = [delegate_to_workspace, request_approval, - commit_memory, search_memory, run_code] + # Core platform tools mirror adapter_base.all_tools — must + # match the platform_tools registry names so docs and tools + # never drift. + base_tools = [ + delegate_task, delegate_task_async, check_task_status, + request_approval, commit_memory, recall_memory, run_code, + ] skill_tools = [] for sk in adapter.loaded_skills: skill_tools.extend(sk.tools) diff --git a/workspace/platform_tools/__init__.py b/workspace/platform_tools/__init__.py new file mode 100644 index 00000000..45e7b0dc --- /dev/null +++ b/workspace/platform_tools/__init__.py @@ -0,0 +1,13 @@ +"""Platform tools — single source of truth for tool naming and docs. + +The platform owns A2A and persistent-memory tooling (cross-cutting +runtime concerns per project memory project_runtime_native_pluggable.md). +Tools are defined ONCE in `registry.py`. Every adapter — MCP server, +LangChain wrapper, any future SDK integration — consumes the specs to +register the tool in its native format. Doc generators (system-prompt +injection, canvas help, future doc sites) read from the same place. + +Adding a tool: append a ToolSpec to TOOLS in registry.py. Every +adapter picks it up automatically; structural tests fail if any side +drifts from the registry. +""" diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py new file mode 100644 index 00000000..3a3558cc --- /dev/null +++ b/workspace/platform_tools/registry.py @@ -0,0 +1,388 @@ +"""Canonical registry of platform tool specs. + +Every tool the platform offers to agents (A2A delegation, persistent +memory, broadcast, introspection) is defined ONCE in TOOLS below. +Adapters consume these specs to register the tool in their native +runtime format: + + - a2a_mcp_server.py iterates `TOOLS` to build the MCP TOOLS list + + dispatches calls to spec.impl. No tool name or description is + hardcoded there. + + - builtin_tools/{delegation,memory}.py define LangChain `@tool` + wrappers using `name=` from the spec; the wrapper body just + calls spec.impl. + + - executor_helpers.get_a2a_instructions() / get_hma_instructions() + GENERATE the system-prompt doc string from `TOOLS` — no + hand-maintained instruction text. + +Adding a new tool: append a ToolSpec to `TOOLS` below. Every adapter +picks it up. Structural alignment tests (workspace/tests/test_platform_tools.py) +fail if any side drifts from the registry. + +Renaming a tool: change `name` here. Search workspace/ for the old +literal in case any non-adapter consumer (tests, plugin code) hard-coded +it; update those manually. The grep is the audit, the test is the gate. + +Removing a tool: delete the entry. Adapters stop registering it +automatically; doc generators stop mentioning it. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Any, Literal + +from a2a_tools import ( + tool_check_task_status, + tool_commit_memory, + tool_delegate_task, + tool_delegate_task_async, + tool_get_workspace_info, + tool_list_peers, + tool_recall_memory, + tool_send_message_to_user, +) + +# Section name maps to the heading in the agent-facing system prompt. +# Adding a new section: add a constant + create a corresponding +# generator in executor_helpers (or generalize get_*_instructions). +A2A_SECTION = "a2a" +MEMORY_SECTION = "memory" + +Section = Literal["a2a", "memory"] + + +@dataclass(frozen=True) +class ToolSpec: + """Runtime-agnostic definition of one platform tool. + + Each adapter (MCP, LangChain, future SDK) consumes the same spec. + Doc generators consume the same spec. There is no other source + of truth for tool naming or description. + """ + + name: str + """The exact name agents see. MUST match every adapter's + registered name and the literal that appears in agent-facing + instruction docs. Structural test enforces this.""" + + short: str + """One-line description. Used as the MCP `description` field + AND as the bullet line in agent-facing instruction docs.""" + + when_to_use: str + """Two-to-three-sentence agent-facing usage guidance — when + to call this tool, what it returns, what NOT to confuse it + with. Concatenated into the system prompt below the tool list.""" + + input_schema: dict[str, Any] + """JSON Schema for the tool's input parameters. Consumed + directly by the MCP server. LangChain derives its schema from + Python type annotations on the @tool function — alignment is + pinned by the structural test.""" + + impl: Callable[..., Awaitable[str]] + """The actual coroutine. Both adapters call this; only the + wrapping differs.""" + + section: Section + """Which agent-prompt section this tool belongs to (controls + which instruction generator emits it).""" + + +# --------------------------------------------------------------------------- +# A2A — inter-agent communication & broadcast +# --------------------------------------------------------------------------- + +_DELEGATE_TASK = ToolSpec( + name="delegate_task", + short=( + "Delegate a task to a peer workspace via A2A and WAIT for the " + "response (synchronous)." + ), + when_to_use=( + "Use for QUICK questions and small sub-tasks where you can " + "afford to wait inline. Returns the peer's response text " + "directly. For longer-running work (research, multi-minute " + "jobs) use delegate_task_async + check_task_status instead " + "so you don't hold this workspace busy waiting." + ), + input_schema={ + "type": "object", + "properties": { + "workspace_id": { + "type": "string", + "description": "Target workspace ID (from list_peers).", + }, + "task": { + "type": "string", + "description": "Task description to send to the peer.", + }, + }, + "required": ["workspace_id", "task"], + }, + impl=tool_delegate_task, + section=A2A_SECTION, +) + +_DELEGATE_TASK_ASYNC = ToolSpec( + name="delegate_task_async", + short=( + "Send a task to a peer and return immediately with a task_id " + "(non-blocking)." + ), + when_to_use=( + "Use for long-running work where you want to keep doing other " + "things while the peer processes. Poll with check_task_status " + "to retrieve the result. The platform's A2A queue handles " + "delivery + retries; the peer works independently." + ), + input_schema={ + "type": "object", + "properties": { + "workspace_id": { + "type": "string", + "description": "Target workspace ID (from list_peers).", + }, + "task": { + "type": "string", + "description": "Task description to send to the peer.", + }, + }, + "required": ["workspace_id", "task"], + }, + impl=tool_delegate_task_async, + section=A2A_SECTION, +) + +_CHECK_TASK_STATUS = ToolSpec( + name="check_task_status", + short=( + "Poll the status of a task started with delegate_task_async; " + "returns result when done." + ), + when_to_use=( + "Statuses: pending/in_progress (peer still working — wait), " + "queued (peer is busy with a prior task — DO NOT retry, the " + "platform stitches the response when it finishes), completed " + "(result available), failed (real error — fall back to a " + "different peer or handle it yourself)." + ), + input_schema={ + "type": "object", + "properties": { + "workspace_id": { + "type": "string", + "description": "Workspace ID the task was sent to.", + }, + "task_id": { + "type": "string", + "description": "task_id returned by delegate_task_async.", + }, + }, + "required": ["workspace_id", "task_id"], + }, + impl=tool_check_task_status, + section=A2A_SECTION, +) + +_LIST_PEERS = ToolSpec( + name="list_peers", + short=( + "List the workspaces this agent can communicate with — name, " + "ID, status, role for each." + ), + when_to_use=( + "Call this first when you need to delegate but don't know the " + "target's ID. Access control is enforced — you only see " + "siblings, parent, and direct children." + ), + input_schema={"type": "object", "properties": {}}, + impl=tool_list_peers, + section=A2A_SECTION, +) + +_GET_WORKSPACE_INFO = ToolSpec( + name="get_workspace_info", + short="Get this workspace's own info — ID, name, role, tier, parent, status.", + when_to_use=( + "Use to introspect your own identity (e.g. before reporting " + "back to the user, or to determine whether you're a tier-0 " + "root that can write GLOBAL memory)." + ), + input_schema={"type": "object", "properties": {}}, + impl=tool_get_workspace_info, + section=A2A_SECTION, +) + +_SEND_MESSAGE_TO_USER = ToolSpec( + name="send_message_to_user", + short=( + "Send a message directly to the user's canvas chat — pushed instantly " + "via WebSocket. Use this to: (1) acknowledge a task immediately ('Got " + "it, I'll start working on this'), (2) send interim progress updates " + "while doing long work, (3) deliver follow-up results after delegation " + "completes, (4) attach files (zip, pdf, csv, image) for the user to " + "download via the `attachments` field (NEVER paste file URLs in " + "`message`). The message appears in the user's chat as if you're " + "proactively reaching out." + ), + when_to_use=( + "Use proactively across the lifecycle of a task — early to " + "acknowledge, mid-flight to update, late to deliver. Never paste " + "file URLs in the message body — always pass absolute paths in " + "`attachments` so the platform serves them as download chips " + "(works on SaaS where external file hosts are unreachable)." + ), + input_schema={ + "type": "object", + "properties": { + "message": { + "type": "string", + # The "no URLs in message text" rule is the single biggest + # cause of bad chat UX: agents drop catbox.moe / file:// + # / temporary upload-host links into the prose, the + # canvas renders them as plain markdown links the user + # can't preview, and SaaS deployments often can't even + # reach those external hosts. Every download MUST go + # through the structured `attachments` field below. + "description": ( + "Caption text for the chat bubble. Required even when sending " + "attachments — set to a short label like 'Here's the build:' " + "or 'Done — see attached.'\n\n" + "DO NOT paste file URLs, download links, or container paths in " + "this string. Files MUST go through the `attachments` field, " + "which renders as a clickable download chip and works on SaaS " + "deployments where external file-host URLs (catbox.moe, file://, " + "etc.) are unreachable from the user's browser." + ), + }, + "attachments": { + "type": "array", + "description": ( + "REQUIRED for any file delivery. Pass absolute file paths inside " + "THIS container (e.g. ['/tmp/build.zip', '/workspace/report.pdf']) " + "— the platform uploads each file and returns a download chip " + "with the file's icon + name + size in the user's chat. The chip " + "works in SaaS deployments because the URL is platform-served, " + "not an external host.\n\n" + "USE THIS instead of: pasting URLs in `message`, base64-encoding " + "in the body, or telling the user to look at a path on disk. " + "If the file isn't already on disk, write it first (Bash, Write " + "tool, etc.) then pass its path here. 25 MB per file cap." + ), + "items": {"type": "string"}, + }, + }, + "required": ["message"], + }, + impl=tool_send_message_to_user, + section=A2A_SECTION, +) + + +# --------------------------------------------------------------------------- +# HMA — hierarchical persistent memory +# --------------------------------------------------------------------------- + +_COMMIT_MEMORY = ToolSpec( + name="commit_memory", + short="Save a fact to persistent memory; survives across sessions and restarts.", + when_to_use=( + "Scopes: LOCAL (private to you, default), TEAM (shared with " + "parent + siblings), GLOBAL (entire org — only tier-0 root " + "workspaces can write). Commit decisions, learned facts, and " + "completed-task summaries so future sessions and teammates " + "can recall them." + ), + input_schema={ + "type": "object", + "properties": { + "content": { + "type": "string", + "description": "What to remember — be specific.", + }, + "scope": { + "type": "string", + "enum": ["LOCAL", "TEAM", "GLOBAL"], + "description": "Memory scope (default LOCAL).", + }, + }, + "required": ["content"], + }, + impl=tool_commit_memory, + section=MEMORY_SECTION, +) + +_RECALL_MEMORY = ToolSpec( + name="recall_memory", + short="Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows.", + when_to_use=( + "Call at the start of new work and when picking up something " + "you may have done before. Empty query returns ALL accessible " + "memories — cheap and avoids missing rows that don't match a " + "narrow keyword. Memory is automatically recalled at session " + "start; use this to refresh mid-session." + ), + input_schema={ + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query (empty returns all).", + }, + "scope": { + "type": "string", + "enum": ["LOCAL", "TEAM", "GLOBAL", ""], + "description": "Filter by scope (empty = all accessible).", + }, + }, + }, + impl=tool_recall_memory, + section=MEMORY_SECTION, +) + + +# --------------------------------------------------------------------------- +# Public registry. Keep alphabetically grouped by section for stable +# adapter listings + diff-friendly review. +# --------------------------------------------------------------------------- + +TOOLS: list[ToolSpec] = [ + # A2A + _DELEGATE_TASK, + _DELEGATE_TASK_ASYNC, + _CHECK_TASK_STATUS, + _LIST_PEERS, + _GET_WORKSPACE_INFO, + _SEND_MESSAGE_TO_USER, + # HMA + _COMMIT_MEMORY, + _RECALL_MEMORY, +] + + +def a2a_tools() -> list[ToolSpec]: + """All A2A-section tools, in registration order.""" + return [t for t in TOOLS if t.section == A2A_SECTION] + + +def memory_tools() -> list[ToolSpec]: + """All memory-section tools, in registration order.""" + return [t for t in TOOLS if t.section == MEMORY_SECTION] + + +def by_name(name: str) -> ToolSpec: + """Look up a spec by its canonical name. Raises KeyError if absent.""" + for t in TOOLS: + if t.name == name: + return t + raise KeyError(f"no platform tool named {name!r}") + + +def tool_names() -> list[str]: + """Canonical names in registration order.""" + return [t.name for t in TOOLS] diff --git a/workspace/policies/routing.py b/workspace/policies/routing.py index 908cd2b0..c9152cc3 100644 --- a/workspace/policies/routing.py +++ b/workspace/policies/routing.py @@ -64,7 +64,7 @@ def build_team_routing_payload( "action": "choose_member", "message": ( f"You have {len(members)} team members. " - "Choose the best one for this task and call delegate_to_workspace with their ID." + "Choose the best one for this task and call delegate_task_async with their ID." ), "task": task, "members": members, diff --git a/workspace/shared_runtime.py b/workspace/shared_runtime.py index a874356a..11358079 100644 --- a/workspace/shared_runtime.py +++ b/workspace/shared_runtime.py @@ -140,7 +140,7 @@ def build_peer_section( *, heading: str = "## Your Peers (workspaces you can delegate to)", instruction: str = ( - "Use the `delegate_to_workspace` tool to send tasks to peers. " + "Use the `delegate_task_async` tool to send tasks to peers. " "Only delegate to peers listed above." ), ) -> str: diff --git a/workspace/tests/conftest.py b/workspace/tests/conftest.py index 6d35d737..066cc21b 100644 --- a/workspace/tests/conftest.py +++ b/workspace/tests/conftest.py @@ -113,10 +113,12 @@ def _make_tools_mocks(): tools_mod.__path__ = [] # Make it a proper package tools_delegation_mod = ModuleType("builtin_tools.delegation") - tools_delegation_mod.delegate_to_workspace = MagicMock() - tools_delegation_mod.delegate_to_workspace.name = "delegate_to_workspace" - tools_delegation_mod.check_delegation_status = MagicMock() - tools_delegation_mod.check_delegation_status.name = "check_delegation_status" + tools_delegation_mod.delegate_task = MagicMock() + tools_delegation_mod.delegate_task.name = "delegate_task" + tools_delegation_mod.delegate_task_async = MagicMock() + tools_delegation_mod.delegate_task_async.name = "delegate_task_async" + tools_delegation_mod.check_task_status = MagicMock() + tools_delegation_mod.check_task_status.name = "check_task_status" tools_approval_mod = ModuleType("builtin_tools.approval") tools_approval_mod.request_approval = MagicMock() @@ -125,8 +127,8 @@ def _make_tools_mocks(): tools_memory_mod = ModuleType("builtin_tools.memory") tools_memory_mod.commit_memory = MagicMock() tools_memory_mod.commit_memory.name = "commit_memory" - tools_memory_mod.search_memory = MagicMock() - tools_memory_mod.search_memory.name = "search_memory" + tools_memory_mod.recall_memory = MagicMock() + tools_memory_mod.recall_memory.name = "recall_memory" tools_sandbox_mod = ModuleType("builtin_tools.sandbox") tools_sandbox_mod.run_code = MagicMock() diff --git a/workspace/tests/test_coordinator_routing.py b/workspace/tests/test_coordinator_routing.py index 13abc6c1..1dfd9626 100644 --- a/workspace/tests/test_coordinator_routing.py +++ b/workspace/tests/test_coordinator_routing.py @@ -28,7 +28,7 @@ async def test_route_task_to_team_delegates_preferred_member(monkeypatch): delegate = MagicMock() delegate.ainvoke = AsyncMock(return_value={"ok": True}) - monkeypatch.setattr(sys.modules["builtin_tools.delegation"], "delegate_to_workspace", delegate) + monkeypatch.setattr(sys.modules["builtin_tools.delegation"], "delegate_task_async", delegate) result = await coordinator.route_task_to_team( "Do the thing", @@ -58,4 +58,4 @@ def test_build_children_description_reuses_shared_renderer(): assert "## Your Team (sub-workspaces you coordinate)" in description assert "**Alpha** (id: `child-1`, status: online)" in description assert "Skills: research" in description - assert "delegate_to_workspace" in description + assert "delegate_task_async" in description diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index 33d4f982..8d33e98d 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -4,7 +4,7 @@ The delegation tool now returns immediately with a task_id and runs the A2A request in the background. Tests verify: 1. Immediate return with task_id 2. Background task completion -3. check_delegation_status retrieval +3. check_task_status retrieval 4. Error handling (RBAC, discovery, network) """ @@ -109,22 +109,22 @@ def delegation_mocks(monkeypatch): async def _invoke(mod, workspace_id="target", task="do stuff"): - """Call delegate_to_workspace and return the immediate result.""" - fn = mod.delegate_to_workspace + """Call delegate_task_async and return the immediate result.""" + fn = mod.delegate_task_async if hasattr(fn, "ainvoke"): return await fn.ainvoke({"workspace_id": workspace_id, "task": task}) return await fn(workspace_id=workspace_id, task=task) async def _invoke_and_wait(mod, workspace_id="target", task="do stuff"): - """Call delegate_to_workspace, wait for background task, return status.""" + """Call delegate_task_async, wait for background task, return status.""" result = await _invoke(mod, workspace_id, task) # Wait for all background tasks to complete if mod._background_tasks: await asyncio.gather(*mod._background_tasks, return_exceptions=True) # Get final status if "task_id" in result: - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): return await fn.ainvoke({"task_id": result["task_id"]}) return await fn(task_id=result["task_id"]) @@ -182,7 +182,7 @@ class TestAsyncDelegation: await _invoke(mod, workspace_id="ws-a", task="task A") await _invoke(mod, workspace_id="ws-b", task="task B") - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): result = await fn.ainvoke({"task_id": ""}) else: @@ -194,7 +194,7 @@ class TestAsyncDelegation: async def test_check_delegation_not_found(self, delegation_mocks): mod, *_ = delegation_mocks - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): result = await fn.ainvoke({"task_id": "nonexistent"}) else: @@ -354,7 +354,7 @@ class TestA2AQueued: class TestQueuedLazyRefresh: - """When a delegation is QUEUED, check_delegation_status must lazily + """When a delegation is QUEUED, check_task_status must lazily refresh from the platform's GET /delegations to pick up drain-stitch completions. Without this refresh, the LLM sees "queued" forever because the platform never pushes back to the runtime. @@ -401,7 +401,7 @@ class TestQueuedLazyRefresh: refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) with patch("httpx.AsyncClient", refresh_cls): - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): refreshed = await fn.ainvoke({"task_id": task_id}) else: @@ -443,7 +443,7 @@ class TestQueuedLazyRefresh: refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) with patch("httpx.AsyncClient", refresh_cls): - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): refreshed = await fn.ainvoke({"task_id": task_id}) else: @@ -486,7 +486,7 @@ class TestQueuedLazyRefresh: refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) with patch("httpx.AsyncClient", refresh_cls): - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): refreshed = await fn.ainvoke({"task_id": task_id}) else: @@ -515,7 +515,7 @@ class TestQueuedLazyRefresh: refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False) with patch("httpx.AsyncClient", refresh_cls): - fn = mod.check_delegation_status + fn = mod.check_task_status if hasattr(fn, "ainvoke"): refreshed = await fn.ainvoke({"task_id": task_id}) else: diff --git a/workspace/tests/test_executor_helpers.py b/workspace/tests/test_executor_helpers.py index 75869be2..884d9245 100644 --- a/workspace/tests/test_executor_helpers.py +++ b/workspace/tests/test_executor_helpers.py @@ -438,9 +438,12 @@ def test_get_system_prompt_handles_non_utf8(tmp_path): def test_get_a2a_instructions_mcp_default(): out = get_a2a_instructions() - assert "MCP tools" in out + # Section heading is the canonical agent-facing label. + assert "## Inter-Agent Communication" in out + # Every A2A tool from the registry must appear by name. assert "list_peers" in out assert "send_message_to_user" in out + assert "delegate_task" in out def test_get_a2a_instructions_cli_variant(): @@ -468,32 +471,27 @@ def test_a2a_cli_instructions_use_module_invocation_not_legacy_app_path(): def test_a2a_mcp_instructions_reference_existing_tools(): - """The MCP instructions text must only reference tools that are actually - registered in a2a_mcp_server.py. If someone renames a server tool, the - prompt text must be updated in lockstep — this test catches the drift. + """Pin the registry-driven alignment: every tool name appearing in the + agent-facing A2A instructions must be a tool the MCP server actually + registers. Both sides now derive from platform_tools.registry, so the + real test is that the registry's a2a_tools() set drives both surfaces + consistently. """ - import re - import pathlib - mcp_server = pathlib.Path(__file__).parent.parent / "a2a_mcp_server.py" - registered = set(re.findall(r'"name":\s*"([a-z_]+)"', mcp_server.read_text())) - # The server advertises itself by name; strip that false positive. - registered.discard("a2a-delegation") + from a2a_mcp_server import TOOLS as MCP_TOOLS + from platform_tools.registry import a2a_tools + registered = {t["name"] for t in MCP_TOOLS} instructions = get_a2a_instructions(mcp=True) - # Every tool called out by name in the instructions must exist on the - # server. (We allow the server to have extras the prompt doesn't mention.) - referenced = { - "list_peers", - "delegate_task", - "delegate_task_async", - "check_task_status", - "get_workspace_info", - "send_message_to_user", - } - for name in referenced: - assert name in instructions, f"prompt missing {name}" - assert name in registered, f"MCP server no longer registers {name}" + for spec in a2a_tools(): + assert spec.name in instructions, ( + f"A2A instructions are missing the tool {spec.name!r} that " + f"the registry declares — the doc generator drifted." + ) + assert spec.name in registered, ( + f"MCP server no longer registers {spec.name!r} that the registry " + f"declares — the MCP TOOLS list drifted from the registry." + ) # ====================================================================== diff --git a/workspace/tests/test_memory.py b/workspace/tests/test_memory.py index 3e587a8c..cd6736b7 100644 --- a/workspace/tests/test_memory.py +++ b/workspace/tests/test_memory.py @@ -98,7 +98,7 @@ def test_commit_memory_uses_awareness_client_when_configured(monkeypatch, memory assert captured["json"] == {"content": "remember this", "scope": "TEAM"} -def test_search_memory_uses_platform_fallback_without_awareness(monkeypatch, memory_modules): +def test_recall_memory_uses_platform_fallback_without_awareness(monkeypatch, memory_modules): memory, _awareness_client = memory_modules captured = {} @@ -119,7 +119,7 @@ def test_search_memory_uses_platform_fallback_without_awareness(monkeypatch, mem monkeypatch.setattr(memory.httpx, "AsyncClient", FakeAsyncClient) - result = asyncio.run(memory.search_memory("status", "local")) + result = asyncio.run(memory.recall_memory("status", "local")) assert result == { "success": True, @@ -236,10 +236,10 @@ def test_commit_memory_promoted_packet_logs_skill_promotion(monkeypatch, tmp_pat assert not (tmp_path / "skills").exists() -def test_search_memory_rejects_invalid_scope(memory_modules): +def test_recall_memory_rejects_invalid_scope(memory_modules): memory, _awareness_client = memory_modules - result = asyncio.run(memory.search_memory("status", "bad")) + result = asyncio.run(memory.recall_memory("status", "bad")) assert result == {"error": "scope must be LOCAL, TEAM, GLOBAL, or empty"} @@ -457,15 +457,15 @@ def test_commit_memory_result_failure(memory_modules_with_mocks): # --------------------------------------------------------------------------- -# search_memory — RBAC deny +# recall_memory — RBAC deny # --------------------------------------------------------------------------- -def test_search_memory_rbac_deny(memory_modules_with_mocks): +def test_recall_memory_rbac_deny(memory_modules_with_mocks): memory, mock_audit, _ = memory_modules_with_mocks mock_audit.check_permission.return_value = False mock_audit.get_workspace_roles.return_value = (["read-only-special"], {}) - result = asyncio.run(memory.search_memory("find something", "local")) + result = asyncio.run(memory.recall_memory("find something", "local")) assert result["success"] is False assert "RBAC" in result["error"] @@ -473,22 +473,22 @@ def test_search_memory_rbac_deny(memory_modules_with_mocks): # --------------------------------------------------------------------------- -# search_memory — invalid scope +# recall_memory — invalid scope # --------------------------------------------------------------------------- -def test_search_memory_invalid_scope(memory_modules_with_mocks): +def test_recall_memory_invalid_scope(memory_modules_with_mocks): memory, _mock_audit, _ = memory_modules_with_mocks - result = asyncio.run(memory.search_memory("q", "BAD")) + result = asyncio.run(memory.recall_memory("q", "BAD")) assert result == {"error": "scope must be LOCAL, TEAM, GLOBAL, or empty"} # --------------------------------------------------------------------------- -# search_memory — awareness_client success +# recall_memory — awareness_client success # --------------------------------------------------------------------------- -def test_search_memory_awareness_client_success(memory_modules_with_mocks): +def test_recall_memory_awareness_client_success(memory_modules_with_mocks): from unittest.mock import AsyncMock, MagicMock memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks @@ -501,7 +501,7 @@ def test_search_memory_awareness_client_success(memory_modules_with_mocks): # Patch directly on the loaded module since it imported the name at load time memory.build_awareness_client = MagicMock(return_value=mock_ac) - result = asyncio.run(memory.search_memory("find", "team")) + result = asyncio.run(memory.recall_memory("find", "team")) assert result["success"] is True assert result["count"] == 2 @@ -509,10 +509,10 @@ def test_search_memory_awareness_client_success(memory_modules_with_mocks): # --------------------------------------------------------------------------- -# search_memory — awareness_client raises +# recall_memory — awareness_client raises # --------------------------------------------------------------------------- -def test_search_memory_awareness_client_exception(memory_modules_with_mocks): +def test_recall_memory_awareness_client_exception(memory_modules_with_mocks): from unittest.mock import AsyncMock, MagicMock memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks @@ -521,7 +521,7 @@ def test_search_memory_awareness_client_exception(memory_modules_with_mocks): # Patch directly on the loaded module since it imported the name at load time memory.build_awareness_client = MagicMock(return_value=mock_ac) - result = asyncio.run(memory.search_memory("query", "local")) + result = asyncio.run(memory.recall_memory("query", "local")) assert result["success"] is False assert "awareness search failed" in result["error"] @@ -530,10 +530,10 @@ def test_search_memory_awareness_client_exception(memory_modules_with_mocks): # --------------------------------------------------------------------------- -# search_memory — httpx 200 success (no awareness_client) +# recall_memory — httpx 200 success (no awareness_client) # --------------------------------------------------------------------------- -def test_search_memory_httpx_200_success(memory_modules_with_mocks): +def test_recall_memory_httpx_200_success(memory_modules_with_mocks): memory, _mock_audit, _ = memory_modules_with_mocks class FakeAsyncClient: @@ -545,7 +545,7 @@ def test_search_memory_httpx_200_success(memory_modules_with_mocks): memory.httpx.AsyncClient = FakeAsyncClient - result = asyncio.run(memory.search_memory("find", "global")) + result = asyncio.run(memory.recall_memory("find", "global")) assert result["success"] is True assert result["count"] == 2 @@ -553,10 +553,10 @@ def test_search_memory_httpx_200_success(memory_modules_with_mocks): # --------------------------------------------------------------------------- -# search_memory — httpx non-200 +# recall_memory — httpx non-200 # --------------------------------------------------------------------------- -def test_search_memory_httpx_non_200(memory_modules_with_mocks): +def test_recall_memory_httpx_non_200(memory_modules_with_mocks): memory, mock_audit, _ = memory_modules_with_mocks class FakeAsyncClient: @@ -568,17 +568,17 @@ def test_search_memory_httpx_non_200(memory_modules_with_mocks): memory.httpx.AsyncClient = FakeAsyncClient - result = asyncio.run(memory.search_memory("q", "")) + result = asyncio.run(memory.recall_memory("q", "")) assert result["success"] is False assert "server error" in result["error"] # --------------------------------------------------------------------------- -# search_memory — httpx raises +# recall_memory — httpx raises # --------------------------------------------------------------------------- -def test_search_memory_httpx_exception(memory_modules_with_mocks): +def test_recall_memory_httpx_exception(memory_modules_with_mocks): memory, mock_audit, _ = memory_modules_with_mocks class FakeAsyncClient: @@ -590,7 +590,7 @@ def test_search_memory_httpx_exception(memory_modules_with_mocks): memory.httpx.AsyncClient = FakeAsyncClient - result = asyncio.run(memory.search_memory("query", "local")) + result = asyncio.run(memory.recall_memory("query", "local")) assert result["success"] is False assert "request timed out" in result["error"] @@ -672,7 +672,7 @@ def test_commit_memory_awareness_exception_span_record_fails(memory_modules_with assert result["success"] is False # error propagated despite span failure -def test_search_memory_awareness_exception_span_record_fails(memory_modules_with_mocks): +def test_recall_memory_awareness_exception_span_record_fails(memory_modules_with_mocks): """awareness_client.search raises + span.record_exception also raises: error still returned.""" from unittest.mock import AsyncMock, MagicMock memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks @@ -685,7 +685,7 @@ def test_search_memory_awareness_exception_span_record_fails(memory_modules_with mock_ac.search = AsyncMock(side_effect=RuntimeError("awareness down")) memory.build_awareness_client = MagicMock(return_value=mock_ac) - result = asyncio.run(memory.search_memory("test", "local")) + result = asyncio.run(memory.recall_memory("test", "local")) assert result["success"] is False @@ -711,8 +711,8 @@ def test_commit_memory_httpx_exception_span_record_fails(memory_modules_with_moc assert result["success"] is False -def test_search_memory_httpx_exception_span_record_fails(memory_modules_with_mocks): - """httpx raises in search_memory + span.record_exception also raises: error still returned.""" +def test_recall_memory_httpx_exception_span_record_fails(memory_modules_with_mocks): + """httpx raises in recall_memory + span.record_exception also raises: error still returned.""" from unittest.mock import MagicMock memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks @@ -729,7 +729,7 @@ def test_search_memory_httpx_exception_span_record_fails(memory_modules_with_moc memory.httpx.AsyncClient = FakeAsyncClient - result = asyncio.run(memory.search_memory("query", "local")) + result = asyncio.run(memory.recall_memory("query", "local")) assert result["success"] is False diff --git a/workspace/tests/test_platform_tools.py b/workspace/tests/test_platform_tools.py new file mode 100644 index 00000000..6c375f0f --- /dev/null +++ b/workspace/tests/test_platform_tools.py @@ -0,0 +1,123 @@ +"""Structural alignment tests — every adapter must agree with the registry. + +The registry in workspace/platform_tools/registry.py is the single source +of truth for tool naming + docs. These tests fail if any consumer +(MCP server, LangChain @tool wrappers, doc generators) drifts. + +If you add a tool: append a ToolSpec to registry.TOOLS, then add the +matching @tool wrapper in builtin_tools/. These tests catch the case +where the registry has a name that has no LangChain @tool counterpart +(or vice versa). + +If you rename a tool: edit registry.TOOLS only. These tests fail loudly +if the LangChain @tool name or MCP TOOLS["name"] still has the old name. +""" + +from __future__ import annotations + +import pytest + +from platform_tools.registry import TOOLS, a2a_tools, by_name, memory_tools, tool_names + + +def test_registry_names_are_unique(): + """Every ToolSpec must have a distinct name — duplicate is a typo.""" + names = tool_names() + assert len(names) == len(set(names)), f"duplicate tool names: {names}" + + +def test_registry_a2a_and_memory_partition_is_complete(): + """Every tool belongs to exactly one section. No orphans.""" + a2a = {t.name for t in a2a_tools()} + mem = {t.name for t in memory_tools()} + all_names = set(tool_names()) + assert a2a | mem == all_names + assert not (a2a & mem), f"tool in both sections: {a2a & mem}" + + +def test_by_name_lookup_works(): + spec = by_name("delegate_task") + assert spec.name == "delegate_task" + assert spec.section == "a2a" + with pytest.raises(KeyError): + by_name("nonexistent_tool") + + +def test_mcp_server_registers_every_registry_tool(): + """The MCP server's TOOLS list is built from the registry. Every + spec must produce a corresponding entry — if not, the import-time + list comprehension is broken or the registry has an entry the + server isn't picking up. + """ + from a2a_mcp_server import TOOLS as MCP_TOOLS + + mcp_names = {t["name"] for t in MCP_TOOLS} + registry_names = set(tool_names()) + assert mcp_names == registry_names, ( + f"MCP and registry diverged. MCP-only: {mcp_names - registry_names}; " + f"registry-only: {registry_names - mcp_names}" + ) + + +def test_mcp_tool_descriptions_match_registry_short(): + """Each MCP tool's description IS the registry's `short` field — + the bullet-line description shown to the model. The deeper + when_to_use guidance lives only in the system prompt. + """ + from a2a_mcp_server import TOOLS as MCP_TOOLS + + by_mcp_name = {t["name"]: t for t in MCP_TOOLS} + for spec in TOOLS: + assert by_mcp_name[spec.name]["description"] == spec.short, ( + f"MCP description for {spec.name!r} drifted from registry.short. " + f"Edit registry.py, not the MCP server's TOOLS list." + ) + + +def test_mcp_tool_input_schemas_match_registry(): + """Schemas must come from the registry, never duplicated in the server.""" + from a2a_mcp_server import TOOLS as MCP_TOOLS + + by_mcp_name = {t["name"]: t for t in MCP_TOOLS} + for spec in TOOLS: + assert by_mcp_name[spec.name]["inputSchema"] == spec.input_schema, ( + f"MCP inputSchema for {spec.name!r} drifted from registry." + ) + + +def test_a2a_instructions_text_includes_every_a2a_tool(): + """get_a2a_instructions must mention every a2a-section tool by name.""" + from executor_helpers import get_a2a_instructions + + instructions = get_a2a_instructions(mcp=True) + for spec in a2a_tools(): + assert spec.name in instructions, ( + f"agent-facing A2A docs missing tool {spec.name!r} from registry" + ) + + +def test_hma_instructions_text_includes_every_memory_tool(): + """get_hma_instructions must mention every memory-section tool by name.""" + from executor_helpers import get_hma_instructions + + instructions = get_hma_instructions() + for spec in memory_tools(): + assert spec.name in instructions, ( + f"agent-facing HMA docs missing tool {spec.name!r} from registry" + ) + + +def test_old_pre_rename_names_not_present_in_docs(): + """Pre-rename names (delegate_to_workspace, search_memory, + check_delegation_status) must not leak back into the agent-facing + docs. They're not in the registry; their absence is the canonical + state. + """ + from executor_helpers import get_a2a_instructions, get_hma_instructions + + blob = get_a2a_instructions(mcp=True) + get_hma_instructions() + for stale in ("delegate_to_workspace", "search_memory", "check_delegation_status"): + assert stale not in blob, ( + f"pre-rename name {stale!r} leaked into docs — registry " + f"is the source of truth, not the doc generator." + ) diff --git a/workspace/tests/test_prompt.py b/workspace/tests/test_prompt.py index 5f868c81..5969de2b 100644 --- a/workspace/tests/test_prompt.py +++ b/workspace/tests/test_prompt.py @@ -202,7 +202,7 @@ def test_peer_capabilities_format(tmp_path): assert "## Your Peers" in result assert "**Echo Agent** (id: `peer-1`, status: online)" in result assert "Skills: echo, repeat" in result - assert "delegate_to_workspace" in result + assert "delegate_task_async" in result # peer-2 has no agent_card but DOES have a DB name + status — must # still render so coordinators can delegate to freshly-created peers # whose A2A discovery hasn't populated a card yet (regression of the