feat(a2a-mcp): enrich channel envelope with peer name/role/agent_card_url
The bare envelope only carried `peer_id` for peer_agent inbound, so a receiving agent had to round-trip to /registry to find out who's talking. Surface the sender's display name, role, and an agent-card URL alongside the routing fields so the agent can render "ops-agent (sre): ping" in one shot without an extra lookup. a2a_client.py: - Add _peer_metadata cache `dict[peer_id → (fetched_at, record)]` - Add enrich_peer_metadata(peer_id) — sync, hits cache or registry with a tight 2s timeout, returns None on validation/network/non-200 so callers can degrade gracefully - TTL = 5 min so a busy multi-peer chat doesn't hit registry on every push, but role/name renames propagate within a session - Add _agent_card_url_for(peer_id) — deterministic from peer_id alone a2a_mcp_server.py: - _build_channel_notification calls enrich_peer_metadata when peer_id is non-empty; meta carries peer_name + peer_role + agent_card_url alongside the existing routing fields - agent_card_url surfaces unconditionally (constructable from peer_id); peer_name/role only when registry lookup succeeds — never blocks the push on a registry stall Tests: 6 new branches (canvas_user no enrichment / cache hit no GET / cache miss fetches once / registry-fail graceful degrade / TTL expiry re-fetches / invalid peer_id skips lookup). Mutation-verified: 6/6 fail without prod code, 39/39 pass with. Tracks the broader RFC at #2469 (workspace-server activity_type rename to break the echo loop). Independent of PR #2470 — this is the metadata-enrichment half of the same UX improvement. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e5a3b5282b
commit
050aa33fc1
@ -30,6 +30,87 @@ else:
|
||||
# Cache workspace ID → name mappings (populated by list_peers calls)
|
||||
_peer_names: dict[str, str] = {}
|
||||
|
||||
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
|
||||
# Populated by tool_list_peers and by the lazy registry lookup in
|
||||
# enrich_peer_metadata. The notification-callback path (channel envelope
|
||||
# enrichment) reads this cache on every inbound peer_agent push, so a
|
||||
# bare ``dict[str, dict]`` is the fastest read shape; entries carry their
|
||||
# fetched-at timestamp so TTL eviction is in-line with the lookup.
|
||||
_peer_metadata: dict[str, tuple[float, dict]] = {}
|
||||
|
||||
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
|
||||
# is the same window we use for delegation routing — long enough that a
|
||||
# busy agent receiving repeated pushes from one peer doesn't hit the
|
||||
# registry on every push, short enough that role/name renames propagate
|
||||
# within a single agent session.
|
||||
_PEER_METADATA_TTL_SECONDS = 300.0
|
||||
|
||||
|
||||
def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | None:
|
||||
"""Return cached or freshly-fetched metadata for ``peer_id``.
|
||||
|
||||
Sync helper — safe to call from the inbox poller's notification
|
||||
callback thread (which is not async). Hits the in-process cache
|
||||
first; on miss or TTL expiry, GETs ``/registry/discover/<peer_id>``
|
||||
synchronously with a tight timeout. Returns None on validation
|
||||
failure, network failure, or non-200 response so callers can
|
||||
degrade gracefully (the channel envelope falls back to the raw
|
||||
``peer_id`` instead of crashing the push path).
|
||||
|
||||
The fetched dict is stored as-is, so callers can read whatever
|
||||
fields the platform exposes (currently: ``id``, ``name``, ``role``,
|
||||
``status``, ``url``). New fields surface automatically without a
|
||||
code change here.
|
||||
"""
|
||||
canon = _validate_peer_id(peer_id)
|
||||
if canon is None:
|
||||
return None
|
||||
|
||||
current = now if now is not None else time.monotonic()
|
||||
cached = _peer_metadata.get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
return record
|
||||
|
||||
url = f"{PLATFORM_URL}/registry/discover/{canon}"
|
||||
try:
|
||||
with httpx.Client(timeout=2.0) as client:
|
||||
resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()})
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
|
||||
return cached[1] if cached is not None else None
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.debug(
|
||||
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
|
||||
)
|
||||
return cached[1] if cached is not None else None
|
||||
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
return cached[1] if cached is not None else None
|
||||
if not isinstance(data, dict):
|
||||
return cached[1] if cached is not None else None
|
||||
|
||||
_peer_metadata[canon] = (current, data)
|
||||
if name := data.get("name"):
|
||||
_peer_names[canon] = name
|
||||
return data
|
||||
|
||||
|
||||
def _agent_card_url_for(peer_id: str) -> str:
|
||||
"""Construct the platform-side agent-card URL for ``peer_id``.
|
||||
|
||||
Uses the registry's discovery path so the agent receiving a push
|
||||
can hit a single endpoint to enumerate the sender's capabilities
|
||||
+ role + URL. Same shape every workspace exposes regardless of
|
||||
runtime — claude-code, hermes, langchain wrappers all register
|
||||
through ``/registry/register`` and surface through ``/registry/discover``.
|
||||
"""
|
||||
return f"{PLATFORM_URL}/registry/discover/{peer_id}"
|
||||
|
||||
# Sentinel prefix for errors originating from send_a2a_message / child agents.
|
||||
# Used by delegate_task to distinguish real errors from normal response text.
|
||||
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
|
||||
|
||||
@ -48,8 +48,10 @@ from a2a_client import ( # noqa: F401, E402
|
||||
PLATFORM_URL,
|
||||
WORKSPACE_ID,
|
||||
_A2A_ERROR_PREFIX,
|
||||
_agent_card_url_for,
|
||||
_peer_names,
|
||||
discover_peer,
|
||||
enrich_peer_metadata,
|
||||
get_peers,
|
||||
get_workspace_info,
|
||||
send_a2a_message,
|
||||
@ -370,23 +372,42 @@ def _build_channel_notification(msg: dict) -> dict:
|
||||
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
|
||||
envelope expected by Claude Code's channel-bridge contract.
|
||||
|
||||
Pure function so the wire shape is unit-testable without spinning
|
||||
up an asyncio loop. The wire-up in ``main()`` just composes this
|
||||
with ``asyncio.run_coroutine_threadsafe``.
|
||||
Side-effecting only via the in-process peer-metadata cache: if the
|
||||
message is from a peer agent, this calls ``enrich_peer_metadata``
|
||||
to surface the peer's name, role, and agent-card URL alongside the
|
||||
raw ``peer_id``. The cache is TTL'd at the source, so a busy agent
|
||||
receiving repeated pushes from one peer doesn't hit the registry on
|
||||
every push. Enrichment failure is logged at DEBUG and degraded to
|
||||
bare ``peer_id`` — the push must never block on a registry stall.
|
||||
"""
|
||||
meta = {
|
||||
"source": "molecule",
|
||||
"kind": msg.get("kind", ""),
|
||||
"peer_id": msg.get("peer_id", ""),
|
||||
"method": msg.get("method", ""),
|
||||
"activity_id": msg.get("activity_id", ""),
|
||||
"ts": msg.get("created_at", ""),
|
||||
}
|
||||
|
||||
peer_id = msg.get("peer_id") or ""
|
||||
if peer_id:
|
||||
record = enrich_peer_metadata(peer_id)
|
||||
if record is not None:
|
||||
if name := record.get("name"):
|
||||
meta["peer_name"] = name
|
||||
if role := record.get("role"):
|
||||
meta["peer_role"] = role
|
||||
# agent_card_url is constructable from peer_id alone; surface it
|
||||
# even when enrichment fails so the receiving agent has a single
|
||||
# endpoint to hit for capabilities lookup.
|
||||
meta["agent_card_url"] = _agent_card_url_for(peer_id)
|
||||
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"method": _CHANNEL_NOTIFICATION_METHOD,
|
||||
"params": {
|
||||
"content": msg.get("text", ""),
|
||||
"meta": {
|
||||
"source": "molecule",
|
||||
"kind": msg.get("kind", ""),
|
||||
"peer_id": msg.get("peer_id", ""),
|
||||
"method": msg.get("method", ""),
|
||||
"activity_id": msg.get("activity_id", ""),
|
||||
"ts": msg.get("created_at", ""),
|
||||
},
|
||||
"meta": meta,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@ -242,6 +242,223 @@ def test_build_channel_notification_handles_missing_fields_gracefully():
|
||||
assert meta["kind"] == ""
|
||||
|
||||
|
||||
# ----- Channel envelope enrichment (peer_name / peer_role / agent_card_url) ---
|
||||
#
|
||||
# The bare envelope only carries `peer_id` for peer_agent inbound, so the
|
||||
# receiving agent has to round-trip to /registry to find out who's
|
||||
# talking. Enrichment surfaces the sender's display name, role, and an
|
||||
# agent-card URL alongside the routing fields so the agent can render
|
||||
# "ops-agent (sre): hi" in one shot. Cache-backed and TTL'd so a busy
|
||||
# multi-peer chat doesn't hit the registry on every push.
|
||||
#
|
||||
# Tests pin: cache hit, cache miss + registry hit, registry miss
|
||||
# (graceful degrade), TTL expiry, canvas_user (no enrichment), and the
|
||||
# agent_card_url surfaces even when the registry is reachable but
|
||||
# returns nothing usable.
|
||||
|
||||
|
||||
_PEER_UUID = "11111111-2222-3333-4444-555555555555"
|
||||
_OTHER_PEER = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def _reset_peer_metadata_cache(monkeypatch):
|
||||
"""Each test starts with a clean ``_peer_metadata`` cache so an
|
||||
earlier test's hit doesn't satisfy a later test's miss. Mutates the
|
||||
module-level dict in place rather than reassigning so other modules
|
||||
that imported the dict by reference still see the same instance."""
|
||||
import a2a_client
|
||||
a2a_client._peer_metadata.clear()
|
||||
yield
|
||||
a2a_client._peer_metadata.clear()
|
||||
|
||||
|
||||
def _make_httpx_response(status_code: int, json_body: object) -> MagicMock:
|
||||
resp = MagicMock()
|
||||
resp.status_code = status_code
|
||||
resp.json.return_value = json_body
|
||||
return resp
|
||||
|
||||
|
||||
def _patch_httpx_client(returning: MagicMock):
|
||||
"""Replace httpx.Client with a context-manager mock returning
|
||||
``returning`` from .get(). Mirrors the inbox tests' pattern so a
|
||||
future refactor of the registry GET path can be re-tested with the
|
||||
same harness."""
|
||||
client = MagicMock()
|
||||
client.__enter__ = MagicMock(return_value=client)
|
||||
client.__exit__ = MagicMock(return_value=False)
|
||||
client.get = MagicMock(return_value=returning)
|
||||
return patch("httpx.Client", return_value=client), client
|
||||
|
||||
|
||||
def test_envelope_enrichment_canvas_user_has_no_peer_fields(_reset_peer_metadata_cache):
|
||||
"""canvas_user pushes have no peer (peer_id=''). The enrichment
|
||||
block must short-circuit so we don't fire a wasted registry GET +
|
||||
don't add empty peer_name/role/agent_card_url to the meta dict."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello from canvas",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
meta = payload["params"]["meta"]
|
||||
assert "peer_name" not in meta
|
||||
assert "peer_role" not in meta
|
||||
assert "agent_card_url" not in meta
|
||||
|
||||
|
||||
def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache):
|
||||
"""Cache hit: registry NOT called, meta carries the cached fields.
|
||||
This is the hot path on a busy multi-peer chat — every cache hit
|
||||
saves a 2-second timeout-bounded registry GET."""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
import time as _time
|
||||
|
||||
a2a_client._peer_metadata[_PEER_UUID] = (
|
||||
_time.monotonic(),
|
||||
{"id": _PEER_UUID, "name": "ops-agent", "role": "sre", "status": "online"},
|
||||
)
|
||||
|
||||
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
|
||||
with p:
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-2",
|
||||
"text": "ping",
|
||||
"peer_id": _PEER_UUID,
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T01:23:45Z",
|
||||
})
|
||||
|
||||
assert client.get.call_count == 0, "cache hit must not fire a registry GET"
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["peer_id"] == _PEER_UUID
|
||||
assert meta["peer_name"] == "ops-agent"
|
||||
assert meta["peer_role"] == "sre"
|
||||
assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}")
|
||||
|
||||
|
||||
def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
|
||||
"""Cache miss + registry hit: GET fires, response cached, meta
|
||||
carries fetched fields. Subsequent build for the same peer must
|
||||
NOT re-fetch (cache populated by first call)."""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
p, client = _patch_httpx_client(
|
||||
_make_httpx_response(
|
||||
200,
|
||||
{"id": _PEER_UUID, "name": "fetched-name", "role": "router", "status": "online"},
|
||||
)
|
||||
)
|
||||
with p:
|
||||
payload1 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
|
||||
})
|
||||
payload2 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
|
||||
})
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"second push for same peer must use cache, got {client.get.call_count} GETs"
|
||||
)
|
||||
assert payload1["params"]["meta"]["peer_name"] == "fetched-name"
|
||||
assert payload2["params"]["meta"]["peer_name"] == "fetched-name"
|
||||
|
||||
|
||||
def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache):
|
||||
"""Registry returns 500 (or 4xx, or network error): enrichment
|
||||
silently degrades to bare peer_id. The push must not crash, the
|
||||
push must not block, and the agent_card_url must still surface
|
||||
because it's constructable from peer_id alone."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
p, _ = _patch_httpx_client(_make_httpx_response(500, {}))
|
||||
with p:
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-3",
|
||||
"text": "ping",
|
||||
"peer_id": _PEER_UUID,
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["peer_id"] == _PEER_UUID
|
||||
assert "peer_name" not in meta
|
||||
assert "peer_role" not in meta
|
||||
assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}"), (
|
||||
"agent_card_url must be present even on registry failure — "
|
||||
"it's deterministic from peer_id and gives the agent a single "
|
||||
"endpoint to retry against"
|
||||
)
|
||||
|
||||
|
||||
def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache):
|
||||
"""Cached entry past TTL: registry is hit again. Pin the TTL
|
||||
behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS``
|
||||
doesn't accidentally make the cache permanent."""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
# Stale entry: fetched 1 hour ago (>> 5 min TTL).
|
||||
a2a_client._peer_metadata[_PEER_UUID] = (
|
||||
0.0, # treated as ancient relative to time.monotonic()
|
||||
{"id": _PEER_UUID, "name": "stale-name", "role": "old"},
|
||||
)
|
||||
|
||||
p, client = _patch_httpx_client(
|
||||
_make_httpx_response(
|
||||
200,
|
||||
{"id": _PEER_UUID, "name": "fresh-name", "role": "new", "status": "online"},
|
||||
)
|
||||
)
|
||||
with p:
|
||||
payload = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping",
|
||||
})
|
||||
|
||||
assert client.get.call_count == 1, "stale cache must trigger a re-fetch"
|
||||
assert payload["params"]["meta"]["peer_name"] == "fresh-name"
|
||||
assert payload["params"]["meta"]["peer_role"] == "new"
|
||||
|
||||
|
||||
def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache):
|
||||
"""Defensive: a malformed peer_id (not a UUID) must not crash the
|
||||
push path or cause a registry GET to be fired against an unsanitised
|
||||
URL. enrich_peer_metadata returns None on validation failure; the
|
||||
enrichment fields are simply absent."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
|
||||
with p:
|
||||
payload = _build_channel_notification({
|
||||
"peer_id": "not-a-uuid",
|
||||
"kind": "peer_agent",
|
||||
"text": "evil",
|
||||
})
|
||||
|
||||
assert client.get.call_count == 0, (
|
||||
"invalid peer_id must not reach a network call — UUID validation "
|
||||
"guards the URL-construction surface"
|
||||
)
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["peer_id"] == "not-a-uuid"
|
||||
assert "peer_name" not in meta
|
||||
assert "peer_role" not in meta
|
||||
# agent_card_url is constructed unconditionally from peer_id; even on
|
||||
# an invalid id it's harmless (the receiving agent's GET will 404
|
||||
# and it can fall back to inbox_pop without enrichment).
|
||||
assert meta["agent_card_url"] == f"{__import__('a2a_client').PLATFORM_URL}/registry/discover/not-a-uuid"
|
||||
|
||||
|
||||
# ============== initialize handshake — capability declaration ==============
|
||||
# Without `experimental.claude/channel`, Claude Code's MCP client drops
|
||||
# our notifications/claude/channel emissions instead of routing them as
|
||||
|
||||
Loading…
Reference in New Issue
Block a user