Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 42fc46fde1 | |||
| 003d48e2b7 | |||
| 367b7eb024 | |||
| 1da02f9d06 |
@@ -34,6 +34,8 @@ async def list_peers() -> list[dict]:
|
||||
|
||||
async def delegate_task(workspace_id: str, task: str) -> str:
|
||||
"""Send a task to a peer workspace via A2A and return the response text."""
|
||||
if not workspace_id:
|
||||
return "Error: workspace_id is required"
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
# Discover target URL
|
||||
try:
|
||||
|
||||
@@ -2103,3 +2103,71 @@ def test_peer_metadata_set_replaces_existing_entry_in_place(_reset_peer_metadata
|
||||
)
|
||||
cached = a2a_client._peer_metadata[peer]
|
||||
assert cached[1]["name"] == "v2", "re-write must update the value in place"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# _safe_activity_id — non-string input guard (line 192)
|
||||
# =============================================================================
|
||||
|
||||
class TestSafeActivityId:
|
||||
"""Coverage for a2a_mcp_server._safe_activity_id()."""
|
||||
|
||||
def test_non_string_returns_empty_string(self):
|
||||
"""Non-str input is the defensive guard branch (line 192)."""
|
||||
from a2a_mcp_server import _safe_activity_id
|
||||
|
||||
# Each non-string type exercises the isinstance guard
|
||||
for value in (None, 123, [], {"a": 1}, 0.0):
|
||||
result = _safe_activity_id(value)
|
||||
assert result == "", f"{type(value).__name__} must return empty string"
|
||||
|
||||
def test_string_invalid_format_returns_empty(self):
|
||||
"""Valid type but non-UUID format → empty string."""
|
||||
from a2a_mcp_server import _safe_activity_id
|
||||
|
||||
assert _safe_activity_id("not-a-uuid") == ""
|
||||
assert _safe_activity_id("") == ""
|
||||
|
||||
def test_string_valid_format_passthrough(self):
|
||||
"""Valid UUID-format string passes through."""
|
||||
from a2a_mcp_server import _safe_activity_id
|
||||
|
||||
valid = "00000000-0000-0000-0000-000000000001"
|
||||
assert _safe_activity_id(valid) == valid
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# _safe_ts — non-string input guard (line 198)
|
||||
# =============================================================================
|
||||
|
||||
class TestSafeTs:
|
||||
"""Coverage for a2a_mcp_server._safe_ts()."""
|
||||
|
||||
def test_non_string_returns_empty_string(self):
|
||||
"""Non-str input is the defensive guard branch (line 198)."""
|
||||
from a2a_mcp_server import _safe_ts
|
||||
|
||||
for value in (None, 123, [], {"a": 1}, 0.0):
|
||||
result = _safe_ts(value)
|
||||
assert result == "", f"{type(value).__name__} must return empty string"
|
||||
|
||||
def test_string_invalid_format_returns_empty(self):
|
||||
"""Valid type but non-ISO8601 format → empty string.
|
||||
|
||||
The regex accepts any 9999-99-99T99:99:99Z skeleton since it
|
||||
checks structure not calendar validity; use a clearly-invalid
|
||||
skeleton to exercise the else-branch.
|
||||
"""
|
||||
from a2a_mcp_server import _safe_ts
|
||||
|
||||
# Missing T separator — clearly not ISO8601
|
||||
assert _safe_ts("2026-05-13 10:30:00Z") == ""
|
||||
assert _safe_ts("not a date") == ""
|
||||
assert _safe_ts("") == ""
|
||||
|
||||
def test_string_valid_format_passthrough(self):
|
||||
"""Valid ISO8601 string passes through."""
|
||||
from a2a_mcp_server import _safe_ts
|
||||
|
||||
valid = "2026-05-13T10:30:00Z"
|
||||
assert _safe_ts(valid) == valid
|
||||
|
||||
@@ -0,0 +1,432 @@
|
||||
"""Test coverage for ``builtin_tools.a2a_tools`` and ``send_message_wrapper``.
|
||||
|
||||
Issue #367: 21 new test cases targeting previously-uncovered branches.
|
||||
|
||||
HTTP mocking: each test patches ``builtin_tools.a2a_tools.httpx.AsyncClient``
|
||||
with an ``AsyncMock`` so no real network I/O occurs. The patch target is
|
||||
the attribute as seen inside the ``a2a_tools`` module (where httpx is imported
|
||||
as ``import httpx``), so ``@pytest.fixture(autouse=True)`` from conftest.py is
|
||||
harmless — it replaces the module-level name *after* our patch exits.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# conftest.py fixture — swap the MagicMock for the real module for THIS file
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _real_a2a_tools_module():
|
||||
"""Replace conftest's MagicMock of builtin_tools.a2a_tools with the real module.
|
||||
|
||||
conftest.py sets sys.modules["builtin_tools.a2a_tools"] = <MagicMock> so that
|
||||
adapter tests don't accidentally hit the platform. For THIS test file we
|
||||
want the real module, so we restore it from disk and swap it back after.
|
||||
"""
|
||||
import builtin_tools.a2a_tools as real_module
|
||||
|
||||
# conftest.py may have clobbered builtin_tools.__path__; restore it so the
|
||||
# import above finds builtin_tools/a2a_tools.py on disk.
|
||||
if "builtin_tools" in sys.modules:
|
||||
real_builtin = sys.modules["builtin_tools"]
|
||||
if getattr(real_builtin, "__path__", None) == []:
|
||||
builtin_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
real_builtin.__path__ = [os.path.join(builtin_dir, "builtin_tools")]
|
||||
|
||||
saved = sys.modules.get("builtin_tools.a2a_tools")
|
||||
# Ensure we have the real module (reload if sys.modules already has it)
|
||||
if saved is None or saved is real_module:
|
||||
import importlib
|
||||
importlib.reload(real_module)
|
||||
sys.modules["builtin_tools.a2a_tools"] = real_module
|
||||
yield
|
||||
sys.modules["builtin_tools.a2a_tools"] = saved
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_env(monkeypatch):
|
||||
"""Per-test: set required env vars."""
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_mock_response(
|
||||
json_data, status_code: int = 200
|
||||
) -> MagicMock:
|
||||
"""Return a fully-configured AsyncMock that mirrors httpx.Response."""
|
||||
resp = MagicMock()
|
||||
resp.json = MagicMock(return_value=json_data)
|
||||
resp.status_code = status_code
|
||||
return resp
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# builtin_tools/a2a_tools — list_peers
|
||||
# =============================================================================
|
||||
|
||||
class TestListPeers:
|
||||
"""Coverage for builtin_tools/a2a_tools.list_peers()."""
|
||||
|
||||
async def test_returns_peers_on_200(self):
|
||||
"""Successful GET returns the peer list."""
|
||||
from builtin_tools.a2a_tools import list_peers
|
||||
|
||||
peers = [
|
||||
{"id": "ws-1", "name": "Alpha", "role": "sre", "status": "online"},
|
||||
{"id": "ws-2", "name": "Beta", "role": "dev", "status": "busy"},
|
||||
]
|
||||
mock_resp = _make_mock_response(peers, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await list_peers()
|
||||
assert result == peers
|
||||
|
||||
async def test_returns_empty_list_on_non_200(self):
|
||||
"""list_peers swallows all non-200 responses gracefully."""
|
||||
from builtin_tools.a2a_tools import list_peers
|
||||
|
||||
mock_resp = _make_mock_response({}, 500)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await list_peers()
|
||||
assert result == []
|
||||
|
||||
async def test_returns_empty_list_on_exception(self):
|
||||
"""Network errors must not propagate — list_peers returns []. """
|
||||
from builtin_tools.a2a_tools import list_peers
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(
|
||||
side_effect=RuntimeError("dns failure")
|
||||
)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
result = await list_peers()
|
||||
assert result == []
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# builtin_tools/a2a_tools — delegate_task
|
||||
# =============================================================================
|
||||
|
||||
_DISCOVER_ROUTE = "http://test.invalid/registry/discover/ws-target"
|
||||
|
||||
|
||||
class TestDelegateTask:
|
||||
"""Coverage for builtin_tools/a2a_tools.delegate_task(workspace_id, task)."""
|
||||
|
||||
async def test_empty_workspace_id_returns_error(self):
|
||||
"""Empty workspace_id is validated before any network call."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
out = await delegate_task("", "do it")
|
||||
assert "Error" in out
|
||||
assert "workspace_id" in out.lower()
|
||||
|
||||
async def test_discover_returns_non_200(self):
|
||||
"""Discovery 4xx/5xx → error message with status code."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({}, 404)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert "Error" in out
|
||||
assert "404" in out
|
||||
|
||||
async def test_discover_returns_200_with_empty_url(self):
|
||||
"""Discovery 200 but no url field → actionable error."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"name": "orphan"}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert "Error" in out
|
||||
assert "no URL" in out
|
||||
|
||||
async def test_a2a_post_returns_500(self):
|
||||
"""A2A send 5xx with empty body → str(data) returned (code doesn't check status_code)."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({}, 500)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
# Code checks json body, not status_code; empty body {} → str({})
|
||||
assert out == "{}"
|
||||
|
||||
async def test_result_parts_empty_dict(self):
|
||||
"""Regression #279: {"parts": []} → str(result), not "(no text)"."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"result": {"parts": []}}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
# Must return str(result), not "(no text)"
|
||||
assert "parts" in out
|
||||
assert "(no text)" not in out
|
||||
|
||||
async def test_result_is_plain_string(self):
|
||||
"""A bare string result returns as-is."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"result": "just a plain string"}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert out == "just a plain string"
|
||||
|
||||
async def test_result_is_number(self):
|
||||
"""Non-dict, non-string result → falls through to "(no text)"."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"result": 12345}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert out == "(no text)"
|
||||
|
||||
async def test_result_parts_non_dict_element(self):
|
||||
"""parts[0] is not a dict → falls through to "(no text)".
|
||||
|
||||
The code checks if parts[0] is a dict; since 123 is an int, it hits
|
||||
the else-branch and returns "(no text)".
|
||||
"""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"result": {"parts": [123, "also a string"]}}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert out == "(no text)"
|
||||
|
||||
async def test_error_dict_form(self):
|
||||
"""{"error": {"message": "..."}} → "Error: ..."."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response(
|
||||
{"error": {"message": "peer overloaded", "code": 429}}, 200
|
||||
)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert out == "Error: peer overloaded"
|
||||
|
||||
async def test_error_string_form(self):
|
||||
"""{"error": "string error"} → "Error: string error"."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"error": "workspace offline"}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert out == "Error: workspace offline"
|
||||
|
||||
async def test_error_null(self):
|
||||
"""{"error": null} → "Error: None" (edge case — str(null) in message)."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
a2a_resp = _make_mock_response({"error": None}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(return_value=a2a_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert "Error" in out
|
||||
|
||||
async def test_a2a_post_raises_exception(self):
|
||||
"""Network error during A2A POST → Error: sending A2A message: ..."""
|
||||
from builtin_tools.a2a_tools import delegate_task
|
||||
|
||||
discover_resp = _make_mock_response({"url": "http://peer.invalid/a2a"}, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=discover_resp)
|
||||
mock_client.__aenter__.return_value.post = AsyncMock(
|
||||
side_effect=ConnectionError("connection refused")
|
||||
)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await delegate_task("ws-target", "do it")
|
||||
assert "Error" in out
|
||||
assert "connection refused" in out
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# builtin_tools/a2a_tools — get_peers_summary
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestGetPeersSummary:
|
||||
"""Coverage for builtin_tools/a2a_tools.get_peers_summary()."""
|
||||
|
||||
async def test_empty_peers_returns_no_peers_available(self):
|
||||
from builtin_tools.a2a_tools import get_peers_summary
|
||||
|
||||
mock_resp = _make_mock_response([], 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await get_peers_summary()
|
||||
assert "No peers" in out
|
||||
|
||||
async def test_peer_missing_fields(self):
|
||||
"""Peers with missing name/id/role/status must not KeyError/TypeError."""
|
||||
from builtin_tools.a2a_tools import get_peers_summary
|
||||
|
||||
mock_resp = _make_mock_response([{"id": "ws-x"}], 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await get_peers_summary()
|
||||
assert "ws-x" in out
|
||||
assert isinstance(out, str)
|
||||
|
||||
async def test_healthy_peer_roundtrip(self):
|
||||
"""Sanity: normal peer dicts produce a formatted list."""
|
||||
from builtin_tools.a2a_tools import get_peers_summary
|
||||
|
||||
peers = [
|
||||
{"id": "ws-alpha", "name": "Alpha", "role": "sre", "status": "online"},
|
||||
]
|
||||
mock_resp = _make_mock_response(peers, 200)
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__.return_value.get = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("builtin_tools.a2a_tools.httpx.AsyncClient", return_value=mock_client):
|
||||
out = await get_peers_summary()
|
||||
assert "Alpha" in out
|
||||
assert "ws-alpha" in out
|
||||
assert "sre" in out
|
||||
assert "online" in out
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# send_message_wrapper — safe_send_message
|
||||
# =============================================================================
|
||||
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
|
||||
|
||||
class TestSafeSendMessage:
|
||||
"""Coverage for adapters.smolagents.send_message_wrapper.safe_send_message()."""
|
||||
|
||||
def test_non_string_input_converted(self):
|
||||
"""Non-str text is str()-converted before escaping."""
|
||||
delivered = []
|
||||
safe_send_message(42, send_fn=lambda s: delivered.append(s))
|
||||
assert delivered == ["[smolagents] 42"]
|
||||
assert isinstance(delivered[0], str)
|
||||
|
||||
def test_html_entities_escaped(self):
|
||||
"""< > ' are escaped so rendered UIs cannot be injected.
|
||||
|
||||
The payload <script>alert('xss')</script> has no literal '&', so &
|
||||
does not appear. The escape output is: <script>alert('xss')</script>
|
||||
"""
|
||||
delivered = []
|
||||
safe_send_message(
|
||||
"<script>alert('xss')</script>",
|
||||
send_fn=lambda s: delivered.append(s),
|
||||
)
|
||||
assert "<" in delivered[0]
|
||||
assert ">" in delivered[0]
|
||||
assert "'" in delivered[0]
|
||||
assert "<script>" in delivered[0]
|
||||
# The angle brackets and quotes must NOT appear unescaped
|
||||
assert "<script>" not in delivered[0]
|
||||
assert "alert('" not in delivered[0]
|
||||
|
||||
def test_truncation_at_max_len(self):
|
||||
"""Text > 2000 chars is truncated; caller is warned."""
|
||||
delivered = []
|
||||
with patch(
|
||||
"adapters.smolagents.send_message_wrapper.logger"
|
||||
) as mock_logger:
|
||||
long_text = "A" * 2500
|
||||
safe_send_message(long_text, send_fn=lambda s: delivered.append(s))
|
||||
assert len(delivered[0]) < len(long_text)
|
||||
mock_logger.warning.assert_called_once()
|
||||
assert "truncating" in mock_logger.warning.call_args[0][0]
|
||||
|
||||
def test_no_truncation_under_max_len(self):
|
||||
"""Text ≤ 2000 chars is passed through intact with no warning."""
|
||||
delivered = []
|
||||
with patch(
|
||||
"adapters.smolagents.send_message_wrapper.logger"
|
||||
) as mock_logger:
|
||||
text = "A" * 1500
|
||||
safe_send_message(text, send_fn=lambda s: delivered.append(s))
|
||||
expected = f"[smolagents] {text}"
|
||||
assert delivered[0] == expected
|
||||
mock_logger.warning.assert_not_called()
|
||||
|
||||
def test_debug_log_emitted(self):
|
||||
"""Every delivery logs at DEBUG with final payload length."""
|
||||
delivered = []
|
||||
with patch(
|
||||
"adapters.smolagents.send_message_wrapper.logger"
|
||||
) as mock_logger:
|
||||
safe_send_message("hello", send_fn=lambda s: delivered.append(s))
|
||||
mock_logger.debug.assert_called_once()
|
||||
assert "delivering" in mock_logger.debug.call_args[0][0]
|
||||
|
||||
def test_label_prefix_always_present(self):
|
||||
"""Every delivered payload starts with '[smolagents]'."""
|
||||
delivered = []
|
||||
safe_send_message("x", send_fn=lambda s: delivered.append(s))
|
||||
assert delivered[0].startswith("[smolagents]")
|
||||
@@ -0,0 +1,107 @@
|
||||
"""Test coverage for builtin_tools.security._redact_secrets().
|
||||
|
||||
Issue #834 (C2): commit_memory must not persist API keys verbatim.
|
||||
|
||||
Pre-commit hook blocks bare secret-like strings (ghp_, sk-ant-, etc.) to prevent
|
||||
accidental commits of real credentials. These tests focus on the functional
|
||||
behaviour of the redaction logic: idempotency, contextual keyword=value patterns,
|
||||
boundary cases, and mixed content — without triggering the hook's length thresholds.
|
||||
The pre-commit hook itself is the primary guard for bare-pattern detection.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from builtin_tools.security import REDACTED, _redact_secrets
|
||||
|
||||
|
||||
class TestRedactContextual:
|
||||
"""Keyword=value patterns with high-entropy values (under pre-commit threshold)."""
|
||||
|
||||
def test_api_key_contextual(self):
|
||||
"""api_key=X where X ≥ 40 base64 chars → value replaced, keyword preserved."""
|
||||
value = "A" * 40
|
||||
assert _redact_secrets(f"api_key={value}") == f"api_key={REDACTED}"
|
||||
|
||||
def test_keyword_contextual(self):
|
||||
"""Generic 'key=' also matches."""
|
||||
value = "B" * 45
|
||||
assert _redact_secrets(f"key={value}") == f"key={REDACTED}"
|
||||
|
||||
def test_secret_contextual(self):
|
||||
value = "C" * 50
|
||||
assert _redact_secrets(f"secret= {value}") == f"secret= {REDACTED}"
|
||||
|
||||
def test_token_contextual(self):
|
||||
value = "D" * 40
|
||||
assert _redact_secrets(f"token={value}") == f"token={REDACTED}"
|
||||
|
||||
def test_password_contextual(self):
|
||||
value = "E" * 50
|
||||
assert _redact_secrets(f"password={value}") == f"password={REDACTED}"
|
||||
|
||||
def test_keyword_spacing_tolerated(self):
|
||||
"""Spaces around = are tolerated by the pattern."""
|
||||
value = "F" * 40
|
||||
assert _redact_secrets(f"key = {value}") == f"key = {REDACTED}"
|
||||
|
||||
def test_contextual_too_short_not_redacted(self):
|
||||
"""Value shorter than 40 chars is not redacted."""
|
||||
short = "A" * 39
|
||||
assert _redact_secrets(f"api_key={short}") == f"api_key={short}"
|
||||
|
||||
def test_case_insensitive_keyword(self):
|
||||
"""Keyword matching is case-insensitive."""
|
||||
value = "G" * 40
|
||||
assert _redact_secrets(f"API_KEY={value}") == f"API_KEY={REDACTED}"
|
||||
assert _redact_secrets(f"Token={value}") == f"Token={REDACTED}"
|
||||
assert _redact_secrets(f"SECRET={value}") == f"SECRET={REDACTED}"
|
||||
|
||||
def test_boundary_preserved(self):
|
||||
"""Contextual pattern preserves the keyword; only value is replaced."""
|
||||
value = "H" * 40
|
||||
result = _redact_secrets(f"api_key={value}")
|
||||
assert result.startswith("api_key=")
|
||||
assert result.endswith(REDACTED)
|
||||
assert result == f"api_key={REDACTED}"
|
||||
|
||||
def test_base64_chars_in_value(self):
|
||||
"""Base64 alphabet chars (/ +) in value are covered by the charset."""
|
||||
# 40-char string with base64 chars
|
||||
value = "A" * 20 + "/+" + "A" * 18
|
||||
result = _redact_secrets(f"api_key={value}")
|
||||
assert result == f"api_key={REDACTED}"
|
||||
|
||||
|
||||
class TestRedactEdgeCases:
|
||||
"""Non-secret strings, idempotency, and boundary conditions."""
|
||||
|
||||
def test_idempotent(self):
|
||||
"""Calling redaction twice produces the same result."""
|
||||
text = f"token={'A' * 40}"
|
||||
first = _redact_secrets(text)
|
||||
second = _redact_secrets(first)
|
||||
assert second == first
|
||||
assert REDACTED in first
|
||||
|
||||
def test_already_redacted_string(self):
|
||||
"""The [REDACTED] sentinel itself is not matched by any pattern."""
|
||||
assert _redact_secrets(f"see {REDACTED} here") == f"see {REDACTED} here"
|
||||
|
||||
def test_no_match_passthrough(self):
|
||||
"""Normal prose passes through unchanged."""
|
||||
assert _redact_secrets("The answer is 42.") == "The answer is 42."
|
||||
assert _redact_secrets("Hello, world!") == "Hello, world!"
|
||||
assert _redact_secrets("api_key short") == "api_key short"
|
||||
assert _redact_secrets("") == ""
|
||||
|
||||
def test_empty_string(self):
|
||||
assert _redact_secrets("") == ""
|
||||
|
||||
def test_short_value_not_secret(self):
|
||||
"""A short string after a keyword= prefix is not a secret."""
|
||||
assert _redact_secrets("token=short") == "token=short"
|
||||
|
||||
def test_mixed_content(self):
|
||||
"""Real text with a secret-like prefix → only the secret is redacted."""
|
||||
value = "A" * 40
|
||||
result = _redact_secrets(f"found secret: api_key={value} in config")
|
||||
assert result == f"found secret: api_key={REDACTED} in config"
|
||||
@@ -998,3 +998,87 @@ def test_heartbeat_500_does_not_increment_auth_counter(monkeypatch, caplog):
|
||||
f"5xx must NOT be classified as auth failure — would mislead operator. "
|
||||
f"Got 'revoked' ERRORs: {[r.message[:80] for r in revoked_errors]}"
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Subcommand dispatch — doctor / --help / MOLECULE_WORKSPACES errors
|
||||
# =============================================================================
|
||||
|
||||
class TestSubcommandDispatch:
|
||||
"""Coverage for mcp_cli.py argv dispatch (lines 110-122, 138-140)."""
|
||||
|
||||
def test_doctor_subcommand_calls_mcp_doctor_run(self, monkeypatch, capsys):
|
||||
"""molecule-mcp doctor → imports mcp_doctor and exits with its code."""
|
||||
import mcp_doctor
|
||||
|
||||
monkeypatch.setattr(mcp_doctor, "run", lambda: 0)
|
||||
monkeypatch.setattr(sys, "argv", ["mcp", "doctor"])
|
||||
# Also stub PLATFORM_URL so we don't hit the env-check first
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok1")
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
mcp_cli.main()
|
||||
assert exc_info.value.code == 0
|
||||
|
||||
def test_help_flag_exits_zero_and_prints_usage(self, monkeypatch, capsys):
|
||||
"""molecule-mcp --help / -h / help → prints usage and exits 0."""
|
||||
for arg in ("--help", "-h", "help"):
|
||||
monkeypatch.setattr(sys, "argv", ["mcp", arg])
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
mcp_cli.main()
|
||||
captured = capsys.readouterr()
|
||||
assert exc_info.value.code == 0
|
||||
assert "molecule-mcp" in captured.out
|
||||
assert "doctor" in captured.out
|
||||
|
||||
def test_molecule_workspaces_error_prints_to_stderr(self, monkeypatch, capsys):
|
||||
"""MOLECULE_WORKSPACES with invalid entries prints to stderr."""
|
||||
# Must have PLATFORM_URL set or it exits before reaching this branch
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
# Invalid MOLECULE_WORKSPACES format so _resolve_workspaces returns errors
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACES", "invalid-entry")
|
||||
# Reset argv to a clean state
|
||||
monkeypatch.setattr(sys, "argv", ["mcp"])
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
mcp_cli.main()
|
||||
captured = capsys.readouterr()
|
||||
assert exc_info.value.code == 2
|
||||
assert "invalid MOLECULE_WORKSPACES" in captured.err
|
||||
|
||||
|
||||
class TestRegisterWorkspaceTokenImportError:
|
||||
"""Coverage for mcp_cli.py lines 181-185 — ImportError fallback."""
|
||||
|
||||
def test_import_error_is_swallowed_and_continues(
|
||||
self, monkeypatch, capsys, tmp_path
|
||||
):
|
||||
"""When platform_auth.register_workspace_token is absent, CLI continues."""
|
||||
# Set up a valid single-workspace environment so main() does NOT
|
||||
# exit early — it reaches the register_workspace_token call
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "tok1")
|
||||
# Ensure heartbeat is disabled
|
||||
monkeypatch.setenv("MOLECULE_MCP_DISABLE_HEARTBEAT", "1")
|
||||
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||
|
||||
# Remove register_workspace_token from platform_auth so the
|
||||
# ImportError branch fires (lines 181-185)
|
||||
import platform_auth
|
||||
saved = getattr(platform_auth, "register_workspace_token", None)
|
||||
if saved is not None:
|
||||
delattr(platform_auth, "register_workspace_token")
|
||||
|
||||
try:
|
||||
# If ImportError is not handled, main() raises ImportError here.
|
||||
# The test verifies it is handled (no exception propagates).
|
||||
with pytest.raises(SystemExit):
|
||||
mcp_cli.main()
|
||||
finally:
|
||||
# Restore so other tests are not affected
|
||||
if saved is not None:
|
||||
platform_auth.register_workspace_token = saved
|
||||
|
||||
@@ -0,0 +1,300 @@
|
||||
"""Test coverage for shared_runtime helpers (issue #366).
|
||||
|
||||
Six helper functions previously had zero test coverage:
|
||||
_extract_part_text, extract_message_text, format_conversation_history,
|
||||
build_task_text, append_peer_guidance, brief_task
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
from shared_runtime import (
|
||||
_extract_part_text,
|
||||
append_peer_guidance,
|
||||
brief_task,
|
||||
build_task_text,
|
||||
extract_message_text,
|
||||
format_conversation_history,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# _extract_part_text
|
||||
# =============================================================================
|
||||
|
||||
class TestExtractPartText:
|
||||
"""Coverage for shared_runtime._extract_part_text()."""
|
||||
|
||||
def test_dict_with_text_field(self):
|
||||
assert _extract_part_text({"text": "hello"}) == "hello"
|
||||
|
||||
def test_dict_without_text_field(self):
|
||||
assert _extract_part_text({"type": "image"}) == ""
|
||||
|
||||
def test_dict_with_empty_text_field(self):
|
||||
assert _extract_part_text({"text": ""}) == ""
|
||||
|
||||
def test_dict_with_root_nesting(self):
|
||||
"""Text buried in part['root']['text'] is extracted."""
|
||||
assert _extract_part_text({"root": {"text": "nested"}}) == "nested"
|
||||
|
||||
def test_dict_with_root_non_dict(self):
|
||||
"""part['root'] that is not a dict is safely skipped."""
|
||||
assert _extract_part_text({"root": "string", "text": "top"}) == "top"
|
||||
|
||||
def test_object_with_text_attribute(self):
|
||||
class FakePart:
|
||||
text = "attr-text"
|
||||
|
||||
assert _extract_part_text(FakePart()) == "attr-text"
|
||||
|
||||
def test_object_with_root_object_with_text(self):
|
||||
"""Object with root.attr.text is extracted (A2A v1 object style)."""
|
||||
|
||||
class FakeRoot:
|
||||
text = "root-attr-text"
|
||||
|
||||
class FakePart:
|
||||
root = FakeRoot()
|
||||
|
||||
assert _extract_part_text(FakePart()) == "root-attr-text"
|
||||
|
||||
def test_object_with_empty_text_attribute(self):
|
||||
class FakePart:
|
||||
text = ""
|
||||
|
||||
assert _extract_part_text(FakePart()) == ""
|
||||
|
||||
def test_none_input(self):
|
||||
assert _extract_part_text(None) == ""
|
||||
|
||||
def test_unexpected_type(self):
|
||||
"""Plain int/float/bool falls through to empty string."""
|
||||
assert _extract_part_text(42) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# extract_message_text
|
||||
# =============================================================================
|
||||
|
||||
class TestExtractMessageText:
|
||||
"""Coverage for shared_runtime.extract_message_text()."""
|
||||
|
||||
def test_list_of_dict_parts(self):
|
||||
parts = [{"text": "hello"}, {"text": "world"}]
|
||||
assert extract_message_text(parts) == "hello world"
|
||||
|
||||
def test_single_part(self):
|
||||
assert extract_message_text([{"text": "single"}]) == "single"
|
||||
|
||||
def test_context_object_with_message_parts(self):
|
||||
"""RequestContext-like: .message.parts is the parts list."""
|
||||
|
||||
class FakeContext:
|
||||
class _Msg:
|
||||
parts = [{"text": "from context"}]
|
||||
|
||||
message = _Msg()
|
||||
|
||||
assert extract_message_text(FakeContext()) == "from context"
|
||||
|
||||
def test_context_object_without_message(self):
|
||||
"""No .message attr → falls back to treating input as a parts list."""
|
||||
|
||||
class FakeContext:
|
||||
pass # no .message
|
||||
|
||||
# Pass a list directly as the context-like object
|
||||
assert extract_message_text([{"text": "fallback"}]) == "fallback"
|
||||
|
||||
def test_whitespace_normalized(self):
|
||||
"""Leading/trailing whitespace is stripped; internal newlines are preserved."""
|
||||
parts = [{"text": " hello "}, {"text": "\nworld\n"}]
|
||||
result = extract_message_text(parts)
|
||||
# Leading/trailing stripped, but internal \n stays (join uses single space)
|
||||
assert result == "hello \nworld"
|
||||
assert not result.startswith(" ")
|
||||
assert not result.endswith(" ")
|
||||
|
||||
def test_empty_parts_list(self):
|
||||
assert extract_message_text([]) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# format_conversation_history
|
||||
# =============================================================================
|
||||
|
||||
class TestFormatConversationHistory:
|
||||
"""Coverage for shared_runtime.format_conversation_history()."""
|
||||
|
||||
def test_single_user_message(self):
|
||||
hist = [("human", "hello")]
|
||||
out = format_conversation_history(hist)
|
||||
assert out == "User: hello"
|
||||
|
||||
def test_single_agent_message(self):
|
||||
hist = [("ai", "response")]
|
||||
out = format_conversation_history(hist)
|
||||
assert out == "Agent: response"
|
||||
|
||||
def test_interleaved_history(self):
|
||||
hist = [
|
||||
("human", "hello"),
|
||||
("ai", "hi there"),
|
||||
("human", "what is 2+2?"),
|
||||
("ai", "four"),
|
||||
]
|
||||
out = format_conversation_history(hist)
|
||||
lines = out.split("\n")
|
||||
assert lines[0] == "User: hello"
|
||||
assert lines[1] == "Agent: hi there"
|
||||
assert lines[2] == "User: what is 2+2?"
|
||||
assert lines[3] == "Agent: four"
|
||||
|
||||
def test_empty_history(self):
|
||||
assert format_conversation_history([]) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# build_task_text
|
||||
# =============================================================================
|
||||
|
||||
class TestBuildTaskText:
|
||||
"""Coverage for shared_runtime.build_task_text()."""
|
||||
|
||||
def test_no_history_returns_user_message_unchanged(self):
|
||||
assert build_task_text("do the thing", []) == "do the thing"
|
||||
|
||||
def test_history_prepends_transcript(self):
|
||||
hist = [("human", "hello"), ("ai", "hi")]
|
||||
result = build_task_text("follow-up", hist)
|
||||
assert "Conversation so far:" in result
|
||||
assert "User: hello" in result
|
||||
assert "Agent: hi" in result
|
||||
assert "follow-up" in result
|
||||
|
||||
def test_user_message_after_conversation_header(self):
|
||||
hist = [("human", "hello")]
|
||||
result = build_task_text("do it", hist)
|
||||
assert result.startswith("Conversation so far:")
|
||||
assert result.endswith("Current request: do it")
|
||||
|
||||
def test_empty_user_message_with_history(self):
|
||||
"""Empty user_message is still rendered with history."""
|
||||
hist = [("human", "hello")]
|
||||
result = build_task_text("", hist)
|
||||
assert "Conversation so far:" in result
|
||||
assert "Current request:" in result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# append_peer_guidance
|
||||
# =============================================================================
|
||||
|
||||
class TestAppendPeerGuidance:
|
||||
"""Coverage for shared_runtime.append_peer_guidance()."""
|
||||
|
||||
def test_base_text_appended(self):
|
||||
result = append_peer_guidance(
|
||||
"base text",
|
||||
peers_info="alpha: ws-1",
|
||||
default_text="default",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
assert result.startswith("base text")
|
||||
assert "## Peers" in result
|
||||
assert "alpha: ws-1" in result
|
||||
assert "Use delegate_task" in result
|
||||
|
||||
def test_null_base_text_uses_default(self):
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
peers_info="peer info",
|
||||
default_text="DEFAULT_TEXT",
|
||||
tool_name="tool",
|
||||
)
|
||||
assert result.startswith("DEFAULT_TEXT")
|
||||
|
||||
def test_whitespace_base_text_strips_to_empty_peers_still_added(self):
|
||||
"""Whitespace-only base_text is stripped but default_text is NOT used
|
||||
(only None triggers the fallback). The peers section is still appended."""
|
||||
result = append_peer_guidance(
|
||||
" ",
|
||||
peers_info="peer",
|
||||
default_text="DEF",
|
||||
tool_name="t",
|
||||
)
|
||||
# " ".strip() == ""; default_text is NOT substituted for whitespace
|
||||
assert "## Peers" in result
|
||||
assert "peer" in result
|
||||
assert "DEF" not in result # default_text only on None, not whitespace
|
||||
|
||||
def test_none_base_text_uses_default(self):
|
||||
"""None base_text triggers fallback to default_text."""
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
peers_info="peer",
|
||||
default_text="DEFAULT",
|
||||
tool_name="tool",
|
||||
)
|
||||
assert result.startswith("DEFAULT")
|
||||
assert "## Peers" in result
|
||||
|
||||
def test_empty_peers_info_skips_section(self):
|
||||
result = append_peer_guidance(
|
||||
"base",
|
||||
peers_info="",
|
||||
default_text="def",
|
||||
tool_name="tool",
|
||||
)
|
||||
# No "## Peers" section when peers_info is empty
|
||||
assert result == "base"
|
||||
|
||||
def test_whitespace_in_base_and_peers_normalized(self):
|
||||
result = append_peer_guidance(
|
||||
" base \n",
|
||||
peers_info=" peer-1 \n",
|
||||
default_text="def",
|
||||
tool_name="tool",
|
||||
)
|
||||
# Base should be stripped of leading/trailing whitespace
|
||||
assert result.startswith("base")
|
||||
# Peer info should be appended
|
||||
assert "peer-1" in result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# brief_task
|
||||
# =============================================================================
|
||||
|
||||
class TestBriefTask:
|
||||
"""Coverage for shared_runtime.brief_task()."""
|
||||
|
||||
def test_short_text_returned_unchanged(self):
|
||||
assert brief_task("hello", limit=60) == "hello"
|
||||
|
||||
def test_exact_limit_no_ellipsis(self):
|
||||
text = "A" * 60
|
||||
assert brief_task(text, limit=60) == text
|
||||
assert "..." not in text
|
||||
|
||||
def test_truncated_with_ellipsis(self):
|
||||
text = "A" * 80
|
||||
result = brief_task(text, limit=60)
|
||||
assert len(result) == 63 # 60 chars + "..."
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_limit_10_shortens(self):
|
||||
result = brief_task("hello world", limit=10)
|
||||
assert len(result) == 13 # 10 chars + "..."
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_limit_0_returns_ellipsis(self):
|
||||
"""limit=0 → 0-char slice + "..." since len("hello") > 0."""
|
||||
result = brief_task("hello", limit=0)
|
||||
assert result == "..."
|
||||
|
||||
def test_limit_1_single_char_plus_ellipsis(self):
|
||||
result = brief_task("hello", limit=1)
|
||||
assert len(result) == 4 # 1 char + "..."
|
||||
assert result.startswith("h")
|
||||
assert result.endswith("...")
|
||||
Reference in New Issue
Block a user