feat(platform): single-source-of-truth tool registry — adapters consume, no drift

Establishes workspace/platform_tools/registry.py as THE place tool
naming and docs live. Every consumer reads from it; nothing duplicates
the source. Closes the architectural gap behind the doc/tool drift
discussion 2026-04-28 — adding hundreds of future runtime SDK adapters
should not require touching tool names anywhere except the registry.

What the registry owns

  ToolSpec dataclass with: name, short (one-line description), when_to_use
  (multi-paragraph agent-facing usage guidance), input_schema (JSON Schema),
  impl (the actual coroutine in a2a_tools.py), section ('a2a' | 'memory').

  TOOLS list with 8 entries — delegate_task, delegate_task_async,
  check_task_status, list_peers, get_workspace_info, send_message_to_user,
  commit_memory, recall_memory.

What now reads from the registry

  - workspace/a2a_mcp_server.py
      The hardcoded TOOLS list (167 lines of hand-maintained dicts) is
      gone. Replaced with a 6-line list comprehension over the registry.
      MCP description = spec.short. inputSchema = spec.input_schema.

  - workspace/executor_helpers.py
      get_a2a_instructions(mcp=True) and get_hma_instructions() now
      GENERATE the agent-facing system-prompt text from the registry.
      Heading + per-tool bullet (spec.short) + per-tool when_to_use +
      a section-specific footer. No more hand-maintained instruction
      blocks that drift from reality.

  - workspace/builtin_tools/delegation.py
      Renamed delegate_to_workspace -> delegate_task_async to match
      registry. check_delegation_status -> check_task_status. Added
      sync delegate_task @tool wrapping a2a_tools.tool_delegate_task
      (was missing for LangChain runtimes — CP review Issue 3).

  - workspace/builtin_tools/memory.py
      Renamed search_memory -> recall_memory to match registry.

  - workspace/adapter_base.py, workspace/main.py
      Bundle all 7 core tools (was 6) into all_tools / base_tools.

  - workspace/coordinator.py, shared_runtime.py, policies/routing.py
      Updated system-prompt-text references to use the registry names.

Structural alignment tests

  workspace/tests/test_platform_tools.py — 9 tests pin every
  registry-to-adapter mapping:
    - registry names are unique
    - a2a + memory partition is complete (no orphans)
    - by_name lookup works
    - MCP server registers exactly the registry's tool set
    - MCP description equals registry.short for every tool
    - MCP inputSchema equals registry.input_schema for every tool
    - get_a2a_instructions text contains every a2a tool name
    - get_hma_instructions text contains every memory tool name
    - pre-rename names (delegate_to_workspace, search_memory,
      check_delegation_status) cannot leak back

  Adding a future tool means adding one ToolSpec; the test failure
  list tells the author exactly which adapter to update.

Adapter pattern for future SDK support

  When (e.g.) AutoGen or Pydantic AI gets adapters, the only work
  needed for tool surfacing is "wrap registry.TOOLS in your SDK's
  tool format." Names, descriptions, schemas, impl come from the
  registry — adapter author writes zero strings.

Why this needed to ship now

  PR #2237 (already in staging) injected MCP-world docs as the
  default system-prompt content. Without the registry, those docs
  said "delegate_task" while LangChain runtimes only had
  "delegate_to_workspace" — workers see docs for tools that don't
  exist (CP review Issue 1+3). PR #2239 was a tactical rename;
  this PR is the structural fix that prevents the same class of
  drift from recurring as new adapters ship.

  PR #2239 was closed in favor of this — same renames, plus the
  registry, plus structural tests. Single coherent change.

Tests: 1232 pass, 2 xfailed (pre-existing). 9 new in
test_platform_tools.py; 4 alignment tests in test_prompt.py from
#2237 still pass; original test_executor_helpers tests adapted to
the registry-driven world.

Refs: CP review Issues 1, 2, 3, 5; project memory
project_runtime_native_pluggable.md (platform owns A2A);
project memory feedback_doc_tool_alignment.md (this is the structural
fix for the tactical lesson).
This commit is contained in:
Hongming Wang 2026-04-28 17:11:36 -07:00
parent 3f99fede5a
commit e9a59cda3b
18 changed files with 731 additions and 297 deletions

View File

@ -13,6 +13,7 @@ Environment variables (set by the workspace container):
"""
import asyncio
import inspect
import json
import logging
import sys
@ -27,6 +28,7 @@ from a2a_tools import (
tool_recall_memory,
tool_send_message_to_user,
)
from platform_tools.registry import TOOLS as _PLATFORM_TOOL_SPECS
logger = logging.getLogger(__name__)
@ -45,158 +47,27 @@ from a2a_client import ( # noqa: F401, E402
from a2a_tools import report_activity # noqa: F401, E402
# --- Tool definitions (schemas) ---
#
# Built once at import time from the platform_tools registry. The MCP
# `description` field is the spec's `short` line — that's the unified
# tool description used by both the MCP tool listing AND the bullet
# rendering in the agent-facing system-prompt section. The deeper
# `when_to_use` guidance is appended to the system prompt only (it's
# too long to live in MCP `description` without bloating every
# tool-list response the model sees).
TOOLS = [
{
"name": "delegate_task",
"description": "Delegate a task to another workspace via A2A protocol and WAIT for the response. Use for quick tasks. The target must be a peer (sibling or parent/child). Use list_peers to find available targets.",
"inputSchema": {
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "Target workspace ID (from list_peers)",
},
"task": {
"type": "string",
"description": "The task description to send to the target workspace",
},
},
"required": ["workspace_id", "task"],
},
},
{
"name": "delegate_task_async",
"description": "Send a task to another workspace with a short timeout (fire-and-forget). Returns immediately — the target continues processing. Best when you don't need the result right away. Note: check_task_status may not work with all workspace implementations.",
"inputSchema": {
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "Target workspace ID (from list_peers)",
},
"task": {
"type": "string",
"description": "The task description to send to the target workspace",
},
},
"required": ["workspace_id", "task"],
},
},
{
"name": "check_task_status",
"description": "Check the status of a previously submitted async task via tasks/get. Note: only works if the target workspace's A2A implementation supports task persistence. May return 'not found' for completed tasks.",
"inputSchema": {
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "The workspace ID the task was sent to",
},
"task_id": {
"type": "string",
"description": "The task_id returned by delegate_task_async",
},
},
"required": ["workspace_id", "task_id"],
},
},
{
"name": "list_peers",
"description": "List all workspaces this agent can communicate with (siblings and parent/children). Returns name, ID, status, and role for each peer.",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "get_workspace_info",
"description": "Get this workspace's own info — ID, name, role, tier, parent, status.",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "send_message_to_user",
"description": "Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.",
"inputSchema": {
"type": "object",
"properties": {
"message": {
"type": "string",
# The "no URLs in message text" rule is the single biggest
# cause of bad chat UX: agents drop catbox.moe / file://
# / temporary upload-host links into the prose, the
# canvas renders them as plain markdown links the user
# can't preview, and SaaS deployments often can't even
# reach those external hosts. Every download MUST go
# through the structured `attachments` field below.
"description": (
"Caption text for the chat bubble. Required even when sending "
"attachments — set to a short label like 'Here's the build:' "
"or 'Done — see attached.'\n\n"
"DO NOT paste file URLs, download links, or container paths in "
"this string. Files MUST go through the `attachments` field, "
"which renders as a clickable download chip and works on SaaS "
"deployments where external file-host URLs (catbox.moe, file://, "
"etc.) are unreachable from the user's browser."
),
},
"attachments": {
"type": "array",
"description": (
"REQUIRED for any file delivery. Pass absolute file paths inside "
"THIS container (e.g. ['/tmp/build.zip', '/workspace/report.pdf']) "
"— the platform uploads each file and returns a download chip "
"with the file's icon + name + size in the user's chat. The chip "
"works in SaaS deployments because the URL is platform-served, "
"not an external host.\n\n"
"USE THIS instead of: pasting URLs in `message`, base64-encoding "
"in the body, or telling the user to look at a path on disk. "
"If the file isn't already on disk, write it first (Bash, Write "
"tool, etc.) then pass its path here. 25 MB per file cap."
),
"items": {"type": "string"},
},
},
"required": ["message"],
},
},
{
"name": "commit_memory",
"description": "Append a new memory row to persistent storage. Each call CREATES a row — does not overwrite existing memories with the same content. Use to remember decisions, task results, and context that should survive a restart. Scope: LOCAL (this workspace only), TEAM (parent + siblings), GLOBAL (entire org). GLOBAL writes require tier-0 (root) workspace; lower-tier callers get an RBAC error.",
"inputSchema": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The information to remember — be detailed and specific",
},
"scope": {
"type": "string",
"enum": ["LOCAL", "TEAM", "GLOBAL"],
"description": "Memory scope (default: LOCAL)",
},
},
"required": ["content"],
},
},
{
"name": "recall_memory",
"description": "Substring-search persistent memory and return ALL matching rows (no pagination). Empty query returns every memory accessible at the given scope. Server-side filter is case-insensitive substring match on `content`. Use at the start of conversations to recall prior context — calling once with empty query is cheap and avoids missing relevant memories that don't match a narrow keyword.",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (empty returns all memories)",
},
"scope": {
"type": "string",
"enum": ["LOCAL", "TEAM", "GLOBAL", ""],
"description": "Filter by scope (empty returns all accessible)",
},
},
},
},
"name": _spec.name,
"description": _spec.short,
"inputSchema": _spec.input_schema,
}
for _spec in _PLATFORM_TOOL_SPECS
]
# --- Tool dispatch ---
async def handle_tool_call(name: str, arguments: dict) -> str:

View File

@ -421,8 +421,8 @@ class BaseAdapter(ABC):
from coordinator import get_children, get_parent_context, build_children_description
from prompt import build_system_prompt, get_peer_capabilities, get_platform_instructions
from builtin_tools.approval import request_approval
from builtin_tools.delegation import delegate_to_workspace, check_delegation_status
from builtin_tools.memory import commit_memory, search_memory
from builtin_tools.delegation import delegate_task, delegate_task_async, check_task_status
from builtin_tools.memory import commit_memory, recall_memory
from builtin_tools.sandbox import run_code
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
@ -455,8 +455,14 @@ class BaseAdapter(ABC):
seen_skill_ids.add(skill.metadata.id)
logger.info(f"Loaded {len(loaded_skills)} skills: {[s.metadata.id for s in loaded_skills]}")
# Assemble tools: 6 core + skill tools
all_tools = [delegate_to_workspace, check_delegation_status, request_approval, commit_memory, search_memory, run_code]
# Core platform tools — names mirror the platform_tools registry,
# so the names referenced in get_a2a_instructions/get_hma_instructions
# are guaranteed to exist as @tool symbols here. The structural
# alignment test in tests/test_platform_tools.py pins this.
all_tools = [
delegate_task, delegate_task_async, check_task_status,
request_approval, commit_memory, recall_memory, run_code,
]
for skill in loaded_skills:
all_tools.extend(skill.tools)

View File

@ -2,7 +2,7 @@
Delegations are non-blocking: the tool fires the A2A request in the background
and returns immediately with a task_id. The agent can check status anytime via
check_delegation_status, or just continue working and check later.
check_task_status, or just continue working and check later.
When the delegate responds, the result is stored and the agent is notified
via a status update.
@ -44,7 +44,7 @@ class DelegationStatus(str, Enum):
# The reply will arrive via the platform's stitch path when the
# peer finishes its current work. The LLM should WAIT, not retry,
# and definitely not fall back to doing the work itself — see the
# check_delegation_status docstring for the prompt-side guidance.
# check_task_status docstring for the prompt-side guidance.
QUEUED = "queued"
COMPLETED = "completed"
FAILED = "failed"
@ -110,7 +110,7 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str,
Best-effort POST to /workspaces/<self>/delegations/record. The agent still
fires A2A directly for speed + OTEL propagation, but the platform's
GET /delegations endpoint now mirrors the same set an agent's local
check_delegation_status sees.
check_task_status sees.
"""
try:
async with httpx.AsyncClient(timeout=10) as client:
@ -129,11 +129,11 @@ async def _record_delegation_on_platform(task_id: str, target_workspace_id: str,
async def _refresh_queued_from_platform(task_id: str) -> bool:
"""Lazy-refresh a QUEUED delegation's local state from the platform.
Called by check_delegation_status when local status is QUEUED. The
Called by check_task_status when local status is QUEUED. The
platform's drain stitch (a2a_queue.go) updates the delegate_result
activity_logs row when a queued delegation eventually completes,
but it has no callback to this runtime without this lazy refresh,
the LLM polling check_delegation_status would see "queued" forever
the LLM polling check_task_status would see "queued" forever
even after the platform has the result.
Returns True if the local delegation was updated to a terminal state
@ -215,7 +215,7 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str):
delegation.status = DelegationStatus.IN_PROGRESS
# #64: register on the platform so GET /workspaces/<self>/delegations
# sees the same set as check_delegation_status. Best-effort — platform
# sees the same set as check_task_status. Best-effort — platform
# unreachability must not block the actual A2A delegation.
await _record_delegation_on_platform(task_id, workspace_id, task)
@ -286,7 +286,7 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str):
# accepted the request but the peer's runtime is
# mid-task. Platform-side drain will deliver the
# reply asynchronously. Mark QUEUED locally so
# check_delegation_status can surface that state
# check_task_status can surface that state
# to the LLM with explicit "wait, don't bypass"
# guidance. Do NOT mark FAILED — the request is
# alive in the platform's queue, not lost.
@ -371,14 +371,36 @@ async def _execute_delegation(task_id: str, workspace_id: str, task: str):
@tool
async def delegate_to_workspace(
async def delegate_task(
workspace_id: str,
task: str,
) -> str:
"""Delegate a task to a peer workspace via A2A and WAIT for the response.
Synchronous variant blocks until the peer replies (or the platform's
A2A round-trip times out). Use this for QUICK questions and small
sub-tasks where you can afford to wait inline.
For longer-running work (research, multi-minute jobs) use
delegate_task_async + check_task_status instead so you don't hold
this workspace busy waiting.
Tool name + description are sourced from the platform_tools registry
a single ToolSpec drives MCP, LangChain, and system-prompt docs.
"""
from a2a_tools import tool_delegate_task
return await tool_delegate_task(workspace_id, task)
@tool
async def delegate_task_async(
workspace_id: str,
task: str,
) -> dict:
"""Delegate a task to a peer workspace via A2A protocol (non-blocking).
Sends the task in the background and returns immediately with a task_id.
Use check_delegation_status to poll for the result, or continue working
Use check_task_status to poll for the result, or continue working
and check later. The delegate works independently.
Args:
@ -386,7 +408,7 @@ async def delegate_to_workspace(
task: The task description to send to the peer.
Returns:
A dict with task_id and status="delegated". Use check_delegation_status(task_id) to get results.
A dict with task_id and status="delegated". Use check_task_status(task_id) to get results.
"""
task_id = str(uuid.uuid4())
@ -417,12 +439,12 @@ async def delegate_to_workspace(
"success": True,
"task_id": task_id,
"status": "delegated",
"message": f"Task delegated to {workspace_id}. Use check_delegation_status('{task_id}') to get the result when ready.",
"message": f"Task delegated to {workspace_id}. Use check_task_status('{task_id}') to get the result when ready.",
}
@tool
async def check_delegation_status(
async def check_task_status(
task_id: str = "",
) -> dict:
"""Check the status of a delegated task, or list all active delegations.
@ -434,7 +456,7 @@ async def check_delegation_status(
processing a prior task. The reply WILL arrive the platform's
drain re-dispatches when the peer is free. This tool transparently
polls the platform for the eventual outcome on each call, so
keep polling check_delegation_status periodically and you'll see
keep polling check_task_status periodically and you'll see
the status flip to "completed" / "failed" automatically.
Do NOT retry the delegation. Do NOT do the work yourself.
Acknowledge to the user that the peer is busy and will reply,
@ -445,7 +467,7 @@ async def check_delegation_status(
yourself if status is "failed", never if status is "queued".
Args:
task_id: The task_id returned by delegate_to_workspace. If empty, lists all delegations.
task_id: The task_id returned by delegate_task_async. If empty, lists all delegations.
Returns:
Status and result (if completed) of the delegation.

View File

@ -8,7 +8,7 @@ Hierarchical Memory Architecture:
RBAC enforcement
----------------
``commit_memory`` requires the ``"memory.write"`` action.
``search_memory`` requires the ``"memory.read"`` action.
``recall_memory`` requires the ``"memory.read"`` action.
Roles are read from ``config.yaml`` under ``rbac.roles`` (default: operator).
Audit trail
@ -188,7 +188,7 @@ async def commit_memory(content: str, scope: str = "LOCAL") -> dict:
@tool
async def search_memory(query: str = "", scope: str = "") -> dict:
async def recall_memory(query: str = "", scope: str = "") -> dict:
"""Search stored memories.
Args:

View File

@ -81,7 +81,7 @@ def build_children_description(children: list[dict]) -> str:
children,
heading="## Your Team (sub-workspaces you coordinate)",
instruction=(
"Use the `delegate_to_workspace` tool to send tasks to the chosen member. "
"Use the `delegate_task_async` tool to send tasks to the chosen member. "
"Only delegate to members listed above."
),
)
@ -92,7 +92,7 @@ def build_children_description(children: list[dict]) -> str:
"",
"### Coordination Rules — MANDATORY",
"1. You are a COORDINATOR. Your ONLY job is to delegate and synthesize. NEVER do the work yourself.",
"2. For EVERY task, use `delegate_to_workspace` to send it to the appropriate team member(s). "
"2. For EVERY task, use `delegate_task_async` to send it to the appropriate team member(s). "
"Do this BEFORE writing any analysis, code, or research yourself.",
"3. If a task spans multiple members, delegate to ALL of them in parallel and aggregate results.",
"4. If ALL members are offline/paused, tell the caller which members are unavailable. "
@ -120,7 +120,7 @@ async def route_task_to_team(
task: The task description to route.
preferred_member_id: Optional directly delegate to this member.
"""
from builtin_tools.delegation import delegate_to_workspace as delegate
from builtin_tools.delegation import delegate_task_async as delegate
children = await get_children()
decision = build_team_routing_payload(

View File

@ -273,29 +273,19 @@ def get_system_prompt(config_path: str, fallback: str | None = None) -> str | No
return fallback
_A2A_INSTRUCTIONS_MCP = """## Inter-Agent Communication
You have MCP tools for communicating with other workspaces:
- list_peers: discover available peer workspaces (name, ID, status, role)
- delegate_task: send a task and WAIT for the response (for quick tasks)
- delegate_task_async: send a task and return immediately with a task_id (for long tasks)
- check_task_status: poll an async task's status and get results when done
- get_workspace_info: get your own workspace info
For quick questions, use delegate_task (synchronous).
For long-running work (building pages, running audits), use delegate_task_async + check_task_status.
Always use list_peers first to discover available workspace IDs.
Access control is enforced you can only reach siblings and parent/children.
PROACTIVE MESSAGING: Use send_message_to_user to push messages to the user's chat at ANY time:
- Acknowledge tasks immediately: "Got it, delegating to the team now..."
- Send progress updates during long work: "Research Lead finished, waiting on Dev Lead..."
- Deliver follow-up results: "All teams reported back. Here's the synthesis: ..."
This lets you respond quickly ("I'll work on this") and come back later with results.
If delegate_task returns a DELEGATION FAILED message, do NOT forward the raw error to the user.
Instead: (1) try delegating to a different peer, (2) handle the task yourself, or
(3) tell the user which peer is unavailable and provide your own best answer."""
# Tool-usage instructions for system-prompt injection. Generated from
# the platform_tools registry — every tool name, description, and usage
# guidance comes from the canonical ToolSpec. Adding/renaming a tool in
# registry.py automatically flows through here.
_A2A_FOOTER = (
"Always use list_peers first to discover available workspace IDs. "
"Access control is enforced — you can only reach siblings and parent/children. "
"If a delegation returns a DELEGATION FAILED message, do NOT forward "
"the raw error to the user. Instead: (1) try a different peer, "
"(2) handle the task yourself, or (3) tell the user which peer is "
"unavailable and provide your own best answer."
)
_A2A_INSTRUCTIONS_CLI = """## Inter-Agent Communication
You can delegate tasks to other workspaces using the a2a command:
@ -309,39 +299,55 @@ For quick questions, use sync delegate. For long tasks, use --async + status.
Only delegate to peers listed by the peers command (access control enforced)."""
def _render_section(heading: str, specs, footer: str = "") -> str:
"""Render a section: heading, per-tool bullet, per-tool when_to_use, footer."""
parts = [heading, ""]
for spec in specs:
parts.append(f"- **{spec.name}**: {spec.short}")
parts.append("")
for spec in specs:
parts.append(f"### {spec.name}")
parts.append(spec.when_to_use)
parts.append("")
if footer:
parts.append(footer)
return "\n".join(parts).rstrip() + "\n"
def get_a2a_instructions(mcp: bool = True) -> str:
"""Return inter-agent communication instructions for system-prompt injection.
Pass `mcp=True` (default) for MCP-capable runtimes (Claude Code via SDK,
Codex). Pass `mcp=False` for CLI-only runtimes (Ollama, custom) that have
to call a2a_cli.py as a subprocess.
Generated from the platform_tools registry. Pass `mcp=True` (default)
for MCP-capable runtimes (claude-code, hermes, langchain, crewai).
Pass `mcp=False` for CLI-only runtimes (ollama, custom subprocess
runtimes that don't speak MCP) — those get a static block describing
the molecule_runtime.a2a_cli subprocess interface instead.
"""
return _A2A_INSTRUCTIONS_MCP if mcp else _A2A_INSTRUCTIONS_CLI
_HMA_INSTRUCTIONS = """## Hierarchical Memory (HMA)
You have persistent memory tools that survive across sessions and restarts:
- **commit_memory(content, scope)**: Save important information.
- LOCAL: private to you only (default)
- TEAM: shared with your parent workspace and siblings (same team)
- GLOBAL: shared with the entire org (only root workspaces can write)
- **recall_memory(query)**: Search your accessible memories. Returns LOCAL + TEAM + GLOBAL matches.
**When to use memory:**
- After making a decision or learning something non-obvious commit_memory("decision X because Y", scope="TEAM")
- Before starting work recall_memory("what did the team decide about X")
- When you discover org-wide knowledge (repo locations, API patterns, conventions) commit_memory(fact, scope="GLOBAL") if you are a root workspace, or scope="TEAM" to share with your team
- After completing a task commit_memory("completed task X, PR #N opened", scope="TEAM") so your lead and teammates know
**Memory is automatically recalled** at the start of each new session. Use it proactively during work to share context.
"""
if not mcp:
return _A2A_INSTRUCTIONS_CLI
from platform_tools.registry import a2a_tools
return _render_section(
"## Inter-Agent Communication",
a2a_tools(),
footer=_A2A_FOOTER,
)
def get_hma_instructions() -> str:
"""Return HMA memory instructions for system-prompt injection."""
return _HMA_INSTRUCTIONS
"""Return HMA persistent-memory instructions for system-prompt injection.
Generated from the platform_tools registry.
"""
from platform_tools.registry import memory_tools
return _render_section(
"## Hierarchical Memory (HMA)",
memory_tools(),
footer=(
"Memory is automatically recalled at the start of each new "
"session. Use commit_memory proactively during work so future "
"sessions and teammates can recall what you learned."
),
)
# ========================================================================

View File

@ -337,11 +337,16 @@ async def main(): # pragma: no cover
# Rebuild the agent's tool list from updated skills
if hasattr(adapter, "all_tools") and hasattr(adapter, "system_prompt"):
from builtin_tools.approval import request_approval
from builtin_tools.delegation import delegate_to_workspace
from builtin_tools.memory import commit_memory, search_memory
from builtin_tools.delegation import delegate_task, delegate_task_async, check_task_status
from builtin_tools.memory import commit_memory, recall_memory
from builtin_tools.sandbox import run_code
base_tools = [delegate_to_workspace, request_approval,
commit_memory, search_memory, run_code]
# Core platform tools mirror adapter_base.all_tools — must
# match the platform_tools registry names so docs and tools
# never drift.
base_tools = [
delegate_task, delegate_task_async, check_task_status,
request_approval, commit_memory, recall_memory, run_code,
]
skill_tools = []
for sk in adapter.loaded_skills:
skill_tools.extend(sk.tools)

View File

@ -0,0 +1,13 @@
"""Platform tools — single source of truth for tool naming and docs.
The platform owns A2A and persistent-memory tooling (cross-cutting
runtime concerns per project memory project_runtime_native_pluggable.md).
Tools are defined ONCE in `registry.py`. Every adapter MCP server,
LangChain wrapper, any future SDK integration consumes the specs to
register the tool in its native format. Doc generators (system-prompt
injection, canvas help, future doc sites) read from the same place.
Adding a tool: append a ToolSpec to TOOLS in registry.py. Every
adapter picks it up automatically; structural tests fail if any side
drifts from the registry.
"""

View File

@ -0,0 +1,388 @@
"""Canonical registry of platform tool specs.
Every tool the platform offers to agents (A2A delegation, persistent
memory, broadcast, introspection) is defined ONCE in TOOLS below.
Adapters consume these specs to register the tool in their native
runtime format:
- a2a_mcp_server.py iterates `TOOLS` to build the MCP TOOLS list +
dispatches calls to spec.impl. No tool name or description is
hardcoded there.
- builtin_tools/{delegation,memory}.py define LangChain `@tool`
wrappers using `name=` from the spec; the wrapper body just
calls spec.impl.
- executor_helpers.get_a2a_instructions() / get_hma_instructions()
GENERATE the system-prompt doc string from `TOOLS` no
hand-maintained instruction text.
Adding a new tool: append a ToolSpec to `TOOLS` below. Every adapter
picks it up. Structural alignment tests (workspace/tests/test_platform_tools.py)
fail if any side drifts from the registry.
Renaming a tool: change `name` here. Search workspace/ for the old
literal in case any non-adapter consumer (tests, plugin code) hard-coded
it; update those manually. The grep is the audit, the test is the gate.
Removing a tool: delete the entry. Adapters stop registering it
automatically; doc generators stop mentioning it.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any, Literal
from a2a_tools import (
tool_check_task_status,
tool_commit_memory,
tool_delegate_task,
tool_delegate_task_async,
tool_get_workspace_info,
tool_list_peers,
tool_recall_memory,
tool_send_message_to_user,
)
# Section name maps to the heading in the agent-facing system prompt.
# Adding a new section: add a constant + create a corresponding
# generator in executor_helpers (or generalize get_*_instructions).
A2A_SECTION = "a2a"
MEMORY_SECTION = "memory"
Section = Literal["a2a", "memory"]
@dataclass(frozen=True)
class ToolSpec:
"""Runtime-agnostic definition of one platform tool.
Each adapter (MCP, LangChain, future SDK) consumes the same spec.
Doc generators consume the same spec. There is no other source
of truth for tool naming or description.
"""
name: str
"""The exact name agents see. MUST match every adapter's
registered name and the literal that appears in agent-facing
instruction docs. Structural test enforces this."""
short: str
"""One-line description. Used as the MCP `description` field
AND as the bullet line in agent-facing instruction docs."""
when_to_use: str
"""Two-to-three-sentence agent-facing usage guidance — when
to call this tool, what it returns, what NOT to confuse it
with. Concatenated into the system prompt below the tool list."""
input_schema: dict[str, Any]
"""JSON Schema for the tool's input parameters. Consumed
directly by the MCP server. LangChain derives its schema from
Python type annotations on the @tool function alignment is
pinned by the structural test."""
impl: Callable[..., Awaitable[str]]
"""The actual coroutine. Both adapters call this; only the
wrapping differs."""
section: Section
"""Which agent-prompt section this tool belongs to (controls
which instruction generator emits it)."""
# ---------------------------------------------------------------------------
# A2A — inter-agent communication & broadcast
# ---------------------------------------------------------------------------
_DELEGATE_TASK = ToolSpec(
name="delegate_task",
short=(
"Delegate a task to a peer workspace via A2A and WAIT for the "
"response (synchronous)."
),
when_to_use=(
"Use for QUICK questions and small sub-tasks where you can "
"afford to wait inline. Returns the peer's response text "
"directly. For longer-running work (research, multi-minute "
"jobs) use delegate_task_async + check_task_status instead "
"so you don't hold this workspace busy waiting."
),
input_schema={
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "Target workspace ID (from list_peers).",
},
"task": {
"type": "string",
"description": "Task description to send to the peer.",
},
},
"required": ["workspace_id", "task"],
},
impl=tool_delegate_task,
section=A2A_SECTION,
)
_DELEGATE_TASK_ASYNC = ToolSpec(
name="delegate_task_async",
short=(
"Send a task to a peer and return immediately with a task_id "
"(non-blocking)."
),
when_to_use=(
"Use for long-running work where you want to keep doing other "
"things while the peer processes. Poll with check_task_status "
"to retrieve the result. The platform's A2A queue handles "
"delivery + retries; the peer works independently."
),
input_schema={
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "Target workspace ID (from list_peers).",
},
"task": {
"type": "string",
"description": "Task description to send to the peer.",
},
},
"required": ["workspace_id", "task"],
},
impl=tool_delegate_task_async,
section=A2A_SECTION,
)
_CHECK_TASK_STATUS = ToolSpec(
name="check_task_status",
short=(
"Poll the status of a task started with delegate_task_async; "
"returns result when done."
),
when_to_use=(
"Statuses: pending/in_progress (peer still working — wait), "
"queued (peer is busy with a prior task — DO NOT retry, the "
"platform stitches the response when it finishes), completed "
"(result available), failed (real error — fall back to a "
"different peer or handle it yourself)."
),
input_schema={
"type": "object",
"properties": {
"workspace_id": {
"type": "string",
"description": "Workspace ID the task was sent to.",
},
"task_id": {
"type": "string",
"description": "task_id returned by delegate_task_async.",
},
},
"required": ["workspace_id", "task_id"],
},
impl=tool_check_task_status,
section=A2A_SECTION,
)
_LIST_PEERS = ToolSpec(
name="list_peers",
short=(
"List the workspaces this agent can communicate with — name, "
"ID, status, role for each."
),
when_to_use=(
"Call this first when you need to delegate but don't know the "
"target's ID. Access control is enforced — you only see "
"siblings, parent, and direct children."
),
input_schema={"type": "object", "properties": {}},
impl=tool_list_peers,
section=A2A_SECTION,
)
_GET_WORKSPACE_INFO = ToolSpec(
name="get_workspace_info",
short="Get this workspace's own info — ID, name, role, tier, parent, status.",
when_to_use=(
"Use to introspect your own identity (e.g. before reporting "
"back to the user, or to determine whether you're a tier-0 "
"root that can write GLOBAL memory)."
),
input_schema={"type": "object", "properties": {}},
impl=tool_get_workspace_info,
section=A2A_SECTION,
)
_SEND_MESSAGE_TO_USER = ToolSpec(
name="send_message_to_user",
short=(
"Send a message directly to the user's canvas chat — pushed instantly "
"via WebSocket. Use this to: (1) acknowledge a task immediately ('Got "
"it, I'll start working on this'), (2) send interim progress updates "
"while doing long work, (3) deliver follow-up results after delegation "
"completes, (4) attach files (zip, pdf, csv, image) for the user to "
"download via the `attachments` field (NEVER paste file URLs in "
"`message`). The message appears in the user's chat as if you're "
"proactively reaching out."
),
when_to_use=(
"Use proactively across the lifecycle of a task — early to "
"acknowledge, mid-flight to update, late to deliver. Never paste "
"file URLs in the message body — always pass absolute paths in "
"`attachments` so the platform serves them as download chips "
"(works on SaaS where external file hosts are unreachable)."
),
input_schema={
"type": "object",
"properties": {
"message": {
"type": "string",
# The "no URLs in message text" rule is the single biggest
# cause of bad chat UX: agents drop catbox.moe / file://
# / temporary upload-host links into the prose, the
# canvas renders them as plain markdown links the user
# can't preview, and SaaS deployments often can't even
# reach those external hosts. Every download MUST go
# through the structured `attachments` field below.
"description": (
"Caption text for the chat bubble. Required even when sending "
"attachments — set to a short label like 'Here's the build:' "
"or 'Done — see attached.'\n\n"
"DO NOT paste file URLs, download links, or container paths in "
"this string. Files MUST go through the `attachments` field, "
"which renders as a clickable download chip and works on SaaS "
"deployments where external file-host URLs (catbox.moe, file://, "
"etc.) are unreachable from the user's browser."
),
},
"attachments": {
"type": "array",
"description": (
"REQUIRED for any file delivery. Pass absolute file paths inside "
"THIS container (e.g. ['/tmp/build.zip', '/workspace/report.pdf']) "
"— the platform uploads each file and returns a download chip "
"with the file's icon + name + size in the user's chat. The chip "
"works in SaaS deployments because the URL is platform-served, "
"not an external host.\n\n"
"USE THIS instead of: pasting URLs in `message`, base64-encoding "
"in the body, or telling the user to look at a path on disk. "
"If the file isn't already on disk, write it first (Bash, Write "
"tool, etc.) then pass its path here. 25 MB per file cap."
),
"items": {"type": "string"},
},
},
"required": ["message"],
},
impl=tool_send_message_to_user,
section=A2A_SECTION,
)
# ---------------------------------------------------------------------------
# HMA — hierarchical persistent memory
# ---------------------------------------------------------------------------
_COMMIT_MEMORY = ToolSpec(
name="commit_memory",
short="Save a fact to persistent memory; survives across sessions and restarts.",
when_to_use=(
"Scopes: LOCAL (private to you, default), TEAM (shared with "
"parent + siblings), GLOBAL (entire org — only tier-0 root "
"workspaces can write). Commit decisions, learned facts, and "
"completed-task summaries so future sessions and teammates "
"can recall them."
),
input_schema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "What to remember — be specific.",
},
"scope": {
"type": "string",
"enum": ["LOCAL", "TEAM", "GLOBAL"],
"description": "Memory scope (default LOCAL).",
},
},
"required": ["content"],
},
impl=tool_commit_memory,
section=MEMORY_SECTION,
)
_RECALL_MEMORY = ToolSpec(
name="recall_memory",
short="Search persistent memory; returns matching LOCAL + TEAM + GLOBAL rows.",
when_to_use=(
"Call at the start of new work and when picking up something "
"you may have done before. Empty query returns ALL accessible "
"memories — cheap and avoids missing rows that don't match a "
"narrow keyword. Memory is automatically recalled at session "
"start; use this to refresh mid-session."
),
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query (empty returns all).",
},
"scope": {
"type": "string",
"enum": ["LOCAL", "TEAM", "GLOBAL", ""],
"description": "Filter by scope (empty = all accessible).",
},
},
},
impl=tool_recall_memory,
section=MEMORY_SECTION,
)
# ---------------------------------------------------------------------------
# Public registry. Keep alphabetically grouped by section for stable
# adapter listings + diff-friendly review.
# ---------------------------------------------------------------------------
TOOLS: list[ToolSpec] = [
# A2A
_DELEGATE_TASK,
_DELEGATE_TASK_ASYNC,
_CHECK_TASK_STATUS,
_LIST_PEERS,
_GET_WORKSPACE_INFO,
_SEND_MESSAGE_TO_USER,
# HMA
_COMMIT_MEMORY,
_RECALL_MEMORY,
]
def a2a_tools() -> list[ToolSpec]:
"""All A2A-section tools, in registration order."""
return [t for t in TOOLS if t.section == A2A_SECTION]
def memory_tools() -> list[ToolSpec]:
"""All memory-section tools, in registration order."""
return [t for t in TOOLS if t.section == MEMORY_SECTION]
def by_name(name: str) -> ToolSpec:
"""Look up a spec by its canonical name. Raises KeyError if absent."""
for t in TOOLS:
if t.name == name:
return t
raise KeyError(f"no platform tool named {name!r}")
def tool_names() -> list[str]:
"""Canonical names in registration order."""
return [t.name for t in TOOLS]

View File

@ -64,7 +64,7 @@ def build_team_routing_payload(
"action": "choose_member",
"message": (
f"You have {len(members)} team members. "
"Choose the best one for this task and call delegate_to_workspace with their ID."
"Choose the best one for this task and call delegate_task_async with their ID."
),
"task": task,
"members": members,

View File

@ -140,7 +140,7 @@ def build_peer_section(
*,
heading: str = "## Your Peers (workspaces you can delegate to)",
instruction: str = (
"Use the `delegate_to_workspace` tool to send tasks to peers. "
"Use the `delegate_task_async` tool to send tasks to peers. "
"Only delegate to peers listed above."
),
) -> str:

View File

@ -113,10 +113,12 @@ def _make_tools_mocks():
tools_mod.__path__ = [] # Make it a proper package
tools_delegation_mod = ModuleType("builtin_tools.delegation")
tools_delegation_mod.delegate_to_workspace = MagicMock()
tools_delegation_mod.delegate_to_workspace.name = "delegate_to_workspace"
tools_delegation_mod.check_delegation_status = MagicMock()
tools_delegation_mod.check_delegation_status.name = "check_delegation_status"
tools_delegation_mod.delegate_task = MagicMock()
tools_delegation_mod.delegate_task.name = "delegate_task"
tools_delegation_mod.delegate_task_async = MagicMock()
tools_delegation_mod.delegate_task_async.name = "delegate_task_async"
tools_delegation_mod.check_task_status = MagicMock()
tools_delegation_mod.check_task_status.name = "check_task_status"
tools_approval_mod = ModuleType("builtin_tools.approval")
tools_approval_mod.request_approval = MagicMock()
@ -125,8 +127,8 @@ def _make_tools_mocks():
tools_memory_mod = ModuleType("builtin_tools.memory")
tools_memory_mod.commit_memory = MagicMock()
tools_memory_mod.commit_memory.name = "commit_memory"
tools_memory_mod.search_memory = MagicMock()
tools_memory_mod.search_memory.name = "search_memory"
tools_memory_mod.recall_memory = MagicMock()
tools_memory_mod.recall_memory.name = "recall_memory"
tools_sandbox_mod = ModuleType("builtin_tools.sandbox")
tools_sandbox_mod.run_code = MagicMock()

View File

@ -28,7 +28,7 @@ async def test_route_task_to_team_delegates_preferred_member(monkeypatch):
delegate = MagicMock()
delegate.ainvoke = AsyncMock(return_value={"ok": True})
monkeypatch.setattr(sys.modules["builtin_tools.delegation"], "delegate_to_workspace", delegate)
monkeypatch.setattr(sys.modules["builtin_tools.delegation"], "delegate_task_async", delegate)
result = await coordinator.route_task_to_team(
"Do the thing",
@ -58,4 +58,4 @@ def test_build_children_description_reuses_shared_renderer():
assert "## Your Team (sub-workspaces you coordinate)" in description
assert "**Alpha** (id: `child-1`, status: online)" in description
assert "Skills: research" in description
assert "delegate_to_workspace" in description
assert "delegate_task_async" in description

View File

@ -4,7 +4,7 @@ The delegation tool now returns immediately with a task_id and runs the
A2A request in the background. Tests verify:
1. Immediate return with task_id
2. Background task completion
3. check_delegation_status retrieval
3. check_task_status retrieval
4. Error handling (RBAC, discovery, network)
"""
@ -109,22 +109,22 @@ def delegation_mocks(monkeypatch):
async def _invoke(mod, workspace_id="target", task="do stuff"):
"""Call delegate_to_workspace and return the immediate result."""
fn = mod.delegate_to_workspace
"""Call delegate_task_async and return the immediate result."""
fn = mod.delegate_task_async
if hasattr(fn, "ainvoke"):
return await fn.ainvoke({"workspace_id": workspace_id, "task": task})
return await fn(workspace_id=workspace_id, task=task)
async def _invoke_and_wait(mod, workspace_id="target", task="do stuff"):
"""Call delegate_to_workspace, wait for background task, return status."""
"""Call delegate_task_async, wait for background task, return status."""
result = await _invoke(mod, workspace_id, task)
# Wait for all background tasks to complete
if mod._background_tasks:
await asyncio.gather(*mod._background_tasks, return_exceptions=True)
# Get final status
if "task_id" in result:
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
return await fn.ainvoke({"task_id": result["task_id"]})
return await fn(task_id=result["task_id"])
@ -182,7 +182,7 @@ class TestAsyncDelegation:
await _invoke(mod, workspace_id="ws-a", task="task A")
await _invoke(mod, workspace_id="ws-b", task="task B")
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
result = await fn.ainvoke({"task_id": ""})
else:
@ -194,7 +194,7 @@ class TestAsyncDelegation:
async def test_check_delegation_not_found(self, delegation_mocks):
mod, *_ = delegation_mocks
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
result = await fn.ainvoke({"task_id": "nonexistent"})
else:
@ -354,7 +354,7 @@ class TestA2AQueued:
class TestQueuedLazyRefresh:
"""When a delegation is QUEUED, check_delegation_status must lazily
"""When a delegation is QUEUED, check_task_status must lazily
refresh from the platform's GET /delegations to pick up drain-stitch
completions. Without this refresh, the LLM sees "queued" forever
because the platform never pushes back to the runtime.
@ -401,7 +401,7 @@ class TestQueuedLazyRefresh:
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", refresh_cls):
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
refreshed = await fn.ainvoke({"task_id": task_id})
else:
@ -443,7 +443,7 @@ class TestQueuedLazyRefresh:
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", refresh_cls):
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
refreshed = await fn.ainvoke({"task_id": task_id})
else:
@ -486,7 +486,7 @@ class TestQueuedLazyRefresh:
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", refresh_cls):
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
refreshed = await fn.ainvoke({"task_id": task_id})
else:
@ -515,7 +515,7 @@ class TestQueuedLazyRefresh:
refresh_cls.return_value.__aexit__ = AsyncMock(return_value=False)
with patch("httpx.AsyncClient", refresh_cls):
fn = mod.check_delegation_status
fn = mod.check_task_status
if hasattr(fn, "ainvoke"):
refreshed = await fn.ainvoke({"task_id": task_id})
else:

View File

@ -438,9 +438,12 @@ def test_get_system_prompt_handles_non_utf8(tmp_path):
def test_get_a2a_instructions_mcp_default():
out = get_a2a_instructions()
assert "MCP tools" in out
# Section heading is the canonical agent-facing label.
assert "## Inter-Agent Communication" in out
# Every A2A tool from the registry must appear by name.
assert "list_peers" in out
assert "send_message_to_user" in out
assert "delegate_task" in out
def test_get_a2a_instructions_cli_variant():
@ -468,32 +471,27 @@ def test_a2a_cli_instructions_use_module_invocation_not_legacy_app_path():
def test_a2a_mcp_instructions_reference_existing_tools():
"""The MCP instructions text must only reference tools that are actually
registered in a2a_mcp_server.py. If someone renames a server tool, the
prompt text must be updated in lockstep this test catches the drift.
"""Pin the registry-driven alignment: every tool name appearing in the
agent-facing A2A instructions must be a tool the MCP server actually
registers. Both sides now derive from platform_tools.registry, so the
real test is that the registry's a2a_tools() set drives both surfaces
consistently.
"""
import re
import pathlib
mcp_server = pathlib.Path(__file__).parent.parent / "a2a_mcp_server.py"
registered = set(re.findall(r'"name":\s*"([a-z_]+)"', mcp_server.read_text()))
# The server advertises itself by name; strip that false positive.
registered.discard("a2a-delegation")
from a2a_mcp_server import TOOLS as MCP_TOOLS
from platform_tools.registry import a2a_tools
registered = {t["name"] for t in MCP_TOOLS}
instructions = get_a2a_instructions(mcp=True)
# Every tool called out by name in the instructions must exist on the
# server. (We allow the server to have extras the prompt doesn't mention.)
referenced = {
"list_peers",
"delegate_task",
"delegate_task_async",
"check_task_status",
"get_workspace_info",
"send_message_to_user",
}
for name in referenced:
assert name in instructions, f"prompt missing {name}"
assert name in registered, f"MCP server no longer registers {name}"
for spec in a2a_tools():
assert spec.name in instructions, (
f"A2A instructions are missing the tool {spec.name!r} that "
f"the registry declares — the doc generator drifted."
)
assert spec.name in registered, (
f"MCP server no longer registers {spec.name!r} that the registry "
f"declares — the MCP TOOLS list drifted from the registry."
)
# ======================================================================

View File

@ -98,7 +98,7 @@ def test_commit_memory_uses_awareness_client_when_configured(monkeypatch, memory
assert captured["json"] == {"content": "remember this", "scope": "TEAM"}
def test_search_memory_uses_platform_fallback_without_awareness(monkeypatch, memory_modules):
def test_recall_memory_uses_platform_fallback_without_awareness(monkeypatch, memory_modules):
memory, _awareness_client = memory_modules
captured = {}
@ -119,7 +119,7 @@ def test_search_memory_uses_platform_fallback_without_awareness(monkeypatch, mem
monkeypatch.setattr(memory.httpx, "AsyncClient", FakeAsyncClient)
result = asyncio.run(memory.search_memory("status", "local"))
result = asyncio.run(memory.recall_memory("status", "local"))
assert result == {
"success": True,
@ -236,10 +236,10 @@ def test_commit_memory_promoted_packet_logs_skill_promotion(monkeypatch, tmp_pat
assert not (tmp_path / "skills").exists()
def test_search_memory_rejects_invalid_scope(memory_modules):
def test_recall_memory_rejects_invalid_scope(memory_modules):
memory, _awareness_client = memory_modules
result = asyncio.run(memory.search_memory("status", "bad"))
result = asyncio.run(memory.recall_memory("status", "bad"))
assert result == {"error": "scope must be LOCAL, TEAM, GLOBAL, or empty"}
@ -457,15 +457,15 @@ def test_commit_memory_result_failure(memory_modules_with_mocks):
# ---------------------------------------------------------------------------
# search_memory — RBAC deny
# recall_memory — RBAC deny
# ---------------------------------------------------------------------------
def test_search_memory_rbac_deny(memory_modules_with_mocks):
def test_recall_memory_rbac_deny(memory_modules_with_mocks):
memory, mock_audit, _ = memory_modules_with_mocks
mock_audit.check_permission.return_value = False
mock_audit.get_workspace_roles.return_value = (["read-only-special"], {})
result = asyncio.run(memory.search_memory("find something", "local"))
result = asyncio.run(memory.recall_memory("find something", "local"))
assert result["success"] is False
assert "RBAC" in result["error"]
@ -473,22 +473,22 @@ def test_search_memory_rbac_deny(memory_modules_with_mocks):
# ---------------------------------------------------------------------------
# search_memory — invalid scope
# recall_memory — invalid scope
# ---------------------------------------------------------------------------
def test_search_memory_invalid_scope(memory_modules_with_mocks):
def test_recall_memory_invalid_scope(memory_modules_with_mocks):
memory, _mock_audit, _ = memory_modules_with_mocks
result = asyncio.run(memory.search_memory("q", "BAD"))
result = asyncio.run(memory.recall_memory("q", "BAD"))
assert result == {"error": "scope must be LOCAL, TEAM, GLOBAL, or empty"}
# ---------------------------------------------------------------------------
# search_memory — awareness_client success
# recall_memory — awareness_client success
# ---------------------------------------------------------------------------
def test_search_memory_awareness_client_success(memory_modules_with_mocks):
def test_recall_memory_awareness_client_success(memory_modules_with_mocks):
from unittest.mock import AsyncMock, MagicMock
memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks
@ -501,7 +501,7 @@ def test_search_memory_awareness_client_success(memory_modules_with_mocks):
# Patch directly on the loaded module since it imported the name at load time
memory.build_awareness_client = MagicMock(return_value=mock_ac)
result = asyncio.run(memory.search_memory("find", "team"))
result = asyncio.run(memory.recall_memory("find", "team"))
assert result["success"] is True
assert result["count"] == 2
@ -509,10 +509,10 @@ def test_search_memory_awareness_client_success(memory_modules_with_mocks):
# ---------------------------------------------------------------------------
# search_memory — awareness_client raises
# recall_memory — awareness_client raises
# ---------------------------------------------------------------------------
def test_search_memory_awareness_client_exception(memory_modules_with_mocks):
def test_recall_memory_awareness_client_exception(memory_modules_with_mocks):
from unittest.mock import AsyncMock, MagicMock
memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks
@ -521,7 +521,7 @@ def test_search_memory_awareness_client_exception(memory_modules_with_mocks):
# Patch directly on the loaded module since it imported the name at load time
memory.build_awareness_client = MagicMock(return_value=mock_ac)
result = asyncio.run(memory.search_memory("query", "local"))
result = asyncio.run(memory.recall_memory("query", "local"))
assert result["success"] is False
assert "awareness search failed" in result["error"]
@ -530,10 +530,10 @@ def test_search_memory_awareness_client_exception(memory_modules_with_mocks):
# ---------------------------------------------------------------------------
# search_memory — httpx 200 success (no awareness_client)
# recall_memory — httpx 200 success (no awareness_client)
# ---------------------------------------------------------------------------
def test_search_memory_httpx_200_success(memory_modules_with_mocks):
def test_recall_memory_httpx_200_success(memory_modules_with_mocks):
memory, _mock_audit, _ = memory_modules_with_mocks
class FakeAsyncClient:
@ -545,7 +545,7 @@ def test_search_memory_httpx_200_success(memory_modules_with_mocks):
memory.httpx.AsyncClient = FakeAsyncClient
result = asyncio.run(memory.search_memory("find", "global"))
result = asyncio.run(memory.recall_memory("find", "global"))
assert result["success"] is True
assert result["count"] == 2
@ -553,10 +553,10 @@ def test_search_memory_httpx_200_success(memory_modules_with_mocks):
# ---------------------------------------------------------------------------
# search_memory — httpx non-200
# recall_memory — httpx non-200
# ---------------------------------------------------------------------------
def test_search_memory_httpx_non_200(memory_modules_with_mocks):
def test_recall_memory_httpx_non_200(memory_modules_with_mocks):
memory, mock_audit, _ = memory_modules_with_mocks
class FakeAsyncClient:
@ -568,17 +568,17 @@ def test_search_memory_httpx_non_200(memory_modules_with_mocks):
memory.httpx.AsyncClient = FakeAsyncClient
result = asyncio.run(memory.search_memory("q", ""))
result = asyncio.run(memory.recall_memory("q", ""))
assert result["success"] is False
assert "server error" in result["error"]
# ---------------------------------------------------------------------------
# search_memory — httpx raises
# recall_memory — httpx raises
# ---------------------------------------------------------------------------
def test_search_memory_httpx_exception(memory_modules_with_mocks):
def test_recall_memory_httpx_exception(memory_modules_with_mocks):
memory, mock_audit, _ = memory_modules_with_mocks
class FakeAsyncClient:
@ -590,7 +590,7 @@ def test_search_memory_httpx_exception(memory_modules_with_mocks):
memory.httpx.AsyncClient = FakeAsyncClient
result = asyncio.run(memory.search_memory("query", "local"))
result = asyncio.run(memory.recall_memory("query", "local"))
assert result["success"] is False
assert "request timed out" in result["error"]
@ -672,7 +672,7 @@ def test_commit_memory_awareness_exception_span_record_fails(memory_modules_with
assert result["success"] is False # error propagated despite span failure
def test_search_memory_awareness_exception_span_record_fails(memory_modules_with_mocks):
def test_recall_memory_awareness_exception_span_record_fails(memory_modules_with_mocks):
"""awareness_client.search raises + span.record_exception also raises: error still returned."""
from unittest.mock import AsyncMock, MagicMock
memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks
@ -685,7 +685,7 @@ def test_search_memory_awareness_exception_span_record_fails(memory_modules_with
mock_ac.search = AsyncMock(side_effect=RuntimeError("awareness down"))
memory.build_awareness_client = MagicMock(return_value=mock_ac)
result = asyncio.run(memory.search_memory("test", "local"))
result = asyncio.run(memory.recall_memory("test", "local"))
assert result["success"] is False
@ -711,8 +711,8 @@ def test_commit_memory_httpx_exception_span_record_fails(memory_modules_with_moc
assert result["success"] is False
def test_search_memory_httpx_exception_span_record_fails(memory_modules_with_mocks):
"""httpx raises in search_memory + span.record_exception also raises: error still returned."""
def test_recall_memory_httpx_exception_span_record_fails(memory_modules_with_mocks):
"""httpx raises in recall_memory + span.record_exception also raises: error still returned."""
from unittest.mock import MagicMock
memory, mock_audit, mock_awareness_mod = memory_modules_with_mocks
@ -729,7 +729,7 @@ def test_search_memory_httpx_exception_span_record_fails(memory_modules_with_moc
memory.httpx.AsyncClient = FakeAsyncClient
result = asyncio.run(memory.search_memory("query", "local"))
result = asyncio.run(memory.recall_memory("query", "local"))
assert result["success"] is False

View File

@ -0,0 +1,123 @@
"""Structural alignment tests — every adapter must agree with the registry.
The registry in workspace/platform_tools/registry.py is the single source
of truth for tool naming + docs. These tests fail if any consumer
(MCP server, LangChain @tool wrappers, doc generators) drifts.
If you add a tool: append a ToolSpec to registry.TOOLS, then add the
matching @tool wrapper in builtin_tools/. These tests catch the case
where the registry has a name that has no LangChain @tool counterpart
(or vice versa).
If you rename a tool: edit registry.TOOLS only. These tests fail loudly
if the LangChain @tool name or MCP TOOLS["name"] still has the old name.
"""
from __future__ import annotations
import pytest
from platform_tools.registry import TOOLS, a2a_tools, by_name, memory_tools, tool_names
def test_registry_names_are_unique():
"""Every ToolSpec must have a distinct name — duplicate is a typo."""
names = tool_names()
assert len(names) == len(set(names)), f"duplicate tool names: {names}"
def test_registry_a2a_and_memory_partition_is_complete():
"""Every tool belongs to exactly one section. No orphans."""
a2a = {t.name for t in a2a_tools()}
mem = {t.name for t in memory_tools()}
all_names = set(tool_names())
assert a2a | mem == all_names
assert not (a2a & mem), f"tool in both sections: {a2a & mem}"
def test_by_name_lookup_works():
spec = by_name("delegate_task")
assert spec.name == "delegate_task"
assert spec.section == "a2a"
with pytest.raises(KeyError):
by_name("nonexistent_tool")
def test_mcp_server_registers_every_registry_tool():
"""The MCP server's TOOLS list is built from the registry. Every
spec must produce a corresponding entry if not, the import-time
list comprehension is broken or the registry has an entry the
server isn't picking up.
"""
from a2a_mcp_server import TOOLS as MCP_TOOLS
mcp_names = {t["name"] for t in MCP_TOOLS}
registry_names = set(tool_names())
assert mcp_names == registry_names, (
f"MCP and registry diverged. MCP-only: {mcp_names - registry_names}; "
f"registry-only: {registry_names - mcp_names}"
)
def test_mcp_tool_descriptions_match_registry_short():
"""Each MCP tool's description IS the registry's `short` field —
the bullet-line description shown to the model. The deeper
when_to_use guidance lives only in the system prompt.
"""
from a2a_mcp_server import TOOLS as MCP_TOOLS
by_mcp_name = {t["name"]: t for t in MCP_TOOLS}
for spec in TOOLS:
assert by_mcp_name[spec.name]["description"] == spec.short, (
f"MCP description for {spec.name!r} drifted from registry.short. "
f"Edit registry.py, not the MCP server's TOOLS list."
)
def test_mcp_tool_input_schemas_match_registry():
"""Schemas must come from the registry, never duplicated in the server."""
from a2a_mcp_server import TOOLS as MCP_TOOLS
by_mcp_name = {t["name"]: t for t in MCP_TOOLS}
for spec in TOOLS:
assert by_mcp_name[spec.name]["inputSchema"] == spec.input_schema, (
f"MCP inputSchema for {spec.name!r} drifted from registry."
)
def test_a2a_instructions_text_includes_every_a2a_tool():
"""get_a2a_instructions must mention every a2a-section tool by name."""
from executor_helpers import get_a2a_instructions
instructions = get_a2a_instructions(mcp=True)
for spec in a2a_tools():
assert spec.name in instructions, (
f"agent-facing A2A docs missing tool {spec.name!r} from registry"
)
def test_hma_instructions_text_includes_every_memory_tool():
"""get_hma_instructions must mention every memory-section tool by name."""
from executor_helpers import get_hma_instructions
instructions = get_hma_instructions()
for spec in memory_tools():
assert spec.name in instructions, (
f"agent-facing HMA docs missing tool {spec.name!r} from registry"
)
def test_old_pre_rename_names_not_present_in_docs():
"""Pre-rename names (delegate_to_workspace, search_memory,
check_delegation_status) must not leak back into the agent-facing
docs. They're not in the registry; their absence is the canonical
state.
"""
from executor_helpers import get_a2a_instructions, get_hma_instructions
blob = get_a2a_instructions(mcp=True) + get_hma_instructions()
for stale in ("delegate_to_workspace", "search_memory", "check_delegation_status"):
assert stale not in blob, (
f"pre-rename name {stale!r} leaked into docs — registry "
f"is the source of truth, not the doc generator."
)

View File

@ -202,7 +202,7 @@ def test_peer_capabilities_format(tmp_path):
assert "## Your Peers" in result
assert "**Echo Agent** (id: `peer-1`, status: online)" in result
assert "Skills: echo, repeat" in result
assert "delegate_to_workspace" in result
assert "delegate_task_async" in result
# peer-2 has no agent_card but DOES have a DB name + status — must
# still render so coordinators can delegate to freshly-created peers
# whose A2A discovery hasn't populated a card yet (regression of the