test(workspace): add 17-case coverage for enrich_peer_metadata + nonblocking + worker (#502)
Some checks failed
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 24s
E2E API Smoke Test / detect-changes (push) Successful in 25s
Handlers Postgres Integration / detect-changes (push) Successful in 24s
CI / Python Lint & Test (push) Failing after 6m53s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 22s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Failing after 4m40s
CI / Platform (Go) (push) Successful in 6s
main-red-watchdog / watchdog (push) Successful in 25s
CI / Canvas (Next.js) (push) Successful in 6s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 8s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Failing after 5m30s
CI / Shellcheck (E2E scripts) (push) Successful in 3s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 5s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 6s
CI / Canvas Deploy Reminder (push) Has been skipped
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 9s
Block internal-flavored paths / Block forbidden paths (push) Successful in 10s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 2m10s
publish-runtime-autobump / autobump-and-tag (push) Failing after 46s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 7s
CI / Detect changes (push) Successful in 23s

Co-authored-by: Molecule AI · core-be <core-be@agents.moleculesai.app>
Co-committed-by: Molecule AI · core-be <core-be@agents.moleculesai.app>
This commit is contained in:
Molecule AI · core-be 2026-05-11 15:56:25 +00:00 committed by Molecule AI · core-lead
parent 7b783aa2ed
commit 92f3a17a17
2 changed files with 428 additions and 6 deletions

View File

@ -187,12 +187,6 @@ def enrich_peer_metadata_nonblocking(
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Schedule background fetch unless one is already in flight for this
# peer. The synchronous version atomically reads-then-writes; the
# async version splits that into "schedule fetch" + "fetch fills
@ -256,6 +250,12 @@ def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
time.sleep(0.01)
def _peer_in_flight_clear_for_testing() -> None:
"""Clear the in-flight enrichment set. Test-only helper."""
with _enrich_in_flight_lock:
_enrich_in_flight.clear()
def enrich_peer_metadata(
peer_id: str,
source_workspace_id: str | None = None,

View File

@ -1061,3 +1061,425 @@ class TestGetWorkspaceInfo:
url = mock_client.get.call_args.args[0]
assert "/workspaces/" in url
# ---------------------------------------------------------------------------
# enrich_peer_metadata — sync helper, separate from the async path.
# ---------------------------------------------------------------------------
def _make_sync_mock_client(*, get_resp=None, get_exc=None):
"""Build a synchronous httpx.Client context-manager mock for enrich_peer_metadata."""
mock_get = MagicMock()
if get_exc is not None:
mock_get.side_effect = get_exc
elif get_resp is not None:
mock_get.return_value = get_resp
mock_client = MagicMock()
mock_client.get = mock_get
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
return mock_client
def _make_sync_response(status_code: int, data) -> MagicMock:
"""Build a sync httpx.Response mock."""
resp = MagicMock()
resp.status_code = status_code
resp.json = MagicMock(return_value=data)
return resp
class TestEnrichPeerMetadata:
"""Tests for a2a_client.enrich_peer_metadata.
Uses the same test-ID constant and cache-isolation pattern as the
async tests above.
"""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata(
peer_id,
source_workspace_id=source_workspace_id,
now=now,
)
def test_cache_hit_within_ttl_returns_cached(self):
"""Fresh cache entry → no HTTP call, returns the cached record."""
import a2a_client
peer_data = {"id": _TEST_PEER_ID, "name": "Cached Peer", "url": "http://cached"}
now = 1000.0
# Seed cache with a fresh entry (TTL = 300s, so 1000+100 = 1100 < 1300).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now, peer_data))
try:
result = self._call(_TEST_PEER_ID, now=now + 100)
assert result == peer_data
finally:
# Clean up so other tests are not polluted.
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_cache_expired_causes_refetch(self):
"""Stale cache entry (TTL exceeded) → HTTP GET issued, cache updated."""
import a2a_client
old_data = {"id": _TEST_PEER_ID, "name": "Old"}
fresh_data = {"id": _TEST_PEER_ID, "name": "Fresh", "url": "http://fresh"}
now = 1000.0
# Seed cache with an expired entry (> 300s ago).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now - 1000, old_data))
resp = _make_sync_response(200, fresh_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == fresh_data
# Cache should now hold the fresh data.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == fresh_data
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_network_exception_returns_none_negative_cache_set(self):
"""Network failure → returns None, failure cached (negative cache)."""
import a2a_client
now = 1000.0
mock_client = _make_sync_mock_client(get_exc=ConnectionError("unreachable"))
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
# Negative cache: failure stored so we don't re-fetch on every call.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None # None sentinel = negative cache
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_200_returns_none_negative_cache_set(self):
"""HTTP 404/403/500 → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(404, {"detail": "not found"})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_json_response_returns_none_negative_cache_set(self):
"""Server returns non-JSON body → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = ValueError("invalid json")
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_dict_json_returns_none_negative_cache_set(self):
"""Server returns a JSON array or scalar → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, ["peer-a", "peer-b"])
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_invalid_peer_id_returns_none_without_http(self):
"""Path-traversal / malformed peer IDs are rejected at the trust boundary."""
import a2a_client
mock_client = _make_sync_mock_client(get_resp=_make_sync_response(200, {}))
with patch("a2a_client.httpx.Client", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
assert self._call(bad) is None
# No GET should have been issued for any invalid ID.
mock_client.get.assert_not_called()
def test_happy_path_returns_data_and_caches(self):
"""200 + dict JSON → returns data, cache updated, peer name stored."""
import a2a_client
now = 1000.0
peer_data = {
"id": _TEST_PEER_ID,
"name": "Happy Peer",
"role": "sre",
"url": "http://happy-peer:8080",
}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == peer_data
# Cache updated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
# Peer name indexed.
assert a2a_client._peer_names.get(_TEST_PEER_ID) == "Happy Peer"
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_names.clear()
def test_get_url_includes_peer_id_and_workspace_header(self):
"""GET is issued to /registry/discover/<peer_id> with X-Workspace-ID."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, now=now)
mock_client.get.assert_called_once()
positional_url = mock_client.get.call_args.args[0]
assert _TEST_PEER_ID in positional_url
assert "/registry/discover/" in positional_url
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert "X-Workspace-ID" in headers_sent
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_source_workspace_id_header_overrides_default(self):
"""Caller can pass source_workspace_id to set X-Workspace-ID header."""
import a2a_client
now = 1000.0
src_id = "22222222-2222-2222-2222-222222222222"
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, source_workspace_id=src_id, now=now)
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == src_id
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# enrich_peer_metadata_nonblocking — background-fetch wrapper
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataNonblocking:
"""Tests for the nonblocking variant that schedules work in a thread pool."""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata_nonblocking(
peer_id,
source_workspace_id=source_workspace_id,
)
def test_always_returns_none(self):
"""Nonblocking variant always returns None — never blocks on a registry GET.
Callers render the bare peer_id immediately. A background worker
populates the cache asynchronously; subsequent pushes will see the
warm cache and the caller can optionally read it directly.
"""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
try:
result = self._call(_TEST_PEER_ID)
assert result is None
# The peer should be in the in-flight set (work was scheduled).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_in_flight_guard_prevents_duplicate_schedule(self):
"""Same peer pushed twice before first schedule completes → only one in-flight entry."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
# Pre-populate in-flight manually to simulate already-scheduled.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
result = self._call(_TEST_PEER_ID)
# Returns None because a worker is already scheduled.
assert result is None
# Should NOT have added it again (set.add is idempotent).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_invalid_peer_id_returns_none_without_schedule(self):
"""Malformed peer IDs are rejected at the trust boundary."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
result = self._call("")
assert result is None
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# ---------------------------------------------------------------------------
# _enrich_peer_metadata_worker — background thread body
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataWorker:
"""Tests for the background worker and the test-sync helper."""
def test_worker_runs_sync_function_and_clears_inflight(self):
"""Worker runs enrich_peer_metadata and clears in-flight when done."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
peer_data = {"id": _TEST_PEER_ID, "name": "Worker Peer"}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
# Pre-populate in-flight to simulate a running worker.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
with patch("a2a_client.httpx.Client", return_value=mock_client):
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should be cleared after worker finishes.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# Cache should be populated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_worker_exception_in_sync_function_is_swallowed(self):
"""Exception from the sync function is caught by the worker, in-flight cleared."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
# Patch enrich_peer_metadata to raise so the worker catches it.
with patch.object(
a2a_client, "enrich_peer_metadata", side_effect=RuntimeError("boom")
):
# Should NOT raise — worker swallows it.
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should still be cleared even on error.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# _wait_for_enrichment_inflight_for_testing — test synchronisation helper
# ---------------------------------------------------------------------------
class TestWaitForEnrichmentInFlight:
"""Tests for the test-only synchronisation helper."""
def test_returns_immediately_when_nothing_inflight(self):
"""Empty in-flight set → returns instantly."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
# Should not raise.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=0.1)
# Should have returned quickly (not slept the full 0.1s).
# The implementation polls with 10ms sleeps, so if it ran for >50ms
# it would have done multiple polls — the empty-set early-return is
# the fast path.
def test_blocks_until_inflight_completes(self):
"""In-flight entry cleared while waiting → returns."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
a2a_client._peer_metadata.clear()
peer_data = {"id": _TEST_PEER_ID, "name": "Blocker Peer"}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
# Schedule the nonblocking call — it will be in-flight.
a2a_client.enrich_peer_metadata_nonblocking(_TEST_PEER_ID)
try:
# Wait should block until the worker finishes.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=5.0)
# Cache should now be warm.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()