From 050aa33fc1ebdba12db7e13aac68db8f1661b493 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:40:09 -0700 Subject: [PATCH 1/5] feat(a2a-mcp): enrich channel envelope with peer name/role/agent_card_url MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bare envelope only carried `peer_id` for peer_agent inbound, so a receiving agent had to round-trip to /registry to find out who's talking. Surface the sender's display name, role, and an agent-card URL alongside the routing fields so the agent can render "ops-agent (sre): ping" in one shot without an extra lookup. a2a_client.py: - Add _peer_metadata cache `dict[peer_id → (fetched_at, record)]` - Add enrich_peer_metadata(peer_id) — sync, hits cache or registry with a tight 2s timeout, returns None on validation/network/non-200 so callers can degrade gracefully - TTL = 5 min so a busy multi-peer chat doesn't hit registry on every push, but role/name renames propagate within a session - Add _agent_card_url_for(peer_id) — deterministic from peer_id alone a2a_mcp_server.py: - _build_channel_notification calls enrich_peer_metadata when peer_id is non-empty; meta carries peer_name + peer_role + agent_card_url alongside the existing routing fields - agent_card_url surfaces unconditionally (constructable from peer_id); peer_name/role only when registry lookup succeeds — never blocks the push on a registry stall Tests: 6 new branches (canvas_user no enrichment / cache hit no GET / cache miss fetches once / registry-fail graceful degrade / TTL expiry re-fetches / invalid peer_id skips lookup). Mutation-verified: 6/6 fail without prod code, 39/39 pass with. Tracks the broader RFC at #2469 (workspace-server activity_type rename to break the echo loop). Independent of PR #2470 — this is the metadata-enrichment half of the same UX improvement. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 81 +++++++++ workspace/a2a_mcp_server.py | 43 +++-- workspace/tests/test_a2a_mcp_server.py | 219 ++++++++++++++++++++++++- 3 files changed, 331 insertions(+), 12 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 4a9be69e..62aa94b4 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -30,6 +30,87 @@ else: # Cache workspace ID → name mappings (populated by list_peers calls) _peer_names: dict[str, str] = {} +# Cache workspace ID → full peer record (id, name, role, status, url, ...). +# Populated by tool_list_peers and by the lazy registry lookup in +# enrich_peer_metadata. The notification-callback path (channel envelope +# enrichment) reads this cache on every inbound peer_agent push, so a +# bare ``dict[str, dict]`` is the fastest read shape; entries carry their +# fetched-at timestamp so TTL eviction is in-line with the lookup. +_peer_metadata: dict[str, tuple[float, dict]] = {} + +# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes +# is the same window we use for delegation routing — long enough that a +# busy agent receiving repeated pushes from one peer doesn't hit the +# registry on every push, short enough that role/name renames propagate +# within a single agent session. +_PEER_METADATA_TTL_SECONDS = 300.0 + + +def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | None: + """Return cached or freshly-fetched metadata for ``peer_id``. + + Sync helper — safe to call from the inbox poller's notification + callback thread (which is not async). Hits the in-process cache + first; on miss or TTL expiry, GETs ``/registry/discover/`` + synchronously with a tight timeout. Returns None on validation + failure, network failure, or non-200 response so callers can + degrade gracefully (the channel envelope falls back to the raw + ``peer_id`` instead of crashing the push path). + + The fetched dict is stored as-is, so callers can read whatever + fields the platform exposes (currently: ``id``, ``name``, ``role``, + ``status``, ``url``). New fields surface automatically without a + code change here. + """ + canon = _validate_peer_id(peer_id) + if canon is None: + return None + + current = now if now is not None else time.monotonic() + cached = _peer_metadata.get(canon) + if cached is not None: + fetched_at, record = cached + if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + return record + + url = f"{PLATFORM_URL}/registry/discover/{canon}" + try: + with httpx.Client(timeout=2.0) as client: + resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()}) + except Exception as exc: # noqa: BLE001 + logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) + return cached[1] if cached is not None else None + + if resp.status_code != 200: + logger.debug( + "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code + ) + return cached[1] if cached is not None else None + + try: + data = resp.json() + except Exception: # noqa: BLE001 + return cached[1] if cached is not None else None + if not isinstance(data, dict): + return cached[1] if cached is not None else None + + _peer_metadata[canon] = (current, data) + if name := data.get("name"): + _peer_names[canon] = name + return data + + +def _agent_card_url_for(peer_id: str) -> str: + """Construct the platform-side agent-card URL for ``peer_id``. + + Uses the registry's discovery path so the agent receiving a push + can hit a single endpoint to enumerate the sender's capabilities + + role + URL. Same shape every workspace exposes regardless of + runtime — claude-code, hermes, langchain wrappers all register + through ``/registry/register`` and surface through ``/registry/discover``. + """ + return f"{PLATFORM_URL}/registry/discover/{peer_id}" + # Sentinel prefix for errors originating from send_a2a_message / child agents. # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index fc7e4862..2c47655a 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -48,8 +48,10 @@ from a2a_client import ( # noqa: F401, E402 PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _agent_card_url_for, _peer_names, discover_peer, + enrich_peer_metadata, get_peers, get_workspace_info, send_a2a_message, @@ -370,23 +372,42 @@ def _build_channel_notification(msg: dict) -> dict: """Transform an ``InboxMessage.to_dict()`` into the MCP notification envelope expected by Claude Code's channel-bridge contract. - Pure function so the wire shape is unit-testable without spinning - up an asyncio loop. The wire-up in ``main()`` just composes this - with ``asyncio.run_coroutine_threadsafe``. + Side-effecting only via the in-process peer-metadata cache: if the + message is from a peer agent, this calls ``enrich_peer_metadata`` + to surface the peer's name, role, and agent-card URL alongside the + raw ``peer_id``. The cache is TTL'd at the source, so a busy agent + receiving repeated pushes from one peer doesn't hit the registry on + every push. Enrichment failure is logged at DEBUG and degraded to + bare ``peer_id`` — the push must never block on a registry stall. """ + meta = { + "source": "molecule", + "kind": msg.get("kind", ""), + "peer_id": msg.get("peer_id", ""), + "method": msg.get("method", ""), + "activity_id": msg.get("activity_id", ""), + "ts": msg.get("created_at", ""), + } + + peer_id = msg.get("peer_id") or "" + if peer_id: + record = enrich_peer_metadata(peer_id) + if record is not None: + if name := record.get("name"): + meta["peer_name"] = name + if role := record.get("role"): + meta["peer_role"] = role + # agent_card_url is constructable from peer_id alone; surface it + # even when enrichment fails so the receiving agent has a single + # endpoint to hit for capabilities lookup. + meta["agent_card_url"] = _agent_card_url_for(peer_id) + return { "jsonrpc": "2.0", "method": _CHANNEL_NOTIFICATION_METHOD, "params": { "content": msg.get("text", ""), - "meta": { - "source": "molecule", - "kind": msg.get("kind", ""), - "peer_id": msg.get("peer_id", ""), - "method": msg.get("method", ""), - "activity_id": msg.get("activity_id", ""), - "ts": msg.get("created_at", ""), - }, + "meta": meta, }, } diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 18d038c2..6efa522d 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -3,7 +3,7 @@ import asyncio import json -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -242,6 +242,223 @@ def test_build_channel_notification_handles_missing_fields_gracefully(): assert meta["kind"] == "" +# ----- Channel envelope enrichment (peer_name / peer_role / agent_card_url) --- +# +# The bare envelope only carries `peer_id` for peer_agent inbound, so the +# receiving agent has to round-trip to /registry to find out who's +# talking. Enrichment surfaces the sender's display name, role, and an +# agent-card URL alongside the routing fields so the agent can render +# "ops-agent (sre): hi" in one shot. Cache-backed and TTL'd so a busy +# multi-peer chat doesn't hit the registry on every push. +# +# Tests pin: cache hit, cache miss + registry hit, registry miss +# (graceful degrade), TTL expiry, canvas_user (no enrichment), and the +# agent_card_url surfaces even when the registry is reachable but +# returns nothing usable. + + +_PEER_UUID = "11111111-2222-3333-4444-555555555555" +_OTHER_PEER = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + +@pytest.fixture() +def _reset_peer_metadata_cache(monkeypatch): + """Each test starts with a clean ``_peer_metadata`` cache so an + earlier test's hit doesn't satisfy a later test's miss. Mutates the + module-level dict in place rather than reassigning so other modules + that imported the dict by reference still see the same instance.""" + import a2a_client + a2a_client._peer_metadata.clear() + yield + a2a_client._peer_metadata.clear() + + +def _make_httpx_response(status_code: int, json_body: object) -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_body + return resp + + +def _patch_httpx_client(returning: MagicMock): + """Replace httpx.Client with a context-manager mock returning + ``returning`` from .get(). Mirrors the inbox tests' pattern so a + future refactor of the registry GET path can be re-tested with the + same harness.""" + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=returning) + return patch("httpx.Client", return_value=client), client + + +def test_envelope_enrichment_canvas_user_has_no_peer_fields(_reset_peer_metadata_cache): + """canvas_user pushes have no peer (peer_id=''). The enrichment + block must short-circuit so we don't fire a wasted registry GET + + don't add empty peer_name/role/agent_card_url to the meta dict.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello from canvas", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert "agent_card_url" not in meta + + +def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache): + """Cache hit: registry NOT called, meta carries the cached fields. + This is the hot path on a busy multi-peer chat — every cache hit + saves a 2-second timeout-bounded registry GET.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + import time as _time + + a2a_client._peer_metadata[_PEER_UUID] = ( + _time.monotonic(), + {"id": _PEER_UUID, "name": "ops-agent", "role": "sre", "status": "online"}, + ) + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-2", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T01:23:45Z", + }) + + assert client.get.call_count == 0, "cache hit must not fire a registry GET" + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert meta["peer_name"] == "ops-agent" + assert meta["peer_role"] == "sre" + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache): + """Cache miss + registry hit: GET fires, response cached, meta + carries fetched fields. Subsequent build for the same peer must + NOT re-fetch (cache populated by first call).""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fetched-name", "role": "router", "status": "online"}, + ) + ) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push for same peer must use cache, got {client.get.call_count} GETs" + ) + assert payload1["params"]["meta"]["peer_name"] == "fetched-name" + assert payload2["params"]["meta"]["peer_name"] == "fetched-name" + + +def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache): + """Registry returns 500 (or 4xx, or network error): enrichment + silently degrades to bare peer_id. The push must not crash, the + push must not block, and the agent_card_url must still surface + because it's constructable from peer_id alone.""" + from a2a_mcp_server import _build_channel_notification + + p, _ = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-3", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}"), ( + "agent_card_url must be present even on registry failure — " + "it's deterministic from peer_id and gives the agent a single " + "endpoint to retry against" + ) + + +def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): + """Cached entry past TTL: registry is hit again. Pin the TTL + behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS`` + doesn't accidentally make the cache permanent.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + # Stale entry: fetched 1 hour ago (>> 5 min TTL). + a2a_client._peer_metadata[_PEER_UUID] = ( + 0.0, # treated as ancient relative to time.monotonic() + {"id": _PEER_UUID, "name": "stale-name", "role": "old"}, + ) + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fresh-name", "role": "new", "status": "online"}, + ) + ) + with p: + payload = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping", + }) + + assert client.get.call_count == 1, "stale cache must trigger a re-fetch" + assert payload["params"]["meta"]["peer_name"] == "fresh-name" + assert payload["params"]["meta"]["peer_role"] == "new" + + +def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache): + """Defensive: a malformed peer_id (not a UUID) must not crash the + push path or cause a registry GET to be fired against an unsanitised + URL. enrich_peer_metadata returns None on validation failure; the + enrichment fields are simply absent.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "peer_id": "not-a-uuid", + "kind": "peer_agent", + "text": "evil", + }) + + assert client.get.call_count == 0, ( + "invalid peer_id must not reach a network call — UUID validation " + "guards the URL-construction surface" + ) + meta = payload["params"]["meta"] + assert meta["peer_id"] == "not-a-uuid" + assert "peer_name" not in meta + assert "peer_role" not in meta + # agent_card_url is constructed unconditionally from peer_id; even on + # an invalid id it's harmless (the receiving agent's GET will 404 + # and it can fall back to inbox_pop without enrichment). + assert meta["agent_card_url"] == f"{__import__('a2a_client').PLATFORM_URL}/registry/discover/not-a-uuid" + + # ============== initialize handshake — capability declaration ============== # Without `experimental.claude/channel`, Claude Code's MCP client drops # our notifications/claude/channel emissions instead of routing them as From 0fec3d6fe44f82c1a0d3354158510e38738fe372 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:45:05 -0700 Subject: [PATCH 2/5] fix(test): anchor envelope-enrichment TTL test to monotonic baseline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Setting fetched_at = 0.0 assumed wall-clock semantics, but time.monotonic() returns process uptime — when this test ran early in the pytest run, current was <300s and the entry was treated as fresh, silently skipping the re-fetch the assertion expects. Anchor to time.monotonic() - TTL - 60 so the entry is unambiguously past the freshness window regardless of when in the run the test fires. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/tests/test_a2a_mcp_server.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 6efa522d..5d625775 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -405,12 +405,19 @@ def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): """Cached entry past TTL: registry is hit again. Pin the TTL behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS`` doesn't accidentally make the cache permanent.""" + import time + import a2a_client from a2a_mcp_server import _build_channel_notification - # Stale entry: fetched 1 hour ago (>> 5 min TTL). + # Stale entry: anchored to *current* monotonic time minus TTL+slack + # so the entry is unambiguously past the freshness window. A naked + # `0.0` looked stale relative to wall-clock but `time.monotonic()` + # starts at process uptime — when this test ran early in the pytest + # run, current was <300s and the entry was treated as fresh, + # silently skipping the re-fetch the assertion expects. a2a_client._peer_metadata[_PEER_UUID] = ( - 0.0, # treated as ancient relative to time.monotonic() + time.monotonic() - a2a_client._PEER_METADATA_TTL_SECONDS - 60.0, {"id": _PEER_UUID, "name": "stale-name", "role": "old"}, ) From 103ac09aebdda3b96a3eff83ed3e9e3e80a081eb Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:49:36 -0700 Subject: [PATCH 3/5] docs(a2a-mcp): list new envelope attrs in initialize instructions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent learns about tag attributes ONLY from the instructions string returned by initialize. Without this update the wheel ships peer_name / peer_role / agent_card_url on the wire but no agent ever uses them — they get printed inline in the push tag, the agent doesn't know they're there, and the UX gain from the enrichment is lost. Update _build_channel_instructions to: - List the new attrs in the tag template under PUSH PATH - Add per-attribute semantics (when present, what to do with them, what \"absent\" means — graceful-degrade vs bug) - Point at the discover endpoint for agent_card_url so the agent treats it as a follow-on URL not the body of the message Tests: structural pin asserting all three attr names appear in the instructions AND the per-field semantics phrases (\"registry resolved\", \"discover endpoint\") so a future copy-edit that shortens the prose can't silently drop the agent guidance. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_mcp_server.py | 15 +++++++++-- workspace/tests/test_a2a_mcp_server.py | 36 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 2c47655a..aaa10bcf 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -221,8 +221,9 @@ def _build_channel_instructions() -> str: "\n" "PUSH PATH (Claude Code with channel push enabled):\n" "Messages arrive as tags as a " - "synthetic user turn — no agent action needed to surface them.\n" + "peer_id=\"...\" peer_name=\"...\" peer_role=\"...\" " + "agent_card_url=\"...\" activity_id=\"...\" ts=\"...\"> tags as " + "a synthetic user turn — no agent action needed to surface them.\n" "\n" "POLL PATH (every other MCP client + Claude Code without push " "enabled — this is the universal default):\n" @@ -234,6 +235,16 @@ def _build_channel_instructions() -> str: "delegating to you).\n" "- `peer_id` is empty for canvas_user, set to the sender " "workspace UUID for peer_agent.\n" + "- `peer_name` and `peer_role` are present for peer_agent when " + "the platform registry resolved the sender — e.g. " + "`peer_name=\"ops-agent\"`, `peer_role=\"sre\"`. Surface these " + "in your reasoning so the user can tell which peer is talking " + "without having to memorise UUIDs. Absent on canvas_user and " + "on a registry-lookup failure (the push still delivers).\n" + "- `agent_card_url` is present for peer_agent and points at " + "the platform's discover endpoint for that peer — fetch it if " + "you need the peer's full capability list (skills, role, " + "runtime).\n" "- `activity_id` is the inbox row to acknowledge.\n" "\n" "Reply path:\n" diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 5d625775..b9e68e80 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -704,6 +704,42 @@ def test_instructions_zero_timeout_means_push_only_mode(): os.environ["MOLECULE_MCP_POLL_TIMEOUT_SECS"] = saved +def test_instructions_document_envelope_enrichment_attrs(): + """The agent learns about envelope attributes ONLY from the + instructions string. PR-B added peer_name, peer_role, + agent_card_url to the wire shape; pin that the instructions list + them in the tag template AND describe each one's + semantics. Without this, the wheel ships new attributes that no + agent ever uses.""" + from a2a_mcp_server import _build_initialize_result + + instructions = _build_initialize_result()["instructions"] + + # The tag template in the PUSH PATH section must include + # the new attribute names so the agent recognises them when they + # arrive inline. + for attr in ("peer_name", "peer_role", "agent_card_url"): + assert attr in instructions, ( + f"instructions must list `{attr}` as a tag " + f"attribute — otherwise the agent sees the attr in pushes " + f"but doesn't know what to do with it" + ) + + # And the per-field semantics block must explain when each attr + # is present + what it means. These phrases are what the agent + # actually reads to decide how to surface the attrs in its turn. + assert "registry resolved" in instructions, ( + "instructions must explain peer_name/peer_role come from a " + "registry lookup that may fail — otherwise the agent treats " + "their absence as a bug instead of a graceful degrade" + ) + assert "discover endpoint" in instructions, ( + "instructions must point at the registry discover endpoint " + "for agent_card_url so the agent knows it's a follow-on URL " + "to fetch full capabilities, not the body of the message" + ) + + def test_initialize_instructions_pins_prompt_injection_defense(): """The threat-model sentence in `_CHANNEL_INSTRUCTIONS` is what tells the agent that inbound canvas-user / peer-agent message From e6eda38318a2f392aac9f1aaf6e31e05400a7ab9 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 18:16:35 -0700 Subject: [PATCH 4/5] fix(a2a-client): negative-cache registry failures in enrich_peer_metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review on PR #2471: failure outcomes (4xx/5xx/non-JSON/network exception) weren't writing to _peer_metadata, so a peer with a flaky or missing registry record re-fired the 2s-bounded GET on EVERY push. The cache became a no-op for the exact failure scenarios it most needs to defend against, and the poller thread stalled 2s per push for that peer until the registry came back. Cache the failure outcome as `(now, None)` so the TTL window suppresses re-fetch. Two new tests pin the behaviour for both HTTP failures (5xx) and transport exceptions (httpx.ConnectError). Type signature widens to `dict | None` on the value tuple's second slot to match the new sentinel; readers already handle `None` as "no enrichment available" — that's the documented graceful-degrade contract — so no caller change needed. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 31 ++++++++++--- workspace/tests/test_a2a_mcp_server.py | 61 ++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 62aa94b4..f583be61 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -34,9 +34,12 @@ _peer_names: dict[str, str] = {} # Populated by tool_list_peers and by the lazy registry lookup in # enrich_peer_metadata. The notification-callback path (channel envelope # enrichment) reads this cache on every inbound peer_agent push, so a -# bare ``dict[str, dict]`` is the fastest read shape; entries carry their -# fetched-at timestamp so TTL eviction is in-line with the lookup. -_peer_metadata: dict[str, tuple[float, dict]] = {} +# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read +# shape; entries carry their fetched-at timestamp so TTL eviction is +# in-line with the lookup. ``None`` as the record is the negative-cache +# sentinel: registry failure is cached for one TTL window so we don't +# re-fire the 2s-bounded GET on every push from a flaky peer. +_peer_metadata: dict[str, tuple[float, dict | None]] = {} # How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes # is the same window we use for delegation routing — long enough that a @@ -57,6 +60,13 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No degrade gracefully (the channel envelope falls back to the raw ``peer_id`` instead of crashing the push path). + Negative caching: failure outcomes (4xx/5xx/non-JSON/network + exception) are stored as ``(now, None)`` and treated as + fresh-but-empty for the TTL window. Without this, a peer with a + flaky/missing registry record would re-fire the 2s-bounded GET on + EVERY push — turning the cache into a no-op for the exact failure + scenarios it most needs to defend against. + The fetched dict is stored as-is, so callers can read whatever fields the platform exposes (currently: ``id``, ``name``, ``role``, ``status``, ``url``). New fields surface automatically without a @@ -71,6 +81,9 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + # Fresh entry — return whatever's there. ``None`` is the + # negative-cache sentinel: caller treats absence of fields + # the same as a registry miss, which is the desired UX. return record url = f"{PLATFORM_URL}/registry/discover/{canon}" @@ -79,20 +92,24 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()}) except Exception as exc: # noqa: BLE001 logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None if resp.status_code != 200: logger.debug( "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code ) - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None try: data = resp.json() except Exception: # noqa: BLE001 - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None if not isinstance(data, dict): - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None _peer_metadata[canon] = (current, data) if name := data.get("name"): diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index b9e68e80..f8111410 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -401,6 +401,67 @@ def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_c ) +def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metadata_cache): + """Registry failure must be cached for the TTL window. Without + this, a peer with a flaky or missing registry record re-fires the + 2s-bounded GET on EVERY push — the cache becomes a no-op for the + exact scenarios it most needs to defend against, and the poller + thread stalls 2s per push for that peer until the registry comes + back. Pin: two pushes from a 5xx-returning peer fire exactly one + GET, not two.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push from a 5xx-returning peer must use the negative " + f"cache, got {client.get.call_count} GETs" + ) + # Both pushes deliver without enrichment (peer_name/role absent), + # but agent_card_url surfaces unconditionally. + for payload in (payload1, payload2): + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metadata_cache): + """Same negative-caching contract for network exceptions — + httpx.ConnectError, DNS failure, registry pod restart all + surface as exceptions from client.get(). Without negative + caching, a temporary network blip turns into a 2s stall on + every push for the duration.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + # Important: simulate the exception INSIDE the with-block (which + # is where the real httpx.Client raises) by making get() raise. + import httpx as _httpx + client.get = MagicMock(side_effect=_httpx.ConnectError("dns down")) + with patch("httpx.Client", return_value=client): + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + + assert client.get.call_count == 1, ( + f"network exceptions must be negative-cached, got " + f"{client.get.call_count} GETs" + ) + # Sanity: the cache entry exists and carries None as the record. + cached = a2a_client._peer_metadata[_PEER_UUID] + assert cached[1] is None + + def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): """Cached entry past TTL: registry is hit again. Pin the TTL behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS`` From 885eff2350fd95c0e0ee078e4e3e89aff000ce35 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 18:28:24 -0700 Subject: [PATCH 5/5] test: drop unused _OTHER_PEER constant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit github-code-quality bot flagged it as an unused module-level global — correctly. The earlier draft of the negative-cache test was going to exercise two distinct peer IDs hitting the registry concurrently, but the test was simplified to a single-peer flow before merge and the constant lost its consumer. Resolves the only blocking review thread on PR #2471. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/tests/test_a2a_mcp_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index f8111410..f847cf72 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -258,7 +258,6 @@ def test_build_channel_notification_handles_missing_fields_gracefully(): _PEER_UUID = "11111111-2222-3333-4444-555555555555" -_OTHER_PEER = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" @pytest.fixture()