Merge pull request #2850 from Molecule-AI/feat/enrich-off-poller-2484-1777965000
perf(a2a): move enrichment GET off the inbox poller thread (#2484)
This commit is contained in:
commit
76e9656a7b
@ -9,8 +9,10 @@ import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import httpx
|
||||
|
||||
@ -66,6 +68,146 @@ _peer_metadata: dict[str, tuple[float, dict | None]] = {}
|
||||
_PEER_METADATA_TTL_SECONDS = 300.0
|
||||
|
||||
|
||||
# Background-fetch executor for enrich_peer_metadata_nonblocking (#2484).
|
||||
# A small pool — peers are highly TTL-cached, so the steady-state load
|
||||
# is "one fetch per peer per 5 minutes." Two workers handle the cold-
|
||||
# start burst when an agent starts receiving pushes from a new peer for
|
||||
# the first time without backing up the inbox poller. Daemon threads:
|
||||
# the executor must NOT block process exit if the inbox shuts down.
|
||||
_enrich_executor: ThreadPoolExecutor | None = None
|
||||
_enrich_executor_lock = threading.Lock()
|
||||
|
||||
# In-flight peer IDs — guards against a single peer's repeated pushes
|
||||
# scheduling N concurrent registry fetches before the first one fills
|
||||
# the cache. Set membership is "a worker is currently fetching this
|
||||
# peer; subsequent calls should NOT schedule another."
|
||||
_enrich_in_flight: set[str] = set()
|
||||
_enrich_in_flight_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_enrich_executor() -> ThreadPoolExecutor:
|
||||
"""Lazy-init the enrichment worker pool. Lazy because most test
|
||||
fixtures and short-lived CLI invocations don't need it; only the
|
||||
long-running molecule-mcp / inbox-poller path actually schedules
|
||||
background fetches.
|
||||
"""
|
||||
global _enrich_executor
|
||||
if _enrich_executor is not None:
|
||||
return _enrich_executor
|
||||
with _enrich_executor_lock:
|
||||
if _enrich_executor is None:
|
||||
_enrich_executor = ThreadPoolExecutor(
|
||||
max_workers=2,
|
||||
thread_name_prefix="enrich-peer",
|
||||
)
|
||||
return _enrich_executor
|
||||
|
||||
|
||||
def enrich_peer_metadata_nonblocking(
|
||||
peer_id: str,
|
||||
source_workspace_id: str | None = None,
|
||||
) -> dict | None:
|
||||
"""Cache-first variant of ``enrich_peer_metadata`` — returns
|
||||
immediately without blocking on a registry GET.
|
||||
|
||||
Behavior:
|
||||
- Cache hit (fresh): return the cached record.
|
||||
- Cache miss or TTL expired: schedule a background fetch via the
|
||||
worker pool, return ``None`` (caller renders bare peer_id).
|
||||
The next push for this peer hits the warm cache and gets the
|
||||
full record.
|
||||
|
||||
Why this exists (#2484): the inbox poller's notification callback
|
||||
in molecule-mcp called the synchronous ``enrich_peer_metadata`` on
|
||||
every push, blocking the poller for up to 2s × N uncached peers
|
||||
per batch. Push-delivery latency was gated on registry latency —
|
||||
the exact thing the negative-cache patch in PR #2471 was supposed
|
||||
to avoid amplifying. Moving the fetch off the poller thread means
|
||||
push delivery is bounded by the inbox poll interval, never by
|
||||
registry RTT.
|
||||
|
||||
Trade-off: the FIRST push from a new peer arrives metadata-light
|
||||
(no name/role). The MCP host renders the bare peer_id. Subsequent
|
||||
pushes (within the 5-min TTL) hit the warm cache and get the full
|
||||
record. Acceptable because:
|
||||
- Channel-envelope enrichment is a UX nicety, not a correctness
|
||||
invariant.
|
||||
- The cold-cache window per peer is bounded to one push.
|
||||
- The TTL is long enough that an active conversation never
|
||||
re-enters the cold state.
|
||||
"""
|
||||
canon = _validate_peer_id(peer_id)
|
||||
if canon is None:
|
||||
return None
|
||||
current = time.monotonic()
|
||||
cached = _peer_metadata.get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
return record
|
||||
# Schedule background fetch unless one is already in flight for this
|
||||
# peer. The synchronous version atomically reads-then-writes; the
|
||||
# async version splits that into "schedule fetch" + "fetch fills
|
||||
# cache later." The in-flight set keeps a flurry of pushes from
|
||||
# one peer (e.g., a chatty agent) from spawning N parallel GETs.
|
||||
with _enrich_in_flight_lock:
|
||||
if canon in _enrich_in_flight:
|
||||
return None
|
||||
_enrich_in_flight.add(canon)
|
||||
try:
|
||||
_get_enrich_executor().submit(
|
||||
_enrich_peer_metadata_worker, canon, source_workspace_id
|
||||
)
|
||||
except RuntimeError:
|
||||
# Executor was shut down (process exit path) — drop the request,
|
||||
# let the caller render bare peer_id.
|
||||
with _enrich_in_flight_lock:
|
||||
_enrich_in_flight.discard(canon)
|
||||
return None
|
||||
|
||||
|
||||
def _enrich_peer_metadata_worker(
|
||||
canon: str, source_workspace_id: str | None
|
||||
) -> None:
|
||||
"""Background-thread body for ``enrich_peer_metadata_nonblocking``.
|
||||
Runs the same fetch logic as the synchronous helper but discards
|
||||
the return value — the cache write is the only output anyone
|
||||
needs. Always clears the in-flight marker so a future cache miss
|
||||
can retry.
|
||||
"""
|
||||
try:
|
||||
enrich_peer_metadata(canon, source_workspace_id)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# Background workers must not crash the executor — log and
|
||||
# move on. The negative-cache path inside enrich_peer_metadata
|
||||
# already records failures, so a re-attempt is rate-limited
|
||||
# by TTL.
|
||||
logger.debug("_enrich_peer_metadata_worker: %s failed: %s", canon, exc)
|
||||
finally:
|
||||
with _enrich_in_flight_lock:
|
||||
_enrich_in_flight.discard(canon)
|
||||
|
||||
|
||||
def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
|
||||
"""Block until all in-flight enrichment workers have completed.
|
||||
|
||||
Test-only helper. Production code never has a reason to wait — the
|
||||
point of the nonblocking path is that callers don't care when the
|
||||
cache fills. Tests that want to assert "after the worker runs, the
|
||||
cache has the record" use this to synchronise without sleeping.
|
||||
|
||||
Polls ``_enrich_in_flight`` rather than holding a Condition because
|
||||
the worker pool is already serializing through ``_enrich_in_flight_lock``;
|
||||
poll keeps the production hot path lock-free.
|
||||
"""
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
with _enrich_in_flight_lock:
|
||||
if not _enrich_in_flight:
|
||||
return
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
def enrich_peer_metadata(
|
||||
peer_id: str,
|
||||
source_workspace_id: str | None = None,
|
||||
|
||||
@ -55,6 +55,7 @@ from a2a_client import ( # noqa: F401, E402
|
||||
_validate_peer_id,
|
||||
discover_peer,
|
||||
enrich_peer_metadata,
|
||||
enrich_peer_metadata_nonblocking,
|
||||
get_peers,
|
||||
get_workspace_info,
|
||||
send_a2a_message,
|
||||
@ -498,7 +499,15 @@ def _build_channel_notification(msg: dict) -> dict:
|
||||
meta["peer_id"] = ""
|
||||
else:
|
||||
meta["peer_id"] = safe_peer_id
|
||||
record = enrich_peer_metadata(safe_peer_id)
|
||||
# Cache-first non-blocking enrichment (#2484): on cache miss
|
||||
# this returns None immediately and schedules a background
|
||||
# fetch. The first push for a new peer renders bare
|
||||
# peer_id; the next push (within the 5-min TTL) hits the
|
||||
# warm cache and gets full name/role. Push-delivery latency
|
||||
# is bounded by the inbox poll interval, never by registry
|
||||
# RTT — closes the gap that PR #2471's negative-cache path
|
||||
# was meant to avoid amplifying.
|
||||
record = enrich_peer_metadata_nonblocking(safe_peer_id)
|
||||
if record is not None:
|
||||
# Sanitise BEFORE storing in meta so both the JSON-RPC
|
||||
# envelope and the rendered content (via
|
||||
|
||||
@ -507,11 +507,22 @@ def _reset_peer_metadata_cache(monkeypatch):
|
||||
"""Each test starts with a clean ``_peer_metadata`` cache so an
|
||||
earlier test's hit doesn't satisfy a later test's miss. Mutates the
|
||||
module-level dict in place rather than reassigning so other modules
|
||||
that imported the dict by reference still see the same instance."""
|
||||
that imported the dict by reference still see the same instance.
|
||||
|
||||
Also drains and clears ``_enrich_in_flight`` (#2484): a previous
|
||||
test's background fetch worker can leave a peer marked in-flight,
|
||||
and the next test's nonblocking call would short-circuit without
|
||||
scheduling a fetch. Drain BEFORE clearing in case a worker is
|
||||
mid-execution and writes to ``_peer_metadata`` after the clear.
|
||||
"""
|
||||
import a2a_client
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
a2a_client._peer_metadata.clear()
|
||||
a2a_client._enrich_in_flight.clear()
|
||||
yield
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
a2a_client._peer_metadata.clear()
|
||||
a2a_client._enrich_in_flight.clear()
|
||||
|
||||
|
||||
def _make_httpx_response(status_code: int, json_body: object) -> MagicMock:
|
||||
@ -586,9 +597,16 @@ def test_envelope_enrichment_uses_cache_when_present(_reset_peer_metadata_cache)
|
||||
|
||||
|
||||
def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
|
||||
"""Cache miss + registry hit: GET fires, response cached, meta
|
||||
carries fetched fields. Subsequent build for the same peer must
|
||||
NOT re-fetch (cache populated by first call)."""
|
||||
"""Cache miss: nonblocking enrichment returns None on the first
|
||||
push (first push arrives metadata-light), schedules a background
|
||||
fetch that populates the cache, second push hits the warm cache.
|
||||
|
||||
Pre-2026-05-05 (#2484) the first push was synchronous: the inbox
|
||||
poller blocked up to 2s on the registry GET before delivering. The
|
||||
nonblocking path means push delivery is bounded by the inbox poll
|
||||
interval, never by registry RTT — at the cost of one push per peer
|
||||
per TTL window arriving without name/role.
|
||||
"""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
@ -602,22 +620,39 @@ def test_envelope_enrichment_fetches_on_cache_miss(_reset_peer_metadata_cache):
|
||||
payload1 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
|
||||
})
|
||||
# First push: bare peer_id, fetch is in-flight in the background.
|
||||
# peer_name / peer_role NOT yet present.
|
||||
assert "peer_name" not in payload1["params"]["meta"]
|
||||
assert "peer_role" not in payload1["params"]["meta"]
|
||||
|
||||
# Wait for the background worker to finish populating the cache.
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
payload2 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
|
||||
})
|
||||
|
||||
# Worker fired exactly one GET (cache miss → fetch); the second push
|
||||
# hit the warm cache and DID NOT fire another GET.
|
||||
assert client.get.call_count == 1, (
|
||||
f"second push for same peer must use cache, got {client.get.call_count} GETs"
|
||||
)
|
||||
assert payload1["params"]["meta"]["peer_name"] == "fetched-name"
|
||||
# Second push has the enriched fields the worker stored.
|
||||
assert payload2["params"]["meta"]["peer_name"] == "fetched-name"
|
||||
assert payload2["params"]["meta"]["peer_role"] == "router"
|
||||
|
||||
|
||||
def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_cache):
|
||||
"""Registry returns 500 (or 4xx, or network error): enrichment
|
||||
silently degrades to bare peer_id. The push must not crash, the
|
||||
push must not block, and the agent_card_url must still surface
|
||||
because it's constructable from peer_id alone."""
|
||||
because it's constructable from peer_id alone.
|
||||
|
||||
Post-#2484 the first push always degrades to bare peer_id (the
|
||||
background fetch hasn't run yet); this test captures that
|
||||
"degrades on cache miss + failure path doesn't break" stays true.
|
||||
"""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
p, _ = _patch_httpx_client(_make_httpx_response(500, {}))
|
||||
@ -630,6 +665,9 @@ def test_envelope_enrichment_degrades_on_registry_failure(_reset_peer_metadata_c
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
# Drain the background fetch so a follow-up test starting with
|
||||
# this peer in-flight doesn't see ghost state.
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["peer_id"] == _PEER_UUID
|
||||
@ -649,7 +687,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada
|
||||
exact scenarios it most needs to defend against, and the poller
|
||||
thread stalls 2s per push for that peer until the registry comes
|
||||
back. Pin: two pushes from a 5xx-returning peer fire exactly one
|
||||
GET, not two."""
|
||||
GET, not two.
|
||||
|
||||
Post-#2484 the GETs run in a background worker, so the test waits
|
||||
for in-flight to drain between pushes — the negative-cache write
|
||||
must land in `_peer_metadata` before the second push consults it.
|
||||
"""
|
||||
import a2a_client
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
p, client = _patch_httpx_client(_make_httpx_response(500, {}))
|
||||
@ -657,9 +701,13 @@ def test_envelope_enrichment_negative_caches_registry_failure(_reset_peer_metada
|
||||
payload1 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first",
|
||||
})
|
||||
# Wait for the worker to write the negative-cache entry before
|
||||
# the second push reads it.
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
payload2 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second",
|
||||
})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"second push from a 5xx-returning peer must use the negative "
|
||||
@ -692,7 +740,9 @@ def test_envelope_enrichment_negative_caches_network_exception(_reset_peer_metad
|
||||
client.get = MagicMock(side_effect=_httpx.ConnectError("dns down"))
|
||||
with patch("httpx.Client", return_value=client):
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"network exceptions must be negative-cached, got "
|
||||
@ -728,7 +778,9 @@ def test_envelope_enrichment_negative_caches_non_json_200(_reset_peer_metadata_c
|
||||
p, client = _patch_httpx_client(resp)
|
||||
with p:
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"non-JSON 200 must be negative-cached, got {client.get.call_count} GETs"
|
||||
@ -756,7 +808,9 @@ def test_envelope_enrichment_negative_caches_non_dict_json_200(_reset_peer_metad
|
||||
)
|
||||
with p:
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "first"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
_build_channel_notification({"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "second"})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"non-dict JSON 200 must be negative-cached, got {client.get.call_count} GETs"
|
||||
@ -792,13 +846,25 @@ def test_envelope_enrichment_re_fetches_after_ttl(_reset_peer_metadata_cache):
|
||||
)
|
||||
)
|
||||
with p:
|
||||
payload = _build_channel_notification({
|
||||
# First push: stale cache → background fetch scheduled; the
|
||||
# nonblocking path returns None when the entry is past TTL,
|
||||
# so this first push degrades to bare peer_id (no peer_name).
|
||||
# Wait for the background worker to fill the cache, then issue
|
||||
# a second push to confirm it picked up the fresh values.
|
||||
payload1 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "ping",
|
||||
})
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
payload2 = _build_channel_notification({
|
||||
"peer_id": _PEER_UUID, "kind": "peer_agent", "text": "pong",
|
||||
})
|
||||
|
||||
assert client.get.call_count == 1, "stale cache must trigger a re-fetch"
|
||||
assert payload["params"]["meta"]["peer_name"] == "fresh-name"
|
||||
assert payload["params"]["meta"]["peer_role"] == "new"
|
||||
assert "peer_name" not in payload1["params"]["meta"], (
|
||||
"first push past TTL degrades to bare peer_id under nonblocking enrichment"
|
||||
)
|
||||
assert payload2["params"]["meta"]["peer_name"] == "fresh-name"
|
||||
assert payload2["params"]["meta"]["peer_role"] == "new"
|
||||
|
||||
|
||||
def test_envelope_enrichment_invalid_peer_id_skips_lookup(_reset_peer_metadata_cache):
|
||||
@ -1732,3 +1798,102 @@ def _readable(fd: int) -> bool:
|
||||
|
||||
rlist, _, _ = select.select([fd], [], [], 0)
|
||||
return bool(rlist)
|
||||
|
||||
|
||||
# ---- #2484 nonblocking-enrichment dedicated tests ----
|
||||
|
||||
|
||||
def test_enrich_peer_metadata_nonblocking_cache_hit_returns_immediately(
|
||||
_reset_peer_metadata_cache,
|
||||
):
|
||||
"""Cache hit (fresh entry within TTL): nonblocking helper returns
|
||||
the cached record without scheduling a worker. Pin the fast path —
|
||||
the whole point of the helper is that the steady-state pushes for
|
||||
a known peer don't touch the executor."""
|
||||
import a2a_client
|
||||
import time as _time
|
||||
|
||||
a2a_client._peer_metadata[_PEER_UUID] = (
|
||||
_time.monotonic(),
|
||||
{"id": _PEER_UUID, "name": "ops", "role": "sre"},
|
||||
)
|
||||
|
||||
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
|
||||
with p:
|
||||
record = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
|
||||
|
||||
assert record is not None
|
||||
assert record["name"] == "ops"
|
||||
assert client.get.call_count == 0, "cache hit must not schedule a worker"
|
||||
# No in-flight marker should have been added since we returned synchronously.
|
||||
assert _PEER_UUID not in a2a_client._enrich_in_flight
|
||||
|
||||
|
||||
def test_enrich_peer_metadata_nonblocking_cache_miss_schedules_fetch(
|
||||
_reset_peer_metadata_cache,
|
||||
):
|
||||
"""Cache miss: helper returns None immediately, schedules a
|
||||
background fetch, the worker fills the cache. After draining the
|
||||
in-flight marker, a follow-up call hits the warm cache."""
|
||||
import a2a_client
|
||||
|
||||
p, client = _patch_httpx_client(
|
||||
_make_httpx_response(
|
||||
200,
|
||||
{"id": _PEER_UUID, "name": "fresh", "role": "router"},
|
||||
)
|
||||
)
|
||||
with p:
|
||||
first = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
|
||||
assert first is None, "first call on cache miss must return None (bare peer_id)"
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
second = a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID)
|
||||
|
||||
assert client.get.call_count == 1
|
||||
assert second is not None
|
||||
assert second["name"] == "fresh"
|
||||
|
||||
|
||||
def test_enrich_peer_metadata_nonblocking_coalesces_duplicate_pushes(
|
||||
_reset_peer_metadata_cache,
|
||||
):
|
||||
"""A burst of pushes for the same uncached peer must schedule
|
||||
exactly ONE background fetch. Without the in-flight gate, a chatty
|
||||
peer's first 10 pushes would queue 10 GETs against the registry —
|
||||
exactly the DoS-on-self pattern the negative cache was meant to
|
||||
rate-limit, except now we're amplifying with concurrency.
|
||||
"""
|
||||
import a2a_client
|
||||
|
||||
p, client = _patch_httpx_client(
|
||||
_make_httpx_response(
|
||||
200,
|
||||
{"id": _PEER_UUID, "name": "x", "role": "y"},
|
||||
)
|
||||
)
|
||||
with p:
|
||||
# Fire 5 nonblocking calls back-to-back BEFORE the worker has
|
||||
# a chance to drain. All 5 hit the in-flight gate; only the
|
||||
# first schedules a worker.
|
||||
for _ in range(5):
|
||||
assert a2a_client.enrich_peer_metadata_nonblocking(_PEER_UUID) is None
|
||||
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=2.0)
|
||||
|
||||
assert client.get.call_count == 1, (
|
||||
f"in-flight gate must coalesce concurrent pushes; got {client.get.call_count} GETs"
|
||||
)
|
||||
|
||||
|
||||
def test_enrich_peer_metadata_nonblocking_invalid_peer_id_returns_none(
|
||||
_reset_peer_metadata_cache,
|
||||
):
|
||||
"""Defensive: malformed peer_id (not a UUID) must short-circuit
|
||||
without touching the cache OR the executor."""
|
||||
import a2a_client
|
||||
|
||||
p, client = _patch_httpx_client(_make_httpx_response(200, {}))
|
||||
with p:
|
||||
assert a2a_client.enrich_peer_metadata_nonblocking("not-a-uuid") is None
|
||||
|
||||
assert client.get.call_count == 0
|
||||
assert "not-a-uuid" not in a2a_client._enrich_in_flight
|
||||
|
||||
Loading…
Reference in New Issue
Block a user