perf(a2a): move enrichment GET off the inbox poller thread (#2484)

The inbox poller's notification callback called the synchronous
enrich_peer_metadata on every push, blocking the poller for up to
2s × N uncached peers per poll batch. Push delivery latency was
gated on registry RTT — exactly what PR #2471's negative-cache patch
was trying to avoid amplifying.

Fix: cache-first nonblocking path with a tiny background worker pool.

  enrich_peer_metadata_nonblocking(peer_id):
    - Cache hit (fresh, within TTL):  return cached record immediately
    - Cache miss / stale:              return None, schedule background
                                       fetch via ThreadPoolExecutor

The first push from a new peer arrives metadata-light (bare peer_id);
the next push within the 5-min TTL hits the warm cache and gets full
name/role. Acceptable trade-off because the channel-envelope
enrichment is a UX nicety, not a correctness invariant — and the
cold-cache window per peer is bounded to one push.

Defenses:
  - In-flight gate (_enrich_in_flight) — N concurrent pushes for the
    same uncached peer schedule exactly ONE worker, not N. Without
    this, a chatty peer's first burst of pushes would amplify into
    parallel registry GETs — the exact DoS-on-self pattern the
    negative cache was meant to rate-limit.
  - Lazy executor init — most test fixtures + short-lived CLI
    invocations never need it; only the long-running molecule-mcp
    path actually fires background work.
  - Daemon-style threads via thread_name_prefix; executor never
    blocks process exit.

Tests:
  - test_enrich_peer_metadata_nonblocking_cache_hit_returns_immediately
  - test_enrich_peer_metadata_nonblocking_cache_miss_schedules_fetch
  - test_enrich_peer_metadata_nonblocking_coalesces_duplicate_pushes
  - test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none

Plus updates to the existing test_envelope_enrichment_* suite that
asserted synchronous behavior — they now drain the in-flight set via
_wait_for_enrichment_inflight_for_testing before checking cache state.

Existing synchronous enrich_peer_metadata is unchanged — Phase B (#2790)
schema↔dispatcher drift gate + the negative-cache contract from PR
#2471 still apply. The nonblocking variant is purely additive.

1739 passed, 0 failed locally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-05 01:24:42 -07:00
parent d10c1a1a36
commit 35017c5452
3 changed files with 327 additions and 11 deletions

View File

@ -9,8 +9,10 @@ import logging
import os
import random
import re
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
import httpx
@ -66,6 +68,146 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {}
_PEER_METADATA_TTL_SECONDS = 300.0
# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484).
# A small pool — peers are highly TTL-cached, so the steady-state load
# is "one fetch per peer per 5 minutes." Two workers handle the cold-
# start burst when an agent starts receiving pushes from a new peer for
# the first time without backing up the inbox poller. Daemon threads:
# the executor must NOT block process exit if the inbox shuts down.
_enrich_executor: ThreadPoolExecutor | None = None
_enrich_executor_lock = threading.Lock()
# In-flight peer IDs — guards against a single peer's repeated pushes
# scheduling N concurrent registry fetches before the first one fills
# the cache. Set membership is "a worker is currently fetching this
# peer; subsequent calls should NOT schedule another."
_enrich_in_flight: set[str] = set()
_enrich_in_flight_lock = threading.Lock()
def _get_enrich_executor() -> ThreadPoolExecutor:
"""Lazy-init the enrichment worker pool. Lazy because most test
fixtures and short-lived CLI invocations don't need it; only the
long-running molecule-mcp / inbox-poller path actually schedules
background fetches.
"""
global _enrich_executor
if _enrich_executor is not None:
return _enrich_executor
with _enrich_executor_lock:
if _enrich_executor is None:
_enrich_executor = ThreadPoolExecutor(
max_workers=2,
thread_name_prefix="enrich-peer",
)
return _enrich_executor
def enrich_peer_metadata_nonblocking(
peer_id: str,
source_workspace_id: str | None = None,
) -> dict | None:
"""Cache-first variant of ``enrich_peer_metadata`` — returns
immediately without blocking on a registry GET.
Behavior:
- Cache hit (fresh): return the cached record.
- Cache miss or TTL expired: schedule a background fetch via the
worker pool, return ``None`` (caller renders bare peer_id).
The next push for this peer hits the warm cache and gets the
full record.
Why this exists (#2484): the inbox poller's notification callback
in molecule-mcp called the synchronous ``enrich_peer_metadata`` on
every push, blocking the poller for up to 2s × N uncached peers
per batch. Push-delivery latency was gated on registry latency
the exact thing the negative-cache patch in PR #2471 was supposed
to avoid amplifying. Moving the fetch off the poller thread means
push delivery is bounded by the inbox poll interval, never by
registry RTT.
Trade-off: the FIRST push from a new peer arrives metadata-light
(no name/role). The MCP host renders the bare peer_id. Subsequent
pushes (within the 5-min TTL) hit the warm cache and get the full
record. Acceptable because:
- Channel-envelope enrichment is a UX nicety, not a correctness
invariant.
- The cold-cache window per peer is bounded to one push.
- The TTL is long enough that an active conversation never
re-enters the cold state.
"""
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = time.monotonic()
cached = _peer_metadata.get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Schedule background fetch unless one is already in flight for this
# peer. The synchronous version atomically reads-then-writes; the
# async version splits that into "schedule fetch" + "fetch fills
# cache later." The in-flight set keeps a flurry of pushes from
# one peer (e.g., a chatty agent) from spawning N parallel GETs.
with _enrich_in_flight_lock:
if canon in _enrich_in_flight:
return None
_enrich_in_flight.add(canon)
try:
_get_enrich_executor().submit(
_enrich_peer_metadata_worker, canon, source_workspace_id
)
except RuntimeError:
# Executor was shut down (process exit path) — drop the request,
# let the caller render bare peer_id.
with _enrich_in_flight_lock:
_enrich_in_flight.discard(canon)
return None
def _enrich_peer_metadata_worker(
canon: str, source_workspace_id: str | None
) -> None:
"""Background-thread body for ``enrich_peer_metadata_nonblocking``.
Runs the same fetch logic as the synchronous helper but discards
the return value the cache write is the only output anyone
needs. Always clears the in-flight marker so a future cache miss
can retry.
"""
try:
enrich_peer_metadata(canon, source_workspace_id)
except Exception as exc: # noqa: BLE001
# Background workers must not crash the executor — log and
# move on. The negative-cache path inside enrich_peer_metadata
# already records failures, so a re-attempt is rate-limited
# by TTL.
logger.debug("_enrich_peer_metadata_worker: %s failed: %s", canon, exc)
finally:
with _enrich_in_flight_lock:
_enrich_in_flight.discard(canon)
def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
"""Block until all in-flight enrichment workers have completed.
Test-only helper. Production code never has a reason to wait the
point of the nonblocking path is that callers don't care when the
cache fills. Tests that want to assert "after the worker runs, the
cache has the record" use this to synchronise without sleeping.
Polls ``_enrich_in_flight`` rather than holding a Condition because
the worker pool is already serializing through ``_enrich_in_flight_lock``;
poll keeps the production hot path lock-free.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
with _enrich_in_flight_lock:
if not _enrich_in_flight:
return
time.sleep(0.01)
def enrich_peer_metadata(
peer_id: str,
source_workspace_id: str | None = None,

View File

@ -55,6 +55,7 @@ from a2a_client import ( # noqa: F401, E402
_validate_peer_id,
discover_peer,
enrich_peer_metadata,
enrich_peer_metadata_nonblocking,
get_peers,
get_workspace_info,
send_a2a_message,
@ -498,7 +499,15 @@ def _build_channel_notification(msg: dict) -> dict:
meta["peer_id"] = ""
else:
meta["peer_id"] = safe_peer_id
record = enrich_peer_metadata(safe_peer_id)
# Cache-first non-blocking enrichment (#2484): on cache miss
# this returns None immediately and schedules a background
# fetch. The first push for a new peer renders bare
# peer_id; the next push (within the 5-min TTL) hits the
# warm cache and gets full name/role. Push-delivery latency
# is bounded by the inbox poll interval, never by registry
# RTT — closes the gap that PR #2471's negative-cache path
# was meant to avoid amplifying.
record = enrich_peer_metadata_nonblocking(safe_peer_id)
if record is not None:
# Sanitise BEFORE storing in meta so both the JSON-RPC
# envelope and the rendered content (via

View File

@ -507,11 +507,22 @@ def _reset_peer_metadata_cache(monkeypatch):
"""Each test starts with a clean ``_peer_metadata`` cache so an
earlier test's hit doesn't satisfy a later test's miss. Mutates the
module-level dict in place rather than reassigning so other modules
that imported the dict by reference still see the same instance."""
that imported the dict by reference still see the same instance.
Also drains and clears ``_enrich_in_flight`` (#2484): a previous
test's background fetch worker can leave a peer marked in-flight,
and the next test's nonblocking call would short-circuit without
scheduling a fetch. Drain BEFORE clearing in case a worker is
mid-execution and writes to ``_peer_metadata`` after the clear.
"""
import a2a_client
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
a2a_client._peer_metadata.clear()
a2a_client._enrich_in_flight.clear()
yield
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
a2a_client._peer_metadata.clear()
a2a_client._enrich_in_flight.clear()
def _make_httpx_response(status_code: int, json_body: object) -> MagicMock:
@ -586,9 +597,16 @@ def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache)
def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
"""Cache miss + registry hit: GET fires, response cached, meta
carries fetched fields. Subsequent build for the same peer must
NOT re-fetch (cache populated by first call)."""
"""Cache miss: nonblocking enrichment returns None on the first
push (first push arrives metadata-light), schedules a background
fetch that populates the cache, second push hits the warm cache.
Pre-2026-05-05 (#2484) the first push was synchronous: the inbox
poller blocked up to 2s on the registry GET before delivering. The
nonblocking path means push delivery is bounded by the inbox poll
interval, never by registry RTT at the cost of one push per peer
per TTL window arriving without name/role.
"""
import a2a_client
from a2a_mcp_server import _build_channel_notification
@ -602,22 +620,39 @@ def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
payload1 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
})
# First push: bare peer_id, fetch is in-flight in the background.
# peer_name / peer_role NOT yet present.
assert "peer_name" not in payload1["params"]["meta"]
assert "peer_role" not in payload1["params"]["meta"]
# Wait for the background worker to finish populating the cache.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
payload2 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
})
# Worker fired exactly one GET (cache miss → fetch); the second push
# hit the warm cache and DID NOT fire another GET.
assert client.get.call_count == 1, (
f"second push for same peer must use cache, got {client.get.call_count} GETs"
)
assert payload1["params"]["meta"]["peer_name"] == "fetched-name"
# Second push has the enriched fields the worker stored.
assert payload2["params"]["meta"]["peer_name"] == "fetched-name"
assert payload2["params"]["meta"]["peer_role"] == "router"
def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache):
"""Registry returns 500 (or 4xx, or network error): enrichment
silently degrades to bare peer_id. The push must not crash, the
push must not block, and the agent_card_url must still surface
because it's constructable from peer_id alone."""
because it's constructable from peer_id alone.
Post-#2484 the first push always degrades to bare peer_id (the
background fetch hasn't run yet); this test captures that
"degrades on cache miss + failure path doesn't break" stays true.
"""
import a2a_client
from a2a_mcp_server import _build_channel_notification
p, _ = _patch_httpx_client(_make_httpx_response(500, {}))
@ -630,6 +665,9 @@ def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_c
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
# Drain the background fetch so a follow-up test starting with
# this peer in-flight doesn't see ghost state.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
meta = payload["params"]["meta"]
assert meta["peer_id"] == _PEER_UUID
@ -649,7 +687,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada
exact scenarios it most needs to defend against, and the poller
thread stalls 2s per push for that peer until the registry comes
back. Pin: two pushes from a 5xx-returning peer fire exactly one
GET, not two."""
GET, not two.
Post-#2484 the GETs run in a background worker, so the test waits
for in-flight to drain between pushes the negative-cache write
must land in `_peer_metadata` before the second push consults it.
"""
import a2a_client
from a2a_mcp_server import _build_channel_notification
p, client = _patch_httpx_client(_make_httpx_response(500, {}))
@ -657,9 +701,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada
payload1 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
})
# Wait for the worker to write the negative-cache entry before
# the second push reads it.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
payload2 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
assert client.get.call_count == 1, (
f"second push from a 5xx-returning peer must use the negative "
@ -692,7 +740,9 @@ def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metad
client.get = MagicMock(side_effect=_httpx.ConnectError("dns down"))
with patch("httpx.Client", return_value=client):
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
assert client.get.call_count == 1, (
f"network exceptions must be negative-cached, got "
@ -728,7 +778,9 @@ def test_envelope_enrichment_negative_caches_non_json_200(_reset_peer_metadata_c
p, client = _patch_httpx_client(resp)
with p:
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
assert client.get.call_count == 1, (
f"non-JSON 200 must be negative-cached, got {client.get.call_count} GETs"
@ -756,7 +808,9 @@ def test_envelope_enrichment_negative_caches_non_dict_json_200(_reset_peer_metad
)
with p:
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
assert client.get.call_count == 1, (
f"non-dict JSON 200 must be negative-cached, got {client.get.call_count} GETs"
@ -792,13 +846,25 @@ def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache):
)
)
with p:
payload = _build_channel_notification({
# First push: stale cache → background fetch scheduled; the
# nonblocking path returns None when the entry is past TTL,
# so this first push degrades to bare peer_id (no peer_name).
# Wait for the background worker to fill the cache, then issue
# a second push to confirm it picked up the fresh values.
payload1 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping",
})
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
payload2 = _build_channel_notification({
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "pong",
})
assert client.get.call_count == 1, "stale cache must trigger a re-fetch"
assert payload["params"]["meta"]["peer_name"] == "fresh-name"
assert payload["params"]["meta"]["peer_role"] == "new"
assert "peer_name" not in payload1["params"]["meta"], (
"first push past TTL degrades to bare peer_id under nonblocking enrichment"
)
assert payload2["params"]["meta"]["peer_name"] == "fresh-name"
assert payload2["params"]["meta"]["peer_role"] == "new"
def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache):
@ -1732,3 +1798,102 @@ def _readable(fd: int) -> bool:
rlist, _, _ = select.select([fd], [], [], 0)
return bool(rlist)
# ---- #2484 nonblocking-enrichment dedicated tests ----
def test_enrich_peer_metadata_nonblocking_cache_hit_returns_immediately(
_reset_peer_metadata_cache,
):
"""Cache hit (fresh entry within TTL): nonblocking helper returns
the cached record without scheduling a worker. Pin the fast path
the whole point of the helper is that the steady-state pushes for
a known peer don't touch the executor."""
import a2a_client
import time as _time
a2a_client._peer_metadata[_PEER_UUID] = (
_time.monotonic(),
{"id": _PEER_UUID, "name": "ops", "role": "sre"},
)
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
with p:
record = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
assert record is not None
assert record["name"] == "ops"
assert client.get.call_count == 0, "cache hit must not schedule a worker"
# No in-flight marker should have been added since we returned synchronously.
assert _PEER_UUID not in a2a_client._enrich_in_flight
def test_enrich_peer_metadata_nonblocking_cache_miss_schedules_fetch(
_reset_peer_metadata_cache,
):
"""Cache miss: helper returns None immediately, schedules a
background fetch, the worker fills the cache. After draining the
in-flight marker, a follow-up call hits the warm cache."""
import a2a_client
p, client = _patch_httpx_client(
_make_httpx_response(
200,
{"id": _PEER_UUID, "name": "fresh", "role": "router"},
)
)
with p:
first = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
assert first is None, "first call on cache miss must return None (bare peer_id)"
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
second = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
assert client.get.call_count == 1
assert second is not None
assert second["name"] == "fresh"
def test_enrich_peer_metadata_nonblocking_coalesces_duplicate_pushes(
_reset_peer_metadata_cache,
):
"""A burst of pushes for the same uncached peer must schedule
exactly ONE background fetch. Without the in-flight gate, a chatty
peer's first 10 pushes would queue 10 GETs against the registry —
exactly the DoS-on-self pattern the negative cache was meant to
rate-limit, except now we're amplifying with concurrency.
"""
import a2a_client
p, client = _patch_httpx_client(
_make_httpx_response(
200,
{"id": _PEER_UUID, "name": "x", "role": "y"},
)
)
with p:
# Fire 5 nonblocking calls back-to-back BEFORE the worker has
# a chance to drain. All 5 hit the in-flight gate; only the
# first schedules a worker.
for _ in range(5):
assert a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) is None
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
assert client.get.call_count == 1, (
f"in-flight gate must coalesce concurrent pushes; got {client.get.call_count} GETs"
)
def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none(
_reset_peer_metadata_cache,
):
"""Defensive: malformed peer_id (not a UUID) must short-circuit
without touching the cache OR the executor."""
import a2a_client
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
with p:
assert a2a_client.enrich_peer_metadata_nonblocking("not-a-uuid") is None
assert client.get.call_count == 0
assert "not-a-uuid" not in a2a_client._enrich_in_flight