molecule-core/workspace/shared_runtime.py
Hongming Wang e9a59cda3b feat(platform): single-source-of-truth tool registry — adapters consume, no drift
Establishes workspace/platform_tools/registry.py as THE place tool
naming and docs live. Every consumer reads from it; nothing duplicates
the source. Closes the architectural gap behind the doc/tool drift
discussion 2026-04-28 — adding hundreds of future runtime SDK adapters
should not require touching tool names anywhere except the registry.

What the registry owns

  ToolSpec dataclass with: name, short (one-line description), when_to_use
  (multi-paragraph agent-facing usage guidance), input_schema (JSON Schema),
  impl (the actual coroutine in a2a_tools.py), section ('a2a' | 'memory').

  TOOLS list with 8 entries — delegate_task, delegate_task_async,
  check_task_status, list_peers, get_workspace_info, send_message_to_user,
  commit_memory, recall_memory.

What now reads from the registry

  - workspace/a2a_mcp_server.py
      The hardcoded TOOLS list (167 lines of hand-maintained dicts) is
      gone. Replaced with a 6-line list comprehension over the registry.
      MCP description = spec.short. inputSchema = spec.input_schema.

  - workspace/executor_helpers.py
      get_a2a_instructions(mcp=True) and get_hma_instructions() now
      GENERATE the agent-facing system-prompt text from the registry.
      Heading + per-tool bullet (spec.short) + per-tool when_to_use +
      a section-specific footer. No more hand-maintained instruction
      blocks that drift from reality.

  - workspace/builtin_tools/delegation.py
      Renamed delegate_to_workspace -> delegate_task_async to match
      registry. check_delegation_status -> check_task_status. Added
      sync delegate_task @tool wrapping a2a_tools.tool_delegate_task
      (was missing for LangChain runtimes — CP review Issue 3).

  - workspace/builtin_tools/memory.py
      Renamed search_memory -> recall_memory to match registry.

  - workspace/adapter_base.py, workspace/main.py
      Bundle all 7 core tools (was 6) into all_tools / base_tools.

  - workspace/coordinator.py, shared_runtime.py, policies/routing.py
      Updated system-prompt-text references to use the registry names.

Structural alignment tests

  workspace/tests/test_platform_tools.py — 9 tests pin every
  registry-to-adapter mapping:
    - registry names are unique
    - a2a + memory partition is complete (no orphans)
    - by_name lookup works
    - MCP server registers exactly the registry's tool set
    - MCP description equals registry.short for every tool
    - MCP inputSchema equals registry.input_schema for every tool
    - get_a2a_instructions text contains every a2a tool name
    - get_hma_instructions text contains every memory tool name
    - pre-rename names (delegate_to_workspace, search_memory,
      check_delegation_status) cannot leak back

  Adding a future tool means adding one ToolSpec; the test failure
  list tells the author exactly which adapter to update.

Adapter pattern for future SDK support

  When (e.g.) AutoGen or Pydantic AI gets adapters, the only work
  needed for tool surfacing is "wrap registry.TOOLS in your SDK's
  tool format." Names, descriptions, schemas, impl come from the
  registry — adapter author writes zero strings.

Why this needed to ship now

  PR #2237 (already in staging) injected MCP-world docs as the
  default system-prompt content. Without the registry, those docs
  said "delegate_task" while LangChain runtimes only had
  "delegate_to_workspace" — workers see docs for tools that don't
  exist (CP review Issue 1+3). PR #2239 was a tactical rename;
  this PR is the structural fix that prevents the same class of
  drift from recurring as new adapters ship.

  PR #2239 was closed in favor of this — same renames, plus the
  registry, plus structural tests. Single coherent change.

Tests: 1232 pass, 2 xfailed (pre-existing). 9 new in
test_platform_tools.py; 4 alignment tests in test_prompt.py from
#2237 still pass; original test_executor_helpers tests adapted to
the registry-driven world.

Refs: CP review Issues 1, 2, 3, 5; project memory
project_runtime_native_pluggable.md (platform owns A2A);
project memory feedback_doc_tool_alignment.md (this is the structural
fix for the tactical lesson).
2026-04-28 17:11:36 -07:00

210 lines
7.4 KiB
Python

"""Shared runtime helpers for A2A-backed workspace executors."""
from __future__ import annotations
import json
from typing import Any
from a2a.server.agent_execution import RequestContext
def _extract_part_text(part) -> str:
"""Extract text from a message part, handling dicts and A2A objects."""
if isinstance(part, dict):
text = part.get("text", "")
if text:
return text
root = part.get("root")
if isinstance(root, dict):
return root.get("text", "")
return ""
if hasattr(part, "text") and part.text:
return part.text
if hasattr(part, "root") and hasattr(part.root, "text") and part.root.text:
return part.root.text
return ""
def extract_message_text(context_or_parts) -> str:
"""Extract concatenated plain text from A2A message parts."""
parts = getattr(getattr(context_or_parts, "message", None), "parts", None)
if parts is None:
parts = context_or_parts
return " ".join(
text for part in (parts or []) if (text := _extract_part_text(part))
).strip()
def extract_history(context: RequestContext) -> list[tuple[str, str]]:
"""Extract conversation history from A2A request metadata."""
messages: list[tuple[str, str]] = []
request = getattr(context, "request", None)
metadata = getattr(request, "metadata", None) if request else None
if not isinstance(metadata, dict):
metadata = getattr(context, "metadata", None) or {}
history = metadata.get("history", []) if isinstance(metadata, dict) else []
if not isinstance(history, list):
return messages
for entry in history:
if not isinstance(entry, dict):
continue
role = entry.get("role", "user")
parts = entry.get("parts", [])
text = " ".join(
text for part in (parts or []) if (text := _extract_part_text(part))
).strip()
if text:
mapped_role = "human" if role == "user" else "ai"
messages.append((mapped_role, text))
return messages
def format_conversation_history(history: list[tuple[str, str]]) -> str:
"""Render `(role, text)` history into a stable human-readable transcript."""
return "\n".join(
f"{'User' if role == 'human' else 'Agent'}: {text}" for role, text in history
)
def build_task_text(user_message: str, history: list[tuple[str, str]]) -> str:
"""Build a single task/request string with optional prepended conversation history."""
if not history:
return user_message
transcript = format_conversation_history(history)
return f"Conversation so far:\n{transcript}\n\nCurrent request: {user_message}"
def append_peer_guidance(
base_text: str | None,
peers_info: str,
*,
default_text: str,
tool_name: str,
) -> str:
"""Append peer guidance text when peers are available."""
text = (base_text or default_text).strip()
if peers_info:
text += f"\n\n## Peers\n{peers_info}\nUse {tool_name} to communicate with them."
return text
def summarize_peer_cards(peers: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Return compact peer metadata for prompt rendering.
Falls back to the registry row's `name` and `role` when `agent_card` is
null or unparseable so peers stay visible to delegators even before
their A2A discovery roundtrip has populated a card. Without this
fallback a coordinator-tier workspace with N freshly-created worker
peers would render an empty `## Your Peers` section and refuse to
delegate (the regression behind the 2026-04-27 Design Director
discovery bug).
"""
summaries: list[dict[str, Any]] = []
for peer in peers:
agent_card = peer.get("agent_card")
if isinstance(agent_card, str):
try:
agent_card = json.loads(agent_card)
except Exception:
agent_card = None
if not isinstance(agent_card, dict):
agent_card = None
if agent_card:
skills_raw = agent_card.get("skills") or []
skills = [
s.get("name", s.get("id", ""))
for s in skills_raw
if isinstance(s, dict)
]
name = agent_card.get("name") or peer.get("name") or "Unknown"
else:
skills = []
name = peer.get("name") or "Unknown"
summaries.append(
{
"id": peer.get("id", "unknown"),
"name": name,
"role": peer.get("role") or "",
"status": peer.get("status", "unknown"),
"skills": skills,
}
)
return summaries
def build_peer_section(
peers: list[dict[str, Any]],
*,
heading: str = "## Your Peers (workspaces you can delegate to)",
instruction: str = (
"Use the `delegate_task_async` tool to send tasks to peers. "
"Only delegate to peers listed above."
),
) -> str:
"""Render a stable peer section for system prompts."""
summaries = summarize_peer_cards(peers)
if not summaries:
return ""
parts = [heading, ""]
for peer in summaries:
parts.append(f"- **{peer['name']}** (id: `{peer['id']}`, status: {peer['status']})")
if peer["skills"]:
parts.append(f" Skills: {', '.join(peer['skills'])}")
elif peer.get("role"):
parts.append(f" Role: {peer['role']}")
parts.append("")
parts.append(instruction)
return "\n".join(parts)
def brief_task(text: str, limit: int = 60) -> str:
"""Create a short human-readable task label for the heartbeat banner."""
return text[:limit] + ("..." if len(text) > limit else "")
async def set_current_task(heartbeat: Any, task: str) -> None:
"""Update current task on heartbeat and push immediately to platform.
Uses increment/decrement instead of binary 0/1 so agents can track
multiple concurrent tasks (e.g. a cron running while an A2A delegation
arrives). The counter never goes below 0.
Pushes immediately on BOTH increment and decrement to avoid phantom-busy
(#1372) where active_tasks=1 persisted in the platform DB indefinitely.
"""
if heartbeat:
if task:
heartbeat.active_tasks = getattr(heartbeat, "active_tasks", 0) + 1
heartbeat.current_task = task
else:
heartbeat.active_tasks = max(0, getattr(heartbeat, "active_tasks", 0) - 1)
if heartbeat.active_tasks == 0:
heartbeat.current_task = ""
import os
workspace_id = os.environ.get("WORKSPACE_ID", "")
platform_url = os.environ.get("PLATFORM_URL", "")
if workspace_id and platform_url:
try:
import httpx
active = getattr(heartbeat, "active_tasks", 0) if heartbeat else (1 if task else 0)
cur_task = getattr(heartbeat, "current_task", task or "") if heartbeat else (task or "")
async with httpx.AsyncClient(timeout=3.0) as client:
await client.post(
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"current_task": cur_task,
"active_tasks": active,
"error_rate": 0,
"sample_error": "",
"uptime_seconds": 0,
},
)
except Exception:
pass # Best-effort