molecule-core/workspace-template/tests/test_audit.py
Hongming Wang 24fec62d7f initial commit — Molecule AI platform
Forked clean from public hackathon repo (Starfire-AgentTeam, BSL 1.1)
with full rebrand to Molecule AI under github.com/Molecule-AI/molecule-monorepo.

Brand: Starfire → Molecule AI.
Slug: starfire / agent-molecule → molecule.
Env vars: STARFIRE_* → MOLECULE_*.
Go module: github.com/agent-molecule/platform → github.com/Molecule-AI/molecule-monorepo/platform.
Python packages: starfire_plugin → molecule_plugin, starfire_agent → molecule_agent.
DB: agentmolecule → molecule.

History truncated; see public repo for prior commits and contributor
attribution. Verified green: go test -race ./... (platform), pytest
(workspace-template 1129 + sdk 132), vitest (canvas 352), build (mcp).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 11:55:37 -07:00

307 lines
13 KiB
Python

"""Tests for tools/audit.py — RBAC, audit logging, and workspace roles.
Loads the *real* module via importlib to bypass the conftest mock for
tools.audit, so every test exercises the actual implementation.
"""
from __future__ import annotations
import os
import importlib.util
import os
import json
import os
import sys
from types import ModuleType
from unittest.mock import MagicMock, patch
import os
import pytest
# ---------------------------------------------------------------------------
# Fixture — load the real tools.audit module
# ---------------------------------------------------------------------------
@pytest.fixture
def real_audit(monkeypatch, tmp_path):
"""Load the real tools/audit.py, bypassing the conftest mock."""
# Remove mocks so the real module is loaded fresh
monkeypatch.delitem(sys.modules, "builtin_tools.audit", raising=False)
monkeypatch.delitem(sys.modules, "builtin_tools.compliance", raising=False)
# Point audit log at a temp file so tests don't hit the filesystem
monkeypatch.setenv("AUDIT_LOG_PATH", str(tmp_path / "audit.jsonl"))
monkeypatch.setenv("WORKSPACE_ID", "test-ws")
spec = importlib.util.spec_from_file_location(
"builtin_tools.audit",
os.path.join(os.path.dirname(__file__), "..", "builtin_tools/audit.py"),
)
mod = importlib.util.module_from_spec(spec)
monkeypatch.setitem(sys.modules, "builtin_tools.audit", mod)
spec.loader.exec_module(mod)
# Re-read env vars into the module-level constants (they are read at import)
mod.AUDIT_LOG_PATH = str(tmp_path / "audit.jsonl")
mod.WORKSPACE_ID = "test-ws"
return mod
# ---------------------------------------------------------------------------
# check_permission — built-in roles
# ---------------------------------------------------------------------------
class TestCheckPermissionBuiltinRoles:
def test_check_permission_admin(self, real_audit):
"""admin shortcircuits and returns True for any action."""
mod = real_audit
assert mod.check_permission("delegate", ["admin"]) is True
assert mod.check_permission("approve", ["admin"]) is True
assert mod.check_permission("memory.read", ["admin"]) is True
assert mod.check_permission("memory.write", ["admin"]) is True
assert mod.check_permission("totally_unknown_action", ["admin"]) is True
def test_check_permission_operator(self, real_audit):
"""operator has delegate, approve, memory.read, memory.write."""
mod = real_audit
assert mod.check_permission("delegate", ["operator"]) is True
assert mod.check_permission("approve", ["operator"]) is True
assert mod.check_permission("memory.read", ["operator"]) is True
assert mod.check_permission("memory.write", ["operator"]) is True
assert mod.check_permission("rbac.deny", ["operator"]) is False
def test_check_permission_read_only(self, real_audit):
"""read-only has only memory.read; no delegation or approval."""
mod = real_audit
assert mod.check_permission("memory.read", ["read-only"]) is True
assert mod.check_permission("delegate", ["read-only"]) is False
assert mod.check_permission("approve", ["read-only"]) is False
assert mod.check_permission("memory.write", ["read-only"]) is False
def test_check_permission_no_delegation(self, real_audit):
"""no-delegation cannot delegate, but can approve and write memory."""
mod = real_audit
assert mod.check_permission("delegate", ["no-delegation"]) is False
assert mod.check_permission("approve", ["no-delegation"]) is True
assert mod.check_permission("memory.read", ["no-delegation"]) is True
assert mod.check_permission("memory.write", ["no-delegation"]) is True
def test_check_permission_no_approval(self, real_audit):
"""no-approval cannot approve, but can delegate and write memory."""
mod = real_audit
assert mod.check_permission("approve", ["no-approval"]) is False
assert mod.check_permission("delegate", ["no-approval"]) is True
assert mod.check_permission("memory.read", ["no-approval"]) is True
assert mod.check_permission("memory.write", ["no-approval"]) is True
def test_check_permission_memory_readonly(self, real_audit):
"""memory-readonly can only read memory."""
mod = real_audit
assert mod.check_permission("memory.read", ["memory-readonly"]) is True
assert mod.check_permission("memory.write", ["memory-readonly"]) is False
assert mod.check_permission("delegate", ["memory-readonly"]) is False
assert mod.check_permission("approve", ["memory-readonly"]) is False
# ---------------------------------------------------------------------------
# check_permission — custom roles
# ---------------------------------------------------------------------------
class TestCheckPermissionCustomRoles:
def test_check_permission_custom_roles(self, real_audit):
"""A role defined in custom_permissions is respected."""
mod = real_audit
custom = {"developer": ["deploy", "memory.read"]}
assert mod.check_permission("deploy", ["developer"], custom) is True
assert mod.check_permission("memory.read", ["developer"], custom) is True
def test_check_permission_custom_role_no_builtin_fallthrough(self, real_audit):
"""Custom role with custom_permissions does NOT fall through to built-ins.
'operator' is also a built-in role, but if it appears in custom_permissions
with a restricted list, the custom list is the complete permission set.
"""
mod = real_audit
# Override 'operator' to only allow memory.read via custom_permissions
custom = {"operator": ["memory.read"]}
# memory.read is in the custom list — allowed
assert mod.check_permission("memory.read", ["operator"], custom) is True
# delegate is in the built-in operator set but NOT in the custom list
# — must be denied because custom entry is definitive
assert mod.check_permission("delegate", ["operator"], custom) is False
def test_check_permission_unknown_role(self, real_audit):
"""A role that exists neither in built-ins nor custom_permissions returns False."""
mod = real_audit
assert mod.check_permission("delegate", ["ghost-role"]) is False
assert mod.check_permission("approve", ["phantom", "specter"]) is False
def test_check_permission_empty_roles(self, real_audit):
"""An empty roles list always returns False."""
mod = real_audit
assert mod.check_permission("delegate", []) is False
assert mod.check_permission("memory.read", []) is False
# ---------------------------------------------------------------------------
# log_event
# ---------------------------------------------------------------------------
class TestLogEvent:
def test_log_event_writes_json_line(self, real_audit, tmp_path):
"""log_event appends a valid JSON line to the audit file."""
mod = real_audit
mod.log_event(
event_type="delegation",
action="delegate",
resource="billing-agent",
outcome="success",
)
log_file = tmp_path / "audit.jsonl"
assert log_file.exists(), "audit file was not created"
lines = log_file.read_text(encoding="utf-8").strip().splitlines()
assert len(lines) == 1
event = json.loads(lines[0])
assert event["event_type"] == "delegation"
assert event["action"] == "delegate"
assert event["resource"] == "billing-agent"
assert event["outcome"] == "success"
assert "timestamp" in event
assert "trace_id" in event
assert "workspace_id" in event
def test_log_event_returns_trace_id(self, real_audit):
"""log_event returns the trace_id string."""
mod = real_audit
result = mod.log_event(
event_type="rbac",
action="rbac.deny",
resource="memory-scope",
outcome="denied",
)
assert isinstance(result, str)
assert len(result) > 0
def test_log_event_custom_trace_id(self, real_audit, tmp_path):
"""log_event uses the caller-supplied trace_id."""
mod = real_audit
supplied_id = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
returned_id = mod.log_event(
event_type="approval",
action="approve",
resource="deploy",
outcome="granted",
trace_id=supplied_id,
)
assert returned_id == supplied_id
log_file = tmp_path / "audit.jsonl"
event = json.loads(log_file.read_text().strip())
assert event["trace_id"] == supplied_id
def test_log_event_actor_default(self, real_audit, tmp_path):
"""actor defaults to WORKSPACE_ID when not supplied."""
mod = real_audit
mod.WORKSPACE_ID = "test-ws"
mod.log_event(
event_type="memory",
action="memory.read",
resource="global-scope",
outcome="success",
)
log_file = tmp_path / "audit.jsonl"
event = json.loads(log_file.read_text().strip())
assert event["actor"] == "test-ws"
def test_log_event_extra_fields(self, real_audit, tmp_path):
"""Extra kwargs are written to the JSON; built-in keys cannot be overridden.
The built-in key 'workspace_id' is set automatically by the module
(not a function parameter), so passing it via **extra exercises the
"built-in keys are not overridable" guard in log_event.
"""
mod = real_audit
mod.WORKSPACE_ID = "real-ws"
# 'workspace_id' is a built-in event key — must not be overwritten by extra
mod.log_event(
event_type="delegation",
action="delegate",
resource="target-ws",
outcome="success",
attempt=3,
target_workspace_id="target-ws",
workspace_id="SHOULD-NOT-APPEAR", # built-in key override attempt
)
log_file = tmp_path / "audit.jsonl"
event = json.loads(log_file.read_text().strip())
# Extra fields present
assert event["attempt"] == 3
assert event["target_workspace_id"] == "target-ws"
# Built-in 'workspace_id' is NOT overridden by the extra kwarg
assert event["workspace_id"] == "real-ws"
def test_log_event_write_failure_does_not_raise(self, real_audit, tmp_path, monkeypatch):
"""If the file write fails (e.g. fsync raises), only a WARNING is logged; no exception."""
mod = real_audit
import os as _os
monkeypatch.setattr(_os, "fsync", lambda fd: (_ for _ in ()).throw(OSError("disk full")))
# Must not raise
mod.log_event(
event_type="memory",
action="memory.write",
resource="scope",
outcome="failure",
)
# ---------------------------------------------------------------------------
# get_workspace_roles
# ---------------------------------------------------------------------------
class TestGetWorkspaceRoles:
def test_get_workspace_roles_config_available(self, real_audit, monkeypatch):
"""Returns roles and allowed_actions from the workspace config."""
mod = real_audit
# Build a minimal config mock
mock_rbac = MagicMock()
mock_rbac.roles = ["operator", "read-only"]
mock_rbac.allowed_actions = {"developer": ["deploy"]}
mock_cfg = MagicMock()
mock_cfg.rbac = mock_rbac
mock_config_mod = ModuleType("config")
mock_config_mod.load_config = MagicMock(return_value=mock_cfg)
monkeypatch.setitem(sys.modules, "config", mock_config_mod)
# Clear the lru_cache so our new mock is used
mod._load_workspace_config.cache_clear()
try:
roles, allowed_actions = mod.get_workspace_roles()
assert roles == ["operator", "read-only"]
assert allowed_actions == {"developer": ["deploy"]}
finally:
mod._load_workspace_config.cache_clear()
def test_get_workspace_roles_config_unavailable(self, real_audit, monkeypatch):
"""Falls back to (['operator'], {}) when config cannot be loaded."""
mod = real_audit
# Make load_config raise
mock_config_mod = ModuleType("config")
mock_config_mod.load_config = MagicMock(side_effect=RuntimeError("config missing"))
monkeypatch.setitem(sys.modules, "config", mock_config_mod)
mod._load_workspace_config.cache_clear()
try:
roles, allowed_actions = mod.get_workspace_roles()
assert roles == ["operator"]
assert allowed_actions == {}
finally:
mod._load_workspace_config.cache_clear()