diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 62aa94b4..f583be61 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -34,9 +34,12 @@ _peer_names: dict[str, str] = {} # Populated by tool_list_peers and by the lazy registry lookup in # enrich_peer_metadata. The notification-callback path (channel envelope # enrichment) reads this cache on every inbound peer_agent push, so a -# bare ``dict[str, dict]`` is the fastest read shape; entries carry their -# fetched-at timestamp so TTL eviction is in-line with the lookup. -_peer_metadata: dict[str, tuple[float, dict]] = {} +# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read +# shape; entries carry their fetched-at timestamp so TTL eviction is +# in-line with the lookup. ``None`` as the record is the negative-cache +# sentinel: registry failure is cached for one TTL window so we don't +# re-fire the 2s-bounded GET on every push from a flaky peer. +_peer_metadata: dict[str, tuple[float, dict | None]] = {} # How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes # is the same window we use for delegation routing — long enough that a @@ -57,6 +60,13 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No degrade gracefully (the channel envelope falls back to the raw ``peer_id`` instead of crashing the push path). + Negative caching: failure outcomes (4xx/5xx/non-JSON/network + exception) are stored as ``(now, None)`` and treated as + fresh-but-empty for the TTL window. Without this, a peer with a + flaky/missing registry record would re-fire the 2s-bounded GET on + EVERY push — turning the cache into a no-op for the exact failure + scenarios it most needs to defend against. + The fetched dict is stored as-is, so callers can read whatever fields the platform exposes (currently: ``id``, ``name``, ``role``, ``status``, ``url``). New fields surface automatically without a @@ -71,6 +81,9 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + # Fresh entry — return whatever's there. ``None`` is the + # negative-cache sentinel: caller treats absence of fields + # the same as a registry miss, which is the desired UX. return record url = f"{PLATFORM_URL}/registry/discover/{canon}" @@ -79,20 +92,24 @@ def enrich_peer_metadata(peer_id: str, *, now: float | None = None) -> dict | No resp = client.get(url, headers={"X-Workspace-ID": WORKSPACE_ID, **auth_headers()}) except Exception as exc: # noqa: BLE001 logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None if resp.status_code != 200: logger.debug( "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code ) - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None try: data = resp.json() except Exception: # noqa: BLE001 - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None if not isinstance(data, dict): - return cached[1] if cached is not None else None + _peer_metadata[canon] = (current, None) + return None _peer_metadata[canon] = (current, data) if name := data.get("name"): diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index b9e68e80..f8111410 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -401,6 +401,67 @@ def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_c ) +def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metadata_cache): + """Registry failure must be cached for the TTL window. Without + this, a peer with a flaky or missing registry record re-fires the + 2s-bounded GET on EVERY push — the cache becomes a no-op for the + exact scenarios it most needs to defend against, and the poller + thread stalls 2s per push for that peer until the registry comes + back. Pin: two pushes from a 5xx-returning peer fire exactly one + GET, not two.""" + from a2a_mcp_server import _build_channel_notification + + p, client = _patch_httpx_client(_make_httpx_response(500, {})) + with p: + payload1 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", + }) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", + }) + + assert client.get.call_count == 1, ( + f"second push from a 5xx-returning peer must use the negative " + f"cache, got {client.get.call_count} GETs" + ) + # Both pushes deliver without enrichment (peer_name/role absent), + # but agent_card_url surfaces unconditionally. + for payload in (payload1, payload2): + meta = payload["params"]["meta"] + assert "peer_name" not in meta + assert "peer_role" not in meta + assert meta["agent_card_url"].endswith(f"/registry/discover/{_PEER_UUID}") + + +def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metadata_cache): + """Same negative-caching contract for network exceptions — + httpx.ConnectError, DNS failure, registry pod restart all + surface as exceptions from client.get(). Without negative + caching, a temporary network blip turns into a 2s stall on + every push for the duration.""" + import a2a_client + from a2a_mcp_server import _build_channel_notification + + client = MagicMock() + client.__enter__ = MagicMock(return_value=client) + client.__exit__ = MagicMock(return_value=False) + # Important: simulate the exception INSIDE the with-block (which + # is where the real httpx.Client raises) by making get() raise. + import httpx as _httpx + client.get = MagicMock(side_effect=_httpx.ConnectError("dns down")) + with patch("httpx.Client", return_value=client): + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + + assert client.get.call_count == 1, ( + f"network exceptions must be negative-cached, got " + f"{client.get.call_count} GETs" + ) + # Sanity: the cache entry exists and carries None as the record. + cached = a2a_client._peer_metadata[_PEER_UUID] + assert cached[1] is None + + def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): """Cached entry past TTL: registry is hit again. Pin the TTL behaviour so a future caller bumping ``_PEER_METADATA_TTL_SECONDS``