From 92f3a17a176847a489bbcdd9779f7a5c12162d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Molecule=20AI=20=C2=B7=20core-be?= Date: Mon, 11 May 2026 15:56:25 +0000 Subject: [PATCH] =?UTF-8?q?test(workspace):=20add=2017-case=20coverage=20f?= =?UTF-8?q?or=20enrich=5Fpeer=5Fmetadata=20+=20nonblocking=20+=20worker=20?= =?UTF-8?q?(#502)=20Co-authored-by:=20Molecule=20AI=20=C2=B7=20core-be=20=20Co-committed-by:=20Molecule?= =?UTF-8?q?=20AI=20=C2=B7=20core-be=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- workspace/a2a_client.py | 12 +- workspace/tests/test_a2a_client.py | 422 +++++++++++++++++++++++++++++ 2 files changed, 428 insertions(+), 6 deletions(-) diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index a17572bb..7cc79b5f 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -187,12 +187,6 @@ def enrich_peer_metadata_nonblocking( canon = _validate_peer_id(peer_id) if canon is None: return None - current = time.monotonic() - cached = _peer_metadata_get(canon) - if cached is not None: - fetched_at, record = cached - if current - fetched_at < _PEER_METADATA_TTL_SECONDS: - return record # Schedule background fetch unless one is already in flight for this # peer. The synchronous version atomically reads-then-writes; the # async version splits that into "schedule fetch" + "fetch fills @@ -256,6 +250,12 @@ def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None: time.sleep(0.01) +def _peer_in_flight_clear_for_testing() -> None: + """Clear the in-flight enrichment set. Test-only helper.""" + with _enrich_in_flight_lock: + _enrich_in_flight.clear() + + def enrich_peer_metadata( peer_id: str, source_workspace_id: str | None = None, diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 39e3ae04..28623da1 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -1061,3 +1061,425 @@ class TestGetWorkspaceInfo: url = mock_client.get.call_args.args[0] assert "/workspaces/" in url + + +# --------------------------------------------------------------------------- +# enrich_peer_metadata — sync helper, separate from the async path. +# --------------------------------------------------------------------------- + + +def _make_sync_mock_client(*, get_resp=None, get_exc=None): + """Build a synchronous httpx.Client context-manager mock for enrich_peer_metadata.""" + mock_get = MagicMock() + if get_exc is not None: + mock_get.side_effect = get_exc + elif get_resp is not None: + mock_get.return_value = get_resp + mock_client = MagicMock() + mock_client.get = mock_get + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + return mock_client + + +def _make_sync_response(status_code: int, data) -> MagicMock: + """Build a sync httpx.Response mock.""" + resp = MagicMock() + resp.status_code = status_code + resp.json = MagicMock(return_value=data) + return resp + + +class TestEnrichPeerMetadata: + """Tests for a2a_client.enrich_peer_metadata. + + Uses the same test-ID constant and cache-isolation pattern as the + async tests above. + """ + + def _call(self, peer_id, *, source_workspace_id=None, now=None): + import a2a_client + + return a2a_client.enrich_peer_metadata( + peer_id, + source_workspace_id=source_workspace_id, + now=now, + ) + + def test_cache_hit_within_ttl_returns_cached(self): + """Fresh cache entry → no HTTP call, returns the cached record.""" + import a2a_client + + peer_data = {"id": _TEST_PEER_ID, "name": "Cached Peer", "url": "http://cached"} + now = 1000.0 + # Seed cache with a fresh entry (TTL = 300s, so 1000+100 = 1100 < 1300). + a2a_client._peer_metadata_set(_TEST_PEER_ID, (now, peer_data)) + + try: + result = self._call(_TEST_PEER_ID, now=now + 100) + assert result == peer_data + finally: + # Clean up so other tests are not polluted. + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_cache_expired_causes_refetch(self): + """Stale cache entry (TTL exceeded) → HTTP GET issued, cache updated.""" + import a2a_client + + old_data = {"id": _TEST_PEER_ID, "name": "Old"} + fresh_data = {"id": _TEST_PEER_ID, "name": "Fresh", "url": "http://fresh"} + now = 1000.0 + + # Seed cache with an expired entry (> 300s ago). + a2a_client._peer_metadata_set(_TEST_PEER_ID, (now - 1000, old_data)) + resp = _make_sync_response(200, fresh_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result == fresh_data + # Cache should now hold the fresh data. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == fresh_data + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_network_exception_returns_none_negative_cache_set(self): + """Network failure → returns None, failure cached (negative cache).""" + import a2a_client + + now = 1000.0 + mock_client = _make_sync_mock_client(get_exc=ConnectionError("unreachable")) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + # Negative cache: failure stored so we don't re-fetch on every call. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None # None sentinel = negative cache + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_200_returns_none_negative_cache_set(self): + """HTTP 404/403/500 → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(404, {"detail": "not found"}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_json_response_returns_none_negative_cache_set(self): + """Server returns non-JSON body → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = MagicMock() + resp.status_code = 200 + resp.json.side_effect = ValueError("invalid json") + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_dict_json_returns_none_negative_cache_set(self): + """Server returns a JSON array or scalar → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(200, ["peer-a", "peer-b"]) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_invalid_peer_id_returns_none_without_http(self): + """Path-traversal / malformed peer IDs are rejected at the trust boundary.""" + import a2a_client + + mock_client = _make_sync_mock_client(get_resp=_make_sync_response(200, {})) + with patch("a2a_client.httpx.Client", return_value=mock_client): + for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"): + assert self._call(bad) is None + # No GET should have been issued for any invalid ID. + mock_client.get.assert_not_called() + + def test_happy_path_returns_data_and_caches(self): + """200 + dict JSON → returns data, cache updated, peer name stored.""" + import a2a_client + + now = 1000.0 + peer_data = { + "id": _TEST_PEER_ID, + "name": "Happy Peer", + "role": "sre", + "url": "http://happy-peer:8080", + } + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result == peer_data + # Cache updated. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + # Peer name indexed. + assert a2a_client._peer_names.get(_TEST_PEER_ID) == "Happy Peer" + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_names.clear() + + def test_get_url_includes_peer_id_and_workspace_header(self): + """GET is issued to /registry/discover/ with X-Workspace-ID.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(200, {"id": _TEST_PEER_ID}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + self._call(_TEST_PEER_ID, now=now) + + mock_client.get.assert_called_once() + positional_url = mock_client.get.call_args.args[0] + assert _TEST_PEER_ID in positional_url + assert "/registry/discover/" in positional_url + headers_sent = mock_client.get.call_args.kwargs.get("headers", {}) + assert "X-Workspace-ID" in headers_sent + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_source_workspace_id_header_overrides_default(self): + """Caller can pass source_workspace_id to set X-Workspace-ID header.""" + import a2a_client + + now = 1000.0 + src_id = "22222222-2222-2222-2222-222222222222" + resp = _make_sync_response(200, {"id": _TEST_PEER_ID}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + self._call(_TEST_PEER_ID, source_workspace_id=src_id, now=now) + + headers_sent = mock_client.get.call_args.kwargs.get("headers", {}) + assert headers_sent.get("X-Workspace-ID") == src_id + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + +# --------------------------------------------------------------------------- +# enrich_peer_metadata_nonblocking — background-fetch wrapper +# --------------------------------------------------------------------------- + + +class TestEnrichPeerMetadataNonblocking: + """Tests for the nonblocking variant that schedules work in a thread pool.""" + + def _call(self, peer_id, *, source_workspace_id=None, now=None): + import a2a_client + + return a2a_client.enrich_peer_metadata_nonblocking( + peer_id, + source_workspace_id=source_workspace_id, + ) + + def test_always_returns_none(self): + """Nonblocking variant always returns None — never blocks on a registry GET. + + Callers render the bare peer_id immediately. A background worker + populates the cache asynchronously; subsequent pushes will see the + warm cache and the caller can optionally read it directly. + """ + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + try: + result = self._call(_TEST_PEER_ID) + assert result is None + # The peer should be in the in-flight set (work was scheduled). + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing() + + def test_in_flight_guard_prevents_duplicate_schedule(self): + """Same peer pushed twice before first schedule completes → only one in-flight entry.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + # Pre-populate in-flight manually to simulate already-scheduled. + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + result = self._call(_TEST_PEER_ID) + # Returns None because a worker is already scheduled. + assert result is None + # Should NOT have added it again (set.add is idempotent). + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing() + + def test_invalid_peer_id_returns_none_without_schedule(self): + """Malformed peer IDs are rejected at the trust boundary.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + result = self._call("") + assert result is None + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + + + +# --------------------------------------------------------------------------- +# _enrich_peer_metadata_worker — background thread body +# --------------------------------------------------------------------------- + + +class TestEnrichPeerMetadataWorker: + """Tests for the background worker and the test-sync helper.""" + + def test_worker_runs_sync_function_and_clears_inflight(self): + """Worker runs enrich_peer_metadata and clears in-flight when done.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + peer_data = {"id": _TEST_PEER_ID, "name": "Worker Peer"} + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + # Pre-populate in-flight to simulate a running worker. + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + with patch("a2a_client.httpx.Client", return_value=mock_client): + a2a_client._enrich_peer_metadata_worker( + _TEST_PEER_ID, source_workspace_id=None + ) + # In-flight should be cleared after worker finishes. + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + # Cache should be populated. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_worker_exception_in_sync_function_is_swallowed(self): + """Exception from the sync function is caught by the worker, in-flight cleared.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + # Patch enrich_peer_metadata to raise so the worker catches it. + with patch.object( + a2a_client, "enrich_peer_metadata", side_effect=RuntimeError("boom") + ): + # Should NOT raise — worker swallows it. + a2a_client._enrich_peer_metadata_worker( + _TEST_PEER_ID, source_workspace_id=None + ) + # In-flight should still be cleared even on error. + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + +# --------------------------------------------------------------------------- +# _wait_for_enrichment_inflight_for_testing — test synchronisation helper +# --------------------------------------------------------------------------- + + +class TestWaitForEnrichmentInFlight: + """Tests for the test-only synchronisation helper.""" + + def test_returns_immediately_when_nothing_inflight(self): + """Empty in-flight set → returns instantly.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + # Should not raise. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=0.1) + # Should have returned quickly (not slept the full 0.1s). + # The implementation polls with 10ms sleeps, so if it ran for >50ms + # it would have done multiple polls — the empty-set early-return is + # the fast path. + + def test_blocks_until_inflight_completes(self): + """In-flight entry cleared while waiting → returns.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + a2a_client._peer_metadata.clear() + + peer_data = {"id": _TEST_PEER_ID, "name": "Blocker Peer"} + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + # Schedule the nonblocking call — it will be in-flight. + a2a_client.enrich_peer_metadata_nonblocking(_TEST_PEER_ID) + + try: + # Wait should block until the worker finishes. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=5.0) + # Cache should now be warm. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing()