Implements EU AI Act Annex III compliance (Art. 12 record-keeping, Art. 13
transparency) via an append-only HMAC-SHA256-chained agent event log.
Python (workspace-template/molecule_audit/):
- ledger.py: SQLAlchemy 2.0 AuditEvent model + PBKDF2 key derivation +
append_event() with prev_hmac chain linkage + verify_chain() CLI helper.
- hooks.py: LedgerHooks — on_task_start/on_llm_call/on_tool_call/on_task_end
pipeline hooks; exception-safe (_safe_append); context manager support.
- verify.py: `python -m molecule_audit.verify --agent-id <id>` CLI;
exits 0=valid, 1=broken, 2=missing SALT, 3=DB error.
- tests/test_audit_ledger.py: 46 tests covering HMAC determinism, field
sensitivity, chain verification, LedgerHooks lifecycle, CLI.
Go (platform/):
- migrations/028_audit_events.up.sql: audit_events table with indexes.
- internal/handlers/audit.go: GET /workspaces/:id/audit — parameterized
queries, inline chain verification (chain_valid: bool|null), PBKDF2
key cached via sync.Once.
- internal/handlers/audit_test.go: 14 tests — HMAC, chain verify, handler
query/filter/pagination/cap/error paths.
- internal/router/router.go: wire wsAuth.GET("/audit", audh.Query).
- .env.example: document AUDIT_LEDGER_SALT.
- requirements.txt: add sqlalchemy>=2.0.0.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
136 lines
4.1 KiB
Python
136 lines
4.1 KiB
Python
"""molecule_audit.verify — CLI to verify an agent's HMAC chain integrity.
|
|
|
|
Usage
|
|
-----
|
|
python -m molecule_audit.verify --agent-id <id> [--db <url>]
|
|
|
|
Options
|
|
-------
|
|
--agent-id Agent ID whose chain to verify (required).
|
|
--db SQLAlchemy DB URL override.
|
|
Defaults to AUDIT_LEDGER_DB env var or /var/log/molecule/audit_ledger.db.
|
|
|
|
Exit codes
|
|
----------
|
|
0 Chain is valid (or no events found for this agent).
|
|
1 Chain is broken — tampered or corrupted row(s) detected.
|
|
2 Configuration error (e.g. AUDIT_LEDGER_SALT not set).
|
|
3 Database error (e.g. file not found, connection refused).
|
|
|
|
Example
|
|
-------
|
|
export AUDIT_LEDGER_SALT=<your-secret>
|
|
export AUDIT_LEDGER_DB=/var/log/molecule/audit_ledger.db
|
|
python -m molecule_audit.verify --agent-id my-workspace-id
|
|
# CHAIN VALID (42 events)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
|
|
|
|
def main(argv=None) -> None:
|
|
parser = argparse.ArgumentParser(
|
|
prog="python -m molecule_audit.verify",
|
|
description=(
|
|
"Verify the HMAC chain integrity for a given agent's audit log. "
|
|
"Exit 0 = valid, 1 = broken, 2 = config error, 3 = DB error."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--agent-id",
|
|
required=True,
|
|
metavar="AGENT_ID",
|
|
help="Agent workspace ID to verify.",
|
|
)
|
|
parser.add_argument(
|
|
"--db",
|
|
default=None,
|
|
metavar="URL",
|
|
help=(
|
|
"SQLAlchemy DB URL (e.g. sqlite:///path.db or "
|
|
"postgresql://user:pass@host/db). "
|
|
"Defaults to AUDIT_LEDGER_DB env var."
|
|
),
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
# Defer imports so errors in configuration (missing SALT) produce clean output.
|
|
try:
|
|
from molecule_audit.ledger import (
|
|
AuditEvent,
|
|
_compute_event_hmac,
|
|
get_session_factory,
|
|
verify_chain,
|
|
)
|
|
except RuntimeError as exc:
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
try:
|
|
factory = get_session_factory(args.db)
|
|
session = factory()
|
|
except Exception as exc:
|
|
print(f"ERROR: could not open database: {exc}", file=sys.stderr)
|
|
sys.exit(3)
|
|
|
|
try:
|
|
from sqlalchemy import asc
|
|
|
|
n_events = (
|
|
session.query(AuditEvent)
|
|
.filter(AuditEvent.agent_id == args.agent_id)
|
|
.count()
|
|
)
|
|
|
|
if n_events == 0:
|
|
print(f"No audit events found for agent_id={args.agent_id!r}")
|
|
sys.exit(0)
|
|
|
|
valid = verify_chain(args.agent_id, session)
|
|
|
|
if valid:
|
|
print(f"CHAIN VALID ({n_events} events)")
|
|
sys.exit(0)
|
|
else:
|
|
# Walk the chain manually to report the exact broken event.
|
|
events = (
|
|
session.query(AuditEvent)
|
|
.filter(AuditEvent.agent_id == args.agent_id)
|
|
.order_by(asc(AuditEvent.timestamp), asc(AuditEvent.id))
|
|
.all()
|
|
)
|
|
expected_prev = None
|
|
for ev in events:
|
|
expected_hmac = _compute_event_hmac(ev)
|
|
if ev.hmac != expected_hmac:
|
|
print(
|
|
f"CHAIN BROKEN at event {ev.id} "
|
|
f"(HMAC mismatch: stored={ev.hmac[:12]}... "
|
|
f"computed={expected_hmac[:12]}...)"
|
|
)
|
|
sys.exit(1)
|
|
if ev.prev_hmac != expected_prev:
|
|
print(
|
|
f"CHAIN BROKEN at event {ev.id} "
|
|
f"(prev_hmac mismatch: stored={ev.prev_hmac} "
|
|
f"expected={expected_prev})"
|
|
)
|
|
sys.exit(1)
|
|
expected_prev = ev.hmac
|
|
# verify_chain said broken but we couldn't find the exact event
|
|
print(f"CHAIN BROKEN (position unknown; run with DEBUG logging)")
|
|
sys.exit(1)
|
|
|
|
except Exception as exc:
|
|
print(f"ERROR: verification failed: {exc}", file=sys.stderr)
|
|
sys.exit(3)
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|