Merge pull request #2471 from Molecule-AI/feat/channel-envelope-enrichment

feat(a2a-mcp): enrich channel envelope with peer name/role/agent_card_url
This commit is contained in:
Hongming Wang 2026-05-02 01:31:15 +00:00 committed by GitHub
commit 9bbf32b526
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 464 additions and 14 deletions

View File

@ -30,6 +30,104 @@ else:
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
# Populated by tool_list_peers and by the lazy registry lookup in
# enrich_peer_metadata. The notification-callback path (channel envelope
# enrichment) reads this cache on every inbound peer_agent push, so a
# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read
# shape; entries carry their fetched-at timestamp so TTL eviction is
# in-line with the lookup. ``None`` as the record is the negative-cache
# sentinel: registry failure is cached for one TTL window so we don't
# re-fire the 2s-bounded GET on every push from a flaky peer.
_peer_metadata: dict[str, tuple[float, dict | None]] = {}
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
# is the same window we use for delegation routing — long enough that a
# busy agent receiving repeated pushes from one peer doesn't hit the
# registry on every push, short enough that role/name renames propagate
# within a single agent session.
_PEER_METADATA_TTL_SECONDS = 300.0
def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | None:
"""Return cached or freshly-fetched metadata for ``peer_id``.
Sync helper safe to call from the inbox poller's notification
callback thread (which is not async). Hits the in-process cache
first; on miss or TTL expiry, GETs ``/registry/discover/<peer_id>``
synchronously with a tight timeout. Returns None on validation
failure, network failure, or non-200 response so callers can
degrade gracefully (the channel envelope falls back to the raw
``peer_id`` instead of crashing the push path).
Negative caching: failure outcomes (4xx/5xx/non-JSON/network
exception) are stored as ``(now, None)`` and treated as
fresh-but-empty for the TTL window. Without this, a peer with a
flaky/missing registry record would re-fire the 2s-bounded GET on
EVERY push turning the cache into a no-op for the exact failure
scenarios it most needs to defend against.
The fetched dict is stored as-is, so callers can read whatever
fields the platform exposes (currently: ``id``, ``name``, ``role``,
``status``, ``url``). New fields surface automatically without a
code change here.
"""
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = now if now is not None else time.monotonic()
cached = _peer_metadata.get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
# Fresh entry — return whatever's there. ``None`` is the
# negative-cache sentinel: caller treats absence of fields
# the same as a registry miss, which is the desired UX.
return record
url = f"{PLATFORM_URL}/registry/discover/{canon}"
try:
with httpx.Client(timeout=2.0) as client:
resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()})
except Exception as exc: # noqa: BLE001
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
_peer_metadata[canon] = (current, None)
return None
if resp.status_code != 200:
logger.debug(
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
)
_peer_metadata[canon] = (current, None)
return None
try:
data = resp.json()
except Exception: # noqa: BLE001
_peer_metadata[canon] = (current, None)
return None
if not isinstance(data, dict):
_peer_metadata[canon] = (current, None)
return None
_peer_metadata[canon] = (current, data)
if name := data.get("name"):
_peer_names[canon] = name
return data
def _agent_card_url_for(peer_id: str) -> str:
"""Construct the platform-side agent-card URL for ``peer_id``.
Uses the registry's discovery path so the agent receiving a push
can hit a single endpoint to enumerate the sender's capabilities
+ role + URL. Same shape every workspace exposes regardless of
runtime claude-code, hermes, langchain wrappers all register
through ``/registry/register`` and surface through ``/registry/discover``.
"""
return f"{PLATFORM_URL}/registry/discover/{peer_id}"
# Sentinel prefix for errors originating from send_a2a_message / child agents.
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "

View File

@ -49,8 +49,10 @@ from a2a_client import ( # noqa: F401, E402
PLATFORM_URL,
WORKSPACE_ID,
_A2A_ERROR_PREFIX,
_agent_card_url_for,
_peer_names,
discover_peer,
enrich_peer_metadata,
get_peers,
get_workspace_info,
send_a2a_message,
@ -226,8 +228,9 @@ def _build_channel_instructions() -> str:
"\n"
"PUSH PATH (Claude Code with channel push enabled):\n"
"Messages arrive as <channel source=\"molecule\" kind=\"...\" "
"peer_id=\"...\" activity_id=\"...\" ts=\"...\"> tags as a "
"synthetic user turn — no agent action needed to surface them.\n"
"peer_id=\"...\" peer_name=\"...\" peer_role=\"...\" "
"agent_card_url=\"...\" activity_id=\"...\" ts=\"...\"> tags as "
"a synthetic user turn — no agent action needed to surface them.\n"
"\n"
"POLL PATH (every other MCP client + Claude Code without push "
"enabled — this is the universal default):\n"
@ -239,6 +242,16 @@ def _build_channel_instructions() -> str:
"delegating to you).\n"
"- `peer_id` is empty for canvas_user, set to the sender "
"workspace UUID for peer_agent.\n"
"- `peer_name` and `peer_role` are present for peer_agent when "
"the platform registry resolved the sender — e.g. "
"`peer_name=\"ops-agent\"`, `peer_role=\"sre\"`. Surface these "
"in your reasoning so the user can tell which peer is talking "
"without having to memorise UUIDs. Absent on canvas_user and "
"on a registry-lookup failure (the push still delivers).\n"
"- `agent_card_url` is present for peer_agent and points at "
"the platform's discover endpoint for that peer — fetch it if "
"you need the peer's full capability list (skills, role, "
"runtime).\n"
"- `activity_id` is the inbox row to acknowledge.\n"
"\n"
"Reply path:\n"
@ -377,23 +390,42 @@ def _build_channel_notification(msg: dict) -> dict:
"""Transform an ``InboxMessage.to_dict()`` into the MCP notification
envelope expected by Claude Code's channel-bridge contract.
Pure function so the wire shape is unit-testable without spinning
up an asyncio loop. The wire-up in ``main()`` just composes this
with ``asyncio.run_coroutine_threadsafe``.
Side-effecting only via the in-process peer-metadata cache: if the
message is from a peer agent, this calls ``enrich_peer_metadata``
to surface the peer's name, role, and agent-card URL alongside the
raw ``peer_id``. The cache is TTL'd at the source, so a busy agent
receiving repeated pushes from one peer doesn't hit the registry on
every push. Enrichment failure is logged at DEBUG and degraded to
bare ``peer_id`` the push must never block on a registry stall.
"""
meta = {
"source": "molecule",
"kind": msg.get("kind", ""),
"peer_id": msg.get("peer_id", ""),
"method": msg.get("method", ""),
"activity_id": msg.get("activity_id", ""),
"ts": msg.get("created_at", ""),
}
peer_id = msg.get("peer_id") or ""
if peer_id:
record = enrich_peer_metadata(peer_id)
if record is not None:
if name := record.get("name"):
meta["peer_name"] = name
if role := record.get("role"):
meta["peer_role"] = role
# agent_card_url is constructable from peer_id alone; surface it
# even when enrichment fails so the receiving agent has a single
# endpoint to hit for capabilities lookup.
meta["agent_card_url"] = _agent_card_url_for(peer_id)
return {
"jsonrpc": "2.0",
"method": _CHANNEL_NOTIFICATION_METHOD,
"params": {
"content": msg.get("text", ""),
"meta": {
"source": "molecule",
"kind": msg.get("kind", ""),
"peer_id": msg.get("peer_id", ""),
"method": msg.get("method", ""),
"activity_id": msg.get("activity_id", ""),
"ts": msg.get("created_at", ""),
},
"meta": meta,
},
}

View File

@ -3,7 +3,7 @@
import asyncio
import json
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -242,6 +242,290 @@ def test_build_channel_notification_handles_missing_fields_gracefully():
assert meta["kind"] == ""
# ----- Channel envelope enrichment (peer_name / peer_role / agent_card_url) ---
#
# The bare envelope only carries `peer_id` for peer_agent inbound, so the
# receiving agent has to round-trip to /registry to find out who's
# talking. Enrichment surfaces the sender's display name, role, and an
# agent-card URL alongside the routing fields so the agent can render
# "ops-agent (sre): hi" in one shot. Cache-backed and TTL'd so a busy
# multi-peer chat doesn't hit the registry on every push.
#
# Tests pin: cache hit, cache miss + registry hit, registry miss
# (graceful degrade), TTL expiry, canvas_user (no enrichment), and the
# agent_card_url surfaces even when the registry is reachable but
# returns nothing usable.
_PEER_UUID = "11111111-2222-3333-4444-555555555555"
@pytest.fixture()
def _reset_peer_metadata_cache(monkeypatch):
"""Each test starts with a clean ``_peer_metadata`` cache so an
earlier test's hit doesn't satisfy a later test's miss. Mutates the
module-level dict in place rather than reassigning so other modules
that imported the dict by reference still see the same instance."""
import a2a_client
a2a_client._peer_metadata.clear()
yield
a2a_client._peer_metadata.clear()
def _make_httpx_response(status_code: int, json_body: object) -> MagicMock:
resp = MagicMock()
resp.status_code = status_code
resp.json.return_value = json_body
return resp
def _patch_httpx_client(returning: MagicMock):
"""Replace httpx.Client with a context-manager mock returning
``returning`` from .get(). Mirrors the inbox tests' pattern so a
future refactor of the registry GET path can be re-tested with the
same harness."""
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
client.get = MagicMock(return_value=returning)
return patch("httpx.Client", return_value=client), client
def test_envelope_enrichment_canvas_user_has_no_peer_fields(_reset_peer_metadata_cache):
"""canvas_user pushes have no peer (peer_id=''). The enrichment
block must short-circuit so we don't fire a wasted registry GET +
don't add empty peer_name/role/agent_card_url to the meta dict."""
from a2a_mcp_server import _build_channel_notification
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello from canvas",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
meta = payload["params"]["meta"]
assert "peer_name" not in meta
assert "peer_role" not in meta
assert "agent_card_url" not in meta
def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache):
"""Cache hit: registry NOT called, meta carries the cached fields.
This is the hot path on a busy multi-peer chat every cache hit
saves a 2-second timeout-bounded registry GET."""
import a2a_client
from a2a_mcp_server import _build_channel_notification
import time as _time
a2a_client._peer_metadata[_PEER_UUID] = (
_time.monotonic(),
{"id": _PEER_UUID, "name": "ops-agent", "role": "sre", "status": "online"},
)
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
with p:
payload = _build_channel_notification({
"activity_id": "act-2",
"text": "ping",
"peer_id": _PEER_UUID,
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T01:23:45Z",
})
assert client.get.call_count == 0, "cache hit must not fire a registry GET"
meta = payload["params"]["meta"]
assert meta["peer_id"] == _PEER_UUID
assert meta["peer_name"] == "ops-agent"
assert meta["peer_role"] == "sre"
assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}")
def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
"""Cache miss + registry hit: GET fires, response cached, meta
carries fetched fields. Subsequent build for the same peer must
NOT re-fetch (cache populated by first call)."""
import a2a_client
from a2a_mcp_server import _build_channel_notification
p, client = _patch_httpx_client(
_make_httpx_response(
200,
{"id": _PEER_UUID, "name": "fetched-name", "role": "router", "status": "online"},
)
)
with p:
payload1 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
})
payload2 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
})
assert client.get.call_count == 1, (
f"second push for same peer must use cache, got {client.get.call_count} GETs"
)
assert payload1["params"]["meta"]["peer_name"] == "fetched-name"
assert payload2["params"]["meta"]["peer_name"] == "fetched-name"
def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache):
"""Registry returns 500 (or 4xx, or network error): enrichment
silently degrades to bare peer_id. The push must not crash, the
push must not block, and the agent_card_url must still surface
because it's constructable from peer_id alone."""
from a2a_mcp_server import _build_channel_notification
p, _ = _patch_httpx_client(_make_httpx_response(500, {}))
with p:
payload = _build_channel_notification({
"activity_id": "act-3",
"text": "ping",
"peer_id": _PEER_UUID,
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
meta = payload["params"]["meta"]
assert meta["peer_id"] == _PEER_UUID
assert "peer_name" not in meta
assert "peer_role" not in meta
assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}"), (
"agent_card_url must be present even on registry failure — "
"it's deterministic from peer_id and gives the agent a single "
"endpoint to retry against"
)
def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metadata_cache):
"""Registry failure must be cached for the TTL window. Without
this, a peer with a flaky or missing registry record re-fires the
2s-bounded GET on EVERY push the cache becomes a no-op for the
exact scenarios it most needs to defend against, and the poller
thread stalls 2s per push for that peer until the registry comes
back. Pin: two pushes from a 5xx-returning peer fire exactly one
GET, not two."""
from a2a_mcp_server import _build_channel_notification
p, client = _patch_httpx_client(_make_httpx_response(500, {}))
with p:
payload1 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
})
payload2 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
})
assert client.get.call_count == 1, (
f"second push from a 5xx-returning peer must use the negative "
f"cache, got {client.get.call_count} GETs"
)
# Both pushes deliver without enrichment (peer_name/role absent),
# but agent_card_url surfaces unconditionally.
for payload in (payload1, payload2):
meta = payload["params"]["meta"]
assert "peer_name" not in meta
assert "peer_role" not in meta
assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}")
def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metadata_cache):
"""Same negative-caching contract for network exceptions —
httpx.ConnectError, DNS failure, registry pod restart all
surface as exceptions from client.get(). Without negative
caching, a temporary network blip turns into a 2s stall on
every push for the duration."""
import a2a_client
from a2a_mcp_server import _build_channel_notification
client = MagicMock()
client.__enter__ = MagicMock(return_value=client)
client.__exit__ = MagicMock(return_value=False)
# Important: simulate the exception INSIDE the with-block (which
# is where the real httpx.Client raises) by making get() raise.
import httpx as _httpx
client.get = MagicMock(side_effect=_httpx.ConnectError("dns down"))
with patch("httpx.Client", return_value=client):
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
assert client.get.call_count == 1, (
f"network exceptions must be negative-cached, got "
f"{client.get.call_count} GETs"
)
# Sanity: the cache entry exists and carries None as the record.
cached = a2a_client._peer_metadata[_PEER_UUID]
assert cached[1] is None
def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache):
"""Cached entry past TTL: registry is hit again. Pin the TTL
behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS``
doesn't accidentally make the cache permanent."""
import time
import a2a_client
from a2a_mcp_server import _build_channel_notification
# Stale entry: anchored to *current* monotonic time minus TTL+slack
# so the entry is unambiguously past the freshness window. A naked
# `0.0` looked stale relative to wall-clock but `time.monotonic()`
# starts at process uptime — when this test ran early in the pytest
# run, current was <300s and the entry was treated as fresh,
# silently skipping the re-fetch the assertion expects.
a2a_client._peer_metadata[_PEER_UUID] = (
time.monotonic() - a2a_client._PEER_METADATA_TTL_SECONDS - 60.0,
{"id": _PEER_UUID, "name": "stale-name", "role": "old"},
)
p, client = _patch_httpx_client(
_make_httpx_response(
200,
{"id": _PEER_UUID, "name": "fresh-name", "role": "new", "status": "online"},
)
)
with p:
payload = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping",
})
assert client.get.call_count == 1, "stale cache must trigger a re-fetch"
assert payload["params"]["meta"]["peer_name"] == "fresh-name"
assert payload["params"]["meta"]["peer_role"] == "new"
def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache):
"""Defensive: a malformed peer_id (not a UUID) must not crash the
push path or cause a registry GET to be fired against an unsanitised
URL. enrich_peer_metadata returns None on validation failure; the
enrichment fields are simply absent."""
from a2a_mcp_server import _build_channel_notification
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
with p:
payload = _build_channel_notification({
"peer_id": "not-a-uuid",
"kind": "peer_agent",
"text": "evil",
})
assert client.get.call_count == 0, (
"invalid peer_id must not reach a network call — UUID validation "
"guards the URL-construction surface"
)
meta = payload["params"]["meta"]
assert meta["peer_id"] == "not-a-uuid"
assert "peer_name" not in meta
assert "peer_role" not in meta
# agent_card_url is constructed unconditionally from peer_id; even on
# an invalid id it's harmless (the receiving agent's GET will 404
# and it can fall back to inbox_pop without enrichment).
assert meta["agent_card_url"] == f"{__import__('a2a_client').PLATFORM_URL}/registry/discover/not-a-uuid"
# ============== initialize handshake — capability declaration ==============
# Without `experimental.claude/channel`, Claude Code's MCP client drops
# our notifications/claude/channel emissions instead of routing them as
@ -480,6 +764,42 @@ def test_instructions_zero_timeout_means_push_only_mode():
os.environ["MOLECULE_MCP_POLL_TIMEOUT_SECS"] = saved
def test_instructions_document_envelope_enrichment_attrs():
"""The agent learns about envelope attributes ONLY from the
instructions string. PR-B added peer_name, peer_role,
agent_card_url to the wire shape; pin that the instructions list
them in the <channel> tag template AND describe each one's
semantics. Without this, the wheel ships new attributes that no
agent ever uses."""
from a2a_mcp_server import _build_initialize_result
instructions = _build_initialize_result()["instructions"]
# The <channel> tag template in the PUSH PATH section must include
# the new attribute names so the agent recognises them when they
# arrive inline.
for attr in ("peer_name", "peer_role", "agent_card_url"):
assert attr in instructions, (
f"instructions must list `{attr}` as a <channel> tag "
f"attribute — otherwise the agent sees the attr in pushes "
f"but doesn't know what to do with it"
)
# And the per-field semantics block must explain when each attr
# is present + what it means. These phrases are what the agent
# actually reads to decide how to surface the attrs in its turn.
assert "registry resolved" in instructions, (
"instructions must explain peer_name/peer_role come from a "
"registry lookup that may fail — otherwise the agent treats "
"their absence as a bug instead of a graceful degrade"
)
assert "discover endpoint" in instructions, (
"instructions must point at the registry discover endpoint "
"for agent_card_url so the agent knows it's a follow-on URL "
"to fetch full capabilities, not the body of the message"
)
def test_initialize_instructions_pins_prompt_injection_defense():
"""The threat-model sentence in `_CHANNEL_INSTRUCTIONS` is what
tells the agent that inbound canvas-user / peer-agent message