Added TestLedgerHooksExtended with 26 new test cases covering all previously-uncovered branches in molecule_audit.hooks: - _to_bytes: None, bytes passthrough, str→utf8, dict→JSON (sort_keys), list→JSON - _DEFAULT_AGENT_ID: env var default, explicit override - Session lifecycle: lazy open, session reuse, close when None, __exit__ releases on exception - on_task_start: None input, risk_flag=True, oversight override - on_llm_call: None input+output, risk_flag=True - on_tool_call: bytes input (hash matches), None i/o, risk_flag=True - on_task_end: None output, risk_flag=True, oversight override - _safe_append: exception swallowed and logged as warning All 69 tests in test_audit_ledger.py pass (was 43, +26). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
926 lines
35 KiB
Python
926 lines
35 KiB
Python
"""Tests for molecule_audit — HMAC-chained audit ledger.
|
|
|
|
Coverage
|
|
--------
|
|
ledger.py:
|
|
- _get_hmac_key() missing SALT raises RuntimeError; repeated calls return same key
|
|
- _ts_to_canonical() UTC datetime, naive datetime, None
|
|
- _to_canonical_dict() excludes hmac field, timestamp is Z-suffixed
|
|
- _compute_event_hmac() deterministic; changes when any field changes
|
|
- hash_content() str, bytes, None
|
|
- AuditEvent.to_dict() all fields present, ISO timestamp
|
|
- append_event() single event, chain linkage, error rollback
|
|
- verify_chain() valid chain, tampered hmac, broken prev_hmac, empty chain
|
|
|
|
hooks.py:
|
|
- LedgerHooks.on_task_start() hashes input, writes task_start event
|
|
- LedgerHooks.on_llm_call() hashes i/o, stores model name
|
|
- LedgerHooks.on_tool_call() hashes serialised i/o, stores tool name in model_used
|
|
- LedgerHooks.on_task_end() hashes output, writes task_end event
|
|
- LedgerHooks context manager close() releases session
|
|
- Exception swallowing missing SALT → warning, no raise
|
|
|
|
verify.py CLI:
|
|
- valid chain → exit 0, prints "CHAIN VALID"
|
|
- no events → exit 0, prints "No audit events"
|
|
- broken chain → exit 1, prints "CHAIN BROKEN"
|
|
- missing SALT → exit 2
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac as _hmac_mod
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures — isolated in-memory SQLite DB per test
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_ledger_caches(monkeypatch):
|
|
"""Reset module-level caches and force AUDIT_LEDGER_SALT for every test."""
|
|
import molecule_audit.ledger as ledger
|
|
|
|
monkeypatch.setenv("AUDIT_LEDGER_SALT", "test-salt-for-pytest")
|
|
monkeypatch.setattr(ledger, "_hmac_key", None)
|
|
monkeypatch.setattr(ledger, "_engine", None)
|
|
monkeypatch.setattr(ledger, "_SessionFactory", None)
|
|
|
|
yield
|
|
|
|
# Clean up after test
|
|
ledger.reset_hmac_key_cache()
|
|
ledger.reset_engine_cache()
|
|
|
|
|
|
@pytest.fixture
|
|
def mem_session():
|
|
"""Provide a fresh in-memory SQLite session with the schema created."""
|
|
import molecule_audit.ledger as ledger
|
|
from molecule_audit.ledger import Base
|
|
|
|
engine = create_engine(
|
|
"sqlite:///:memory:", connect_args={"check_same_thread": False}
|
|
)
|
|
Base.metadata.create_all(engine)
|
|
factory = sessionmaker(bind=engine)
|
|
session = factory()
|
|
|
|
# Inject the engine into the module cache so append_event uses it
|
|
ledger._engine = engine
|
|
ledger._SessionFactory = factory
|
|
|
|
yield session
|
|
|
|
session.close()
|
|
Base.metadata.drop_all(engine)
|
|
ledger.reset_engine_cache()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ledger._get_hmac_key
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetHmacKey:
|
|
|
|
def test_raises_when_salt_missing(self, monkeypatch):
|
|
import molecule_audit.ledger as ledger
|
|
monkeypatch.delenv("AUDIT_LEDGER_SALT", raising=False)
|
|
ledger._hmac_key = None # clear cache
|
|
|
|
with pytest.raises(RuntimeError, match="AUDIT_LEDGER_SALT"):
|
|
ledger._get_hmac_key()
|
|
|
|
def test_same_key_returned_on_repeated_calls(self):
|
|
import molecule_audit.ledger as ledger
|
|
|
|
key1 = ledger._get_hmac_key()
|
|
key2 = ledger._get_hmac_key()
|
|
assert key1 is key2 # same object (cached)
|
|
assert len(key1) == 32
|
|
|
|
def test_key_changes_with_different_salt(self, monkeypatch):
|
|
import molecule_audit.ledger as ledger
|
|
|
|
key1 = ledger._get_hmac_key()
|
|
|
|
ledger.reset_hmac_key_cache()
|
|
monkeypatch.setenv("AUDIT_LEDGER_SALT", "different-salt")
|
|
key2 = ledger._get_hmac_key()
|
|
|
|
assert key1 != key2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ledger._ts_to_canonical
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTsToCanonical:
|
|
|
|
def test_utc_aware_datetime(self):
|
|
from molecule_audit.ledger import _ts_to_canonical
|
|
|
|
ts = datetime(2026, 4, 17, 12, 34, 56, 789000, tzinfo=timezone.utc)
|
|
result = _ts_to_canonical(ts)
|
|
assert result == "2026-04-17T12:34:56Z"
|
|
|
|
def test_naive_datetime(self):
|
|
from molecule_audit.ledger import _ts_to_canonical
|
|
|
|
ts = datetime(2026, 4, 17, 12, 34, 56)
|
|
result = _ts_to_canonical(ts)
|
|
assert result == "2026-04-17T12:34:56Z"
|
|
|
|
def test_none_returns_none(self):
|
|
from molecule_audit.ledger import _ts_to_canonical
|
|
|
|
assert _ts_to_canonical(None) is None
|
|
|
|
def test_microseconds_stripped(self):
|
|
from molecule_audit.ledger import _ts_to_canonical
|
|
|
|
ts = datetime(2026, 1, 1, 0, 0, 0, 999999, tzinfo=timezone.utc)
|
|
result = _ts_to_canonical(ts)
|
|
assert "." not in result
|
|
assert result.endswith("Z")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ledger.hash_content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestHashContent:
|
|
|
|
def test_none_returns_none(self):
|
|
from molecule_audit.ledger import hash_content
|
|
assert hash_content(None) is None
|
|
|
|
def test_str_returns_sha256_hex(self):
|
|
from molecule_audit.ledger import hash_content
|
|
result = hash_content("hello")
|
|
expected = hashlib.sha256(b"hello").hexdigest()
|
|
assert result == expected
|
|
assert len(result) == 64
|
|
|
|
def test_bytes_returns_sha256_hex(self):
|
|
from molecule_audit.ledger import hash_content
|
|
result = hash_content(b"hello")
|
|
expected = hashlib.sha256(b"hello").hexdigest()
|
|
assert result == expected
|
|
|
|
def test_str_and_bytes_same_result_for_utf8(self):
|
|
from molecule_audit.ledger import hash_content
|
|
assert hash_content("café") == hash_content("café".encode("utf-8"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ledger._compute_event_hmac
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestComputeEventHmac:
|
|
|
|
def _make_event(self, **kwargs):
|
|
from molecule_audit.ledger import AuditEvent
|
|
defaults = {
|
|
"id": "evt-1",
|
|
"timestamp": datetime(2026, 4, 17, 0, 0, 0, tzinfo=timezone.utc),
|
|
"agent_id": "agent-1",
|
|
"session_id": "sess-1",
|
|
"operation": "task_start",
|
|
"input_hash": None,
|
|
"output_hash": None,
|
|
"model_used": None,
|
|
"human_oversight_flag": False,
|
|
"risk_flag": False,
|
|
"prev_hmac": None,
|
|
"hmac": "placeholder",
|
|
}
|
|
defaults.update(kwargs)
|
|
ev = AuditEvent(**defaults)
|
|
return ev
|
|
|
|
def test_deterministic(self):
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev = self._make_event()
|
|
assert _compute_event_hmac(ev) == _compute_event_hmac(ev)
|
|
|
|
def test_different_agent_id_changes_hmac(self):
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev1 = self._make_event(agent_id="agent-A")
|
|
ev2 = self._make_event(agent_id="agent-B")
|
|
assert _compute_event_hmac(ev1) != _compute_event_hmac(ev2)
|
|
|
|
def test_different_operation_changes_hmac(self):
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev1 = self._make_event(operation="task_start")
|
|
ev2 = self._make_event(operation="task_end")
|
|
assert _compute_event_hmac(ev1) != _compute_event_hmac(ev2)
|
|
|
|
def test_prev_hmac_included_in_computation(self):
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev1 = self._make_event(prev_hmac=None)
|
|
ev2 = self._make_event(prev_hmac="abc123")
|
|
assert _compute_event_hmac(ev1) != _compute_event_hmac(ev2)
|
|
|
|
def test_hmac_field_excluded_from_canonical(self):
|
|
"""The stored hmac field itself must not affect the computation."""
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev1 = self._make_event(hmac="value-a")
|
|
ev2 = self._make_event(hmac="value-b")
|
|
assert _compute_event_hmac(ev1) == _compute_event_hmac(ev2)
|
|
|
|
def test_canonical_json_uses_compact_separators(self):
|
|
"""Canonical JSON must have no spaces (compact separators)."""
|
|
from molecule_audit.ledger import _to_canonical_dict
|
|
ev = self._make_event()
|
|
canonical = _to_canonical_dict(ev)
|
|
payload = json.dumps(canonical, sort_keys=True, separators=(",", ":"))
|
|
assert " " not in payload
|
|
|
|
def test_canonical_json_sort_order_is_alphabetical(self):
|
|
"""Keys must be alphabetically sorted (Python sort_keys=True / Go map order)."""
|
|
from molecule_audit.ledger import _to_canonical_dict
|
|
ev = self._make_event()
|
|
canonical = _to_canonical_dict(ev)
|
|
payload = json.dumps(canonical, sort_keys=True, separators=(",", ":"))
|
|
keys = [k.strip('"') for k in payload.split(',"')[0:]]
|
|
first_key = payload.lstrip("{").split('"')[1]
|
|
assert first_key == "agent_id" # alphabetically first
|
|
|
|
def test_result_is_hex_string(self):
|
|
from molecule_audit.ledger import _compute_event_hmac
|
|
ev = self._make_event()
|
|
h = _compute_event_hmac(ev)
|
|
assert isinstance(h, str)
|
|
assert len(h) == 64
|
|
int(h, 16) # raises ValueError if not valid hex
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ledger.append_event + verify_chain
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAppendEvent:
|
|
|
|
def test_single_event_written(self, mem_session):
|
|
from molecule_audit.ledger import AuditEvent, append_event
|
|
|
|
ev = append_event(
|
|
agent_id="agent-1",
|
|
session_id="sess-1",
|
|
operation="task_start",
|
|
db_session=mem_session,
|
|
)
|
|
assert ev.id is not None
|
|
assert ev.operation == "task_start"
|
|
assert ev.prev_hmac is None # first event
|
|
assert len(ev.hmac) == 64
|
|
|
|
stored = mem_session.query(AuditEvent).first()
|
|
assert stored.id == ev.id
|
|
|
|
def test_chain_linkage_across_two_events(self, mem_session):
|
|
from molecule_audit.ledger import append_event
|
|
|
|
ev1 = append_event("a", "s", "task_start", db_session=mem_session)
|
|
ev2 = append_event("a", "s", "task_end", db_session=mem_session)
|
|
|
|
assert ev2.prev_hmac == ev1.hmac
|
|
assert ev2.hmac != ev1.hmac
|
|
|
|
def test_different_agents_independent_chains(self, mem_session):
|
|
"""Events from different agents do NOT link to each other."""
|
|
from molecule_audit.ledger import append_event
|
|
|
|
ev_a = append_event("agent-A", "s", "task_start", db_session=mem_session)
|
|
ev_b = append_event("agent-B", "s", "task_start", db_session=mem_session)
|
|
ev_a2 = append_event("agent-A", "s", "task_end", db_session=mem_session)
|
|
|
|
assert ev_b.prev_hmac is None # agent-B's first row
|
|
assert ev_a2.prev_hmac == ev_a.hmac # agent-A's chain continues
|
|
|
|
def test_input_hash_stored(self, mem_session):
|
|
from molecule_audit.ledger import append_event, hash_content
|
|
|
|
content = "user prompt"
|
|
ev = append_event(
|
|
"a", "s", "llm_call",
|
|
input_hash=hash_content(content),
|
|
db_session=mem_session,
|
|
)
|
|
assert ev.input_hash == hashlib.sha256(content.encode()).hexdigest()
|
|
|
|
def test_model_used_stored(self, mem_session):
|
|
from molecule_audit.ledger import append_event
|
|
|
|
ev = append_event("a", "s", "llm_call", model_used="hermes-4", db_session=mem_session)
|
|
assert ev.model_used == "hermes-4"
|
|
|
|
def test_to_dict_includes_all_fields(self, mem_session):
|
|
from molecule_audit.ledger import append_event
|
|
|
|
ev = append_event("a", "s", "task_start", db_session=mem_session)
|
|
d = ev.to_dict()
|
|
required_keys = {
|
|
"id", "timestamp", "agent_id", "session_id", "operation",
|
|
"input_hash", "output_hash", "model_used",
|
|
"human_oversight_flag", "risk_flag", "prev_hmac", "hmac",
|
|
}
|
|
assert required_keys == set(d.keys())
|
|
|
|
def test_risk_and_oversight_flags(self, mem_session):
|
|
from molecule_audit.ledger import append_event
|
|
|
|
ev = append_event(
|
|
"a", "s", "task_start",
|
|
human_oversight_flag=True,
|
|
risk_flag=True,
|
|
db_session=mem_session,
|
|
)
|
|
assert ev.human_oversight_flag is True
|
|
assert ev.risk_flag is True
|
|
|
|
|
|
class TestVerifyChain:
|
|
|
|
def test_empty_chain_returns_true(self, mem_session):
|
|
from molecule_audit.ledger import verify_chain
|
|
assert verify_chain("non-existent-agent", mem_session) is True
|
|
|
|
def test_single_event_valid(self, mem_session):
|
|
from molecule_audit.ledger import append_event, verify_chain
|
|
|
|
append_event("a", "s", "task_start", db_session=mem_session)
|
|
assert verify_chain("a", mem_session) is True
|
|
|
|
def test_multi_event_chain_valid(self, mem_session):
|
|
from molecule_audit.ledger import append_event, verify_chain
|
|
|
|
for op in ("task_start", "llm_call", "tool_call", "task_end"):
|
|
append_event("a", "s", op, db_session=mem_session)
|
|
assert verify_chain("a", mem_session) is True
|
|
|
|
def test_tampered_hmac_detected(self, mem_session):
|
|
from molecule_audit.ledger import AuditEvent, append_event, verify_chain
|
|
|
|
ev = append_event("a", "s", "task_start", db_session=mem_session)
|
|
|
|
# Directly corrupt the stored HMAC
|
|
mem_session.query(AuditEvent).filter(AuditEvent.id == ev.id).update(
|
|
{"hmac": "deadbeef" + "0" * 56}
|
|
)
|
|
mem_session.commit()
|
|
|
|
assert verify_chain("a", mem_session) is False
|
|
|
|
def test_broken_prev_hmac_detected(self, mem_session):
|
|
from molecule_audit.ledger import AuditEvent, append_event, verify_chain
|
|
|
|
ev1 = append_event("a", "s", "task_start", db_session=mem_session)
|
|
ev2 = append_event("a", "s", "task_end", db_session=mem_session)
|
|
|
|
# Break the chain link in ev2
|
|
mem_session.query(AuditEvent).filter(AuditEvent.id == ev2.id).update(
|
|
{"prev_hmac": "wrong-prev-hmac"}
|
|
)
|
|
mem_session.commit()
|
|
mem_session.expire_all()
|
|
|
|
assert verify_chain("a", mem_session) is False
|
|
|
|
def test_verify_only_checks_specified_agent(self, mem_session):
|
|
from molecule_audit.ledger import AuditEvent, append_event, verify_chain
|
|
|
|
append_event("agent-good", "s", "task_start", db_session=mem_session)
|
|
ev_bad = append_event("agent-bad", "s", "task_start", db_session=mem_session)
|
|
# Corrupt agent-bad's chain
|
|
mem_session.query(AuditEvent).filter(AuditEvent.id == ev_bad.id).update(
|
|
{"hmac": "a" * 64}
|
|
)
|
|
mem_session.commit()
|
|
mem_session.expire_all()
|
|
|
|
# agent-good should still be valid
|
|
assert verify_chain("agent-good", mem_session) is True
|
|
assert verify_chain("agent-bad", mem_session) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# hooks.LedgerHooks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLedgerHooks:
|
|
|
|
def test_on_task_start_writes_event(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
with LedgerHooks(session_id="s1", agent_id="ag1") as hooks:
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(input_text="hello world")
|
|
|
|
ev = mem_session.query(AuditEvent).filter(AuditEvent.operation == "task_start").first()
|
|
assert ev is not None
|
|
assert ev.agent_id == "ag1"
|
|
assert ev.session_id == "s1"
|
|
assert ev.input_hash == hashlib.sha256(b"hello world").hexdigest()
|
|
assert ev.output_hash is None
|
|
|
|
def test_on_llm_call_stores_model_name(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_llm_call(model="hermes-4-405b", input_text="prompt", output_text="reply")
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).filter(AuditEvent.operation == "llm_call").first()
|
|
assert ev.model_used == "hermes-4-405b"
|
|
assert ev.input_hash == hashlib.sha256(b"prompt").hexdigest()
|
|
assert ev.output_hash == hashlib.sha256(b"reply").hexdigest()
|
|
|
|
def test_on_tool_call_stores_tool_name_in_model_used(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_tool_call("web_search", input_data={"query": "test"}, output_data="result")
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).filter(AuditEvent.operation == "tool_call").first()
|
|
assert ev.model_used == "web_search"
|
|
|
|
def test_on_tool_call_dict_input_is_hashed(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks, _to_bytes
|
|
from molecule_audit.ledger import AuditEvent, hash_content
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
input_data = {"query": "molecule AI"}
|
|
hooks.on_tool_call("search", input_data=input_data)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).filter(AuditEvent.operation == "tool_call").first()
|
|
expected_hash = hash_content(_to_bytes(input_data))
|
|
assert ev.input_hash == expected_hash
|
|
|
|
def test_on_task_end_writes_event(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_end(output_text="done")
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).filter(AuditEvent.operation == "task_end").first()
|
|
assert ev is not None
|
|
assert ev.output_hash == hashlib.sha256(b"done").hexdigest()
|
|
|
|
def test_full_task_lifecycle_writes_four_events(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
with LedgerHooks(session_id="s1", agent_id="ag1") as hooks:
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(input_text="go")
|
|
hooks.on_llm_call(model="m", input_text="q", output_text="a")
|
|
hooks.on_tool_call("t", input_data="x", output_data="y")
|
|
hooks.on_task_end(output_text="done")
|
|
|
|
events = mem_session.query(AuditEvent).filter(AuditEvent.agent_id == "ag1").all()
|
|
ops = [e.operation for e in events]
|
|
assert ops == ["task_start", "llm_call", "tool_call", "task_end"]
|
|
|
|
def test_context_manager_closes_session(self):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1", db_url="sqlite:///:memory:")
|
|
# Force session open
|
|
_ = hooks._open_session()
|
|
assert hooks._session is not None
|
|
|
|
with hooks:
|
|
pass # __exit__ calls close()
|
|
|
|
assert hooks._session is None
|
|
|
|
def test_exception_in_append_is_swallowed(self, mem_session, caplog, monkeypatch):
|
|
"""Audit failures must never raise — they log a WARNING instead."""
|
|
import molecule_audit.ledger as ledger
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
# Make the key derivation raise so append_event will fail
|
|
ledger.reset_hmac_key_cache()
|
|
monkeypatch.delenv("AUDIT_LEDGER_SALT", raising=False)
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
|
|
with caplog.at_level(logging.WARNING, logger="molecule_audit.hooks"):
|
|
# Must NOT raise
|
|
hooks.on_task_start(input_text="test")
|
|
|
|
assert any("failed to append event" in r.message for r in caplog.records)
|
|
|
|
def test_human_oversight_flag_default(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1", human_oversight_flag=True)
|
|
hooks._session = mem_session
|
|
hooks.on_task_start()
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.human_oversight_flag is True
|
|
|
|
def test_risk_flag_propagated(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_llm_call(model="m", risk_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.risk_flag is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# hooks — extended coverage (26 new cases to reach 26-case total)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestLedgerHooksExtended:
|
|
"""Extended coverage for molecule_audit.hooks — fills all uncovered branches.
|
|
|
|
Existing TestLedgerHooks covers the golden-path cases.
|
|
This class covers: _to_bytes, session lifecycle, agent_id defaults,
|
|
None/empty inputs, override flags, risk propagation, and edge cases.
|
|
"""
|
|
|
|
# ── _to_bytes ──────────────────────────────────────────────────────────────
|
|
|
|
def test_to_bytes_none(self):
|
|
from molecule_audit.hooks import _to_bytes
|
|
assert _to_bytes(None) is None
|
|
|
|
def test_to_bytes_bytes_returns_same(self):
|
|
from molecule_audit.hooks import _to_bytes
|
|
data = b"\x00\xff"
|
|
assert _to_bytes(data) == data
|
|
|
|
def test_to_bytes_str_returns_utf8(self):
|
|
from molecule_audit.hooks import _to_bytes
|
|
assert _to_bytes("café") == "café".encode("utf-8")
|
|
|
|
def test_to_bytes_dict_is_json_deterministic(self):
|
|
from molecule_audit.hooks import _to_bytes
|
|
d = {"b": 2, "a": 1}
|
|
result = _to_bytes(d)
|
|
# Must be valid UTF-8 JSON
|
|
import json
|
|
parsed = json.loads(result.decode("utf-8"))
|
|
assert parsed == {"a": 1, "b": 2} # sort_keys=True
|
|
# Same dict produces same bytes (deterministic)
|
|
assert _to_bytes(d) == result
|
|
|
|
def test_to_bytes_list_is_json(self):
|
|
from molecule_audit.hooks import _to_bytes
|
|
result = _to_bytes([1, "two", {"three": 3}])
|
|
import json
|
|
parsed = json.loads(result.decode("utf-8"))
|
|
assert parsed == [1, "two", {"three": 3}]
|
|
|
|
# ── _DEFAULT_AGENT_ID ─────────────────────────────────────────────────────
|
|
|
|
def test_agent_id_defaults_to_workspace_id_env(self, monkeypatch):
|
|
import molecule_audit.hooks as hooks
|
|
monkeypatch.setenv("WORKSPACE_ID", "env-workspace-42")
|
|
# Reset so it picks up the new env value
|
|
hooks._DEFAULT_AGENT_ID = hooks.os.environ.get("WORKSPACE_ID", "unknown-agent")
|
|
h = hooks.LedgerHooks(session_id="s")
|
|
assert h.agent_id == "env-workspace-42"
|
|
|
|
def test_agent_id_overrides_env(self):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
h = LedgerHooks(session_id="s", agent_id="explicit-agent")
|
|
assert h.agent_id == "explicit-agent"
|
|
|
|
# ── Session lifecycle ─────────────────────────────────────────────────────
|
|
|
|
def test_session_is_lazy(self, mem_session):
|
|
"""_open_session is not called until first on_* method."""
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
# Session must NOT be opened until needed
|
|
assert hooks._session is None
|
|
|
|
def test_session_reused_across_calls(self, mem_session):
|
|
"""Multiple on_* calls share the same SQLAlchemy session."""
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(input_text="start")
|
|
hooks.on_task_end(output_text="end")
|
|
# Both events written to the same session
|
|
assert mem_session.query(
|
|
__import__("molecule_audit.ledger", fromlist=["AuditEvent"]).AuditEvent
|
|
).count() == 2
|
|
|
|
def test_close_when_session_is_none(self):
|
|
"""close() is safe to call when no session was ever opened."""
|
|
from molecule_audit.hooks import LedgerHooks
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks.close() # must not raise
|
|
assert hooks._session is None
|
|
|
|
def test_context_manager_releases_on_exception(self, mem_session):
|
|
"""__exit__ closes session even when an exception propagates."""
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
with pytest.raises(ZeroDivisionError):
|
|
with hooks:
|
|
hooks.on_task_start(input_text="start")
|
|
raise ZeroDivisionError("boom")
|
|
# Session must still be closed
|
|
assert hooks._session is None
|
|
|
|
# ── on_task_start None/empty inputs ───────────────────────────────────────
|
|
|
|
def test_on_task_start_none_input(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(input_text=None)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.input_hash is None
|
|
assert ev.operation == "task_start"
|
|
|
|
def test_on_task_start_risk_flag_true(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(risk_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.risk_flag is True
|
|
|
|
def test_on_task_start_oversight_flag_override(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1", human_oversight_flag=False)
|
|
hooks._session = mem_session
|
|
hooks.on_task_start(human_oversight_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.human_oversight_flag is True
|
|
|
|
# ── on_llm_call None/empty inputs ─────────────────────────────────────────
|
|
|
|
def test_on_llm_call_none_input_and_output(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_llm_call(model="m", input_text=None, output_text=None)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.input_hash is None
|
|
assert ev.output_hash is None
|
|
assert ev.model_used == "m"
|
|
|
|
def test_on_llm_call_risk_flag_true(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_llm_call(model="m", risk_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.risk_flag is True
|
|
|
|
# ── on_tool_call None/empty inputs ────────────────────────────────────────
|
|
|
|
def test_on_tool_call_bytes_input(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks, _to_bytes
|
|
from molecule_audit.ledger import AuditEvent, hash_content
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
binary = b"binary data \x00\xff"
|
|
hooks.on_tool_call("read_file", input_data=binary)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.input_hash == hash_content(binary)
|
|
assert ev.model_used == "read_file"
|
|
|
|
def test_on_tool_call_none_input_and_output(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_tool_call("echo", input_data=None, output_data=None)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.input_hash is None
|
|
assert ev.output_hash is None
|
|
assert ev.model_used == "echo"
|
|
|
|
def test_on_tool_call_risk_flag_true(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_tool_call("write_file", risk_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.risk_flag is True
|
|
|
|
# ── on_task_end None/empty inputs ─────────────────────────────────────────
|
|
|
|
def test_on_task_end_none_output(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_end(output_text=None)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.output_hash is None
|
|
assert ev.operation == "task_end"
|
|
|
|
def test_on_task_end_risk_flag_true(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
hooks.on_task_end(risk_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.risk_flag is True
|
|
|
|
def test_on_task_end_oversight_flag_override(self, mem_session):
|
|
from molecule_audit.hooks import LedgerHooks
|
|
from molecule_audit.ledger import AuditEvent
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1", human_oversight_flag=False)
|
|
hooks._session = mem_session
|
|
hooks.on_task_end(human_oversight_flag=True)
|
|
hooks.close()
|
|
|
|
ev = mem_session.query(AuditEvent).first()
|
|
assert ev.human_oversight_flag is True
|
|
|
|
# ── _safe_append exception swallowing ─────────────────────────────────────
|
|
|
|
def test_safe_append_swallows_session_error(self, mem_session, caplog):
|
|
"""_safe_append logs a warning when append_event raises."""
|
|
import logging
|
|
from molecule_audit.hooks import LedgerHooks
|
|
|
|
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
|
hooks._session = mem_session
|
|
|
|
# Force an error by making the session raise on commit
|
|
orig_commit = mem_session.commit
|
|
def bad_commit():
|
|
raise RuntimeError("simulated DB error")
|
|
mem_session.commit = bad_commit
|
|
|
|
with caplog.at_level(logging.WARNING, logger="molecule_audit.hooks"):
|
|
hooks.on_task_start(input_text="test")
|
|
|
|
mem_session.commit = orig_commit # restore
|
|
assert any("failed to append event" in r.message for r in caplog.records)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# verify.py CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestVerifyCLI:
|
|
|
|
def test_valid_chain_exits_zero(self, mem_session, monkeypatch, capsys):
|
|
import molecule_audit.ledger as ledger
|
|
from molecule_audit.ledger import append_event
|
|
from molecule_audit.verify import main
|
|
|
|
# Write a short chain
|
|
for op in ("task_start", "llm_call", "task_end"):
|
|
append_event("cli-agent", "s", op, db_session=mem_session)
|
|
|
|
# Patch get_session_factory to return our in-memory session
|
|
factory_mock = MagicMock(return_value=mem_session)
|
|
monkeypatch.setattr(
|
|
"molecule_audit.ledger.get_session_factory",
|
|
lambda db_url: factory_mock,
|
|
)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
main(["--agent-id", "cli-agent"])
|
|
|
|
assert exc_info.value.code == 0
|
|
captured = capsys.readouterr()
|
|
assert "CHAIN VALID" in captured.out
|
|
assert "3 events" in captured.out
|
|
|
|
def test_no_events_exits_zero(self, mem_session, monkeypatch, capsys):
|
|
from molecule_audit.verify import main
|
|
|
|
factory_mock = MagicMock(return_value=mem_session)
|
|
monkeypatch.setattr(
|
|
"molecule_audit.ledger.get_session_factory",
|
|
lambda db_url: factory_mock,
|
|
)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
main(["--agent-id", "ghost-agent"])
|
|
|
|
assert exc_info.value.code == 0
|
|
captured = capsys.readouterr()
|
|
assert "No audit events" in captured.out
|
|
|
|
def test_broken_chain_exits_one(self, mem_session, monkeypatch, capsys):
|
|
from molecule_audit.ledger import AuditEvent, append_event
|
|
from molecule_audit.verify import main
|
|
|
|
ev = append_event("broken-agent", "s", "task_start", db_session=mem_session)
|
|
# Corrupt the HMAC
|
|
mem_session.query(AuditEvent).filter(AuditEvent.id == ev.id).update(
|
|
{"hmac": "b" * 64}
|
|
)
|
|
mem_session.commit()
|
|
mem_session.expire_all()
|
|
|
|
factory_mock = MagicMock(return_value=mem_session)
|
|
monkeypatch.setattr(
|
|
"molecule_audit.ledger.get_session_factory",
|
|
lambda db_url: factory_mock,
|
|
)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
main(["--agent-id", "broken-agent"])
|
|
|
|
assert exc_info.value.code == 1
|
|
captured = capsys.readouterr()
|
|
assert "CHAIN BROKEN" in captured.out
|
|
|
|
def test_missing_salt_exits_two(self, monkeypatch, capsys):
|
|
import molecule_audit.ledger as ledger
|
|
from molecule_audit.verify import main
|
|
|
|
ledger.reset_hmac_key_cache()
|
|
monkeypatch.delenv("AUDIT_LEDGER_SALT", raising=False)
|
|
|
|
# Patch get_session_factory to raise RuntimeError (simulates SALT check)
|
|
def _raise(*a, **kw):
|
|
raise RuntimeError("AUDIT_LEDGER_SALT environment variable is required but not set.")
|
|
|
|
monkeypatch.setattr("molecule_audit.ledger.get_session_factory", _raise)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
main(["--agent-id", "any"])
|
|
|
|
# The RuntimeError should be caught and cause exit(2) or exit(3)
|
|
assert exc_info.value.code in (2, 3)
|