perf(a2a): bound + LRU-evict _peer_metadata cache (#2482)
Pre-fix _peer_metadata was an unbounded dict — a workspace receiving
from N distinct peers across its lifetime accumulated entries
indefinitely (~100 bytes × N). Not crash-class at typical scale (10K
peers ≈ 1 MB) but unbounded. The TTL-at-read pattern bounded
staleness but did nothing for memory.
Fix: hand-rolled LRU on top of OrderedDict. No new dependency.
- _PEER_METADATA_MAXSIZE = 1024 (issue's recommended bound)
- _peer_metadata_get(canon) — read + LRU touch (move to MRU)
- _peer_metadata_set(canon, value) — write + evict-if-over-maxsize
- All production reads/writes route through the helpers
- _peer_metadata_lock guards the OrderedDict ops so concurrent
background-enrichment workers (#2484) don't race the LRU
invariant
Why hand-rolled vs cachetools:
- No new dep. workspace/ has 0 cache libraries today; adding one
for ~30 lines is negative leverage.
- The TTL is enforced at the call site (existing pattern); only
the size cap + LRU is new. cachetools.TTLCache fuses the two,
which would force a refactor of every caller's TTL check.
- The size + lock are simple enough that a future swap-in of
cachetools is mechanical if needs evolve.
Why maxsize matters more than ttl (issue's framing):
A runaway poller that touches new peer_ids every push would still
grow within a single TTL window — TTL eviction only fires at
read time. The size cap fires immediately on insert, regardless
of read pattern.
Three new tests:
- test_peer_metadata_set_evicts_lru_when_at_maxsize
- test_peer_metadata_get_promotes_to_lru_head
- test_peer_metadata_set_replaces_existing_entry_in_place
1742 passed / 0 failed locally (78 new + 1664 existing).
Closes #2482.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
76e9656a7b
commit
da6d319c48
@ -12,6 +12,7 @@ import re
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections import OrderedDict
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import httpx
|
||||
@ -52,13 +53,29 @@ _peer_to_source: dict[str, str] = {}
|
||||
# Cache workspace ID → full peer record (id, name, role, status, url, ...).
|
||||
# Populated by tool_list_peers and by the lazy registry lookup in
|
||||
# enrich_peer_metadata. The notification-callback path (channel envelope
|
||||
# enrichment) reads this cache on every inbound peer_agent push, so a
|
||||
# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read
|
||||
# shape; entries carry their fetched-at timestamp so TTL eviction is
|
||||
# in-line with the lookup. ``None`` as the record is the negative-cache
|
||||
# sentinel: registry failure is cached for one TTL window so we don't
|
||||
# re-fire the 2s-bounded GET on every push from a flaky peer.
|
||||
_peer_metadata: dict[str, tuple[float, dict | None]] = {}
|
||||
# enrichment) reads this cache on every inbound peer_agent push, so the
|
||||
# read shape stays a dict-like ``__getitem__`` lookup; entries carry
|
||||
# their fetched-at timestamp so TTL eviction is in-line with the
|
||||
# lookup. ``None`` as the record is the negative-cache sentinel:
|
||||
# registry failure is cached for one TTL window so we don't re-fire
|
||||
# the 2s-bounded GET on every push from a flaky peer.
|
||||
#
|
||||
# OrderedDict + maxsize bound (#2482): pre-fix this was an unbounded
|
||||
# ``dict``, so a workspace receiving from N distinct peers across its
|
||||
# lifetime accumulated ~100 bytes/entry × N indefinitely. At 10K peers
|
||||
# that's ~1 MB; at 100K (a chatty platform-wide router) ~10 MB; not
|
||||
# crash-class but unbounded. The LRU bound caps memory + the TTL caps
|
||||
# per-entry staleness — both gates are needed because a runaway poller
|
||||
# touching N new peer_ids per push could grow within a single TTL
|
||||
# window.
|
||||
#
|
||||
# All reads / writes go through ``_peer_metadata_get`` /
|
||||
# ``_peer_metadata_set`` so the LRU move-to-end + size-trim invariants
|
||||
# stay co-located. Direct mutation is allowed only in test fixtures
|
||||
# (clearing for isolation); production code path uses the helpers.
|
||||
_PEER_METADATA_MAXSIZE = 1024
|
||||
_peer_metadata: "OrderedDict[str, tuple[float, dict | None]]" = OrderedDict()
|
||||
_peer_metadata_lock = threading.Lock()
|
||||
|
||||
# How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes
|
||||
# is the same window we use for delegation routing — long enough that a
|
||||
@ -68,6 +85,36 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {}
|
||||
_PEER_METADATA_TTL_SECONDS = 300.0
|
||||
|
||||
|
||||
def _peer_metadata_get(canon: str) -> tuple[float, dict | None] | None:
|
||||
"""Read with LRU touch — moves the entry to the most-recently-used
|
||||
position so steady-state pushes from a busy peer don't get evicted
|
||||
by a cold-start burst from new peers. Returns the raw tuple shape
|
||||
callers expect; TTL eviction stays at the call site.
|
||||
"""
|
||||
with _peer_metadata_lock:
|
||||
entry = _peer_metadata.get(canon)
|
||||
if entry is not None:
|
||||
_peer_metadata.move_to_end(canon)
|
||||
return entry
|
||||
|
||||
|
||||
def _peer_metadata_set(canon: str, value: tuple[float, dict | None]) -> None:
|
||||
"""Write + evict-if-over-maxsize. The eviction is in-process and
|
||||
cheap (popitem(last=False) on an OrderedDict is O(1)). Holding the
|
||||
lock across the trim keeps the size invariant stable under concurrent
|
||||
writes from background enrichment workers.
|
||||
"""
|
||||
with _peer_metadata_lock:
|
||||
_peer_metadata[canon] = value
|
||||
_peer_metadata.move_to_end(canon)
|
||||
# Trim the oldest entries until at-or-below maxsize. The bound
|
||||
# is a soft cap — a single overrun (set called when at maxsize)
|
||||
# evicts the LRU entry before returning, never letting size
|
||||
# exceed maxsize.
|
||||
while len(_peer_metadata) > _PEER_METADATA_MAXSIZE:
|
||||
_peer_metadata.popitem(last=False)
|
||||
|
||||
|
||||
# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484).
|
||||
# A small pool — peers are highly TTL-cached, so the steady-state load
|
||||
# is "one fetch per peer per 5 minutes." Two workers handle the cold-
|
||||
@ -140,7 +187,7 @@ def enrich_peer_metadata_nonblocking(
|
||||
if canon is None:
|
||||
return None
|
||||
current = time.monotonic()
|
||||
cached = _peer_metadata.get(canon)
|
||||
cached = _peer_metadata_get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
@ -241,7 +288,7 @@ def enrich_peer_metadata(
|
||||
return None
|
||||
|
||||
current = now if now is not None else time.monotonic()
|
||||
cached = _peer_metadata.get(canon)
|
||||
cached = _peer_metadata_get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
@ -257,26 +304,26 @@ def enrich_peer_metadata(
|
||||
resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)})
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc)
|
||||
_peer_metadata[canon] = (current, None)
|
||||
_peer_metadata_set(canon, (current, None))
|
||||
return None
|
||||
|
||||
if resp.status_code != 200:
|
||||
logger.debug(
|
||||
"enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code
|
||||
)
|
||||
_peer_metadata[canon] = (current, None)
|
||||
_peer_metadata_set(canon, (current, None))
|
||||
return None
|
||||
|
||||
try:
|
||||
data = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
_peer_metadata[canon] = (current, None)
|
||||
_peer_metadata_set(canon, (current, None))
|
||||
return None
|
||||
if not isinstance(data, dict):
|
||||
_peer_metadata[canon] = (current, None)
|
||||
_peer_metadata_set(canon, (current, None))
|
||||
return None
|
||||
|
||||
_peer_metadata[canon] = (current, data)
|
||||
_peer_metadata_set(canon, (current, data))
|
||||
if name := data.get("name"):
|
||||
_peer_names[canon] = name
|
||||
return data
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
@ -1897,3 +1898,94 @@ def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none(
|
||||
|
||||
assert client.get.call_count == 0
|
||||
assert "not-a-uuid" not in a2a_client._enrich_in_flight
|
||||
|
||||
|
||||
# ---- #2482 bounded-cache tests ----
|
||||
|
||||
|
||||
def test_peer_metadata_set_evicts_lru_when_at_maxsize(_reset_peer_metadata_cache, monkeypatch):
|
||||
"""Cache size never exceeds ``_PEER_METADATA_MAXSIZE``. When the
|
||||
next write would push past the bound, the least-recently-used entry
|
||||
is evicted. Pin: a workspace receiving from N > maxsize peers ends
|
||||
up with exactly maxsize entries — the oldest get dropped, the
|
||||
newest stay.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
# Shrink the bound to make the test fast + deterministic. The real
|
||||
# bound (1024) is too large to exercise per-test.
|
||||
monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 4)
|
||||
|
||||
now = time.monotonic()
|
||||
for i in range(6):
|
||||
# Distinct UUIDs — generate via the static template + index so
|
||||
# _validate_peer_id accepts them.
|
||||
peer = f"00000000-0000-0000-0000-00000000000{i}"
|
||||
a2a_client._peer_metadata_set(peer, (now + i, {"id": peer, "name": f"p{i}"}))
|
||||
|
||||
# Size capped at maxsize.
|
||||
assert len(a2a_client._peer_metadata) == 4
|
||||
# Oldest two evicted, newest four remain.
|
||||
assert "00000000-0000-0000-0000-000000000000" not in a2a_client._peer_metadata
|
||||
assert "00000000-0000-0000-0000-000000000001" not in a2a_client._peer_metadata
|
||||
assert "00000000-0000-0000-0000-000000000002" in a2a_client._peer_metadata
|
||||
assert "00000000-0000-0000-0000-000000000005" in a2a_client._peer_metadata
|
||||
|
||||
|
||||
def test_peer_metadata_get_promotes_to_lru_head(_reset_peer_metadata_cache, monkeypatch):
|
||||
"""Read promotes the entry to most-recently-used. Steady-state
|
||||
pushes from a busy peer must NOT be evicted by a cold-start burst
|
||||
from new peers — the LRU touch on read is what makes that hold.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 3)
|
||||
|
||||
now = time.monotonic()
|
||||
a = "00000000-0000-0000-0000-aaaaaaaaaaaa"
|
||||
b = "00000000-0000-0000-0000-bbbbbbbbbbbb"
|
||||
c = "00000000-0000-0000-0000-cccccccccccc"
|
||||
d = "00000000-0000-0000-0000-dddddddddddd"
|
||||
|
||||
# Insert in order a, b, c. LRU position: a (oldest) → c (newest).
|
||||
a2a_client._peer_metadata_set(a, (now, {"id": a}))
|
||||
a2a_client._peer_metadata_set(b, (now, {"id": b}))
|
||||
a2a_client._peer_metadata_set(c, (now, {"id": c}))
|
||||
|
||||
# Touch `a` via _peer_metadata_get → moves to MRU. Eviction order:
|
||||
# b (oldest now) → c → a (newest).
|
||||
a2a_client._peer_metadata_get(a)
|
||||
|
||||
# Insert `d` — pushes `b` out (not `a` even though `a` was inserted first).
|
||||
a2a_client._peer_metadata_set(d, (now, {"id": d}))
|
||||
|
||||
assert a in a2a_client._peer_metadata, (
|
||||
"recently-touched entry must survive eviction; LRU touch on read is broken"
|
||||
)
|
||||
assert b not in a2a_client._peer_metadata, (
|
||||
"oldest-untouched entry must be evicted first"
|
||||
)
|
||||
assert c in a2a_client._peer_metadata
|
||||
assert d in a2a_client._peer_metadata
|
||||
|
||||
|
||||
def test_peer_metadata_set_replaces_existing_entry_in_place(_reset_peer_metadata_cache):
|
||||
"""Re-write of an existing key updates the value in place — does
|
||||
NOT evict to maxsize-1 then re-insert. The LRU move-to-end on
|
||||
update keeps the entry as MRU.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
peer = "00000000-0000-0000-0000-aaaaaaaaaaaa"
|
||||
now = time.monotonic()
|
||||
a2a_client._peer_metadata_set(peer, (now, {"id": peer, "name": "v1"}))
|
||||
assert len(a2a_client._peer_metadata) == 1
|
||||
|
||||
# Re-write — same key, new value.
|
||||
a2a_client._peer_metadata_set(peer, (now + 100, {"id": peer, "name": "v2"}))
|
||||
|
||||
assert len(a2a_client._peer_metadata) == 1, (
|
||||
"re-write must not duplicate the entry"
|
||||
)
|
||||
cached = a2a_client._peer_metadata[peer]
|
||||
assert cached[1]["name"] == "v2", "re-write must update the value in place"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user