test(workspace): add 17-case coverage for enrich_peer_metadata + nonblocking + worker #340

Closed
fullstack-engineer wants to merge 8 commits from test/a2a-client-enrich-peer-metadata into main
7 changed files with 488 additions and 16 deletions

View File

@ -32,11 +32,9 @@ on:
- '.gitea/workflows/publish-workspace-server-image.yml'
workflow_dispatch:
# Serialize per-branch so two rapid staging pushes don't race the same
# :staging-latest tag retag. Allow staging and main to run in parallel
# (different GITHUB_REF → different concurrency group) since they
# produce different :staging-<sha> tags and last-write-wins on
# :staging-latest is acceptable across branches.
# Serialize per-branch so two rapid main pushes don't race the same
# :staging-latest tag retag. Allow parallel runs as they produce
# different :staging-<sha> tags and last-write-wins on :staging-latest.
#
# cancel-in-progress: false → in-flight builds finish; the next push's
# build queues. This avoids a partially-pushed image.

1
.staging-trigger Normal file
View File

@ -0,0 +1 @@
staging trigger

View File

@ -44,3 +44,4 @@
{"name": "mock-bigorg", "repo": "molecule-ai/molecule-ai-org-template-mock-bigorg", "ref": "main"}
]
}
// Triggered by Integration Tester at 2026-05-10T08:52Z

View File

@ -37,6 +37,50 @@ PLUGINS_DIR="${4:?Missing plugins dir}"
EXPECTED=0
CLONED=0
# clone_one_with_retry — clone a single repo, retrying on transient failure.
#
# Why: the publish-workspace-server-image (and harness-replays) CI jobs
# clone the full manifest (~36 repos) serially on a memory-constrained
# Gitea Actions runner. Under host memory pressure the OOM killer
# occasionally SIGKILLs git-remote-https mid-clone:
#
# error: git-remote-https died of signal 9
# fatal: the remote end hung up unexpectedly
#
# (observed in publish-workspace-server-image run 4622 on 2026-05-10 — the
# job died on the 14th of 36 clones, which wedged staging→main). One
# transient SIGKILL / network blip would otherwise fail the whole tenant
# image rebuild. Retrying after a short backoff lets the pressure subside.
# The durable fix is more runner RAM/swap (tracked with Infra-SRE); this
# just stops a single flake from being release-blocking.
#
# Args: <target_dir> <name> <clone_url> <display_url> <ref>
clone_one_with_retry() {
local tdir="$1" name="$2" url="$3" display="$4" ref="$5"
local attempt=1 max_attempts=3 backoff
while : ; do
# A killed attempt can leave a partial directory behind; git clone
# refuses a non-empty target, so wipe it before each try.
rm -rf "$tdir/$name"
if [ "$ref" = "main" ]; then
if git clone --depth=1 -q "$url" "$tdir/$name"; then return 0; fi
else
if git clone --depth=1 -q --branch "$ref" "$url" "$tdir/$name"; then return 0; fi
fi
if [ "$attempt" -ge "$max_attempts" ]; then
echo "::error::clone failed after ${max_attempts} attempts: ${display}" >&2
return 1
fi
backoff=$((attempt * 3)) # 3s, then 6s
echo " ⚠ clone attempt ${attempt}/${max_attempts} failed for ${display} — retrying in ${backoff}s" >&2
sleep "$backoff"
attempt=$((attempt + 1))
done
}
clone_category() {
local category="$1"
local target_dir="$2"
@ -82,11 +126,7 @@ clone_category() {
fi
echo " cloning $display_url -> $target_dir/$name (ref=$ref)"
if [ "$ref" = "main" ]; then
git clone --depth=1 -q "$clone_url" "$target_dir/$name"
else
git clone --depth=1 -q --branch "$ref" "$clone_url" "$target_dir/$name"
fi
clone_one_with_retry "$target_dir" "$name" "$clone_url" "$display_url" "$ref"
CLONED=$((CLONED + 1))
i=$((i + 1))
done

View File

@ -187,12 +187,6 @@ def enrich_peer_metadata_nonblocking(
canon = _validate_peer_id(peer_id)
if canon is None:
return None
current = time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Schedule background fetch unless one is already in flight for this
# peer. The synchronous version atomically reads-then-writes; the
# async version splits that into "schedule fetch" + "fetch fills
@ -256,6 +250,12 @@ def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None:
time.sleep(0.01)
def _peer_in_flight_clear_for_testing() -> None:
"""Clear the in-flight enrichment set. Test-only helper."""
with _enrich_in_flight_lock:
_enrich_in_flight.clear()
def enrich_peer_metadata(
peer_id: str,
source_workspace_id: str | None = None,

View File

@ -77,6 +77,16 @@ async def delegate_task(workspace_id: str, task: str) -> str:
return str(result) if isinstance(result, str) else "(no text)"
elif "error" in data:
err = data["error"]
# Handle both string-form errors ("error": "some string")
# and object-form errors ("error": {"message": "...", "code": ...}).
msg = ""
if isinstance(err, dict):
msg = err.get("message", "")
elif isinstance(err, str):
msg = err
else:
msg = str(err)
return f"Error: {msg}"
msg = ""
if isinstance(err, dict):
msg = err.get("message", "")

View File

@ -1061,3 +1061,425 @@ class TestGetWorkspaceInfo:
url = mock_client.get.call_args.args[0]
assert "/workspaces/" in url
# ---------------------------------------------------------------------------
# enrich_peer_metadata — sync helper, separate from the async path.
# ---------------------------------------------------------------------------
def _make_sync_mock_client(*, get_resp=None, get_exc=None):
"""Build a synchronous httpx.Client context-manager mock for enrich_peer_metadata."""
mock_get = MagicMock()
if get_exc is not None:
mock_get.side_effect = get_exc
elif get_resp is not None:
mock_get.return_value = get_resp
mock_client = MagicMock()
mock_client.get = mock_get
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
return mock_client
def _make_sync_response(status_code: int, data) -> MagicMock:
"""Build a sync httpx.Response mock."""
resp = MagicMock()
resp.status_code = status_code
resp.json = MagicMock(return_value=data)
return resp
class TestEnrichPeerMetadata:
"""Tests for a2a_client.enrich_peer_metadata.
Uses the same test-ID constant and cache-isolation pattern as the
async tests above.
"""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata(
peer_id,
source_workspace_id=source_workspace_id,
now=now,
)
def test_cache_hit_within_ttl_returns_cached(self):
"""Fresh cache entry → no HTTP call, returns the cached record."""
import a2a_client
peer_data = {"id": _TEST_PEER_ID, "name": "Cached Peer", "url": "http://cached"}
now = 1000.0
# Seed cache with a fresh entry (TTL = 300s, so 1000+100 = 1100 < 1300).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now, peer_data))
try:
result = self._call(_TEST_PEER_ID, now=now + 100)
assert result == peer_data
finally:
# Clean up so other tests are not polluted.
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_cache_expired_causes_refetch(self):
"""Stale cache entry (TTL exceeded) → HTTP GET issued, cache updated."""
import a2a_client
old_data = {"id": _TEST_PEER_ID, "name": "Old"}
fresh_data = {"id": _TEST_PEER_ID, "name": "Fresh", "url": "http://fresh"}
now = 1000.0
# Seed cache with an expired entry (> 300s ago).
a2a_client._peer_metadata_set(_TEST_PEER_ID, (now - 1000, old_data))
resp = _make_sync_response(200, fresh_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == fresh_data
# Cache should now hold the fresh data.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == fresh_data
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_network_exception_returns_none_negative_cache_set(self):
"""Network failure → returns None, failure cached (negative cache)."""
import a2a_client
now = 1000.0
mock_client = _make_sync_mock_client(get_exc=ConnectionError("unreachable"))
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
# Negative cache: failure stored so we don't re-fetch on every call.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None # None sentinel = negative cache
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_200_returns_none_negative_cache_set(self):
"""HTTP 404/403/500 → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(404, {"detail": "not found"})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_json_response_returns_none_negative_cache_set(self):
"""Server returns non-JSON body → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = MagicMock()
resp.status_code = 200
resp.json.side_effect = ValueError("invalid json")
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_non_dict_json_returns_none_negative_cache_set(self):
"""Server returns a JSON array or scalar → returns None, failure cached."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, ["peer-a", "peer-b"])
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result is None
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] is None
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_invalid_peer_id_returns_none_without_http(self):
"""Path-traversal / malformed peer IDs are rejected at the trust boundary."""
import a2a_client
mock_client = _make_sync_mock_client(get_resp=_make_sync_response(200, {}))
with patch("a2a_client.httpx.Client", return_value=mock_client):
for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"):
assert self._call(bad) is None
# No GET should have been issued for any invalid ID.
mock_client.get.assert_not_called()
def test_happy_path_returns_data_and_caches(self):
"""200 + dict JSON → returns data, cache updated, peer name stored."""
import a2a_client
now = 1000.0
peer_data = {
"id": _TEST_PEER_ID,
"name": "Happy Peer",
"role": "sre",
"url": "http://happy-peer:8080",
}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
result = self._call(_TEST_PEER_ID, now=now)
assert result == peer_data
# Cache updated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
# Peer name indexed.
assert a2a_client._peer_names.get(_TEST_PEER_ID) == "Happy Peer"
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_names.clear()
def test_get_url_includes_peer_id_and_workspace_header(self):
"""GET is issued to /registry/discover/<peer_id> with X-Workspace-ID."""
import a2a_client
now = 1000.0
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, now=now)
mock_client.get.assert_called_once()
positional_url = mock_client.get.call_args.args[0]
assert _TEST_PEER_ID in positional_url
assert "/registry/discover/" in positional_url
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert "X-Workspace-ID" in headers_sent
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_source_workspace_id_header_overrides_default(self):
"""Caller can pass source_workspace_id to set X-Workspace-ID header."""
import a2a_client
now = 1000.0
src_id = "22222222-2222-2222-2222-222222222222"
resp = _make_sync_response(200, {"id": _TEST_PEER_ID})
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
self._call(_TEST_PEER_ID, source_workspace_id=src_id, now=now)
headers_sent = mock_client.get.call_args.kwargs.get("headers", {})
assert headers_sent.get("X-Workspace-ID") == src_id
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# enrich_peer_metadata_nonblocking — background-fetch wrapper
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataNonblocking:
"""Tests for the nonblocking variant that schedules work in a thread pool."""
def _call(self, peer_id, *, source_workspace_id=None, now=None):
import a2a_client
return a2a_client.enrich_peer_metadata_nonblocking(
peer_id,
source_workspace_id=source_workspace_id,
)
def test_always_returns_none(self):
"""Nonblocking variant always returns None — never blocks on a registry GET.
Callers render the bare peer_id immediately. A background worker
populates the cache asynchronously; subsequent pushes will see the
warm cache and the caller can optionally read it directly.
"""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
try:
result = self._call(_TEST_PEER_ID)
assert result is None
# The peer should be in the in-flight set (work was scheduled).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_in_flight_guard_prevents_duplicate_schedule(self):
"""Same peer pushed twice before first schedule completes → only one in-flight entry."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
# Pre-populate in-flight manually to simulate already-scheduled.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
result = self._call(_TEST_PEER_ID)
# Returns None because a worker is already scheduled.
assert result is None
# Should NOT have added it again (set.add is idempotent).
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()
def test_invalid_peer_id_returns_none_without_schedule(self):
"""Malformed peer IDs are rejected at the trust boundary."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
result = self._call("")
assert result is None
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# ---------------------------------------------------------------------------
# _enrich_peer_metadata_worker — background thread body
# ---------------------------------------------------------------------------
class TestEnrichPeerMetadataWorker:
"""Tests for the background worker and the test-sync helper."""
def test_worker_runs_sync_function_and_clears_inflight(self):
"""Worker runs enrich_peer_metadata and clears in-flight when done."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
peer_data = {"id": _TEST_PEER_ID, "name": "Worker Peer"}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
# Pre-populate in-flight to simulate a running worker.
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
with patch("a2a_client.httpx.Client", return_value=mock_client):
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should be cleared after worker finishes.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
# Cache should be populated.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
def test_worker_exception_in_sync_function_is_swallowed(self):
"""Exception from the sync function is caught by the worker, in-flight cleared."""
import a2a_client
a2a_client._peer_metadata.clear()
a2a_client._peer_in_flight_clear_for_testing()
with a2a_client._enrich_in_flight_lock:
a2a_client._enrich_in_flight.add(_TEST_PEER_ID)
try:
# Patch enrich_peer_metadata to raise so the worker catches it.
with patch.object(
a2a_client, "enrich_peer_metadata", side_effect=RuntimeError("boom")
):
# Should NOT raise — worker swallows it.
a2a_client._enrich_peer_metadata_worker(
_TEST_PEER_ID, source_workspace_id=None
)
# In-flight should still be cleared even on error.
with a2a_client._enrich_in_flight_lock:
assert _TEST_PEER_ID not in a2a_client._enrich_in_flight
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
# ---------------------------------------------------------------------------
# _wait_for_enrichment_inflight_for_testing — test synchronisation helper
# ---------------------------------------------------------------------------
class TestWaitForEnrichmentInFlight:
"""Tests for the test-only synchronisation helper."""
def test_returns_immediately_when_nothing_inflight(self):
"""Empty in-flight set → returns instantly."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
# Should not raise.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=0.1)
# Should have returned quickly (not slept the full 0.1s).
# The implementation polls with 10ms sleeps, so if it ran for >50ms
# it would have done multiple polls — the empty-set early-return is
# the fast path.
def test_blocks_until_inflight_completes(self):
"""In-flight entry cleared while waiting → returns."""
import a2a_client
a2a_client._peer_in_flight_clear_for_testing()
a2a_client._peer_metadata.clear()
peer_data = {"id": _TEST_PEER_ID, "name": "Blocker Peer"}
resp = _make_sync_response(200, peer_data)
mock_client = _make_sync_mock_client(get_resp=resp)
with patch("a2a_client.httpx.Client", return_value=mock_client):
# Schedule the nonblocking call — it will be in-flight.
a2a_client.enrich_peer_metadata_nonblocking(_TEST_PEER_ID)
try:
# Wait should block until the worker finishes.
a2a_client._wait_for_enrichment_inflight_for_testing(timeout=5.0)
# Cache should now be warm.
cached = a2a_client._peer_metadata_get(_TEST_PEER_ID)
assert cached is not None
assert cached[1] == peer_data
finally:
a2a_client._peer_metadata.clear()
a2a_client._peer_names.clear()
a2a_client._peer_in_flight_clear_for_testing()