forked from molecule-ai/molecule-core
Renames: - platform/ → workspace-server/ (Go module path stays as "platform" for external dep compat — will update after plugin module republish) - workspace-template/ → workspace/ Removed (moved to separate repos or deleted): - PLAN.md — internal roadmap (move to private project board) - HANDOFF.md, AGENTS.md — one-time internal session docs - .claude/ — gitignored entirely (local agent config) - infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy - org-templates/molecule-dev/ → standalone template repo - .mcp-eval/ → molecule-mcp-server repo - test-results/ — ephemeral, gitignored Security scrubbing: - Cloudflare account/zone/KV IDs → placeholders - Real EC2 IPs → <EC2_IP> in all docs - CF token prefix, Neon project ID, Fly app names → redacted - Langfuse dev credentials → parameterized - Personal runner username/machine name → generic Community files: - CONTRIBUTING.md — build, test, branch conventions - CODE_OF_CONDUCT.md — Contributor Covenant 2.1 All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml, README, CLAUDE.md updated for new directory names. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
899 lines
35 KiB
Python
899 lines
35 KiB
Python
"""Tests for tools/governance.py — GovernanceAdapter and module-level functions.
|
|
|
|
Loads the real module via importlib to bypass the conftest mock for
|
|
tools.governance, exercising actual implementation logic including
|
|
graceful degradation when agent-os-kernel is not installed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import importlib.util
|
|
import os
|
|
import sys
|
|
from unittest.mock import MagicMock, AsyncMock
|
|
|
|
import os
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_config(
|
|
policy_mode="audit",
|
|
enabled=True,
|
|
toolkit="microsoft",
|
|
policy_endpoint="",
|
|
policy_file="",
|
|
blocked_patterns=None,
|
|
max_tool_calls_per_task=50,
|
|
):
|
|
cfg = MagicMock()
|
|
cfg.enabled = enabled
|
|
cfg.toolkit = toolkit
|
|
cfg.policy_mode = policy_mode
|
|
cfg.policy_endpoint = policy_endpoint
|
|
cfg.policy_file = policy_file
|
|
cfg.blocked_patterns = blocked_patterns or []
|
|
cfg.max_tool_calls_per_task = max_tool_calls_per_task
|
|
return cfg
|
|
|
|
|
|
def _load_governance_module(monkeypatch, mock_audit, mock_telemetry, with_agent_os=False):
|
|
"""Load tools/governance.py fresh, injecting mock dependencies."""
|
|
# Provide mock tools.audit
|
|
tools_mod = MagicMock()
|
|
tools_mod.audit = mock_audit
|
|
monkeypatch.setitem(sys.modules, "tools", tools_mod)
|
|
monkeypatch.setitem(sys.modules, "builtin_tools.audit", mock_audit)
|
|
monkeypatch.setitem(sys.modules, "builtin_tools.telemetry", mock_telemetry)
|
|
|
|
if not with_agent_os:
|
|
# Ensure agent_os is NOT installed (graceful degradation)
|
|
monkeypatch.setitem(sys.modules, "agent_os", None)
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", None)
|
|
|
|
monkeypatch.delitem(sys.modules, "builtin_tools.governance", raising=False)
|
|
spec = importlib.util.spec_from_file_location(
|
|
"builtin_tools.governance",
|
|
os.path.join(os.path.dirname(__file__), "..", "builtin_tools", "governance.py"),
|
|
)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
monkeypatch.setitem(sys.modules, "builtin_tools.governance", mod)
|
|
spec.loader.exec_module(mod)
|
|
# Reset global singleton
|
|
mod._adapter = None
|
|
return mod
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Base fixture (no agent_os toolkit)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def real_governance(monkeypatch):
|
|
"""Load real governance module with no agent_os toolkit available."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mod = _load_governance_module(monkeypatch, mock_audit, mock_telemetry, with_agent_os=False)
|
|
return mod, mock_audit, mock_telemetry
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Toolkit fixture helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_toolkit_mocks():
|
|
"""Return (mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies)."""
|
|
mock_decision = MagicMock()
|
|
mock_decision.allowed = True
|
|
mock_decision.reason = "policy_ok"
|
|
mock_decision.evaluator_name = "test-evaluator"
|
|
|
|
mock_evaluator_instance = MagicMock()
|
|
mock_evaluator_instance.evaluate = MagicMock(return_value=mock_decision)
|
|
|
|
MockPolicyEvaluator = MagicMock(return_value=mock_evaluator_instance)
|
|
|
|
mock_agent_os_policies = MagicMock()
|
|
mock_agent_os_policies.PolicyEvaluator = MockPolicyEvaluator
|
|
|
|
return mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 1: GovernanceAdapter constructor
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGovernanceAdapterInit:
|
|
|
|
def test_governance_adapter_init(self, real_governance):
|
|
"""GovernanceAdapter(config) creates adapter with _toolkit_available=False."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
cfg = _make_config()
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
assert adapter._config is cfg
|
|
assert adapter._evaluator is None
|
|
assert adapter._toolkit_available is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 2: _init_evaluator — no toolkit
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitEvaluatorNoToolkit:
|
|
|
|
def test_init_evaluator_no_toolkit(self, real_governance):
|
|
"""_init_evaluator() with agent_os not installed logs a warning; _toolkit_available stays False."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
cfg = _make_config()
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
# Call _init_evaluator — agent_os is None in sys.modules → ImportError
|
|
# Must not raise any exception
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is False
|
|
assert adapter._evaluator is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 3: _init_evaluator — with toolkit
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitEvaluatorWithToolkit:
|
|
|
|
def test_init_evaluator_with_toolkit(self, monkeypatch):
|
|
"""_init_evaluator() with agent_os available sets _toolkit_available=True."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
assert adapter._evaluator is mock_evaluator_instance
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 4: initialize() — no toolkit → RBAC-only warning
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitializeRbacOnly:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_sets_toolkit_available_false(self, real_governance):
|
|
"""await adapter.initialize() with no toolkit logs 'RBAC-only mode' warning."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
cfg = _make_config()
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
import logging
|
|
with patch_logger_warning(mod) as warn_calls:
|
|
await adapter.initialize()
|
|
|
|
assert adapter._toolkit_available is False
|
|
# At least one warning about RBAC-only mode
|
|
messages = [str(c) for c in warn_calls]
|
|
assert any("RBAC" in m or "rbac" in m.lower() or "agent-os-kernel" in m for m in messages)
|
|
|
|
|
|
def patch_logger_warning(mod):
|
|
"""Context manager that collects logger.warning calls for the module's logger."""
|
|
from unittest.mock import patch as _patch
|
|
recorded = []
|
|
original = mod.logger.warning
|
|
|
|
class Collector:
|
|
def __enter__(self):
|
|
mod.logger.warning = lambda msg, *a, **kw: recorded.append(msg % a if a else msg)
|
|
return recorded
|
|
|
|
def __exit__(self, *exc):
|
|
mod.logger.warning = original
|
|
|
|
return Collector()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests 5-11: check_permission scenarios
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCheckPermission:
|
|
|
|
def test_check_permission_rbac_deny(self, real_governance):
|
|
"""audit.check_permission returns False → (False, 'RBAC denied ...')."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_audit.check_permission.return_value = False
|
|
|
|
cfg = _make_config()
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
allowed, reason = adapter.check_permission("memory.write", ["read-only"])
|
|
assert allowed is False
|
|
assert "RBAC denied" in reason
|
|
assert "memory.write" in reason
|
|
|
|
def test_check_permission_rbac_allow_no_toolkit(self, real_governance):
|
|
"""RBAC allows, toolkit unavailable → (True, 'rbac_allowed')."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_audit.check_permission.return_value = True
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._toolkit_available = False
|
|
|
|
allowed, reason = adapter.check_permission("memory.read", ["operator"])
|
|
assert allowed is True
|
|
assert reason == "rbac_allowed"
|
|
|
|
def test_check_permission_audit_mode(self, real_governance):
|
|
"""RBAC allows, toolkit available but policy_mode='audit' → (True, 'rbac_allowed')."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_audit.check_permission.return_value = True
|
|
|
|
cfg = _make_config(policy_mode="audit")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
# Even if we pretend toolkit is available, audit mode bypasses it
|
|
adapter._toolkit_available = True
|
|
mock_evaluator = MagicMock()
|
|
adapter._evaluator = mock_evaluator
|
|
|
|
allowed, reason = adapter.check_permission("memory.read", ["operator"])
|
|
assert allowed is True
|
|
assert reason == "rbac_allowed"
|
|
# Evaluator should NOT be called in audit mode
|
|
mock_evaluator.evaluate.assert_not_called()
|
|
|
|
def test_check_permission_strict_mode_toolkit_deny(self, monkeypatch):
|
|
"""Toolkit denies in strict mode → (False, reason)."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
mock_decision.allowed = False
|
|
mock_decision.reason = "policy_denied"
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
allowed, reason = adapter.check_permission("memory.write", ["operator"])
|
|
assert allowed is False
|
|
assert reason == "policy_denied"
|
|
|
|
def test_check_permission_strict_mode_toolkit_allow(self, monkeypatch):
|
|
"""Toolkit allows in strict mode → (True, reason)."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
mock_decision.allowed = True
|
|
mock_decision.reason = "policy_ok"
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
allowed, reason = adapter.check_permission("memory.read", ["operator"])
|
|
assert allowed is True
|
|
assert reason == "policy_ok"
|
|
|
|
def test_check_permission_permissive_mode_toolkit_deny(self, monkeypatch):
|
|
"""Toolkit denies but permissive mode → (True, ...) logs warning."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
mock_decision.allowed = False
|
|
mock_decision.reason = "advisory_deny"
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="permissive")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
warnings_logged = []
|
|
original_warn = mod.logger.warning
|
|
mod.logger.warning = lambda msg, *a, **kw: warnings_logged.append(msg % a if a else msg)
|
|
try:
|
|
allowed, reason = adapter.check_permission("memory.write", ["operator"])
|
|
finally:
|
|
mod.logger.warning = original_warn
|
|
|
|
# In permissive mode, toolkit denial is advisory — action is still allowed
|
|
assert allowed is True
|
|
# A warning was logged about the advisory denial
|
|
assert any("permissive" in w or "advisory" in w or "denied" in w for w in warnings_logged)
|
|
|
|
def test_check_permission_toolkit_exception(self, monkeypatch):
|
|
"""evaluator.evaluate raises exception → falls back to RBAC result."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
mock_evaluator_instance.evaluate.side_effect = RuntimeError("toolkit error")
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
# Should NOT raise; falls back to RBAC result
|
|
allowed, reason = adapter.check_permission("memory.read", ["operator"])
|
|
assert allowed is True # RBAC allowed, exception fallback keeps RBAC result
|
|
assert reason == "toolkit_evaluation_error"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests 12-13: emit()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEmit:
|
|
|
|
def test_emit_calls_audit_log_event(self, real_governance):
|
|
"""emit() calls audit.log_event with governance_toolkit and traceparent."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_audit.log_event.return_value = "trace-123"
|
|
mock_telemetry.get_current_traceparent.return_value = "00-trace-parent-01"
|
|
|
|
cfg = _make_config(toolkit="microsoft")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._toolkit_available = True
|
|
|
|
result = adapter.emit(
|
|
event_type="permission_check",
|
|
action="memory.write",
|
|
resource="scope",
|
|
outcome="allowed",
|
|
actor="test-actor",
|
|
)
|
|
|
|
assert result == "trace-123"
|
|
mock_audit.log_event.assert_called_once()
|
|
call_kwargs = mock_audit.log_event.call_args
|
|
# Check traceparent and governance_toolkit are passed
|
|
kwargs = call_kwargs.kwargs if call_kwargs.kwargs else {}
|
|
all_args = {**kwargs}
|
|
# Also check positional → keyword mapping
|
|
if call_kwargs.args:
|
|
# log_event(event_type, action, resource, outcome, **kwargs)
|
|
pass
|
|
assert "governance_toolkit" in all_args or "microsoft" in str(call_kwargs)
|
|
assert "traceparent" in all_args or "00-trace-parent-01" in str(call_kwargs)
|
|
|
|
def test_emit_disabled_toolkit_label(self, real_governance):
|
|
"""When _toolkit_available=False, governance_toolkit='disabled'."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_audit.log_event.return_value = "trace-456"
|
|
|
|
cfg = _make_config(toolkit="microsoft")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._toolkit_available = False # explicitly disabled
|
|
|
|
adapter.emit(
|
|
event_type="permission_check",
|
|
action="memory.read",
|
|
resource="scope",
|
|
outcome="allowed",
|
|
)
|
|
|
|
mock_audit.log_event.assert_called_once()
|
|
call_args_str = str(mock_audit.log_event.call_args)
|
|
assert "disabled" in call_args_str
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests 14-15: initialize_governance()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitializeGovernance:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_governance_success(self, real_governance):
|
|
"""initialize_governance() sets module _adapter singleton on success."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
assert mod._adapter is None
|
|
|
|
cfg = _make_config()
|
|
adapter = await mod.initialize_governance(cfg)
|
|
|
|
assert adapter is not None
|
|
assert mod._adapter is adapter
|
|
assert isinstance(adapter, mod.GovernanceAdapter)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_governance_failure(self, real_governance):
|
|
"""initialize_governance() returns None and _adapter stays None on failure."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
assert mod._adapter is None
|
|
|
|
cfg = _make_config()
|
|
# Make GovernanceAdapter.initialize raise
|
|
original_init = mod.GovernanceAdapter.initialize
|
|
|
|
async def bad_initialize(self):
|
|
raise RuntimeError("init failed")
|
|
|
|
mod.GovernanceAdapter.initialize = bad_initialize
|
|
try:
|
|
result = await mod.initialize_governance(cfg)
|
|
finally:
|
|
mod.GovernanceAdapter.initialize = original_init
|
|
|
|
assert result is None
|
|
assert mod._adapter is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test 16: get_governance_adapter()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetGovernanceAdapter:
|
|
|
|
def test_get_governance_adapter_none_initially(self, real_governance):
|
|
"""get_governance_adapter() returns None when _adapter is not set."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
assert mod._adapter is None
|
|
assert mod.get_governance_adapter() is None
|
|
|
|
def test_get_governance_adapter_returns_set_adapter(self, real_governance):
|
|
"""get_governance_adapter() returns the _adapter after it is set."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
fake_adapter = MagicMock()
|
|
mod._adapter = fake_adapter
|
|
assert mod.get_governance_adapter() is fake_adapter
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests 17-18: check_permission_with_governance()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCheckPermissionWithGovernance:
|
|
|
|
def test_check_permission_with_governance_no_adapter(self, real_governance):
|
|
"""_adapter=None → falls through to audit.check_permission."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mod._adapter = None
|
|
mock_audit.check_permission.return_value = True
|
|
|
|
allowed, reason = mod.check_permission_with_governance("memory.read", ["operator"])
|
|
assert allowed is True
|
|
assert reason == "rbac_only"
|
|
mock_audit.check_permission.assert_called_once_with("memory.read", ["operator"], None)
|
|
|
|
def test_check_permission_with_governance_with_adapter(self, real_governance):
|
|
"""_adapter set → calls adapter.check_permission."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_adapter = MagicMock()
|
|
mock_adapter.check_permission.return_value = (True, "adapter_allowed")
|
|
mod._adapter = mock_adapter
|
|
|
|
allowed, reason = mod.check_permission_with_governance(
|
|
"memory.write", ["admin"], None, {"resource": "scope"}
|
|
)
|
|
assert allowed is True
|
|
assert reason == "adapter_allowed"
|
|
mock_adapter.check_permission.assert_called_once_with(
|
|
"memory.write", ["admin"], None, {"resource": "scope"}
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests 19-20: _emit_governance_event()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEmitGovernanceEvent:
|
|
|
|
def test_emit_governance_event_no_adapter(self, real_governance):
|
|
"""_adapter=None → _emit_governance_event returns None."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mod._adapter = None
|
|
result = mod._emit_governance_event(
|
|
event_type="permission_check",
|
|
action="memory.read",
|
|
resource="scope",
|
|
outcome="allowed",
|
|
)
|
|
assert result is None
|
|
|
|
def test_emit_governance_event_with_adapter(self, real_governance):
|
|
"""_adapter set → calls adapter.emit and returns its result."""
|
|
mod, mock_audit, mock_telemetry = real_governance
|
|
mock_adapter = MagicMock()
|
|
mock_adapter.emit.return_value = "trace-emit-xyz"
|
|
mod._adapter = mock_adapter
|
|
|
|
result = mod._emit_governance_event(
|
|
event_type="permission_check",
|
|
action="memory.write",
|
|
resource="scope",
|
|
outcome="denied",
|
|
actor="test-actor",
|
|
trace_id="explicit-trace",
|
|
extra_key="extra_val",
|
|
)
|
|
assert result == "trace-emit-xyz"
|
|
mock_adapter.emit.assert_called_once_with(
|
|
"permission_check",
|
|
"memory.write",
|
|
"scope",
|
|
"denied",
|
|
actor="test-actor",
|
|
trace_id="explicit-trace",
|
|
extra_key="extra_val",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for policy_file loading (exercises _init_evaluator branches)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitEvaluatorPolicyFile:
|
|
|
|
def _setup_with_toolkit(self, monkeypatch):
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
return mod, mock_evaluator_instance, MockPolicyEvaluator
|
|
|
|
def test_policy_file_rego_loaded(self, monkeypatch, tmp_path):
|
|
"""When policy_file is a .rego file that exists, evaluator.load_rego is called."""
|
|
mod, mock_evaluator_instance, MockPolicyEvaluator = self._setup_with_toolkit(monkeypatch)
|
|
|
|
policy_path = tmp_path / "policy.rego"
|
|
policy_path.write_text("package main\ndefault allow = false\n")
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_file=str(policy_path))
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
mock_evaluator_instance.load_rego.assert_called_once_with(path=str(policy_path))
|
|
|
|
def test_policy_file_nonexistent_logs_warning(self, monkeypatch, tmp_path):
|
|
"""Non-existent policy_file logs a warning but does not crash."""
|
|
mod, mock_evaluator_instance, MockPolicyEvaluator = self._setup_with_toolkit(monkeypatch)
|
|
|
|
cfg = _make_config(
|
|
policy_mode="strict",
|
|
policy_file=str(tmp_path / "missing.rego"),
|
|
)
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
warnings = []
|
|
original_warn = mod.logger.warning
|
|
mod.logger.warning = lambda msg, *a, **kw: warnings.append(msg % a if a else msg)
|
|
try:
|
|
adapter._init_evaluator()
|
|
finally:
|
|
mod.logger.warning = original_warn
|
|
|
|
# Toolkit still initialised (file load skipped, not a hard failure)
|
|
assert adapter._toolkit_available is True
|
|
assert any("does not exist" in w or "skipping" in w for w in warnings)
|
|
mock_evaluator_instance.load_rego.assert_not_called()
|
|
|
|
def test_policy_file_unknown_extension_logs_warning(self, monkeypatch, tmp_path):
|
|
"""Unknown policy file extension logs a warning and skips load."""
|
|
mod, mock_evaluator_instance, MockPolicyEvaluator = self._setup_with_toolkit(monkeypatch)
|
|
|
|
policy_path = tmp_path / "policy.unknown"
|
|
policy_path.write_text("not a real policy format")
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_file=str(policy_path))
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
warnings = []
|
|
original_warn = mod.logger.warning
|
|
mod.logger.warning = lambda msg, *a, **kw: warnings.append(msg % a if a else msg)
|
|
try:
|
|
adapter._init_evaluator()
|
|
finally:
|
|
mod.logger.warning = original_warn
|
|
|
|
assert adapter._toolkit_available is True
|
|
assert any("Unrecognised" in w or "extension" in w for w in warnings)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gap 1: New targeted coverage tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGap1InitializeToolkitAvailable:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_initialize_logs_info_when_toolkit_available(self, monkeypatch):
|
|
"""Line 72-75: initialize() logs info (not warning) when _toolkit_available=True."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
info_messages = []
|
|
original_info = mod.logger.info
|
|
mod.logger.info = lambda msg, *a, **kw: info_messages.append(msg % a if a else msg)
|
|
try:
|
|
await adapter.initialize()
|
|
finally:
|
|
mod.logger.info = original_info
|
|
|
|
assert adapter._toolkit_available is True
|
|
assert any("GovernanceAdapter initialised" in m or "toolkit=" in m for m in info_messages)
|
|
|
|
|
|
class TestGap1PolicyEndpoint:
|
|
|
|
def test_policy_endpoint_added_to_kwargs(self, monkeypatch):
|
|
"""Line 107: policy_endpoint non-empty → kwargs['endpoint'] set."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_endpoint="https://policy.example.com/v1")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
call_kwargs = MockPolicyEvaluator.call_args.kwargs
|
|
assert call_kwargs.get("endpoint") == "https://policy.example.com/v1"
|
|
|
|
|
|
class TestGap1PolicyFileYamlCedar:
|
|
|
|
def _setup_with_toolkit(self, monkeypatch):
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
return mod, mock_evaluator_instance
|
|
|
|
def test_policy_file_yaml_loaded(self, monkeypatch, tmp_path):
|
|
"""Lines 120-121: .yaml policy file → evaluator.load_yaml called."""
|
|
mod, mock_evaluator_instance = self._setup_with_toolkit(monkeypatch)
|
|
|
|
policy_path = tmp_path / "policy.yaml"
|
|
policy_path.write_text("version: 1\n")
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_file=str(policy_path))
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
mock_evaluator_instance.load_yaml.assert_called_once_with(path=str(policy_path))
|
|
|
|
def test_policy_file_yml_loaded(self, monkeypatch, tmp_path):
|
|
"""Lines 120-121: .yml extension also calls load_yaml."""
|
|
mod, mock_evaluator_instance = self._setup_with_toolkit(monkeypatch)
|
|
|
|
policy_path = tmp_path / "policy.yml"
|
|
policy_path.write_text("version: 1\n")
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_file=str(policy_path))
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
mock_evaluator_instance.load_yaml.assert_called_once_with(path=str(policy_path))
|
|
|
|
def test_policy_file_cedar_loaded(self, monkeypatch, tmp_path):
|
|
"""Lines 123-124: .cedar policy file → evaluator.load_cedar called."""
|
|
mod, mock_evaluator_instance = self._setup_with_toolkit(monkeypatch)
|
|
|
|
policy_path = tmp_path / "policy.cedar"
|
|
policy_path.write_text("permit(principal, action, resource);\n")
|
|
|
|
cfg = _make_config(policy_mode="strict", policy_file=str(policy_path))
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
assert adapter._toolkit_available is True
|
|
mock_evaluator_instance.load_cedar.assert_called_once_with(path=str(policy_path))
|
|
|
|
|
|
class TestGap1InitEvaluatorGenericException:
|
|
|
|
def test_init_evaluator_non_import_error_swallowed(self, monkeypatch):
|
|
"""Lines 142-143: PolicyEvaluator() itself raises non-ImportError → logged, toolkit_available=False."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
# PolicyEvaluator() raises RuntimeError (not ImportError)
|
|
MockPolicyEvaluator = MagicMock(side_effect=RuntimeError("toolkit init failed"))
|
|
mock_agent_os_policies = MagicMock()
|
|
mock_agent_os_policies.PolicyEvaluator = MockPolicyEvaluator
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
|
|
warnings = []
|
|
original_warn = mod.logger.warning
|
|
mod.logger.warning = lambda msg, *a, **kw: warnings.append(msg % a if a else msg)
|
|
try:
|
|
adapter._init_evaluator()
|
|
finally:
|
|
mod.logger.warning = original_warn
|
|
|
|
assert adapter._toolkit_available is False
|
|
assert adapter._evaluator is None
|
|
assert any("Failed" in w or "toolkit init failed" in w for w in warnings)
|
|
|
|
|
|
class TestGap1ExtraContextKeys:
|
|
|
|
def test_check_permission_extra_context_keys_merged(self, monkeypatch):
|
|
"""Lines 206-207: extra context keys beyond base eval_context are merged in."""
|
|
mock_audit = MagicMock()
|
|
mock_audit.check_permission = MagicMock(return_value=True)
|
|
mock_audit.log_event = MagicMock(return_value="trace-abc")
|
|
mock_telemetry = MagicMock()
|
|
mock_telemetry.get_current_traceparent = MagicMock(return_value="00-abc-def-01")
|
|
|
|
mock_decision, mock_evaluator_instance, MockPolicyEvaluator, mock_agent_os_policies = (
|
|
_make_toolkit_mocks()
|
|
)
|
|
mock_decision.allowed = True
|
|
mock_decision.reason = "policy_ok"
|
|
|
|
monkeypatch.setitem(sys.modules, "agent_os", MagicMock())
|
|
monkeypatch.setitem(sys.modules, "agent_os.policies", mock_agent_os_policies)
|
|
|
|
mod = _load_governance_module(
|
|
monkeypatch, mock_audit, mock_telemetry, with_agent_os=True
|
|
)
|
|
|
|
cfg = _make_config(policy_mode="strict")
|
|
adapter = mod.GovernanceAdapter(cfg)
|
|
adapter._init_evaluator()
|
|
|
|
# Pass context with extra_key not in the base eval_context dict
|
|
context = {"resource": "my-resource", "actor": "user-1", "extra_key": "extra_value"}
|
|
allowed, reason = adapter.check_permission("memory.read", ["operator"], context=context)
|
|
|
|
assert allowed is True
|
|
# Verify evaluator.evaluate was called with eval_context containing extra_key
|
|
call_args = mock_evaluator_instance.evaluate.call_args
|
|
eval_ctx = call_args.args[0] if call_args.args else call_args.kwargs.get("eval_context", {})
|
|
assert eval_ctx.get("extra_key") == "extra_value"
|