diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 3dbf33fa..e1451961 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -9,8 +9,10 @@ import logging import os import random import re +import threading import time import uuid +from concurrent.futures import ThreadPoolExecutor import httpx @@ -66,6 +68,146 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {} _PEER_METADATA_TTL_SECONDS = 300.0 +# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484). +# A small pool — peers are highly TTL-cached, so the steady-state load +# is "one fetch per peer per 5 minutes." Two workers handle the cold- +# start burst when an agent starts receiving pushes from a new peer for +# the first time without backing up the inbox poller. Daemon threads: +# the executor must NOT block process exit if the inbox shuts down. +_enrich_executor: ThreadPoolExecutor | None = None +_enrich_executor_lock = threading.Lock() + +# In-flight peer IDs — guards against a single peer's repeated pushes +# scheduling N concurrent registry fetches before the first one fills +# the cache. Set membership is "a worker is currently fetching this +# peer; subsequent calls should NOT schedule another." +_enrich_in_flight: set[str] = set() +_enrich_in_flight_lock = threading.Lock() + + +def _get_enrich_executor() -> ThreadPoolExecutor: + """Lazy-init the enrichment worker pool. Lazy because most test + fixtures and short-lived CLI invocations don't need it; only the + long-running molecule-mcp / inbox-poller path actually schedules + background fetches. + """ + global _enrich_executor + if _enrich_executor is not None: + return _enrich_executor + with _enrich_executor_lock: + if _enrich_executor is None: + _enrich_executor = ThreadPoolExecutor( + max_workers=2, + thread_name_prefix="enrich-peer", + ) + return _enrich_executor + + +def enrich_peer_metadata_nonblocking( + peer_id: str, + source_workspace_id: str | None = None, +) -> dict | None: + """Cache-first variant of ``enrich_peer_metadata`` — returns + immediately without blocking on a registry GET. + + Behavior: + - Cache hit (fresh): return the cached record. + - Cache miss or TTL expired: schedule a background fetch via the + worker pool, return ``None`` (caller renders bare peer_id). + The next push for this peer hits the warm cache and gets the + full record. + + Why this exists (#2484): the inbox poller's notification callback + in molecule-mcp called the synchronous ``enrich_peer_metadata`` on + every push, blocking the poller for up to 2s × N uncached peers + per batch. Push-delivery latency was gated on registry latency — + the exact thing the negative-cache patch in PR #2471 was supposed + to avoid amplifying. Moving the fetch off the poller thread means + push delivery is bounded by the inbox poll interval, never by + registry RTT. + + Trade-off: the FIRST push from a new peer arrives metadata-light + (no name/role). The MCP host renders the bare peer_id. Subsequent + pushes (within the 5-min TTL) hit the warm cache and get the full + record. Acceptable because: + - Channel-envelope enrichment is a UX nicety, not a correctness + invariant. + - The cold-cache window per peer is bounded to one push. + - The TTL is long enough that an active conversation never + re-enters the cold state. + """ + canon = _validate_peer_id(peer_id) + if canon is None: + return None + current = time.monotonic() + cached = _peer_metadata.get(canon) + if cached is not None: + fetched_at, record = cached + if current - fetched_at < _PEER_METADATA_TTL_SECONDS: + return record + # Schedule background fetch unless one is already in flight for this + # peer. The synchronous version atomically reads-then-writes; the + # async version splits that into "schedule fetch" + "fetch fills + # cache later." The in-flight set keeps a flurry of pushes from + # one peer (e.g., a chatty agent) from spawning N parallel GETs. + with _enrich_in_flight_lock: + if canon in _enrich_in_flight: + return None + _enrich_in_flight.add(canon) + try: + _get_enrich_executor().submit( + _enrich_peer_metadata_worker, canon, source_workspace_id + ) + except RuntimeError: + # Executor was shut down (process exit path) — drop the request, + # let the caller render bare peer_id. + with _enrich_in_flight_lock: + _enrich_in_flight.discard(canon) + return None + + +def _enrich_peer_metadata_worker( + canon: str, source_workspace_id: str | None +) -> None: + """Background-thread body for ``enrich_peer_metadata_nonblocking``. + Runs the same fetch logic as the synchronous helper but discards + the return value — the cache write is the only output anyone + needs. Always clears the in-flight marker so a future cache miss + can retry. + """ + try: + enrich_peer_metadata(canon, source_workspace_id) + except Exception as exc: # noqa: BLE001 + # Background workers must not crash the executor — log and + # move on. The negative-cache path inside enrich_peer_metadata + # already records failures, so a re-attempt is rate-limited + # by TTL. + logger.debug("_enrich_peer_metadata_worker: %s failed: %s", canon, exc) + finally: + with _enrich_in_flight_lock: + _enrich_in_flight.discard(canon) + + +def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None: + """Block until all in-flight enrichment workers have completed. + + Test-only helper. Production code never has a reason to wait — the + point of the nonblocking path is that callers don't care when the + cache fills. Tests that want to assert "after the worker runs, the + cache has the record" use this to synchronise without sleeping. + + Polls ``_enrich_in_flight`` rather than holding a Condition because + the worker pool is already serializing through ``_enrich_in_flight_lock``; + poll keeps the production hot path lock-free. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with _enrich_in_flight_lock: + if not _enrich_in_flight: + return + time.sleep(0.01) + + def enrich_peer_metadata( peer_id: str, source_workspace_id: str | None = None, diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py index 687c62fd..72d5f33c 100644 --- a/workspace/a2a_mcp_server.py +++ b/workspace/a2a_mcp_server.py @@ -55,6 +55,7 @@ from a2a_client import ( # noqa: F401, E402 _validate_peer_id, discover_peer, enrich_peer_metadata, + enrich_peer_metadata_nonblocking, get_peers, get_workspace_info, send_a2a_message, @@ -498,7 +499,15 @@ def _build_channel_notification(msg: dict) -> dict: meta["peer_id"] = "" else: meta["peer_id"] = safe_peer_id - record = enrich_peer_metadata(safe_peer_id) + # Cache-first non-blocking enrichment (#2484): on cache miss + # this returns None immediately and schedules a background + # fetch. The first push for a new peer renders bare + # peer_id; the next push (within the 5-min TTL) hits the + # warm cache and gets full name/role. Push-delivery latency + # is bounded by the inbox poll interval, never by registry + # RTT — closes the gap that PR #2471's negative-cache path + # was meant to avoid amplifying. + record = enrich_peer_metadata_nonblocking(safe_peer_id) if record is not None: # Sanitise BEFORE storing in meta so both the JSON-RPC # envelope and the rendered content (via diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 1a690830..6eb096da 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -507,11 +507,22 @@ def _reset_peer_metadata_cache(monkeypatch): """Each test starts with a clean ``_peer_metadata`` cache so an earlier test's hit doesn't satisfy a later test's miss. Mutates the module-level dict in place rather than reassigning so other modules - that imported the dict by reference still see the same instance.""" + that imported the dict by reference still see the same instance. + + Also drains and clears ``_enrich_in_flight`` (#2484): a previous + test's background fetch worker can leave a peer marked in-flight, + and the next test's nonblocking call would short-circuit without + scheduling a fetch. Drain BEFORE clearing in case a worker is + mid-execution and writes to ``_peer_metadata`` after the clear. + """ import a2a_client + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) a2a_client._peer_metadata.clear() + a2a_client._enrich_in_flight.clear() yield + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) a2a_client._peer_metadata.clear() + a2a_client._enrich_in_flight.clear() def _make_httpx_response(status_code: int, json_body: object) -> MagicMock: @@ -586,9 +597,16 @@ def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache) def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache): - """Cache miss + registry hit: GET fires, response cached, meta - carries fetched fields. Subsequent build for the same peer must - NOT re-fetch (cache populated by first call).""" + """Cache miss: nonblocking enrichment returns None on the first + push (first push arrives metadata-light), schedules a background + fetch that populates the cache, second push hits the warm cache. + + Pre-2026-05-05 (#2484) the first push was synchronous: the inbox + poller blocked up to 2s on the registry GET before delivering. The + nonblocking path means push delivery is bounded by the inbox poll + interval, never by registry RTT — at the cost of one push per peer + per TTL window arriving without name/role. + """ import a2a_client from a2a_mcp_server import _build_channel_notification @@ -602,22 +620,39 @@ def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache): payload1 = _build_channel_notification({ "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", }) + # First push: bare peer_id, fetch is in-flight in the background. + # peer_name / peer_role NOT yet present. + assert "peer_name" not in payload1["params"]["meta"] + assert "peer_role" not in payload1["params"]["meta"] + + # Wait for the background worker to finish populating the cache. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) + payload2 = _build_channel_notification({ "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", }) + # Worker fired exactly one GET (cache miss → fetch); the second push + # hit the warm cache and DID NOT fire another GET. assert client.get.call_count == 1, ( f"second push for same peer must use cache, got {client.get.call_count} GETs" ) - assert payload1["params"]["meta"]["peer_name"] == "fetched-name" + # Second push has the enriched fields the worker stored. assert payload2["params"]["meta"]["peer_name"] == "fetched-name" + assert payload2["params"]["meta"]["peer_role"] == "router" def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache): """Registry returns 500 (or 4xx, or network error): enrichment silently degrades to bare peer_id. The push must not crash, the push must not block, and the agent_card_url must still surface - because it's constructable from peer_id alone.""" + because it's constructable from peer_id alone. + + Post-#2484 the first push always degrades to bare peer_id (the + background fetch hasn't run yet); this test captures that + "degrades on cache miss + failure path doesn't break" stays true. + """ + import a2a_client from a2a_mcp_server import _build_channel_notification p, _ = _patch_httpx_client(_make_httpx_response(500, {})) @@ -630,6 +665,9 @@ def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_c "method": "message/send", "created_at": "2026-05-01T00:00:00Z", }) + # Drain the background fetch so a follow-up test starting with + # this peer in-flight doesn't see ghost state. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) meta = payload["params"]["meta"] assert meta["peer_id"] == _PEER_UUID @@ -649,7 +687,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada exact scenarios it most needs to defend against, and the poller thread stalls 2s per push for that peer until the registry comes back. Pin: two pushes from a 5xx-returning peer fire exactly one - GET, not two.""" + GET, not two. + + Post-#2484 the GETs run in a background worker, so the test waits + for in-flight to drain between pushes — the negative-cache write + must land in `_peer_metadata` before the second push consults it. + """ + import a2a_client from a2a_mcp_server import _build_channel_notification p, client = _patch_httpx_client(_make_httpx_response(500, {})) @@ -657,9 +701,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada payload1 = _build_channel_notification({ "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first", }) + # Wait for the worker to write the negative-cache entry before + # the second push reads it. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) payload2 = _build_channel_notification({ "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second", }) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) assert client.get.call_count == 1, ( f"second push from a 5xx-returning peer must use the negative " @@ -692,7 +740,9 @@ def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metad client.get = MagicMock(side_effect=_httpx.ConnectError("dns down")) with patch("httpx.Client", return_value=client): _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) assert client.get.call_count == 1, ( f"network exceptions must be negative-cached, got " @@ -728,7 +778,9 @@ def test_envelope_enrichment_negative_caches_non_json_200(_reset_peer_metadata_c p, client = _patch_httpx_client(resp) with p: _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) assert client.get.call_count == 1, ( f"non-JSON 200 must be negative-cached, got {client.get.call_count} GETs" @@ -756,7 +808,9 @@ def test_envelope_enrichment_negative_caches_non_dict_json_200(_reset_peer_metad ) with p: _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) _build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"}) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) assert client.get.call_count == 1, ( f"non-dict JSON 200 must be negative-cached, got {client.get.call_count} GETs" @@ -792,13 +846,25 @@ def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache): ) ) with p: - payload = _build_channel_notification({ + # First push: stale cache → background fetch scheduled; the + # nonblocking path returns None when the entry is past TTL, + # so this first push degrades to bare peer_id (no peer_name). + # Wait for the background worker to fill the cache, then issue + # a second push to confirm it picked up the fresh values. + payload1 = _build_channel_notification({ "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping", }) + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) + payload2 = _build_channel_notification({ + "peer_id": _PEER_UUID, "kind": "peer_agent", "text": "pong", + }) assert client.get.call_count == 1, "stale cache must trigger a re-fetch" - assert payload["params"]["meta"]["peer_name"] == "fresh-name" - assert payload["params"]["meta"]["peer_role"] == "new" + assert "peer_name" not in payload1["params"]["meta"], ( + "first push past TTL degrades to bare peer_id under nonblocking enrichment" + ) + assert payload2["params"]["meta"]["peer_name"] == "fresh-name" + assert payload2["params"]["meta"]["peer_role"] == "new" def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache): @@ -1732,3 +1798,102 @@ def _readable(fd: int) -> bool: rlist, _, _ = select.select([fd], [], [], 0) return bool(rlist) + + +# ---- #2484 nonblocking-enrichment dedicated tests ---- + + +def test_enrich_peer_metadata_nonblocking_cache_hit_returns_immediately( + _reset_peer_metadata_cache, +): + """Cache hit (fresh entry within TTL): nonblocking helper returns + the cached record without scheduling a worker. Pin the fast path — + the whole point of the helper is that the steady-state pushes for + a known peer don't touch the executor.""" + import a2a_client + import time as _time + + a2a_client._peer_metadata[_PEER_UUID] = ( + _time.monotonic(), + {"id": _PEER_UUID, "name": "ops", "role": "sre"}, + ) + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + record = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) + + assert record is not None + assert record["name"] == "ops" + assert client.get.call_count == 0, "cache hit must not schedule a worker" + # No in-flight marker should have been added since we returned synchronously. + assert _PEER_UUID not in a2a_client._enrich_in_flight + + +def test_enrich_peer_metadata_nonblocking_cache_miss_schedules_fetch( + _reset_peer_metadata_cache, +): + """Cache miss: helper returns None immediately, schedules a + background fetch, the worker fills the cache. After draining the + in-flight marker, a follow-up call hits the warm cache.""" + import a2a_client + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "fresh", "role": "router"}, + ) + ) + with p: + first = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) + assert first is None, "first call on cache miss must return None (bare peer_id)" + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) + second = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) + + assert client.get.call_count == 1 + assert second is not None + assert second["name"] == "fresh" + + +def test_enrich_peer_metadata_nonblocking_coalesces_duplicate_pushes( + _reset_peer_metadata_cache, +): + """A burst of pushes for the same uncached peer must schedule + exactly ONE background fetch. Without the in-flight gate, a chatty + peer's first 10 pushes would queue 10 GETs against the registry — + exactly the DoS-on-self pattern the negative cache was meant to + rate-limit, except now we're amplifying with concurrency. + """ + import a2a_client + + p, client = _patch_httpx_client( + _make_httpx_response( + 200, + {"id": _PEER_UUID, "name": "x", "role": "y"}, + ) + ) + with p: + # Fire 5 nonblocking calls back-to-back BEFORE the worker has + # a chance to drain. All 5 hit the in-flight gate; only the + # first schedules a worker. + for _ in range(5): + assert a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) is None + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0) + + assert client.get.call_count == 1, ( + f"in-flight gate must coalesce concurrent pushes; got {client.get.call_count} GETs" + ) + + +def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none( + _reset_peer_metadata_cache, +): + """Defensive: malformed peer_id (not a UUID) must short-circuit + without touching the cache OR the executor.""" + import a2a_client + + p, client = _patch_httpx_client(_make_httpx_response(200, {})) + with p: + assert a2a_client.enrich_peer_metadata_nonblocking("not-a-uuid") is None + + assert client.get.call_count == 0 + assert "not-a-uuid" not in a2a_client._enrich_in_flight