diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index e1451961..7cd151bf 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -12,6 +12,7 @@ import re import threading import time import uuid +from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor import httpx @@ -52,13 +53,29 @@ _peer_to_source: dict[str, str] = {} # Cache workspace ID → full peer record (id, name, role, status, url, ...). # Populated by tool_list_peers and by the lazy registry lookup in # enrich_peer_metadata. The notification-callback path (channel envelope -# enrichment) reads this cache on every inbound peer_agent push, so a -# bare ``dict[str, tuple[float, dict | None]]`` is the fastest read -# shape; entries carry their fetched-at timestamp so TTL eviction is -# in-line with the lookup. ``None`` as the record is the negative-cache -# sentinel: registry failure is cached for one TTL window so we don't -# re-fire the 2s-bounded GET on every push from a flaky peer. -_peer_metadata: dict[str, tuple[float, dict | None]] = {} +# enrichment) reads this cache on every inbound peer_agent push, so the +# read shape stays a dict-like ``__getitem__`` lookup; entries carry +# their fetched-at timestamp so TTL eviction is in-line with the +# lookup. ``None`` as the record is the negative-cache sentinel: +# registry failure is cached for one TTL window so we don't re-fire +# the 2s-bounded GET on every push from a flaky peer. +# +# OrderedDict + maxsize bound (#2482): pre-fix this was an unbounded +# ``dict``, so a workspace receiving from N distinct peers across its +# lifetime accumulated ~100 bytes/entry × N indefinitely. At 10K peers +# that's ~1 MB; at 100K (a chatty platform-wide router) ~10 MB; not +# crash-class but unbounded. The LRU bound caps memory + the TTL caps +# per-entry staleness — both gates are needed because a runaway poller +# touching N new peer_ids per push could grow within a single TTL +# window. +# +# All reads / writes go through ``_peer_metadata_get`` / +# ``_peer_metadata_set`` so the LRU move-to-end + size-trim invariants +# stay co-located. Direct mutation is allowed only in test fixtures +# (clearing for isolation); production code path uses the helpers. +_PEER_METADATA_MAXSIZE = 1024 +_peer_metadata: "OrderedDict[str, tuple[float, dict | None]]" = OrderedDict() +_peer_metadata_lock = threading.Lock() # How long an entry in ``_peer_metadata`` is treated as fresh. 5 minutes # is the same window we use for delegation routing — long enough that a @@ -68,6 +85,36 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {} _PEER_METADATA_TTL_SECONDS = 300.0 +def _peer_metadata_get(canon: str) -> tuple[float, dict | None] | None: + """Read with LRU touch — moves the entry to the most-recently-used + position so steady-state pushes from a busy peer don't get evicted + by a cold-start burst from new peers. Returns the raw tuple shape + callers expect; TTL eviction stays at the call site. + """ + with _peer_metadata_lock: + entry = _peer_metadata.get(canon) + if entry is not None: + _peer_metadata.move_to_end(canon) + return entry + + +def _peer_metadata_set(canon: str, value: tuple[float, dict | None]) -> None: + """Write + evict-if-over-maxsize. The eviction is in-process and + cheap (popitem(last=False) on an OrderedDict is O(1)). Holding the + lock across the trim keeps the size invariant stable under concurrent + writes from background enrichment workers. + """ + with _peer_metadata_lock: + _peer_metadata[canon] = value + _peer_metadata.move_to_end(canon) + # Trim the oldest entries until at-or-below maxsize. The bound + # is a soft cap — a single overrun (set called when at maxsize) + # evicts the LRU entry before returning, never letting size + # exceed maxsize. + while len(_peer_metadata) > _PEER_METADATA_MAXSIZE: + _peer_metadata.popitem(last=False) + + # Background-fetch executor for enrich_peer_metadata_nonblocking (#2484). # A small pool — peers are highly TTL-cached, so the steady-state load # is "one fetch per peer per 5 minutes." Two workers handle the cold- @@ -140,7 +187,7 @@ def enrich_peer_metadata_nonblocking( if canon is None: return None current = time.monotonic() - cached = _peer_metadata.get(canon) + cached = _peer_metadata_get(canon) if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: @@ -241,7 +288,7 @@ def enrich_peer_metadata( return None current = now if now is not None else time.monotonic() - cached = _peer_metadata.get(canon) + cached = _peer_metadata_get(canon) if cached is not None: fetched_at, record = cached if current - fetched_at < _PEER_METADATA_TTL_SECONDS: @@ -257,26 +304,26 @@ def enrich_peer_metadata( resp = client.get(url, headers={"X-Workspace-ID": src, **auth_headers(src)}) except Exception as exc: # noqa: BLE001 logger.debug("enrich_peer_metadata: GET %s failed: %s", url, exc) - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None if resp.status_code != 200: logger.debug( "enrich_peer_metadata: %s returned HTTP %d", url, resp.status_code ) - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None try: data = resp.json() except Exception: # noqa: BLE001 - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None if not isinstance(data, dict): - _peer_metadata[canon] = (current, None) + _peer_metadata_set(canon, (current, None)) return None - _peer_metadata[canon] = (current, data) + _peer_metadata_set(canon, (current, data)) if name := data.get("name"): _peer_names[canon] = name return data diff --git a/workspace/tests/test_a2a_mcp_server.py b/workspace/tests/test_a2a_mcp_server.py index 6eb096da..25f46e6f 100644 --- a/workspace/tests/test_a2a_mcp_server.py +++ b/workspace/tests/test_a2a_mcp_server.py @@ -3,6 +3,7 @@ import asyncio import json import os +import time from unittest.mock import AsyncMock, MagicMock, patch @@ -1897,3 +1898,94 @@ def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none( assert client.get.call_count == 0 assert "not-a-uuid" not in a2a_client._enrich_in_flight + + +# ---- #2482 bounded-cache tests ---- + + +def test_peer_metadata_set_evicts_lru_when_at_maxsize(_reset_peer_metadata_cache, monkeypatch): + """Cache size never exceeds ``_PEER_METADATA_MAXSIZE``. When the + next write would push past the bound, the least-recently-used entry + is evicted. Pin: a workspace receiving from N > maxsize peers ends + up with exactly maxsize entries — the oldest get dropped, the + newest stay. + """ + import a2a_client + + # Shrink the bound to make the test fast + deterministic. The real + # bound (1024) is too large to exercise per-test. + monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 4) + + now = time.monotonic() + for i in range(6): + # Distinct UUIDs — generate via the static template + index so + # _validate_peer_id accepts them. + peer = f"00000000-0000-0000-0000-00000000000{i}" + a2a_client._peer_metadata_set(peer, (now + i, {"id": peer, "name": f"p{i}"})) + + # Size capped at maxsize. + assert len(a2a_client._peer_metadata) == 4 + # Oldest two evicted, newest four remain. + assert "00000000-0000-0000-0000-000000000000" not in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000001" not in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000002" in a2a_client._peer_metadata + assert "00000000-0000-0000-0000-000000000005" in a2a_client._peer_metadata + + +def test_peer_metadata_get_promotes_to_lru_head(_reset_peer_metadata_cache, monkeypatch): + """Read promotes the entry to most-recently-used. Steady-state + pushes from a busy peer must NOT be evicted by a cold-start burst + from new peers — the LRU touch on read is what makes that hold. + """ + import a2a_client + + monkeypatch.setattr(a2a_client, "_PEER_METADATA_MAXSIZE", 3) + + now = time.monotonic() + a = "00000000-0000-0000-0000-aaaaaaaaaaaa" + b = "00000000-0000-0000-0000-bbbbbbbbbbbb" + c = "00000000-0000-0000-0000-cccccccccccc" + d = "00000000-0000-0000-0000-dddddddddddd" + + # Insert in order a, b, c. LRU position: a (oldest) → c (newest). + a2a_client._peer_metadata_set(a, (now, {"id": a})) + a2a_client._peer_metadata_set(b, (now, {"id": b})) + a2a_client._peer_metadata_set(c, (now, {"id": c})) + + # Touch `a` via _peer_metadata_get → moves to MRU. Eviction order: + # b (oldest now) → c → a (newest). + a2a_client._peer_metadata_get(a) + + # Insert `d` — pushes `b` out (not `a` even though `a` was inserted first). + a2a_client._peer_metadata_set(d, (now, {"id": d})) + + assert a in a2a_client._peer_metadata, ( + "recently-touched entry must survive eviction; LRU touch on read is broken" + ) + assert b not in a2a_client._peer_metadata, ( + "oldest-untouched entry must be evicted first" + ) + assert c in a2a_client._peer_metadata + assert d in a2a_client._peer_metadata + + +def test_peer_metadata_set_replaces_existing_entry_in_place(_reset_peer_metadata_cache): + """Re-write of an existing key updates the value in place — does + NOT evict to maxsize-1 then re-insert. The LRU move-to-end on + update keeps the entry as MRU. + """ + import a2a_client + + peer = "00000000-0000-0000-0000-aaaaaaaaaaaa" + now = time.monotonic() + a2a_client._peer_metadata_set(peer, (now, {"id": peer, "name": "v1"})) + assert len(a2a_client._peer_metadata) == 1 + + # Re-write — same key, new value. + a2a_client._peer_metadata_set(peer, (now + 100, {"id": peer, "name": "v2"})) + + assert len(a2a_client._peer_metadata) == 1, ( + "re-write must not duplicate the entry" + ) + cached = a2a_client._peer_metadata[peer] + assert cached[1]["name"] == "v2", "re-write must update the value in place"