diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 4a9be69e..f583be61 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -30,6 +30,104 @@ else: # Cache workspace ID → name mappings (populated by list_peers calls) _peer_names: dict[str, str] = {} +# Cache workspace ID → full peer record (id, name, role, status, url, ...). +# Populated by tool_list_peers and by the lazy registry lookup in +# enrich_peer_metadata. The notification-callback path (channel envelope +# enrichment) reads this cache on every inbound peer_agent push, so a +# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read +# shape; entries carry their fetched-at timestamp so TTL eviction is +# in-line with the lookup. ``None`` as the record is the negative-cache +# sentinel: registry failure is cached for one TTL window so we don't +# re-fire the 2s-bounded GET on every push from a flaky peer. +_peer_metadata: dict[str, tuple[float, dict | None]] = {} + +# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes +# is the same window we use for delegation routing — long enough that a +# busy agent receiving repeated pushes from one peer doesn't hit the +# registry on every push, short enough that role/name renames propagate +# within a single agent session. +_PEER_METADATA_TTL_SECONDS = 300.0 + + +def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | None: + """Return cached or freshly-fetched metadata for ``peer_id``. + + Sync helper — safe to call from the inbox poller's notification + callback thread (which is not async). Hits the in-process cache + first; on miss or TTL expiry, GETs ``/registry/discover/`` + synchronously with a tight timeout. Returns None on validation + failure, network failure, or non-200 response so callers can + degrade gracefully (the channel envelope falls back to the raw + ``peer_id`` instead of crashing the push path). + + Negative caching: failure outcomes (4xx/5xx/non-JSON/network + exception) are stored as ``(now, None)`` and treated as + fresh-but-empty for the TTL window. Without this, a peer with a + flaky/missing registry record would re-fire the 2s-bounded GET on + EVERY push — turning the cache into a no-op for the exact failure + scenarios it most needs to defend against. + + The fetched dict is stored as-is, so callers can read whatever + fields the platform exposes (currently: ``id``, ``name``, ``role``, + ``status``, ``url``). New fields surface automatically without a + code change here. + """ + canon = _validate_peer_id(peer_id) + if canon is None: + return None + + current = now if now is not None else time.monotonic() + cached = _peer_metadata.get(canon) + if cached is not None: + fetched_at, record = cached + if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + # Fresh entry — return whatever's there. ``None`` is the + # negative-cache sentinel: caller treats absence of fields + # the same as a registry miss, which is the desired UX. + return record + + url = f"{PLATFORM_URL}/registry/discover/{canon}" + try: + with httpx.Client(timeout=2.0) as client: + resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()}) + except Exception as exc: # noqa: BLE001 + logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) + _peer_metadata[canon] = (current, None) + return None + + if resp.status_code != 200: + logger.debug( + "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code + ) + _peer_metadata[canon] = (current, None) + return None + + try: + data = resp.json() + except Exception: # noqa: BLE001 + _peer_metadata[canon] = (current, None) + return None + if not isinstance(data, dict): + _peer_metadata[canon] = (current, None) + return None + + _peer_metadata[canon] = (current, data) + if name := data.get("name"): + _peer_names[canon] = name + return data + + +def _agent_card_url_for(peer_id: str) -> str: + """Construct the platform-side agent-card URL for ``peer_id``. + + Uses the registry's discovery path so the agent receiving a push + can hit a single endpoint to enumerate the sender's capabilities + + role + URL. Same shape every workspace exposes regardless of + runtime — claude-code, hermes, langchain wrappers all register + through ``/registry/register`` and surface through ``/registry/discover``. + """ + return f"{PLATFORM_URL}/registry/discover/{peer_id}" + # Sentinel prefix for errors originating from send_a2a_message / child agents. # Used by delegate_task to distinguish real errors from normal response text. _A2A_ERROR_PREFIX = "[A2A_ERROR] " diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 67e448d5..738b322a 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -49,8 +49,10 @@ from a2a_client import ( # noqa: F401, E402 PLATFORM_URL, WORKSPACE_ID, _A2A_ERROR_PREFIX, + _agent_card_url_for, _peer_names, discover_peer, + enrich_peer_metadata, get_peers, get_workspace_info, send_a2a_message, @@ -226,8 +228,9 @@ def _build_channel_instructions() -> str: "\n" "PUSH PATH (Claude Code with channel push enabled):\n" "Messages arrive as tags as a " - "synthetic user turn — no agent action needed to surface them.\n" + "peer_id=\"...\" peer_name=\"...\" peer_role=\"...\" " + "agent_card_url=\"...\" activity_id=\"...\" ts=\"...\"> tags as " + "a synthetic user turn — no agent action needed to surface them.\n" "\n" "POLL PATH (every other MCP client + Claude Code without push " "enabled — this is the universal default):\n" @@ -239,6 +242,16 @@ def _build_channel_instructions() -> str: "delegating to you).\n" "- `peer_id` is empty for canvas_user, set to the sender " "workspace UUID for peer_agent.\n" + "- `peer_name` and `peer_role` are present for peer_agent when " + "the platform registry resolved the sender — e.g. " + "`peer_name=\"ops-agent\"`, `peer_role=\"sre\"`. Surface these " + "in your reasoning so the user can tell which peer is talking " + "without having to memorise UUIDs. Absent on canvas_user and " + "on a registry-lookup failure (the push still delivers).\n" + "- `agent_card_url` is present for peer_agent and points at " + "the platform's discover endpoint for that peer — fetch it if " + "you need the peer's full capability list (skills, role, " + "runtime).\n" "- `activity_id` is the inbox row to acknowledge.\n" "\n" "Reply path:\n" @@ -377,23 +390,42 @@ def _build_channel_notification(msg: dict) -> dict: """Transform an ``InboxMessage.to_dict()`` into the MCP notification envelope expected by Claude Code's channel-bridge contract. - Pure function so the wire shape is unit-testable without spinning - up an asyncio loop. The wire-up in ``main()`` just composes this - with ``asyncio.run_coroutine_threadsafe``. + Side-effecting only via the in-process peer-metadata cache: if the + message is from a peer agent, this calls ``enrich_peer_metadata`` + to surface the peer's name, role, and agent-card URL alongside the + raw ``peer_id``. The cache is TTL'd at the source, so a busy agent + receiving repeated pushes from one peer doesn't hit the registry on + every push. Enrichment failure is logged at DEBUG and degraded to + bare ``peer_id`` — the push must never block on a registry stall. """ + meta = { + "source": "molecule", + "kind": msg.get("kind", ""), + "peer_id": msg.get("peer_id", ""), + "method": msg.get("method", ""), + "activity_id": msg.get("activity_id", ""), + "ts": msg.get("created_at", ""), + } + + peer_id = msg.get("peer_id") or "" + if peer_id: + record = enrich_peer_metadata(peer_id) + if record is not None: + if name := record.get("name"): + meta["peer_name"] = name + if role := record.get("role"): + meta["peer_role"] = role + # agent_card_url is constructable from peer_id alone; surface it + # even when enrichment fails so the receiving agent has a single + # endpoint to hit for capabilities lookup. + meta["agent_card_url"] = _agent_card_url_for(peer_id) + return { "jsonrpc": "2.0", "method": _CHANNEL_NOTIFICATION_METHOD, "params": { "content": msg.get("text", ""), - "meta": { - "source": "molecule", - "kind": msg.get("kind", ""), - "peer_id": msg.get("peer_id", ""), - "method": msg.get("method", ""), - "activity_id": msg.get("activity_id", ""), - "ts": msg.get("created_at", ""), - }, + "meta": meta, }, } diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 18d038c2..f847cf72 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -3,7 +3,7 @@ import asyncio import json -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -242,6 +242,290 @@ def test_build_channel_notification_handles_missing_fields_gracefully(): assert meta["kind"] == "" +# ----- Channel envelope enrichment (peer_name / peer_role / agent_card_url) --- +# +# The bare envelope only carries `peer_id` for peer_agent inbound, so the +# receiving agent has to round-trip to /registry to find out who's +# talking. Enrichment surfaces the sender's display name, role, and an +# agent-card URL alongside the routing fields so the agent can render +# "ops-agent (sre): hi" in one shot. Cache-backed and TTL'd so a busy +# multi-peer chat doesn't hit the registry on every push. +# +# Tests pin: cache hit, cache miss + registry hit, registry miss +# (graceful degrade), TTL expiry, canvas_user (no enrichment), and the +# agent_card_url surfaces even when the registry is reachable but +# returns nothing usable. + + +_PEER_UUID = "11111111-2222-3333-4444-555555555555" + + +@pytest.fixture() +def _reset_peer_metadata_cache(monkeypatch): + """Each test starts with a clean ``_peer_metadata`` cache so an + earlier test's hit doesn't satisfy a later test's miss. Mutates the + module-level dict in place rather than reassigning so other modules + that imported the dict by reference still see the same instance.""" + import a2a_client + a2a_client._peer_metadata.clear() + yield + a2a_client._peer_metadata.clear() + + +def _make_httpx_response(status_code: int, json_body: object) -> MagicMock: + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_body + return resp + + +def _patch_httpx_client(returning: MagicMock): + """Replace httpx.Client with a context-manager mock returning + ``returning`` from .get(). Mirrors the inbox tests' pattern so a + future refactor of the registry GET path can be re-tested with the + same harness.""" + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + client.get = MagicMock(return_value=returning) + return patch("httpx.Client", return_value=client), client + + +def test_envelope_enrichment_canvas_user_has_no_peer_fields(_reset_peer_metadata_cache): + """canvas_user pushes have no peer (peer_id=''). The enrichment + block must short-circuit so we don't fire a wasted registry GET + + don't add empty peer_name/role/agent_card_url to the meta dict.""" + from a2a_mcp_server import _build_channel_notification + + payload = _build_channel_notification({ + "activity_id": "act-1", + "text": "hello from canvas", + "peer_id": "", + "kind": "canvas_user", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert "agent_card_url" not in meta + + +def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache): + """Cache hit: registry NOT called, meta carries the cached fields. + This is the hot path on a busy multi-peer chat — every cache hit + saves a 2-second timeout-bounded registry GET.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + import time as _time + + a2a_client._peer_metadata[_PEER_UUID] = ( + _time.monotonic(), + {"id": _PEER_UUID, "name": "ops-agent", "role": "sre", "status": "online"}, + ) + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-2", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T01:23:45Z", + }) + + assert client.get.call_count == 0, "cache hit must not fire a registry GET" + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert meta["peer_name"] == "ops-agent" + assert meta["peer_role"] == "sre" + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache): + """Cache miss + registry hit: GET fires, response cached, meta + carries fetched fields. Subsequent build for the same peer must + NOT re-fetch (cache populated by first call).""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fetched-name", "role": "router", "status": "online"}, + ) + ) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push for same peer must use cache, got {client.get.call_count} GETs" + ) + assert payload1["params"]["meta"]["peer_name"] == "fetched-name" + assert payload2["params"]["meta"]["peer_name"] == "fetched-name" + + +def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache): + """Registry returns 500 (or 4xx, or network error): enrichment + silently degrades to bare peer_id. The push must not crash, the + push must not block, and the agent_card_url must still surface + because it's constructable from peer_id alone.""" + from a2a_mcp_server import _build_channel_notification + + p, _ = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload = _build_channel_notification({ + "activity_id": "act-3", + "text": "ping", + "peer_id": _PEER_UUID, + "kind": "peer_agent", + "method": "message/send", + "created_at": "2026-05-01T00:00:00Z", + }) + + meta = payload["params"]["meta"] + assert meta["peer_id"] == _PEER_UUID + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}"), ( + "agent_card_url must be present even on registry failure — " + "it's deterministic from peer_id and gives the agent a single " + "endpoint to retry against" + ) + + +def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metadata_cache): + """Registry failure must be cached for the TTL window. Without + this, a peer with a flaky or missing registry record re-fires the + 2s-bounded GET on EVERY push — the cache becomes a no-op for the + exact scenarios it most needs to defend against, and the poller + thread stalls 2s per push for that peer until the registry comes + back. Pin: two pushes from a 5xx-returning peer fire exactly one + GET, not two.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push from a 5xx-returning peer must use the negative " + f"cache, got {client.get.call_count} GETs" + ) + # Both pushes deliver without enrichment (peer_name/role absent), + # but agent_card_url surfaces unconditionally. + for payload in (payload1, payload2): + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metadata_cache): + """Same negative-caching contract for network exceptions — + httpx.ConnectError, DNS failure, registry pod restart all + surface as exceptions from client.get(). Without negative + caching, a temporary network blip turns into a 2s stall on + every push for the duration.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + # Important: simulate the exception INSIDE the with-block (which + # is where the real httpx.Client raises) by making get() raise. + import httpx as _httpx + client.get = MagicMock(side_effect=_httpx.ConnectError("dns down")) + with patch("httpx.Client", return_value=client): + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + + assert client.get.call_count == 1, ( + f"network exceptions must be negative-cached, got " + f"{client.get.call_count} GETs" + ) + # Sanity: the cache entry exists and carries None as the record. + cached = a2a_client._peer_metadata[_PEER_UUID] + assert cached[1] is None + + +def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): + """Cached entry past TTL: registry is hit again. Pin the TTL + behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS`` + doesn't accidentally make the cache permanent.""" + import time + + import a2a_client + from a2a_mcp_server import _build_channel_notification + + # Stale entry: anchored to *current* monotonic time minus TTL+slack + # so the entry is unambiguously past the freshness window. A naked + # `0.0` looked stale relative to wall-clock but `time.monotonic()` + # starts at process uptime — when this test ran early in the pytest + # run, current was <300s and the entry was treated as fresh, + # silently skipping the re-fetch the assertion expects. + a2a_client._peer_metadata[_PEER_UUID] = ( + time.monotonic() - a2a_client._PEER_METADATA_TTL_SECONDS - 60.0, + {"id": _PEER_UUID, "name": "stale-name", "role": "old"}, + ) + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fresh-name", "role": "new", "status": "online"}, + ) + ) + with p: + payload = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping", + }) + + assert client.get.call_count == 1, "stale cache must trigger a re-fetch" + assert payload["params"]["meta"]["peer_name"] == "fresh-name" + assert payload["params"]["meta"]["peer_role"] == "new" + + +def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache): + """Defensive: a malformed peer_id (not a UUID) must not crash the + push path or cause a registry GET to be fired against an unsanitised + URL. enrich_peer_metadata returns None on validation failure; the + enrichment fields are simply absent.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + payload = _build_channel_notification({ + "peer_id": "not-a-uuid", + "kind": "peer_agent", + "text": "evil", + }) + + assert client.get.call_count == 0, ( + "invalid peer_id must not reach a network call — UUID validation " + "guards the URL-construction surface" + ) + meta = payload["params"]["meta"] + assert meta["peer_id"] == "not-a-uuid" + assert "peer_name" not in meta + assert "peer_role" not in meta + # agent_card_url is constructed unconditionally from peer_id; even on + # an invalid id it's harmless (the receiving agent's GET will 404 + # and it can fall back to inbox_pop without enrichment). + assert meta["agent_card_url"] == f"{__import__('a2a_client').PLATFORM_URL}/registry/discover/not-a-uuid" + + # ============== initialize handshake — capability declaration ============== # Without `experimental.claude/channel`, Claude Code's MCP client drops # our notifications/claude/channel emissions instead of routing them as @@ -480,6 +764,42 @@ def test_instructions_zero_timeout_means_push_only_mode(): os.environ["MOLECULE_MCP_POLL_TIMEOUT_SECS"] = saved +def test_instructions_document_envelope_enrichment_attrs(): + """The agent learns about envelope attributes ONLY from the + instructions string. PR-B added peer_name, peer_role, + agent_card_url to the wire shape; pin that the instructions list + them in the tag template AND describe each one's + semantics. Without this, the wheel ships new attributes that no + agent ever uses.""" + from a2a_mcp_server import _build_initialize_result + + instructions = _build_initialize_result()["instructions"] + + # The tag template in the PUSH PATH section must include + # the new attribute names so the agent recognises them when they + # arrive inline. + for attr in ("peer_name", "peer_role", "agent_card_url"): + assert attr in instructions, ( + f"instructions must list `{attr}` as a tag " + f"attribute — otherwise the agent sees the attr in pushes " + f"but doesn't know what to do with it" + ) + + # And the per-field semantics block must explain when each attr + # is present + what it means. These phrases are what the agent + # actually reads to decide how to surface the attrs in its turn. + assert "registry resolved" in instructions, ( + "instructions must explain peer_name/peer_role come from a " + "registry lookup that may fail — otherwise the agent treats " + "their absence as a bug instead of a graceful degrade" + ) + assert "discover endpoint" in instructions, ( + "instructions must point at the registry discover endpoint " + "for agent_card_url so the agent knows it's a follow-on URL " + "to fetch full capabilities, not the body of the message" + ) + + def test_initialize_instructions_pins_prompt_injection_defense(): """The threat-model sentence in `_CHANNEL_INSTRUCTIONS` is what tells the agent that inbound canvas-user / peer-agent message