From da6d319c48888a08a9793c2933358bbe30f84f3d Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 01:39:07 -0700 Subject: [PATCH] perf(a2a): bound + LRU-evict _peer_metadata cache (#2482) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix _peer_metadata was an unbounded dict — a workspace receiving from N distinct peers across its lifetime accumulated entries indefinitely (~100 bytes × N). Not crash-class at typical scale (10K peers ≈ 1 MB) but unbounded. The TTL-at-read pattern bounded staleness but did nothing for memory. Fix: hand-rolled LRU on top of OrderedDict. No new dependency. - _PEER_METADATA_MAXSIZE = 1024 (issue's recommended bound) - _peer_metadata_get(canon) — read + LRU touch (move to MRU) - _peer_metadata_set(canon, value) — write + evict-if-over-maxsize - All production reads/writes route through the helpers - _peer_metadata_lock guards the OrderedDict ops so concurrent background-enrichment workers (#2484) don't race the LRU invariant Why hand-rolled vs cachetools: - No new dep. workspace/ has 0 cache libraries today; adding one for ~30 lines is negative leverage. - The TTL is enforced at the call site (existing pattern); only the size cap + LRU is new. cachetools.TTLCache fuses the two, which would force a refactor of every caller's TTL check. - The size + lock are simple enough that a future swap-in of cachetools is mechanical if needs evolve. Why maxsize matters more than ttl (issue's framing): A runaway poller that touches new peer_ids every push would still grow within a single TTL window — TTL eviction only fires at read time. The size cap fires immediately on insert, regardless of read pattern. Three new tests: - test_peer_metadata_set_evicts_lru_when_at_maxsize - test_peer_metadata_get_promotes_to_lru_head - test_peer_metadata_set_replaces_existing_entry_in_place 1742 passed / 0 failed locally (78 new + 1664 existing). Closes #2482. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_client.py | 75 +++++++++++++++++---- workspace/tests/test_a2a_mcp_server.py | 92 ++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 14 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index e1451961..7cd151bf 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -12,6 +12,7 @@ import re import threading import time import uuid +from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor import httpx @@ -52,13 +53,29 @@ _peer_to_source: dict[str, str] = {} # Cache workspace ID → full peer record (id, name, role, status, url, ...). # Populated by tool_list_peers and by the lazy registry lookup in # enrich_peer_metadata. The notification-callback path (channel envelope -# enrichment) reads this cache on every inbound peer_agent push, so a -# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read -# shape; entries carry their fetched-at timestamp so TTL eviction is -# in-line with the lookup. ``None`` as the record is the negative-cache -# sentinel: registry failure is cached for one TTL window so we don't -# re-fire the 2s-bounded GET on every push from a flaky peer. -_peer_metadata: dict[str, tuple[float, dict | None]] = {} +# enrichment) reads this cache on every inbound peer_agent push, so the +# read shape stays a dict-like ``__getitem__`` lookup; entries carry +# their fetched-at timestamp so TTL eviction is in-line with the +# lookup. ``None`` as the record is the negative-cache sentinel: +# registry failure is cached for one TTL window so we don't re-fire +# the 2s-bounded GET on every push from a flaky peer. +# +# OrderedDict + maxsize bound (#2482): pre-fix this was an unbounded +# ``dict``, so a workspace receiving from N distinct peers across its +# lifetime accumulated ~100 bytes/entry × N indefinitely. At 10K peers +# that's ~1 MB; at 100K (a chatty platform-wide router) ~10 MB; not +# crash-class but unbounded. The LRU bound caps memory + the TTL caps +# per-entry staleness — both gates are needed because a runaway poller +# touching N new peer_ids per push could grow within a single TTL +# window. +# +# All reads / writes go through ``_peer_metadata_get`` / +# ``_peer_metadata_set`` so the LRU move-to-end + size-trim invariants +# stay co-located. Direct mutation is allowed only in test fixtures +# (clearing for isolation); production code path uses the helpers. +_PEER_METADATA_MAXSIZE = 1024 +_peer_metadata: "OrderedDict[str, tuple[float, dict | None]]" = OrderedDict() +_peer_metadata_lock = threading.Lock() # How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes # is the same window we use for delegation routing — long enough that a @@ -68,6 +85,36 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {} _PEER_METADATA_TTL_SECONDS = 300.0 +def _peer_metadata_get(canon: str) -> tuple[float, dict | None] | None: + """Read with LRU touch — moves the entry to the most-recently-used + position so steady-state pushes from a busy peer don't get evicted + by a cold-start burst from new peers. Returns the raw tuple shape + callers expect; TTL eviction stays at the call site. + """ + with _peer_metadata_lock: + entry = _peer_metadata.get(canon) + if entry is not None: + _peer_metadata.move_to_end(canon) + return entry + + +def _peer_metadata_set(canon: str, value: tuple[float, dict | None]) -> None: + """Write + evict-if-over-maxsize. The eviction is in-process and + cheap (popitem(last=False) on an OrderedDict is O(1)). Holding the + lock across the trim keeps the size invariant stable under concurrent + writes from background enrichment workers. + """ + with _peer_metadata_lock: + _peer_metadata[canon] = value + _peer_metadata.move_to_end(canon) + # Trim the oldest entries until at-or-below maxsize. The bound + # is a soft cap — a single overrun (set called when at maxsize) + # evicts the LRU entry before returning, never letting size + # exceed maxsize. + while len(_peer_metadata) > _PEER_METADATA_MAXSIZE: + _peer_metadata.popitem(last=False) + + # Background-fetch executor for enrich_peer_metadata_nonblocking (#2484). # A small pool — peers are highly TTL-cached, so the steady-state load # is "one fetch per peer per 5 minutes." Two workers handle the cold- @@ -140,7 +187,7 @@ def enrich_peer_metadata_nonblocking( if canon is None: return None current = time.monotonic() - cached = _peer_metadata.get(canon) + cached = _peer_metadata_get(canon) if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: @@ -241,7 +288,7 @@ def enrich_peer_metadata( return None current = now if now is not None else time.monotonic() - cached = _peer_metadata.get(canon) + cached = _peer_metadata_get(canon) if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: @@ -257,26 +304,26 @@ def enrich_peer_metadata( resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)}) except Exception as exc: # noqa: BLE001 logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None if resp.status_code != 200: logger.debug( "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code ) - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None try: data = resp.json() except Exception: # noqa: BLE001 - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None if not isinstance(data, dict): - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None - _peer_metadata[canon] = (current, data) + _peer_metadata_set(canon, (current, data)) if name := data.get("name"): _peer_names[canon] = name return data diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 6eb096da..25f46e6f 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -3,6 +3,7 @@ import asyncio import json import os +import time from unittest.mock import AsyncMock, MagicMock, patch @@ -1897,3 +1898,94 @@ def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none( assert client.get.call_count == 0 assert "not-a-uuid" not in a2a_client._enrich_in_flight + + +# ---- #2482 bounded-cache tests ---- + + +def test_peer_metadata_set_evicts_lru_when_at_maxsize(_reset_peer_metadata_cache, monkeypatch): + """Cache size never exceeds ``_PEER_METADATA_MAXSIZE``. When the + next write would push past the bound, the least-recently-used entry + is evicted. Pin: a workspace receiving from N > maxsize peers ends + up with exactly maxsize entries — the oldest get dropped, the + newest stay. + """ + import a2a_client + + # Shrink the bound to make the test fast + deterministic. The real + # bound (1024) is too large to exercise per-test. + monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 4) + + now = time.monotonic() + for i in range(6): + # Distinct UUIDs — generate via the static template + index so + # _validate_peer_id accepts them. + peer = f"00000000-0000-0000-0000-00000000000{i}" + a2a_client._peer_metadata_set(peer, (now + i, {"id": peer, "name": f"p{i}"})) + + # Size capped at maxsize. + assert len(a2a_client._peer_metadata) == 4 + # Oldest two evicted, newest four remain. + assert "00000000-0000-0000-0000-000000000000" not in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000001" not in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000002" in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000005" in a2a_client._peer_metadata + + +def test_peer_metadata_get_promotes_to_lru_head(_reset_peer_metadata_cache, monkeypatch): + """Read promotes the entry to most-recently-used. Steady-state + pushes from a busy peer must NOT be evicted by a cold-start burst + from new peers — the LRU touch on read is what makes that hold. + """ + import a2a_client + + monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 3) + + now = time.monotonic() + a = "00000000-0000-0000-0000-aaaaaaaaaaaa" + b = "00000000-0000-0000-0000-bbbbbbbbbbbb" + c = "00000000-0000-0000-0000-cccccccccccc" + d = "00000000-0000-0000-0000-dddddddddddd" + + # Insert in order a, b, c. LRU position: a (oldest) → c (newest). + a2a_client._peer_metadata_set(a, (now, {"id": a})) + a2a_client._peer_metadata_set(b, (now, {"id": b})) + a2a_client._peer_metadata_set(c, (now, {"id": c})) + + # Touch `a` via _peer_metadata_get → moves to MRU. Eviction order: + # b (oldest now) → c → a (newest). + a2a_client._peer_metadata_get(a) + + # Insert `d` — pushes `b` out (not `a` even though `a` was inserted first). + a2a_client._peer_metadata_set(d, (now, {"id": d})) + + assert a in a2a_client._peer_metadata, ( + "recently-touched entry must survive eviction; LRU touch on read is broken" + ) + assert b not in a2a_client._peer_metadata, ( + "oldest-untouched entry must be evicted first" + ) + assert c in a2a_client._peer_metadata + assert d in a2a_client._peer_metadata + + +def test_peer_metadata_set_replaces_existing_entry_in_place(_reset_peer_metadata_cache): + """Re-write of an existing key updates the value in place — does + NOT evict to maxsize-1 then re-insert. The LRU move-to-end on + update keeps the entry as MRU. + """ + import a2a_client + + peer = "00000000-0000-0000-0000-aaaaaaaaaaaa" + now = time.monotonic() + a2a_client._peer_metadata_set(peer, (now, {"id": peer, "name": "v1"})) + assert len(a2a_client._peer_metadata) == 1 + + # Re-write — same key, new value. + a2a_client._peer_metadata_set(peer, (now + 100, {"id": peer, "name": "v2"})) + + assert len(a2a_client._peer_metadata) == 1, ( + "re-write must not duplicate the entry" + ) + cached = a2a_client._peer_metadata[peer] + assert cached[1]["name"] == "v2", "re-write must update the value in place"