molecule-core/workspace/builtin_tools/memory.py
Hongming Wang e9a59cda3b feat(platform): single-source-of-truth tool registry — adapters consume, no drift
Establishes workspace/platform_tools/registry.py as THE place tool
naming and docs live. Every consumer reads from it; nothing duplicates
the source. Closes the architectural gap behind the doc/tool drift
discussion 2026-04-28 — adding hundreds of future runtime SDK adapters
should not require touching tool names anywhere except the registry.

What the registry owns

  ToolSpec dataclass with: name, short (one-line description), when_to_use
  (multi-paragraph agent-facing usage guidance), input_schema (JSON Schema),
  impl (the actual coroutine in a2a_tools.py), section ('a2a' | 'memory').

  TOOLS list with 8 entries — delegate_task, delegate_task_async,
  check_task_status, list_peers, get_workspace_info, send_message_to_user,
  commit_memory, recall_memory.

What now reads from the registry

  - workspace/a2a_mcp_server.py
      The hardcoded TOOLS list (167 lines of hand-maintained dicts) is
      gone. Replaced with a 6-line list comprehension over the registry.
      MCP description = spec.short. inputSchema = spec.input_schema.

  - workspace/executor_helpers.py
      get_a2a_instructions(mcp=True) and get_hma_instructions() now
      GENERATE the agent-facing system-prompt text from the registry.
      Heading + per-tool bullet (spec.short) + per-tool when_to_use +
      a section-specific footer. No more hand-maintained instruction
      blocks that drift from reality.

  - workspace/builtin_tools/delegation.py
      Renamed delegate_to_workspace -> delegate_task_async to match
      registry. check_delegation_status -> check_task_status. Added
      sync delegate_task @tool wrapping a2a_tools.tool_delegate_task
      (was missing for LangChain runtimes — CP review Issue 3).

  - workspace/builtin_tools/memory.py
      Renamed search_memory -> recall_memory to match registry.

  - workspace/adapter_base.py, workspace/main.py
      Bundle all 7 core tools (was 6) into all_tools / base_tools.

  - workspace/coordinator.py, shared_runtime.py, policies/routing.py
      Updated system-prompt-text references to use the registry names.

Structural alignment tests

  workspace/tests/test_platform_tools.py — 9 tests pin every
  registry-to-adapter mapping:
    - registry names are unique
    - a2a + memory partition is complete (no orphans)
    - by_name lookup works
    - MCP server registers exactly the registry's tool set
    - MCP description equals registry.short for every tool
    - MCP inputSchema equals registry.input_schema for every tool
    - get_a2a_instructions text contains every a2a tool name
    - get_hma_instructions text contains every memory tool name
    - pre-rename names (delegate_to_workspace, search_memory,
      check_delegation_status) cannot leak back

  Adding a future tool means adding one ToolSpec; the test failure
  list tells the author exactly which adapter to update.

Adapter pattern for future SDK support

  When (e.g.) AutoGen or Pydantic AI gets adapters, the only work
  needed for tool surfacing is "wrap registry.TOOLS in your SDK's
  tool format." Names, descriptions, schemas, impl come from the
  registry — adapter author writes zero strings.

Why this needed to ship now

  PR #2237 (already in staging) injected MCP-world docs as the
  default system-prompt content. Without the registry, those docs
  said "delegate_task" while LangChain runtimes only had
  "delegate_to_workspace" — workers see docs for tools that don't
  exist (CP review Issue 1+3). PR #2239 was a tactical rename;
  this PR is the structural fix that prevents the same class of
  drift from recurring as new adapters ship.

  PR #2239 was closed in favor of this — same renames, plus the
  registry, plus structural tests. Single coherent change.

Tests: 1232 pass, 2 xfailed (pre-existing). 9 new in
test_platform_tools.py; 4 alignment tests in test_prompt.py from
#2237 still pass; original test_executor_helpers tests adapted to
the registry-driven world.

Refs: CP review Issues 1, 2, 3, 5; project memory
project_runtime_native_pluggable.md (platform owns A2A);
project memory feedback_doc_tool_alignment.md (this is the structural
fix for the tactical lesson).
2026-04-28 17:11:36 -07:00

471 lines
17 KiB
Python

"""HMA memory tools for agents.
Hierarchical Memory Architecture:
- LOCAL: private to this workspace, invisible to others
- TEAM: shared with parent + siblings (same team)
- GLOBAL: readable by all, writable by root workspaces only
RBAC enforcement
----------------
``commit_memory`` requires the ``"memory.write"`` action.
``recall_memory`` requires the ``"memory.read"`` action.
Roles are read from ``config.yaml`` under ``rbac.roles`` (default: operator).
Audit trail
-----------
Every memory operation appends a JSON Lines record to the audit log:
memory / memory.write / allowed — write permitted by RBAC
memory / memory.write / success — write committed successfully
memory / memory.write / failure — write failed (platform error)
memory / memory.read / allowed — read permitted by RBAC
memory / memory.read / success — search returned results
memory / memory.read / failure — search failed (platform error)
RBAC denials emit ``rbac / rbac.deny / denied`` events instead.
"""
import json
import os
import uuid
from types import SimpleNamespace
from typing import Any
from langchain_core.tools import tool
from builtin_tools.awareness_client import build_awareness_client
from builtin_tools.audit import check_permission, get_workspace_roles, log_event
from builtin_tools.security import _redact_secrets
from builtin_tools.telemetry import MEMORY_QUERY, MEMORY_SCOPE, WORKSPACE_ID_ATTR, get_tracer
try: # pragma: no cover - optional runtime dependency in lightweight test envs
import httpx # type: ignore
except ImportError: # pragma: no cover
httpx = SimpleNamespace(AsyncClient=None)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
@tool
async def commit_memory(content: str, scope: str = "LOCAL") -> dict:
"""Store a fact in memory with a specific scope.
Args:
content: The fact or knowledge to remember.
scope: Memory scope — LOCAL (private), TEAM (shared with team), or GLOBAL (company-wide, root only).
"""
content = _redact_secrets(content)
trace_id = str(uuid.uuid4())
scope = scope.upper()
if scope not in ("LOCAL", "TEAM", "GLOBAL"):
return {"error": "scope must be LOCAL, TEAM, or GLOBAL"}
# --- RBAC check -----------------------------------------------------------
roles, custom_perms = get_workspace_roles()
if not check_permission("memory.write", roles, custom_perms):
log_event(
event_type="rbac",
action="rbac.deny",
resource=scope,
outcome="denied",
trace_id=trace_id,
attempted_action="memory.write",
roles=roles,
)
return {
"success": False,
"error": (
"RBAC: this workspace does not have the 'memory.write' permission. "
f"Current roles: {roles}"
),
}
log_event(
event_type="memory",
action="memory.write",
resource=scope,
outcome="allowed",
trace_id=trace_id,
memory_scope=scope,
content_length=len(content),
)
# ── OTEL: memory_write span ──────────────────────────────────────────────
tracer = get_tracer()
with tracer.start_as_current_span("memory_write") as mem_span:
mem_span.set_attribute(WORKSPACE_ID_ATTR, WORKSPACE_ID)
mem_span.set_attribute(MEMORY_SCOPE, scope)
mem_span.set_attribute("memory.content_length", len(content))
awareness_client = build_awareness_client()
if awareness_client is not None:
try:
result = await awareness_client.commit(content, scope)
except Exception as e:
log_event(
event_type="memory",
action="memory.write",
resource=scope,
outcome="failure",
trace_id=trace_id,
memory_scope=scope,
error=str(e),
)
try:
mem_span.record_exception(e)
except Exception:
pass
return {"success": False, "error": str(e)}
else:
# #215-class bug: platform now gates /workspaces/:id/memories behind
# workspace auth. Import auth_headers lazily (same pattern as the
# activity-log path below) so test environments that don't ship
# platform_auth still work.
try:
from platform_auth import auth_headers as _auth
_headers = _auth()
except Exception:
_headers = {}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
json={"content": content, "scope": scope},
headers=_headers,
)
if resp.status_code == 201:
result = {"success": True, "id": resp.json().get("id"), "scope": scope}
else:
result = {"success": False, "error": resp.json().get("error", resp.text)}
except Exception as e:
log_event(
event_type="memory",
action="memory.write",
resource=scope,
outcome="failure",
trace_id=trace_id,
memory_scope=scope,
error=str(e),
)
try:
mem_span.record_exception(e)
except Exception:
pass
return {"success": False, "error": str(e)}
if result.get("success"):
mem_span.set_attribute("memory.id", result.get("id") or "")
mem_span.set_attribute("memory.success", True)
log_event(
event_type="memory",
action="memory.write",
resource=scope,
outcome="success",
trace_id=trace_id,
memory_scope=scope,
memory_id=result.get("id"),
)
# #125: surface memory writes in /activity so the Canvas
# "Agent Comms" tab shows what an agent chose to remember.
# Fire-and-forget — failure here must not poison the tool
# response since the memory write itself already succeeded.
await _record_memory_activity(scope, content, result.get("id"))
await _maybe_log_skill_promotion(content, scope, result)
else:
mem_span.set_attribute("memory.success", False)
log_event(
event_type="memory",
action="memory.write",
resource=scope,
outcome="failure",
trace_id=trace_id,
memory_scope=scope,
error=result.get("error"),
)
return result
@tool
async def recall_memory(query: str = "", scope: str = "") -> dict:
"""Search stored memories.
Args:
query: Text to search for (empty returns all).
scope: Filter by scope — LOCAL, TEAM, GLOBAL, or empty for all accessible.
"""
trace_id = str(uuid.uuid4())
scope = scope.upper()
if scope and scope not in ("LOCAL", "TEAM", "GLOBAL"):
return {"error": "scope must be LOCAL, TEAM, GLOBAL, or empty"}
# --- RBAC check -----------------------------------------------------------
roles, custom_perms = get_workspace_roles()
if not check_permission("memory.read", roles, custom_perms):
log_event(
event_type="rbac",
action="rbac.deny",
resource=scope or "all",
outcome="denied",
trace_id=trace_id,
attempted_action="memory.read",
roles=roles,
)
return {
"success": False,
"error": (
"RBAC: this workspace does not have the 'memory.read' permission. "
f"Current roles: {roles}"
),
}
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="allowed",
trace_id=trace_id,
memory_scope=scope or "all",
query_length=len(query),
)
# ── OTEL: memory_read span ───────────────────────────────────────────────
tracer = get_tracer()
with tracer.start_as_current_span("memory_read") as mem_span:
mem_span.set_attribute(WORKSPACE_ID_ATTR, WORKSPACE_ID)
mem_span.set_attribute(MEMORY_SCOPE, scope or "all")
mem_span.set_attribute(MEMORY_QUERY, query[:256] if query else "")
awareness_client = build_awareness_client()
if awareness_client is not None:
try:
result = await awareness_client.search(query, scope)
mem_span.set_attribute("memory.result_count", result.get("count", 0))
mem_span.set_attribute("memory.success", result.get("success", False))
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="success" if result.get("success") else "failure",
trace_id=trace_id,
memory_scope=scope or "all",
result_count=result.get("count", 0),
)
return result
except Exception as e:
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="failure",
trace_id=trace_id,
memory_scope=scope or "all",
error=str(e),
)
try:
mem_span.record_exception(e)
except Exception:
pass
return {"success": False, "error": str(e)}
params = {}
if query:
params["q"] = query
if scope:
params["scope"] = scope.upper()
# #215-class bug (search path): same fix as commit_memory above —
# the platform gates GET /workspaces/:id/memories behind workspace
# auth, so without auth_headers() every search silently 401s and the
# agent thinks its backlog is empty (observed on Technical Researcher
# idle-loop pilot 2026-04-15).
try:
from platform_auth import auth_headers as _auth
_headers = _auth()
except Exception:
_headers = {}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
params=params,
headers=_headers,
)
if resp.status_code == 200:
memories = resp.json()
mem_span.set_attribute("memory.result_count", len(memories))
mem_span.set_attribute("memory.success", True)
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="success",
trace_id=trace_id,
memory_scope=scope or "all",
result_count=len(memories),
)
return {
"success": True,
"count": len(memories),
"memories": memories,
}
mem_span.set_attribute("memory.success", False)
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="failure",
trace_id=trace_id,
memory_scope=scope or "all",
http_status=resp.status_code,
)
return {"success": False, "error": resp.json().get("error", resp.text)}
except Exception as e:
log_event(
event_type="memory",
action="memory.read",
resource=scope or "all",
outcome="failure",
trace_id=trace_id,
memory_scope=scope or "all",
error=str(e),
)
try:
mem_span.record_exception(e)
except Exception:
pass
return {"success": False, "error": str(e)}
def _parse_promotion_packet(content: str) -> dict[str, Any] | None:
"""Return a structured memory packet when content looks like promotion metadata."""
text = content.strip()
if not text.startswith("{"):
return None
try:
payload = json.loads(text)
except json.JSONDecodeError:
return None
if not isinstance(payload, dict): # pragma: no cover
return None
if not payload.get("promote_to_skill"):
return None
return payload
async def _record_memory_activity(scope: str, content: str, memory_id: str | None) -> None:
"""Surface a successful memory write as an activity row so the Canvas
"Agent Comms" tab can display what an agent chose to remember.
Fire-and-forget — never raises. #125.
The summary is intentionally short (scope tag + first 80 chars of
content with a ``…`` ellipsis when truncated) so the activity table
stays readable; full content lives in ``agent_memories``.
"""
workspace_id = WORKSPACE_ID.strip()
platform_url = PLATFORM_URL.strip().rstrip("/")
if not workspace_id or not platform_url:
return
preview = content.strip().replace("\n", " ")
if len(preview) > 80:
preview = preview[:80] + ""
summary = f"[{scope}] {preview}"
# NOTE: target_id is a UUID column scoped to workspace_id references —
# cannot hold awareness/memory IDs (which are arbitrary strings).
# We embed the memory_id in the summary instead so it's still searchable.
if memory_id:
summary = f"{summary} (id={memory_id[:24]})"
payload: dict[str, Any] = {
"workspace_id": workspace_id,
"activity_type": "memory_write",
"summary": summary,
"status": "ok",
}
try:
try:
from platform_auth import auth_headers as _auth
_headers = _auth()
except Exception:
_headers = {}
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{platform_url}/workspaces/{workspace_id}/activity",
json=payload,
headers=_headers,
)
except Exception:
# Activity logging is purely observability — never poison the
# tool response on a failure here. We don't even log_event the
# failure since the memory write itself succeeded and that's
# what matters to the caller.
pass
async def _maybe_log_skill_promotion(content: str, scope: str, memory_result: dict) -> None:
"""Best-effort activity log for durable memory entries that should become skills."""
packet = _parse_promotion_packet(content)
if packet is None:
return
workspace_id = WORKSPACE_ID.strip()
platform_url = PLATFORM_URL.strip().rstrip("/")
if not workspace_id or not platform_url:
return
repetition_signal = packet.get("repetition_signal")
summary = (
packet.get("summary")
or packet.get("title")
or packet.get("what changed")
or "Repeatable workflow promoted to skill candidate"
)
metadata: dict[str, Any] = {
"source": "memory-curation",
"scope": scope,
"memory_id": memory_result.get("id"),
"promote_to_skill": True,
"repetition_signal": repetition_signal,
"memory_packet": packet,
}
payload = {
"activity_type": "skill_promotion",
"method": "memory/skill-promotion",
"summary": summary,
"status": "ok",
"source_id": workspace_id,
"request_body": packet,
"metadata": metadata,
}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
await client.post(
f"{platform_url}/workspaces/{workspace_id}/activity",
json=payload,
)
await client.post(
f"{platform_url}/registry/heartbeat",
json={
"workspace_id": workspace_id,
"error_rate": 0,
"sample_error": "",
"active_tasks": 1,
"uptime_seconds": 0,
"current_task": f"Skill promotion: {summary}",
},
)
except Exception:
# Best-effort observability only. Memory commits must never fail because
# the promotion log could not be written.
return