molecule-core/workspace-template/builtin_tools/audit.py
Hongming Wang 24fec62d7f initial commit — Molecule AI platform
Forked clean from public hackathon repo (Starfire-AgentTeam, BSL 1.1)
with full rebrand to Molecule AI under github.com/Molecule-AI/molecule-monorepo.

Brand: Starfire → Molecule AI.
Slug: starfire / agent-molecule → molecule.
Env vars: STARFIRE_* → MOLECULE_*.
Go module: github.com/agent-molecule/platform → github.com/Molecule-AI/molecule-monorepo/platform.
Python packages: starfire_plugin → molecule_plugin, starfire_agent → molecule_agent.
DB: agentmolecule → molecule.

History truncated; see public repo for prior commits and contributor
attribution. Verified green: go test -race ./... (platform), pytest
(workspace-template 1129 + sdk 132), vitest (canvas 352), build (mcp).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 11:55:37 -07:00

275 lines
10 KiB
Python

"""Immutable append-only audit log for EU AI Act compliance.
Fulfils Article 12 (record-keeping), Article 13 (transparency), and
Article 17 (quality-management system) requirements for high-risk AI systems.
Log format: JSON Lines (one UTF-8 JSON object per line), suitable for direct
ingestion by any SIEM (Splunk, Elastic, Datadog, etc.).
Required event fields
---------------------
timestamp ISO 8601 UTC datetime with timezone offset
event_type Coarse category: "delegation", "approval", "memory", "rbac"
workspace_id Workspace that generated this event
actor Entity that triggered the action; defaults to workspace_id for
automated events, or the human identity for approval decisions
action Verb describing what was attempted:
delegate | approve | memory.read | memory.write | rbac.deny
resource Object of the action: target workspace ID, memory scope,
approval action string, etc.
outcome One of: allowed | denied | success | failure | timeout |
requested | granted
trace_id UUID v4 correlating related events across workspaces
The log file is opened in append mode ("a") on every write — it is NEVER
truncated, rewritten, or deleted by this module. Rotate externally using
logrotate (with ``copytruncate`` disabled) or ship to a SIEM before rotating.
Configuration
-------------
AUDIT_LOG_PATH env var — full path to the JSONL file
default: /var/log/molecule/audit.jsonl
"""
from __future__ import annotations
import functools
import json
import logging
import os
import threading
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
pass # avoid circular import at runtime
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
AUDIT_LOG_PATH: str = os.environ.get(
"AUDIT_LOG_PATH", "/var/log/molecule/audit.jsonl"
)
WORKSPACE_ID: str = os.environ.get("WORKSPACE_ID", "")
# Protects the open() + write() sequence; prevents interleaved JSON lines
# when multiple async tasks run in the same event-loop thread.
_write_lock = threading.Lock()
# ---------------------------------------------------------------------------
# Built-in role → permitted-action mappings
# ---------------------------------------------------------------------------
#: Maps each built-in role name to the set of actions it grants.
#: Custom roles can be added in config.yaml under ``rbac.allowed_actions``.
ROLE_PERMISSIONS: dict[str, set[str]] = {
# Full access — shortcircuits all other checks
"admin": {"delegate", "approve", "memory.read", "memory.write"},
# Standard agent role
"operator": {"delegate", "approve", "memory.read", "memory.write"},
# Read-only observer — no writes, no delegation, no approvals
"read-only": {"memory.read"},
# Can approve and write memory, but cannot delegate
"no-delegation": {"approve", "memory.read", "memory.write"},
# Can delegate and write memory, but cannot invoke approval gate
"no-approval": {"delegate", "memory.read", "memory.write"},
# Memory reads only (useful for analytic sidecars)
"memory-readonly": {"memory.read"},
}
# ---------------------------------------------------------------------------
# Config loader (lazy, cached per process)
# ---------------------------------------------------------------------------
@functools.lru_cache(maxsize=1)
def _load_workspace_config():
"""Return the WorkspaceConfig or None if it cannot be loaded."""
try:
from config import load_config # local import avoids circular deps
return load_config()
except Exception as exc:
logger.warning("audit: could not load workspace config for RBAC: %s", exc)
return None
def get_workspace_roles() -> tuple[list[str], dict[str, list[str]]]:
"""Return ``(roles, custom_permissions)`` from the workspace config.
Falls back to ``["operator"]`` / ``{}`` when the config is unavailable so
that agents remain functional in degraded environments.
"""
cfg = _load_workspace_config()
if cfg is None:
return ["operator"], {}
return list(cfg.rbac.roles), dict(cfg.rbac.allowed_actions)
# ---------------------------------------------------------------------------
# RBAC helpers
# ---------------------------------------------------------------------------
def check_permission(
action: str,
roles: list[str],
custom_permissions: dict[str, list[str]] | None = None,
) -> bool:
"""Return True if *any* of ``roles`` grants ``action``.
Evaluation order
~~~~~~~~~~~~~~~~
1. ``"admin"`` shortcircuits — always grants everything.
2. Custom role definitions (from ``rbac.allowed_actions`` in config.yaml).
3. Built-in :data:`ROLE_PERMISSIONS` table.
When a role appears in *custom_permissions* its built-in definition is
**ignored** — the custom list is the complete permission set for that role.
Args:
action: Action to authorise, e.g. ``"delegate"``.
roles: Roles assigned to the calling workspace.
custom_permissions: Optional ``{role: [action, ...]}`` mapping loaded
from ``WorkspaceConfig.rbac.allowed_actions``.
Returns:
``True`` if the action is permitted, ``False`` otherwise.
Examples::
>>> check_permission("delegate", ["operator"])
True
>>> check_permission("delegate", ["read-only"])
False
>>> check_permission("deploy", ["developer"], {"developer": ["deploy"]})
True
"""
for role in roles:
if role == "admin":
return True
if custom_permissions and role in custom_permissions:
# Custom entry is definitive for this role
if action in custom_permissions[role]:
return True
continue # Don't fall through to built-ins for custom roles
if role in ROLE_PERMISSIONS and action in ROLE_PERMISSIONS[role]:
return True
return False
# ---------------------------------------------------------------------------
# Public audit API
# ---------------------------------------------------------------------------
def log_event(
event_type: str,
action: str,
resource: str,
outcome: str,
actor: str | None = None,
trace_id: str | None = None,
**extra: Any,
) -> str:
"""Append one audit event to the immutable JSON Lines log.
Args:
event_type: Coarse category — ``"delegation"``, ``"approval"``,
``"memory"``, or ``"rbac"``.
action: Verb — ``"delegate"``, ``"approve"``, ``"memory.write"``,
``"memory.read"``, ``"rbac.deny"``.
resource: Object of the action — target workspace ID, memory scope,
approval action string, etc.
outcome: Terminal state — one of ``"allowed"``, ``"denied"``,
``"success"``, ``"failure"``, ``"timeout"``,
``"requested"``, ``"granted"``.
actor: Identity that triggered the event. Defaults to
``WORKSPACE_ID`` (the running workspace) for automated
events. Pass ``decided_by`` for human approval decisions.
trace_id: Caller-supplied UUID v4 for cross-event correlation.
A fresh UUID is generated when omitted.
**extra: Additional key-value pairs appended verbatim to the JSON
object (e.g. ``target_workspace_id``, ``memory_scope``,
``attempt``). Built-in keys cannot be overridden.
Returns:
The ``trace_id`` used for this event, enabling callers to chain
related events under a single correlation identifier.
Example::
trace = log_event(
event_type="delegation",
action="delegate",
resource="billing-agent",
outcome="success",
target_workspace_id="billing-agent",
attempt=1,
)
"""
if trace_id is None:
trace_id = str(uuid.uuid4())
event: dict[str, Any] = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": event_type,
"workspace_id": WORKSPACE_ID,
"actor": actor if actor is not None else WORKSPACE_ID,
"action": action,
"resource": resource,
"outcome": outcome,
"trace_id": trace_id,
}
# Merge extra fields — built-in keys are not overridable
for key, value in extra.items():
if key not in event:
event[key] = value
_write_event(event)
return trace_id
# ---------------------------------------------------------------------------
# Internal writer
# ---------------------------------------------------------------------------
def _ensure_log_dir(path: str) -> None:
"""Create the parent directory for *path* if it does not already exist."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
def _write_event(event: dict[str, Any]) -> None:
"""Serialise *event* as a JSON line and fsync-append it to the log file.
The write is atomic with respect to other threads in this process: the
lock ensures that no two JSON objects are interleaved on the same line.
Failures are emitted to the standard Python logger at WARNING level but
are **never** re-raised — the application must not crash because audit
logging is temporarily unavailable (e.g. disk full, permission error).
In production, consider wiring an alert on WARNING messages from this
module so that missing audit records are detected quickly.
"""
try:
log_path = AUDIT_LOG_PATH
_ensure_log_dir(log_path)
line = json.dumps(event, default=str, ensure_ascii=False) + "\n"
with _write_lock:
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(line)
fh.flush()
os.fsync(fh.fileno())
except Exception as exc: # pylint: disable=broad-except
logger.warning(
"Audit log write failed — event NOT persisted "
"(trace_id=%s, action=%s): %s",
event.get("trace_id", "?"),
event.get("action", "?"),
exc,
)