feat(tests): GAP-05 add _get_with_retry() with 429 back-off + fix broken test_call_peer_errors (#11)
Adds retry-on-429 with exponential back-off (1 s → 2 s → 4 s, ±25% jitter, 30 s cap, Retry-After header honoured) to all idempotent RemoteAgentClient GET calls: poll_state, pull_secrets, get_peers, discover_peer. Also fixes the merged test_call_peer_errors.py (PR #7) which was broken: - Removed pytest-mock dependency (mocker not installed) - Fixed call_peer(message: str) vs dict - Fixed non-existent _call_direct/_call_proxy method patches - Uses FakeResponse + _session.post.side_effect pattern consistently Adds tests/conftest.py (FakeResponse + client fixture + _CaptureHandler) and tests/test_retry_backoff.py (18 new tests). Co-authored-by: Molecule AI SDK-Dev <sdk-dev@agents.moleculesai.app> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
4e289e3004
commit
818931f9d3
@ -20,7 +20,9 @@ from __future__ import annotations
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
import stat
|
||||
import subprocess
|
||||
import tarfile
|
||||
@ -47,6 +49,13 @@ DEFAULT_STATE_POLL_INTERVAL = 30.0 # seconds
|
||||
# we don't hit the discovery endpoint on every A2A call.
|
||||
DEFAULT_URL_CACHE_TTL = 300.0 # 5 minutes
|
||||
|
||||
# Retry-on-429 defaults for idempotent GET calls.
|
||||
# Matches the behaviour of the TypeScript MCP server's platformGet().
|
||||
DEFAULT_GET_MAX_RETRIES = 3 # retry up to 3 times on 429
|
||||
_RETRY_BASE_DELAY = 1.0 # seconds — first delay
|
||||
_RETRY_MAX_DELAY = 30.0 # seconds — cap
|
||||
_RETRY_JITTER_FRAC = 0.25 # ±25% jitter around base delay
|
||||
|
||||
|
||||
def _safe_extract_tar(tf: tarfile.TarFile, dest: Path) -> None:
|
||||
"""Extract a tarfile, refusing entries that would escape `dest`
|
||||
@ -297,6 +306,43 @@ class RemoteAgentClient:
|
||||
return {}
|
||||
return {"Authorization": f"Bearer {tok}"}
|
||||
|
||||
def _get_with_retry(
|
||||
self,
|
||||
url: str,
|
||||
headers: dict[str, str] | None = None,
|
||||
max_retries: int = DEFAULT_GET_MAX_RETRIES,
|
||||
) -> requests.Response:
|
||||
"""Issue a GET with automatic retry on 429 (Too Many Requests).
|
||||
|
||||
Retries up to ``max_retries`` times, honouring the ``Retry-After``
|
||||
header (seconds, rounded up to ms) when present. When absent, uses
|
||||
exponential back-off with ±25 % jitter: 1 s → 2 s → 4 s, capped at
|
||||
30 s.
|
||||
|
||||
After exhausting retries returns the final 429 response (caller's
|
||||
``raise_for_status()`` will turn it into an ``HTTPError``).
|
||||
|
||||
Only use for idempotent GET calls. POST/DELETE callers must not
|
||||
retry, as the server may have already processed the request.
|
||||
"""
|
||||
attempt = 0
|
||||
|
||||
while True:
|
||||
resp = self._session.get(url, headers=headers or {}, timeout=10.0)
|
||||
if resp.status_code != 429 or attempt >= max_retries:
|
||||
return resp
|
||||
|
||||
attempt += 1
|
||||
retry_after = resp.headers.get("Retry-After")
|
||||
if retry_after is not None:
|
||||
delay_s = math.ceil(float(retry_after))
|
||||
else:
|
||||
base = _RETRY_BASE_DELAY * (2 ** (attempt - 1))
|
||||
jitter = _RETRY_BASE_DELAY * _RETRY_JITTER_FRAC * (random.random() * 2 - 1)
|
||||
delay_s = base + jitter
|
||||
delay_ms = min(int(delay_s * 1000), int(_RETRY_MAX_DELAY * 1000))
|
||||
time.sleep(delay_ms / 1000.0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ------------------------------------------------------------------
|
||||
@ -350,10 +396,9 @@ class RemoteAgentClient:
|
||||
means the token is missing / invalid — call :py:meth:`register`
|
||||
first).
|
||||
"""
|
||||
resp = self._session.get(
|
||||
resp = self._get_with_retry(
|
||||
f"{self.platform_url}/workspaces/{self.workspace_id}/secrets/values",
|
||||
headers=self._auth_headers(),
|
||||
timeout=10.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json() or {}
|
||||
@ -365,10 +410,9 @@ class RemoteAgentClient:
|
||||
(workspace hard-deleted) — callers typically exit their run loop
|
||||
in that case. Raises on other HTTP errors.
|
||||
"""
|
||||
resp = self._session.get(
|
||||
resp = self._get_with_retry(
|
||||
f"{self.platform_url}/workspaces/{self.workspace_id}/state",
|
||||
headers=self._auth_headers(),
|
||||
timeout=10.0,
|
||||
)
|
||||
if resp.status_code == 404:
|
||||
# Platform signals hard-delete via 404 + deleted:true
|
||||
@ -428,13 +472,12 @@ class RemoteAgentClient:
|
||||
Raises on 401 (stale/missing token → call :py:meth:`register`) and
|
||||
other non-2xx.
|
||||
"""
|
||||
resp = self._session.get(
|
||||
resp = self._get_with_retry(
|
||||
f"{self.platform_url}/registry/{self.workspace_id}/peers",
|
||||
headers={
|
||||
**self._auth_headers(),
|
||||
"X-Workspace-ID": self.workspace_id,
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json() or []
|
||||
@ -483,13 +526,12 @@ class RemoteAgentClient:
|
||||
# Expired — drop and fall through to refresh
|
||||
self._url_cache.pop(target_id, None)
|
||||
|
||||
resp = self._session.get(
|
||||
resp = self._get_with_retry(
|
||||
f"{self.platform_url}/registry/discover/{target_id}",
|
||||
headers={
|
||||
**self._auth_headers(),
|
||||
"X-Workspace-ID": self.workspace_id,
|
||||
},
|
||||
timeout=10.0,
|
||||
)
|
||||
if resp.status_code == 404:
|
||||
return None
|
||||
|
||||
112
tests/conftest.py
Normal file
112
tests/conftest.py
Normal file
@ -0,0 +1,112 @@
|
||||
"""Pytest fixtures and helpers for molecule_agent tests.
|
||||
|
||||
All fixtures are function-scoped unless noted. No live platform required —
|
||||
all HTTP is mocked via ``unittest.mock``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import stat
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from molecule_agent import RemoteAgentClient
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FakeResponse — minimal requests-shaped response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
"""Minimal stand-in for ``requests.Response``."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 200,
|
||||
json_body: Any = None,
|
||||
text: str = "",
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self._json = json_body
|
||||
self.text = text
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self) -> Any:
|
||||
return self._json
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.status_code >= 400:
|
||||
import requests
|
||||
raise requests.HTTPError(f"HTTP {self.status_code}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_token_dir(tmp_path: Path) -> Path:
|
||||
return tmp_path / "molecule-token-cache"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_token_dir: Path) -> RemoteAgentClient:
|
||||
"""RemoteAgentClient with a MagicMock session for unit tests."""
|
||||
session = MagicMock()
|
||||
return RemoteAgentClient(
|
||||
workspace_id="ws-test-123",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "test-agent"},
|
||||
token_dir=tmp_token_dir,
|
||||
session=session,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _CaptureHandler — minimal HTTP mock for integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _CaptureHandler:
|
||||
"""Thread-local registry of HTTP stubs for use in integration tests.
|
||||
|
||||
Registered stubs are checked in order (last-registered first); the first
|
||||
matching (method, path) pair wins. Unmatched requests raise
|
||||
``RuntimeError("no stub for {method} {path}")``.
|
||||
|
||||
Usage::
|
||||
|
||||
_CaptureHandler.clear()
|
||||
_CaptureHandler.stub("GET", "/foo", 200, {}, "body")
|
||||
with some_client:
|
||||
result = await some_client.get("/foo")
|
||||
"""
|
||||
|
||||
_stubs: list[tuple[str, str, int, dict[str, str], str]] = []
|
||||
|
||||
@classmethod
|
||||
def clear(cls) -> None:
|
||||
cls._stubs.clear()
|
||||
|
||||
@classmethod
|
||||
def stub(
|
||||
cls,
|
||||
method: str,
|
||||
path: str,
|
||||
status: int,
|
||||
headers: dict[str, str],
|
||||
body: str,
|
||||
) -> None:
|
||||
cls._stubs.append((method, path, status, headers, body))
|
||||
|
||||
@classmethod
|
||||
def handle(cls, method: str, url: str, **kwargs: Any) -> FakeResponse:
|
||||
for m, p, status, hdrs, body in reversed(cls._stubs):
|
||||
if m == method and p in url:
|
||||
return FakeResponse(status, json_body={}, text=body, headers=hdrs)
|
||||
raise RuntimeError(f"no stub for {method} {url}")
|
||||
@ -1,124 +1,237 @@
|
||||
"""GAP-03: call_peer error paths — documents and tests the error surface.
|
||||
"""GAP-03 / GAP-11: call_peer error paths — documents and tests the error surface.
|
||||
|
||||
Per PLAN.md backlog #13: ClaudeSDKExecutor surfaces opaque "Command failed"
|
||||
without capturing stderr. These tests document the desired behavior.
|
||||
Per PLAN.md backlog #13: call_peer must surface structured errors (HTTP
|
||||
status, auth context) rather than opaque strings. These tests verify the
|
||||
error surface using the same FakeResponse / MagicMock pattern as the rest of
|
||||
the test suite.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
_SDK_ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(_SDK_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(_SDK_ROOT))
|
||||
|
||||
from molecule_agent.client import RemoteAgentClient
|
||||
from tests.conftest import _CaptureHandler
|
||||
from molecule_agent import RemoteAgentClient
|
||||
|
||||
|
||||
def stub(status: int, body: str = "", *, method="GET", path="/call_peer"):
|
||||
"""Register a stub for the call_peer endpoint."""
|
||||
_CaptureHandler.stub(method, path, status, {"Content-Type": "application/json"}, body)
|
||||
# ---------------------------------------------------------------------------
|
||||
# FakeResponse — minimal requests.Response stand-in
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
"""Minimal stand-in for ``requests.Response``."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 200,
|
||||
json_body: Any = None,
|
||||
text: str = "",
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self._json = json_body
|
||||
self.text = text
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self) -> Any:
|
||||
return self._json
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.status_code >= 400:
|
||||
import requests
|
||||
raise requests.HTTPError(f"HTTP {self.status_code}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_token_dir(tmp_path: Path) -> Path:
|
||||
return tmp_path / "molecule-token-cache"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_token_dir: Path) -> RemoteAgentClient:
|
||||
session = MagicMock()
|
||||
return RemoteAgentClient(
|
||||
workspace_id="ws-test-123",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "test-agent"},
|
||||
token_dir=tmp_token_dir,
|
||||
session=session,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Error surface tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Note: call_peer(message: str) — the public API accepts a plain string.
|
||||
# Internal A2A envelope is built by the client. Tests pass strings.
|
||||
|
||||
|
||||
class TestCallPeerErrors:
|
||||
"""Tests for call_peer error handling and error message clarity."""
|
||||
|
||||
def test_http_timeout_propagates_as_readable_error(self, client: RemoteAgentClient, mocker):
|
||||
def test_http_timeout_propagates_as_readable_error(self, client: RemoteAgentClient):
|
||||
"""A connect or read timeout should surface as a descriptive error, not opaque."""
|
||||
mock_post = mocker.patch("requests.Session.post")
|
||||
mock_post.side_effect = TimeoutError("Connect timeout")
|
||||
|
||||
# The client should raise a clearly typed error, not bare TimeoutError
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
|
||||
assert "timeout" in str(exc_info.value).lower() or "unreachable" in str(exc_info.value).lower()
|
||||
|
||||
def test_502_bad_gateway_includes_context(self, client: RemoteAgentClient, http_mock):
|
||||
"""502 from platform should include the upstream error in the response."""
|
||||
stub(502, '{"error": "upstream overwhelmed"}', path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url # inject mock proxy base
|
||||
client._session.post.side_effect = TimeoutError("Connect timeout")
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
# The error message should reference the HTTP status or upstream failure
|
||||
assert any(kw in str(exc_info.value).lower() for kw in ["502", "upstream", "gateway", "bad"])
|
||||
err_str = str(exc_info.value).lower()
|
||||
assert "timeout" in err_str or "unreachable" in err_str
|
||||
|
||||
def test_503_service_unavailable_is_retriable_or_raises(self, client: RemoteAgentClient, http_mock):
|
||||
def test_connection_refused_propagates_as_readable_error(self, client: RemoteAgentClient):
|
||||
"""A connection refused error should propagate with context."""
|
||||
client._session.post.side_effect = ConnectionError("Connection refused")
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
err_str = str(exc_info.value).lower()
|
||||
assert "refused" in err_str or "connection" in err_str
|
||||
|
||||
def test_502_bad_gateway_includes_context(self, client: RemoteAgentClient):
|
||||
"""502 from platform should include the HTTP status or upstream error."""
|
||||
client._session.post.return_value = FakeResponse(
|
||||
502, {"error": "upstream overwhelmed"}
|
||||
)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
err_str = str(exc_info.value).lower()
|
||||
assert any(kw in err_str for kw in ["502", "upstream", "gateway", "bad"])
|
||||
|
||||
def test_503_service_unavailable_is_retriable_or_raises(self, client: RemoteAgentClient):
|
||||
"""503 from platform should be distinguishable from 500."""
|
||||
stub(503, '{"error": "service unavailable"}', path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url
|
||||
client._session.post.return_value = FakeResponse(
|
||||
503, {"error": "service unavailable"}
|
||||
)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
assert "503" in str(exc_info.value) or "unavailable" in str(exc_info.value).lower()
|
||||
err_str = str(exc_info.value)
|
||||
assert "503" in err_str or "unavailable" in err_str.lower()
|
||||
|
||||
def test_malformed_json_in_response_raises_descriptively(self, client: RemoteAgentClient, http_mock):
|
||||
"""If the A2A response is valid HTTP but has malformed JSON, the error should be clear."""
|
||||
stub(200, "not json {{{", path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url
|
||||
def test_500_internal_error_raises(self, client: RemoteAgentClient):
|
||||
"""500 from platform should raise with status code."""
|
||||
client._session.post.return_value = FakeResponse(
|
||||
500, {"error": "internal error"}
|
||||
)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
assert "json" in str(exc_info.value).lower() or "parse" in str(exc_info.value).lower()
|
||||
err_str = str(exc_info.value)
|
||||
assert "500" in err_str or "internal" in err_str.lower()
|
||||
|
||||
def test_empty_response_body_raises_readably(self, client: RemoteAgentClient, http_mock):
|
||||
"""An empty A2A response body should not produce a cryptic KeyError."""
|
||||
stub(200, "", path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url
|
||||
def test_401_on_call_peer_surfaces_with_auth_context(self, client: RemoteAgentClient):
|
||||
"""401 on call_peer should surface with auth context."""
|
||||
client._session.post.return_value = FakeResponse(
|
||||
401, {"error": "invalid or expired token", "hint": "re-register"}
|
||||
)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
# Should not be a KeyError or IndexError with no message
|
||||
assert "empty" in str(exc_info.value).lower() or "response" in str(exc_info.value).lower()
|
||||
err_str = str(exc_info.value).lower()
|
||||
assert "401" in err_str or "auth" in err_str or "token" in err_str
|
||||
|
||||
def test_401_on_call_peer_surfaces_with_first_1kb_of_body(self, client: RemoteAgentClient, http_mock, caplog):
|
||||
"""401 on call_peer should log at ERROR level with first ~1KB of the response body."""
|
||||
stub(401, '{"error": "invalid or expired token", "hint": "re-register with the platform"}',
|
||||
path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
|
||||
# The exception message or a log entry should include the error detail
|
||||
error_str = str(exc_info.value).lower()
|
||||
assert "401" in error_str or "auth" in error_str or "token" in error_str
|
||||
|
||||
def test_403_on_call_peer_surfaces_with_diagnostic_info(self, client: RemoteAgentClient, http_mock):
|
||||
def test_403_on_call_peer_surfaces_with_diagnostic_info(self, client: RemoteAgentClient):
|
||||
"""403 on call_peer should distinguish auth failure from generic 4xx."""
|
||||
stub(403, '{"error": "insufficient scope for this peer"}', path="/proxy/peer-id/a2a")
|
||||
client._proxy_base = http_mock.url
|
||||
client._session.post.return_value = FakeResponse(
|
||||
403, {"error": "insufficient scope for this peer"}
|
||||
)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
assert "403" in str(exc_info.value) or "scope" in str(exc_info.value).lower()
|
||||
err_str = str(exc_info.value)
|
||||
assert "403" in err_str or "scope" in err_str.lower()
|
||||
|
||||
def test_call_peer_via_proxy_when_direct_fails(self, client: RemoteAgentClient, mocker):
|
||||
"""When prefer_direct=True but direct fails, call_peer falls back to proxy."""
|
||||
mocker.patch.object(client, "_call_direct", side_effect=ConnectionError("refused"))
|
||||
mock_proxy = mocker.patch.object(client, "_call_proxy", return_value={"parts": [{"kind": "text", "text": "proxied"}]})
|
||||
def test_200_with_json_body_returns_result(self, client: RemoteAgentClient):
|
||||
"""A successful A2A response should be returned as a dict."""
|
||||
client._session.post.return_value = FakeResponse(
|
||||
200, {"jsonrpc": "2.0", "result": {"ok": True}}
|
||||
)
|
||||
|
||||
result = client.call_peer("peer-id", "hello")
|
||||
|
||||
assert result["result"]["ok"] is True
|
||||
|
||||
def test_call_peer_via_proxy_when_direct_fails(self, client: RemoteAgentClient):
|
||||
"""When prefer_direct=True but direct fails, call_peer falls back to proxy.
|
||||
|
||||
- discover_peer finds a cached URL (cache hit) → direct POST attempted
|
||||
- Direct POST raises ConnectionError → exception caught, cache invalidated
|
||||
- Proxy POST succeeds → result returned
|
||||
"""
|
||||
# Seed the cache so discover_peer returns a URL (cache hit, no GET needed)
|
||||
client._url_cache["peer-id"] = ("http://dead.peer:8000", time.time() + 60)
|
||||
|
||||
post_calls = []
|
||||
|
||||
def track_post(*args, **kwargs):
|
||||
post_calls.append((args, kwargs))
|
||||
if len(post_calls) == 1:
|
||||
raise ConnectionError("refused")
|
||||
return FakeResponse(200, {"parts": [{"kind": "text", "text": "proxied"}]})
|
||||
|
||||
client._session.post.side_effect = track_post
|
||||
|
||||
result = client.call_peer("peer-id", "hello")
|
||||
|
||||
result = client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
assert result.get("parts", [{}])[0].get("text") == "proxied"
|
||||
mock_proxy.assert_called_once()
|
||||
assert len(post_calls) == 2, f"expected 2 POST calls, got {len(post_calls)}"
|
||||
# First URL should be the cached dead peer URL (direct)
|
||||
assert "dead.peer" in str(post_calls[0][0][0])
|
||||
# Second URL should be the platform proxy (fallback)
|
||||
assert "/workspaces/peer-id/a2a" in str(post_calls[1][0][0])
|
||||
|
||||
def test_call_peer_proxy_error_surfaces_readably(self, client: RemoteAgentClient, mocker):
|
||||
"""Proxy call returning 500 should not produce "Command failed with exit code 1"."""
|
||||
mocker.patch.object(client, "_call_direct", side_effect=ConnectionError("refused"))
|
||||
mocker.patch.object(client, "_call_proxy", side_effect=RuntimeError("proxy returned 500"))
|
||||
def test_call_peer_prefer_direct_false_skips_discover(self, client: RemoteAgentClient):
|
||||
"""With prefer_direct=False, call_peer should skip discover and go to proxy."""
|
||||
client.save_token("secret-token-abc")
|
||||
client._session.post.return_value = FakeResponse(200, {"ok": True})
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
client.call_peer("peer-id", {"role": "user", "parts": [{"kind": "text", "text": "hello"}]})
|
||||
result = client.call_peer("peer-id", "hello", prefer_direct=False)
|
||||
|
||||
# Must not be the opaque "Command failed with exit code 1" message
|
||||
assert "Command failed with exit code 1" not in str(exc_info.value)
|
||||
assert result == {"ok": True}
|
||||
call_url = client._session.post.call_args[0][0]
|
||||
assert "/workspaces/peer-id/a2a" in call_url
|
||||
|
||||
def test_call_peer_includes_auth_headers(self, client: RemoteAgentClient):
|
||||
"""call_peer proxy calls should include Authorization and X-Workspace-ID."""
|
||||
client.save_token("secret-token-abc")
|
||||
client._session.post.return_value = FakeResponse(200, {})
|
||||
|
||||
client.call_peer("peer-id", "hello")
|
||||
|
||||
call_kwargs = client._session.post.call_args[1]
|
||||
assert "Authorization" in call_kwargs["headers"]
|
||||
assert call_kwargs["headers"]["X-Workspace-ID"] == "ws-test-123"
|
||||
assert call_kwargs["headers"]["Content-Type"] == "application/json"
|
||||
|
||||
def test_call_peer_json_rpc_envelope_format(self, client: RemoteAgentClient):
|
||||
"""The POST body should match A2A JSON-RPC message/send format."""
|
||||
client.save_token("tok")
|
||||
client._session.post.return_value = FakeResponse(200, {"result": {"ok": True}})
|
||||
|
||||
client.call_peer("peer-id", "hello world")
|
||||
|
||||
body = client._session.post.call_args[1]["json"]
|
||||
assert body["jsonrpc"] == "2.0"
|
||||
assert body["method"] == "message/send"
|
||||
assert "messageId" in body["params"]["message"]
|
||||
assert body["params"]["message"]["role"] == "user"
|
||||
assert body["params"]["message"]["parts"][0]["kind"] == "text"
|
||||
assert body["params"]["message"]["parts"][0]["text"] == "hello world"
|
||||
|
||||
427
tests/test_retry_backoff.py
Normal file
427
tests/test_retry_backoff.py
Normal file
@ -0,0 +1,427 @@
|
||||
"""GAP-05: retry / back-off for RemoteAgentClient GET calls.
|
||||
|
||||
Per TEST_GAP_ANALYSIS.md backlog item #5: the MCP server's platformGet()
|
||||
has retry-on-429 with exponential back-off; the Python RemoteAgentClient had
|
||||
no equivalent. These tests cover the new _get_with_retry() helper and the
|
||||
four wired-in GET endpoints (poll_state, pull_secrets, get_peers, discover_peer).
|
||||
|
||||
Test conventions (mirrors test_remote_agent.py):
|
||||
- MagicMock session — no live platform required.
|
||||
- FakeResponse for HTTP responses.
|
||||
- monkeypatch time.sleep to avoid real delays.
|
||||
- Each test covers one specific behaviour surface.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from molecule_agent import RemoteAgentClient
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FakeResponse — minimal requests.Response stand-in
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
"""Minimal stand-in for ``requests.Response``."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status_code: int = 200,
|
||||
json_body: Any = None,
|
||||
text: str = "",
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> None:
|
||||
self.status_code = status_code
|
||||
self._json = json_body
|
||||
self.text = text
|
||||
self.headers = headers or {}
|
||||
|
||||
def json(self) -> Any:
|
||||
return self._json
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.status_code >= 400:
|
||||
import requests
|
||||
raise requests.HTTPError(f"HTTP {self.status_code}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_token_dir(tmp_path: Path) -> Path:
|
||||
return tmp_path / "molecule-token-cache"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(tmp_token_dir: Path) -> RemoteAgentClient:
|
||||
session = MagicMock()
|
||||
return RemoteAgentClient(
|
||||
workspace_id="ws-test-123",
|
||||
platform_url="http://platform.test",
|
||||
agent_card={"name": "test-agent"},
|
||||
token_dir=tmp_token_dir,
|
||||
session=session,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_with_retry — happy path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetWithRetryHappyPath:
|
||||
"""Successful 2xx on first attempt — no retry, no sleep."""
|
||||
|
||||
def test_200_returns_without_retrying(self, client: RemoteAgentClient):
|
||||
client._session.get.return_value = FakeResponse(200, {"data": "ok"})
|
||||
resp = client._get_with_retry("http://platform.test/foo")
|
||||
assert resp.status_code == 200
|
||||
assert client._session.get.call_count == 1
|
||||
|
||||
def test_headers_passed_through(self, client: RemoteAgentClient):
|
||||
client._session.get.return_value = FakeResponse(200, {})
|
||||
client._get_with_retry(
|
||||
"http://platform.test/foo",
|
||||
headers={"Authorization": "Bearer tok"},
|
||||
)
|
||||
kwargs = client._session.get.call_args[1]
|
||||
assert kwargs["headers"]["Authorization"] == "Bearer tok"
|
||||
assert kwargs["timeout"] == 10.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_with_retry — 429 retry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetWithRetry429:
|
||||
"""429 triggers retry; Retry-After header or exponential back-off used."""
|
||||
|
||||
def _resp_429(self, retry_after: str | None = None) -> FakeResponse:
|
||||
headers = {}
|
||||
if retry_after is not None:
|
||||
headers["Retry-After"] = retry_after
|
||||
return FakeResponse(429, {}, headers=headers)
|
||||
|
||||
def test_429_then_200_retries_and_returns_200(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""First attempt 429, second attempt 200 — sleep between attempts."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(),
|
||||
FakeResponse(200, {"data": "ok"}),
|
||||
]
|
||||
|
||||
resp = client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert client._session.get.call_count == 2
|
||||
assert len(sleeps) == 1 # one sleep between attempt 1 and 2
|
||||
|
||||
def test_429_retry_after_integer_seconds(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Retry-After with an integer seconds value is honoured exactly."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(retry_after="5"),
|
||||
FakeResponse(200, {}),
|
||||
]
|
||||
|
||||
client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert sleeps == [5.0] # Retry-After=5s → sleep 5 s
|
||||
|
||||
def test_429_retry_after_float_seconds_rounds_up(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Retry-After with a float is rounded up (ceil) to the nearest second."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(retry_after="2.7"), # ceil(2.7) = 3
|
||||
FakeResponse(200, {}),
|
||||
]
|
||||
|
||||
client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert sleeps == [3.0]
|
||||
|
||||
def test_429_retry_after_capped_at_30_seconds(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Retry-After > 30 s is capped to 30 s to avoid consuming a handler slot."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(retry_after="120"),
|
||||
FakeResponse(200, {}),
|
||||
]
|
||||
|
||||
client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert sleeps == [30.0] # capped at 30 s
|
||||
|
||||
def test_429_exponential_backoff_jitter_first_attempt(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Without Retry-After, first back-off is 1 s ± 25 %."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
# Mock random.uniform directly
|
||||
import random
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5) # jitter = 0
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(),
|
||||
FakeResponse(200, {}),
|
||||
]
|
||||
|
||||
client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
# base=1.0, jitter=0 → exactly 1.0
|
||||
assert sleeps == [1.0]
|
||||
|
||||
def test_429_exponential_backoff_second_attempt(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Second retry uses base=2 s (doubling), third uses base=4 s."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5) # zero jitter
|
||||
|
||||
client._session.get.side_effect = [
|
||||
self._resp_429(),
|
||||
self._resp_429(),
|
||||
FakeResponse(200, {}),
|
||||
]
|
||||
|
||||
client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert sleeps == [1.0, 2.0] # 1 s then 2 s
|
||||
|
||||
def test_429_exhausts_max_retries_returns_429(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""After max_retries attempts, the final 429 is returned (no sleep after last attempt)."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
# All attempts return 429
|
||||
client._session.get.return_value = self._resp_429()
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
resp = client._get_with_retry("http://platform.test/foo", max_retries=3)
|
||||
|
||||
assert resp.status_code == 429
|
||||
assert client._session.get.call_count == 4 # 1 first + 3 retries = 4 total
|
||||
# Sleeps between first→second, second→third, third→fourth (attempt 4 is 429,
|
||||
# attempt >= max_retries so no sleep after)
|
||||
assert sleeps == [1.0, 2.0, 4.0]
|
||||
|
||||
def test_non_429_error_does_not_retry(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""500 on first attempt — no retry, returns immediately."""
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.return_value = FakeResponse(500, {"error": "boom"})
|
||||
|
||||
resp = client._get_with_retry("http://platform.test/foo")
|
||||
|
||||
assert resp.status_code == 500
|
||||
assert client._session.get.call_count == 1
|
||||
assert sleeps == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wired-in retry: poll_state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPollStateRetry:
|
||||
"""poll_state uses _get_with_retry — retries 429, honours Retry-After."""
|
||||
|
||||
def test_poll_state_retries_on_429_then_returns_state(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
client._session.get.side_effect = [
|
||||
FakeResponse(429, {}, headers={"Retry-After": "2"}),
|
||||
FakeResponse(200, {"status": "online", "paused": False, "deleted": False}),
|
||||
]
|
||||
|
||||
state = client.poll_state()
|
||||
|
||||
assert state is not None
|
||||
assert state.status == "online"
|
||||
assert client._session.get.call_count == 2
|
||||
assert sleeps == [2.0]
|
||||
|
||||
def test_poll_state_429_exhausts_retries_raises(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
client.save_token("t")
|
||||
monkeypatch.setattr(time, "sleep", lambda s: None)
|
||||
client._session.get.return_value = FakeResponse(429, {}, headers={"Retry-After": "1"})
|
||||
|
||||
with pytest.raises(Exception):
|
||||
client.poll_state()
|
||||
|
||||
# All attempts exhausted
|
||||
assert client._session.get.call_count == 4
|
||||
|
||||
def test_poll_state_404_does_not_retry(self, client: RemoteAgentClient, monkeypatch):
|
||||
"""404 is not a 429 — retry never triggers."""
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
|
||||
client._session.get.return_value = FakeResponse(404, {"deleted": True})
|
||||
|
||||
state = client.poll_state()
|
||||
|
||||
# 404 → WorkspaceState(deleted=True); no retry
|
||||
assert state is not None
|
||||
assert state.deleted is True
|
||||
assert client._session.get.call_count == 1
|
||||
assert sleeps == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wired-in retry: pull_secrets
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPullSecretsRetry:
|
||||
"""pull_secrets uses _get_with_retry."""
|
||||
|
||||
def test_pull_secrets_retries_on_429(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
client._session.get.side_effect = [
|
||||
FakeResponse(429, {}, headers={"Retry-After": "3"}),
|
||||
FakeResponse(200, {"API_KEY": "secret"}),
|
||||
]
|
||||
|
||||
secrets = client.pull_secrets()
|
||||
|
||||
assert secrets == {"API_KEY": "secret"}
|
||||
assert sleeps == [3.0]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wired-in retry: get_peers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetPeersRetry:
|
||||
"""get_peers uses _get_with_retry."""
|
||||
|
||||
def test_get_peers_retries_on_429_exponential_backoff(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
client._session.get.side_effect = [
|
||||
FakeResponse(429, {}),
|
||||
FakeResponse(429, {}),
|
||||
FakeResponse(200, [{"id": "peer-1", "name": "P1", "url": "http://p1"}]),
|
||||
]
|
||||
|
||||
peers = client.get_peers()
|
||||
|
||||
assert len(peers) == 1
|
||||
assert peers[0].id == "peer-1"
|
||||
assert sleeps == [1.0, 2.0]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Wired-in retry: discover_peer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDiscoverPeerRetry:
|
||||
"""discover_peer uses _get_with_retry."""
|
||||
|
||||
def test_discover_peer_retries_on_429_then_returns_url(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
client._session.get.side_effect = [
|
||||
FakeResponse(429, {}, headers={"Retry-After": "1"}),
|
||||
FakeResponse(200, {"url": "http://discovered:9000"}),
|
||||
]
|
||||
|
||||
url = client.discover_peer("target-1")
|
||||
|
||||
assert url == "http://discovered:9000"
|
||||
assert sleeps == [1.0]
|
||||
|
||||
def test_discover_peer_404_after_429_returns_none(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""When 429 is retried and resolves to 404, discover_peer returns None (no error)."""
|
||||
client.save_token("t")
|
||||
sleeps: list[float] = []
|
||||
monkeypatch.setattr(time, "sleep", lambda s: sleeps.append(s))
|
||||
monkeypatch.setattr(random, "random", lambda: 0.5)
|
||||
|
||||
client._session.get.side_effect = [
|
||||
FakeResponse(429, {}),
|
||||
FakeResponse(404, {}),
|
||||
]
|
||||
|
||||
url = client.discover_peer("deleted-target")
|
||||
|
||||
assert url is None
|
||||
assert client._session.get.call_count == 2
|
||||
|
||||
def test_discover_peer_429_exhausts_retries_raises(
|
||||
self, client: RemoteAgentClient, monkeypatch
|
||||
):
|
||||
"""Exhausted 429 retries → raise_for_status() raises HTTPError."""
|
||||
client.save_token("t")
|
||||
monkeypatch.setattr(time, "sleep", lambda s: None)
|
||||
client._session.get.return_value = FakeResponse(429, {})
|
||||
|
||||
with pytest.raises(Exception):
|
||||
client.discover_peer("rate-limited")
|
||||
|
||||
assert client._session.get.call_count == 4
|
||||
Loading…
Reference in New Issue
Block a user