From 050aa33fc1ebdba12db7e13aac68db8f1661b493 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:40:09 -0700 Subject: [PATCH] feat(a2a-mcp): enrich channel envelope with peer name/role/agent_card_url MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bare envelope only carried `peer_id` for peer_agent inbound, so a receiving agent had to round-trip to /registry to find out who's talking. Surface the sender's display name, role, and an agent-card URL alongside the routing fields so the agent can render "ops-agent (sre): ping" in one shot without an extra lookup. a2a_client.py: - Add _peer_metadata cache `dict[peer_id → (fetched_at, record)]` - Add enrich_peer_metadata(peer_id) — sync, hits cache or registry with a tight 2s timeout, returns None on validation/network/non-200 so callers can degrade gracefully - TTL = 5 min so a busy multi-peer chat doesn't hit registry on every push, but role/name renames propagate within a session - Add _agent_card_url_for(peer_id) — deterministic from peer_id alone a2a_mcp_server.py: - _build_channel_notification calls enrich_peer_metadata when peer_id is non-empty; meta carries peer_name + peer_role + agent_card_url alongside the existing routing fields - agent_card_url surfaces unconditionally (constructable from peer_id); peer_name/role only when registry lookup succeeds — never blocks the push on a registry stall Tests: 6 new branches (canvas_user no enrichment / cache hit no GET / cache miss fetches once / registry-fail graceful degrade / TTL expiry re-fetches / invalid peer_id skips lookup). Mutation-verified: 6/6 fail without prod code, 39/39 pass with. Tracks the broader RFC at #2469 (workspace-server activity_type rename to break the echo loop). Independent of PR #2470 — this is the metadata-enrichment half of the same UX improvement. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 81 +++++++++ workspace/a2a_mcp_server.py | 43 +++-- workspace/tests/test_a2a_mcp_server.py | 219 ++++++++++++++++++++++++- 3 files changed, 331 insertions(+), 12 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 4a9be69e..62aa94b4 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -30,6 +30,87 @@ else: # Cache workspace ID → name mappings (populated by list_peers calls) _peer_names: dict[str, str] = {} +# Cache workspace ID → full peer record (id, name, role, status, url, ...). +# Populated by tool_list_peers and by the lazy registry lookup in +# enrich_peer_metadata. The notification-callback path (channel envelope +# enrichment) reads this cache on every inbound peer_agent push, so a +# bare ``dict[str, dict]`` is the fastest read shape; entries carry their +# fetched-at timestamp so TTL eviction is in-line with the lookup. +_peer_metadata: dict[str, tuple[float, dict]] = {} + +# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes +# is the same window we use for delegation routing — long enough that a +# busy agent receiving repeated pushes from one peer doesn't hit the +# registry on every push, short enough that role/name renames propagate +# within a single agent session. +_PEER_METADATA_TTL_SECONDS = 300.0 + + +def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | None: + """Return cached or freshly-fetched metadata for ``peer_id``. + + Sync helper — safe to call from the inbox poller's notification + callback thread (which is not async). Hits the in-process cache + first; on miss or TTL expiry, GETs ``/registry/discover/`` + synchronously with a tight timeout. Returns None on validation + failure, network failure, or non-200 response so callers can + degrade gracefully (the channel envelope falls back to the raw + ``peer_id`` instead of crashing the push path). + + The fetched dict is stored as-is, so callers can read whatever + fields the platform exposes (currently: ``id``, ``name``, ``role``, + ``status``, ``url``). New fields surface automatically without a + code change here. + """ + canon = _validate_peer_id(peer_id) + if canon is None: + return None + + current = now if now is not None else time.monotonic() + cached = _peer_metadata.get(canon) + if cached is not None: + fetched_at, record = cached + if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + return record + + url = f"{PLATFORM_URL}/registry/discover/{canon}" + try: + with httpx.Client(timeout=2.0) as client: + resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()}) + except Exception as exc: # noqa: BLE001 + logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) + return cached[1] if cached is not None else None + + if resp.status_code != 200: + logger.debug( + "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code + ) + return cached[1] if cached is not None else None + + try: + data = resp.json() + except Exception: # noqa: BLE001 + return cached[1] if cached is not None else None + if not isinstance(data, dict): + return cached[1] if cached is not None else None + + _peer_metadata[canon] = (current, data) + if name := data.get("name"): + _peer_names[canon] = name + return data + + +def _agent_card_url_for(peer_id: str) -> str: + """Construct the platform-side agent-card URL for ``peer_id``. + + Uses the registry's discovery path so the agent receiving a push + can hit a single endpoint to enumerate the sender's capabilities + + role + URL. Same shape every workspace exposes regardless of + runtime — claude-code, hermes, langchain wrappers all register + through ``/registry/register`` and surface through ``/registry/discover``. + """ + return f"{PLATFORM_URL}/registry/discover/{peer_id}" + # Sentinel prefix for errors originating from send_a2a_message / child agents. # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index fc7e4862..2c47655a 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -48,8 +48,10 @@ from a2a_client import ( # noqa: F401, E402 PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _agent_card_url_for, _peer_names, discover_peer, + enrich_peer_metadata, get_peers, get_workspace_info, send_a2a_message, @@ -370,23 +372,42 @@ def _build_channel_notification(msg: dict) -> dict: """Transform an ``InboxMessage.to_dict()`` into the MCP notification envelope expected by Claude Code's channel-bridge contract. - Pure function so the wire shape is unit-testable without spinning - up an asyncio loop. The wire-up in ``main()`` just composes this - with ``asyncio.run_coroutine_threadsafe``. + Side-effecting only via the in-process peer-metadata cache: if the + message is from a peer agent, this calls ``enrich_peer_metadata`` + to surface the peer's name, role, and agent-card URL alongside the + raw ``peer_id``. The cache is TTL'd at the source, so a busy agent + receiving repeated pushes from one peer doesn't hit the registry on + every push. Enrichment failure is logged at DEBUG and degraded to + bare ``peer_id`` — the push must never block on a registry stall. """ + meta = { + "source": "molecule", + "kind": msg.get("kind", ""), + "peer_id": msg.get("peer_id", ""), + "method": msg.get("method", ""), + "activity_id": msg.get("activity_id", ""), + "ts": msg.get("created_at", ""), + } + + peer_id = msg.get("peer_id") or "" + if peer_id: + record = enrich_peer_metadata(peer_id) + if record is not None: + if name := record.get("name"): + meta["peer_name"] = name + if role := record.get("role"): + meta["peer_role"] = role + # agent_card_url is constructable from peer_id alone; surface it + # even when enrichment fails so the receiving agent has a single + # endpoint to hit for capabilities lookup. + meta["agent_card_url"] = _agent_card_url_for(peer_id) + return { "jsonrpc": "2.0", "method": _CHANNEL_NOTIFICATION_METHOD, "params": { "content": msg.get("text", ""), - "meta": { - "source": "molecule", - "kind": msg.get("kind", ""), - "peer_id": msg.get("peer_id", ""), - "method": msg.get("method", ""), - "activity_id": msg.get("activity_id", ""), - "ts": msg.get("created_at", ""), - }, + "meta": meta, }, } diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 18d038c2..6efa522d 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -3,7 +3,7 @@ import asyncio import json -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -242,6 +242,223 @@ def test_build_channel_notification_handles_missing_fields_gracefully(): assert meta["kind"] == "" +# ----- Channel envelope enrichment (peer_name / peer_role / agent_card_url) --- +# +# The bare envelope only carries `peer_id` for peer_agent inbound, so the +# receiving agent has to round-trip to /registry to find out who's +# talking. Enrichment surfaces the sender's display name, role, and an +# agent-card URL alongside the routing fields so the agent can render +# "ops-agent (sre): hi" in one shot. Cache-backed and TTL'd so a busy +# multi-peer chat doesn't hit the registry on every push. +# +# Tests pin: cache hit, cache miss + registry hit, registry miss +# (graceful degrade), TTL expiry, canvas_user (no enrichment), and the +# agent_card_url surfaces even when the registry is reachable but +# returns nothing usable. + + +_PEER_UUID = "11111111-2222-3333-4444-555555555555" +_OTHER_PEER = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + +@pytest.fixture() +def _reset_peer_metadata_cache(monkeypatch): + """Each test starts with a clean ``_peer_metadata`` cache so an + earlier test's hit doesn't satisfy a later test's miss. Mutates the + module-level dict in place rather than reassigning so other modules + that imported the dict by reference still see the same instance.""" + import a2a_client + a2a_client._peer_metadata.clear() + yield + a2a_client._peer_metadata.clear() + + +def _make_httpx_response(status_code: int, json_body: object) -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_body + return resp + + +def _patch_httpx_client(returning: MagicMock): + """Replace httpx.Client with a context-manager mock returning + ``returning`` from .get(). Mirrors the inbox tests' pattern so a + future refactor of the registry GET path can be re-tested with the + same harness.""" + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=returning) + return patch("httpx.Client", return_value=client), client + + +def test_envelope_enrichment_canvas_user_has_no_peer_fields(_reset_peer_metadata_cache): + """canvas_user pushes have no peer (peer_id=''). The enrichment + block must short-circuit so we don't fire a wasted registry GET + + don't add empty peer_name/role/agent_card_url to the meta dict.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello from canvas", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert "agent_card_url" not in meta + + +def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache): + """Cache hit: registry NOT called, meta carries the cached fields. + This is the hot path on a busy multi-peer chat — every cache hit + saves a 2-second timeout-bounded registry GET.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + import time as _time + + a2a_client._peer_metadata[_PEER_UUID] = ( + _time.monotonic(), + {"id": _PEER_UUID, "name": "ops-agent", "role": "sre", "status": "online"}, + ) + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-2", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T01:23:45Z", + }) + + assert client.get.call_count == 0, "cache hit must not fire a registry GET" + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert meta["peer_name"] == "ops-agent" + assert meta["peer_role"] == "sre" + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache): + """Cache miss + registry hit: GET fires, response cached, meta + carries fetched fields. Subsequent build for the same peer must + NOT re-fetch (cache populated by first call).""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fetched-name", "role": "router", "status": "online"}, + ) + ) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push for same peer must use cache, got {client.get.call_count} GETs" + ) + assert payload1["params"]["meta"]["peer_name"] == "fetched-name" + assert payload2["params"]["meta"]["peer_name"] == "fetched-name" + + +def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache): + """Registry returns 500 (or 4xx, or network error): enrichment + silently degrades to bare peer_id. The push must not crash, the + push must not block, and the agent_card_url must still surface + because it's constructable from peer_id alone.""" + from a2a_mcp_server import _build_channel_notification + + p, _ = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-3", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}"), ( + "agent_card_url must be present even on registry failure — " + "it's deterministic from peer_id and gives the agent a single " + "endpoint to retry against" + ) + + +def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): + """Cached entry past TTL: registry is hit again. Pin the TTL + behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS`` + doesn't accidentally make the cache permanent.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + # Stale entry: fetched 1 hour ago (>> 5 min TTL). + a2a_client._peer_metadata[_PEER_UUID] = ( + 0.0, # treated as ancient relative to time.monotonic() + {"id": _PEER_UUID, "name": "stale-name", "role": "old"}, + ) + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fresh-name", "role": "new", "status": "online"}, + ) + ) + with p: + payload = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping", + }) + + assert client.get.call_count == 1, "stale cache must trigger a re-fetch" + assert payload["params"]["meta"]["peer_name"] == "fresh-name" + assert payload["params"]["meta"]["peer_role"] == "new" + + +def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache): + """Defensive: a malformed peer_id (not a UUID) must not crash the + push path or cause a registry GET to be fired against an unsanitised + URL. enrich_peer_metadata returns None on validation failure; the + enrichment fields are simply absent.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "peer_id": "not-a-uuid", + "kind": "peer_agent", + "text": "evil", + }) + + assert client.get.call_count == 0, ( + "invalid peer_id must not reach a network call — UUID validation " + "guards the URL-construction surface" + ) + meta = payload["params"]["meta"] + assert meta["peer_id"] == "not-a-uuid" + assert "peer_name" not in meta + assert "peer_role" not in meta + # agent_card_url is constructed unconditionally from peer_id; even on + # an invalid id it's harmless (the receiving agent's GET will 404 + # and it can fall back to inbox_pop without enrichment). + assert meta["agent_card_url"] == f"{__import__('a2a_client').PLATFORM_URL}/registry/discover/not-a-uuid" + + # ============== initialize handshake — capability declaration ============== # Without `experimental.claude/channel`, Claude Code's MCP client drops # our notifications/claude/channel emissions instead of routing them as