diff --git a/.gitea/workflows/publish-workspace-server-image.yml b/.gitea/workflows/publish-workspace-server-image.yml index e9ca5ec2..08a65d14 100644 --- a/.gitea/workflows/publish-workspace-server-image.yml +++ b/.gitea/workflows/publish-workspace-server-image.yml @@ -32,11 +32,9 @@ on: - '.gitea/workflows/publish-workspace-server-image.yml' workflow_dispatch: -# Serialize per-branch so two rapid staging pushes don't race the same -# :staging-latest tag retag. Allow staging and main to run in parallel -# (different GITHUB_REF → different concurrency group) since they -# produce different :staging- tags and last-write-wins on -# :staging-latest is acceptable across branches. +# Serialize per-branch so two rapid main pushes don't race the same +# :staging-latest tag retag. Allow parallel runs as they produce +# different :staging- tags and last-write-wins on :staging-latest. # # cancel-in-progress: false → in-flight builds finish; the next push's # build queues. This avoids a partially-pushed image. diff --git a/.staging-trigger b/.staging-trigger new file mode 100644 index 00000000..270a6560 --- /dev/null +++ b/.staging-trigger @@ -0,0 +1 @@ +staging trigger \ No newline at end of file diff --git a/manifest.json b/manifest.json index 2ac2f462..bde3a1d9 100644 --- a/manifest.json +++ b/manifest.json @@ -44,3 +44,4 @@ {"name": "mock-bigorg", "repo": "molecule-ai/molecule-ai-org-template-mock-bigorg", "ref": "main"} ] } +// Triggered by Integration Tester at 2026-05-10T08:52Z diff --git a/scripts/clone-manifest.sh b/scripts/clone-manifest.sh index 4e9e5d99..d6e343c8 100755 --- a/scripts/clone-manifest.sh +++ b/scripts/clone-manifest.sh @@ -37,6 +37,50 @@ PLUGINS_DIR="${4:?Missing plugins dir}" EXPECTED=0 CLONED=0 +# clone_one_with_retry — clone a single repo, retrying on transient failure. +# +# Why: the publish-workspace-server-image (and harness-replays) CI jobs +# clone the full manifest (~36 repos) serially on a memory-constrained +# Gitea Actions runner. Under host memory pressure the OOM killer +# occasionally SIGKILLs git-remote-https mid-clone: +# +# error: git-remote-https died of signal 9 +# fatal: the remote end hung up unexpectedly +# +# (observed in publish-workspace-server-image run 4622 on 2026-05-10 — the +# job died on the 14th of 36 clones, which wedged staging→main). One +# transient SIGKILL / network blip would otherwise fail the whole tenant +# image rebuild. Retrying after a short backoff lets the pressure subside. +# The durable fix is more runner RAM/swap (tracked with Infra-SRE); this +# just stops a single flake from being release-blocking. +# +# Args: +clone_one_with_retry() { + local tdir="$1" name="$2" url="$3" display="$4" ref="$5" + local attempt=1 max_attempts=3 backoff + + while : ; do + # A killed attempt can leave a partial directory behind; git clone + # refuses a non-empty target, so wipe it before each try. + rm -rf "$tdir/$name" + + if [ "$ref" = "main" ]; then + if git clone --depth=1 -q "$url" "$tdir/$name"; then return 0; fi + else + if git clone --depth=1 -q --branch "$ref" "$url" "$tdir/$name"; then return 0; fi + fi + + if [ "$attempt" -ge "$max_attempts" ]; then + echo "::error::clone failed after ${max_attempts} attempts: ${display}" >&2 + return 1 + fi + backoff=$((attempt * 3)) # 3s, then 6s + echo " ⚠ clone attempt ${attempt}/${max_attempts} failed for ${display} — retrying in ${backoff}s" >&2 + sleep "$backoff" + attempt=$((attempt + 1)) + done +} + clone_category() { local category="$1" local target_dir="$2" @@ -82,11 +126,7 @@ clone_category() { fi echo " cloning $display_url -> $target_dir/$name (ref=$ref)" - if [ "$ref" = "main" ]; then - git clone --depth=1 -q "$clone_url" "$target_dir/$name" - else - git clone --depth=1 -q --branch "$ref" "$clone_url" "$target_dir/$name" - fi + clone_one_with_retry "$target_dir" "$name" "$clone_url" "$display_url" "$ref" CLONED=$((CLONED + 1)) i=$((i + 1)) done diff --git a/workspace/a2a_client.py b/workspace/a2a_client.py index 8e499f40..846732b0 100644 --- a/workspace/a2a_client.py +++ b/workspace/a2a_client.py @@ -187,12 +187,6 @@ def enrich_peer_metadata_nonblocking( canon = _validate_peer_id(peer_id) if canon is None: return None - current = time.monotonic() - cached = _peer_metadata_get(canon) - if cached is not None: - fetched_at, record = cached - if current - fetched_at < _PEER_METADATA_TTL_SECONDS: - return record # Schedule background fetch unless one is already in flight for this # peer. The synchronous version atomically reads-then-writes; the # async version splits that into "schedule fetch" + "fetch fills @@ -256,6 +250,12 @@ def _wait_for_enrichment_inflight_for_testing(timeout: float = 2.0) -> None: time.sleep(0.01) +def _peer_in_flight_clear_for_testing() -> None: + """Clear the in-flight enrichment set. Test-only helper.""" + with _enrich_in_flight_lock: + _enrich_in_flight.clear() + + def enrich_peer_metadata( peer_id: str, source_workspace_id: str | None = None, diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index acdd15cb..48b813a1 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -77,6 +77,16 @@ async def delegate_task(workspace_id: str, task: str) -> str: return str(result) if isinstance(result, str) else "(no text)" elif "error" in data: err = data["error"] + # Handle both string-form errors ("error": "some string") + # and object-form errors ("error": {"message": "...", "code": ...}). + msg = "" + if isinstance(err, dict): + msg = err.get("message", "") + elif isinstance(err, str): + msg = err + else: + msg = str(err) + return f"Error: {msg}" msg = "" if isinstance(err, dict): msg = err.get("message", "") diff --git a/workspace/tests/test_a2a_client.py b/workspace/tests/test_a2a_client.py index 39e3ae04..28623da1 100644 --- a/workspace/tests/test_a2a_client.py +++ b/workspace/tests/test_a2a_client.py @@ -1061,3 +1061,425 @@ class TestGetWorkspaceInfo: url = mock_client.get.call_args.args[0] assert "/workspaces/" in url + + +# --------------------------------------------------------------------------- +# enrich_peer_metadata — sync helper, separate from the async path. +# --------------------------------------------------------------------------- + + +def _make_sync_mock_client(*, get_resp=None, get_exc=None): + """Build a synchronous httpx.Client context-manager mock for enrich_peer_metadata.""" + mock_get = MagicMock() + if get_exc is not None: + mock_get.side_effect = get_exc + elif get_resp is not None: + mock_get.return_value = get_resp + mock_client = MagicMock() + mock_client.get = mock_get + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + return mock_client + + +def _make_sync_response(status_code: int, data) -> MagicMock: + """Build a sync httpx.Response mock.""" + resp = MagicMock() + resp.status_code = status_code + resp.json = MagicMock(return_value=data) + return resp + + +class TestEnrichPeerMetadata: + """Tests for a2a_client.enrich_peer_metadata. + + Uses the same test-ID constant and cache-isolation pattern as the + async tests above. + """ + + def _call(self, peer_id, *, source_workspace_id=None, now=None): + import a2a_client + + return a2a_client.enrich_peer_metadata( + peer_id, + source_workspace_id=source_workspace_id, + now=now, + ) + + def test_cache_hit_within_ttl_returns_cached(self): + """Fresh cache entry → no HTTP call, returns the cached record.""" + import a2a_client + + peer_data = {"id": _TEST_PEER_ID, "name": "Cached Peer", "url": "http://cached"} + now = 1000.0 + # Seed cache with a fresh entry (TTL = 300s, so 1000+100 = 1100 < 1300). + a2a_client._peer_metadata_set(_TEST_PEER_ID, (now, peer_data)) + + try: + result = self._call(_TEST_PEER_ID, now=now + 100) + assert result == peer_data + finally: + # Clean up so other tests are not polluted. + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_cache_expired_causes_refetch(self): + """Stale cache entry (TTL exceeded) → HTTP GET issued, cache updated.""" + import a2a_client + + old_data = {"id": _TEST_PEER_ID, "name": "Old"} + fresh_data = {"id": _TEST_PEER_ID, "name": "Fresh", "url": "http://fresh"} + now = 1000.0 + + # Seed cache with an expired entry (> 300s ago). + a2a_client._peer_metadata_set(_TEST_PEER_ID, (now - 1000, old_data)) + resp = _make_sync_response(200, fresh_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result == fresh_data + # Cache should now hold the fresh data. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == fresh_data + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_network_exception_returns_none_negative_cache_set(self): + """Network failure → returns None, failure cached (negative cache).""" + import a2a_client + + now = 1000.0 + mock_client = _make_sync_mock_client(get_exc=ConnectionError("unreachable")) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + # Negative cache: failure stored so we don't re-fetch on every call. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None # None sentinel = negative cache + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_200_returns_none_negative_cache_set(self): + """HTTP 404/403/500 → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(404, {"detail": "not found"}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_json_response_returns_none_negative_cache_set(self): + """Server returns non-JSON body → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = MagicMock() + resp.status_code = 200 + resp.json.side_effect = ValueError("invalid json") + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_non_dict_json_returns_none_negative_cache_set(self): + """Server returns a JSON array or scalar → returns None, failure cached.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(200, ["peer-a", "peer-b"]) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result is None + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] is None + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_invalid_peer_id_returns_none_without_http(self): + """Path-traversal / malformed peer IDs are rejected at the trust boundary.""" + import a2a_client + + mock_client = _make_sync_mock_client(get_resp=_make_sync_response(200, {})) + with patch("a2a_client.httpx.Client", return_value=mock_client): + for bad in ("", "ws-abc", "../admin", "not-a-uuid", "8dad3e29"): + assert self._call(bad) is None + # No GET should have been issued for any invalid ID. + mock_client.get.assert_not_called() + + def test_happy_path_returns_data_and_caches(self): + """200 + dict JSON → returns data, cache updated, peer name stored.""" + import a2a_client + + now = 1000.0 + peer_data = { + "id": _TEST_PEER_ID, + "name": "Happy Peer", + "role": "sre", + "url": "http://happy-peer:8080", + } + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + result = self._call(_TEST_PEER_ID, now=now) + + assert result == peer_data + # Cache updated. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + # Peer name indexed. + assert a2a_client._peer_names.get(_TEST_PEER_ID) == "Happy Peer" + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_names.clear() + + def test_get_url_includes_peer_id_and_workspace_header(self): + """GET is issued to /registry/discover/ with X-Workspace-ID.""" + import a2a_client + + now = 1000.0 + resp = _make_sync_response(200, {"id": _TEST_PEER_ID}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + self._call(_TEST_PEER_ID, now=now) + + mock_client.get.assert_called_once() + positional_url = mock_client.get.call_args.args[0] + assert _TEST_PEER_ID in positional_url + assert "/registry/discover/" in positional_url + headers_sent = mock_client.get.call_args.kwargs.get("headers", {}) + assert "X-Workspace-ID" in headers_sent + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_source_workspace_id_header_overrides_default(self): + """Caller can pass source_workspace_id to set X-Workspace-ID header.""" + import a2a_client + + now = 1000.0 + src_id = "22222222-2222-2222-2222-222222222222" + resp = _make_sync_response(200, {"id": _TEST_PEER_ID}) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + self._call(_TEST_PEER_ID, source_workspace_id=src_id, now=now) + + headers_sent = mock_client.get.call_args.kwargs.get("headers", {}) + assert headers_sent.get("X-Workspace-ID") == src_id + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + +# --------------------------------------------------------------------------- +# enrich_peer_metadata_nonblocking — background-fetch wrapper +# --------------------------------------------------------------------------- + + +class TestEnrichPeerMetadataNonblocking: + """Tests for the nonblocking variant that schedules work in a thread pool.""" + + def _call(self, peer_id, *, source_workspace_id=None, now=None): + import a2a_client + + return a2a_client.enrich_peer_metadata_nonblocking( + peer_id, + source_workspace_id=source_workspace_id, + ) + + def test_always_returns_none(self): + """Nonblocking variant always returns None — never blocks on a registry GET. + + Callers render the bare peer_id immediately. A background worker + populates the cache asynchronously; subsequent pushes will see the + warm cache and the caller can optionally read it directly. + """ + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + try: + result = self._call(_TEST_PEER_ID) + assert result is None + # The peer should be in the in-flight set (work was scheduled). + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing() + + def test_in_flight_guard_prevents_duplicate_schedule(self): + """Same peer pushed twice before first schedule completes → only one in-flight entry.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + # Pre-populate in-flight manually to simulate already-scheduled. + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + result = self._call(_TEST_PEER_ID) + # Returns None because a worker is already scheduled. + assert result is None + # Should NOT have added it again (set.add is idempotent). + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing() + + def test_invalid_peer_id_returns_none_without_schedule(self): + """Malformed peer IDs are rejected at the trust boundary.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + result = self._call("") + assert result is None + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + + + +# --------------------------------------------------------------------------- +# _enrich_peer_metadata_worker — background thread body +# --------------------------------------------------------------------------- + + +class TestEnrichPeerMetadataWorker: + """Tests for the background worker and the test-sync helper.""" + + def test_worker_runs_sync_function_and_clears_inflight(self): + """Worker runs enrich_peer_metadata and clears in-flight when done.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + peer_data = {"id": _TEST_PEER_ID, "name": "Worker Peer"} + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + # Pre-populate in-flight to simulate a running worker. + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + with patch("a2a_client.httpx.Client", return_value=mock_client): + a2a_client._enrich_peer_metadata_worker( + _TEST_PEER_ID, source_workspace_id=None + ) + # In-flight should be cleared after worker finishes. + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + # Cache should be populated. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + def test_worker_exception_in_sync_function_is_swallowed(self): + """Exception from the sync function is caught by the worker, in-flight cleared.""" + import a2a_client + + a2a_client._peer_metadata.clear() + a2a_client._peer_in_flight_clear_for_testing() + + with a2a_client._enrich_in_flight_lock: + a2a_client._enrich_in_flight.add(_TEST_PEER_ID) + + try: + # Patch enrich_peer_metadata to raise so the worker catches it. + with patch.object( + a2a_client, "enrich_peer_metadata", side_effect=RuntimeError("boom") + ): + # Should NOT raise — worker swallows it. + a2a_client._enrich_peer_metadata_worker( + _TEST_PEER_ID, source_workspace_id=None + ) + # In-flight should still be cleared even on error. + with a2a_client._enrich_in_flight_lock: + assert _TEST_PEER_ID not in a2a_client._enrich_in_flight + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + + +# --------------------------------------------------------------------------- +# _wait_for_enrichment_inflight_for_testing — test synchronisation helper +# --------------------------------------------------------------------------- + + +class TestWaitForEnrichmentInFlight: + """Tests for the test-only synchronisation helper.""" + + def test_returns_immediately_when_nothing_inflight(self): + """Empty in-flight set → returns instantly.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + # Should not raise. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=0.1) + # Should have returned quickly (not slept the full 0.1s). + # The implementation polls with 10ms sleeps, so if it ran for >50ms + # it would have done multiple polls — the empty-set early-return is + # the fast path. + + def test_blocks_until_inflight_completes(self): + """In-flight entry cleared while waiting → returns.""" + import a2a_client + + a2a_client._peer_in_flight_clear_for_testing() + a2a_client._peer_metadata.clear() + + peer_data = {"id": _TEST_PEER_ID, "name": "Blocker Peer"} + resp = _make_sync_response(200, peer_data) + mock_client = _make_sync_mock_client(get_resp=resp) + + with patch("a2a_client.httpx.Client", return_value=mock_client): + # Schedule the nonblocking call — it will be in-flight. + a2a_client.enrich_peer_metadata_nonblocking(_TEST_PEER_ID) + + try: + # Wait should block until the worker finishes. + a2a_client._wait_for_enrichment_inflight_for_testing(timeout=5.0) + # Cache should now be warm. + cached = a2a_client._peer_metadata_get(_TEST_PEER_ID) + assert cached is not None + assert cached[1] == peer_data + finally: + a2a_client._peer_metadata.clear() + a2a_client._peer_names.clear() + a2a_client._peer_in_flight_clear_for_testing()