molecule-core/workspace/builtin_tools/governance.py
Hongming Wang d8026347e5 chore: open-source restructure — rename dirs, remove internal files, scrub secrets
Renames:
- platform/ → workspace-server/ (Go module path stays as "platform" for
  external dep compat — will update after plugin module republish)
- workspace-template/ → workspace/

Removed (moved to separate repos or deleted):
- PLAN.md — internal roadmap (move to private project board)
- HANDOFF.md, AGENTS.md — one-time internal session docs
- .claude/ — gitignored entirely (local agent config)
- infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy
- org-templates/molecule-dev/ → standalone template repo
- .mcp-eval/ → molecule-mcp-server repo
- test-results/ — ephemeral, gitignored

Security scrubbing:
- Cloudflare account/zone/KV IDs → placeholders
- Real EC2 IPs → <EC2_IP> in all docs
- CF token prefix, Neon project ID, Fly app names → redacted
- Langfuse dev credentials → parameterized
- Personal runner username/machine name → generic

Community files:
- CONTRIBUTING.md — build, test, branch conventions
- CODE_OF_CONDUCT.md — Contributor Covenant 2.1

All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml,
README, CLAUDE.md updated for new directory names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 00:24:44 -07:00

404 lines
14 KiB
Python

"""Bridge between Molecule AI's RBAC + audit subsystem and the Microsoft Agent
Governance Toolkit (agent-os-kernel, released April 2, 2026).
Integration points
------------------
* ``check_permission`` → ``PolicyEvaluator.evaluate()``
Molecule AI's RBAC gate runs first; if RBAC allows the action the toolkit
evaluator is consulted according to ``policy_mode``.
* ``log_event`` → governance audit sink
Every permission decision (allow or deny) is written via
``tools.audit.log_event`` with extra governance metadata so the full
decision trail lands in Molecule AI's existing audit stream.
* OTEL traceparent flows through
``tools.telemetry.get_current_traceparent()`` is called inside ``emit()``
and the W3C traceparent string is attached to every audit record, giving
end-to-end distributed tracing across agent boundaries.
Graceful degradation
--------------------
If ``agent-os-kernel`` is not installed the module falls back to Molecule AI
RBAC alone. No exception propagates to the agent — governance is a
best-effort overlay, never a hard dependency.
Install::
pip install agent-os-kernel
Minimal config.yaml snippet::
governance:
enabled: true
toolkit: microsoft
policy_mode: strict # strict | permissive | audit
policy_endpoint: https://your-tenant.governance.azure.com
policy_file: policies/workspace.rego
blocked_patterns:
- ".*\\.exec$"
- "shell\\."
max_tool_calls_per_task: 50
NOTE: The agent-os-kernel package was released April 2, 2026 and is in
community preview. The API bindings in this module target v3.0.x of the
package (agent_os.policies.PolicyEvaluator). If the package API changes,
update _init_evaluator() accordingly.
"""
import logging
import os
from typing import Any, Optional
logger = logging.getLogger(__name__)
WORKSPACE_ID: str = os.environ.get("WORKSPACE_ID", "")
# Module-level singleton — set by initialize_governance() at startup
_adapter: Optional["GovernanceAdapter"] = None
class GovernanceAdapter:
"""Bridges Molecule AI RBAC + audit trail to the Microsoft Agent Governance Toolkit."""
def __init__(self, config: Any) -> None:
self._config = config
self._evaluator = None
self._toolkit_available: bool = False
async def initialize(self) -> None:
"""Async entry point: initialise evaluator and log outcome."""
self._init_evaluator()
if self._toolkit_available:
logger.info(
"GovernanceAdapter initialised — toolkit=%s mode=%s",
self._config.toolkit,
self._config.policy_mode,
)
else:
logger.warning(
"GovernanceAdapter initialised in RBAC-only mode "
"(agent-os-kernel not available or failed to load)."
)
def _init_evaluator(self) -> None:
"""Lazy-import and configure the PolicyEvaluator from agent-os-kernel.
All failures are caught and logged; the adapter simply runs without
the toolkit rather than crashing the workspace.
"""
try:
try:
from agent_os.policies import PolicyEvaluator # type: ignore[import]
except ImportError:
logger.warning(
"agent-os-kernel is not installed — graceful degradation active. "
"Governance will use Molecule AI RBAC only. "
"To enable the Microsoft Agent Governance Toolkit run: "
"pip install agent-os-kernel"
)
return
kwargs: dict[str, Any] = {
"policy_mode": self._config.policy_mode,
"max_tool_calls_per_task": self._config.max_tool_calls_per_task,
"blocked_patterns": self._config.blocked_patterns,
}
if self._config.policy_endpoint:
kwargs["endpoint"] = self._config.policy_endpoint
self._evaluator = PolicyEvaluator(**kwargs)
# Load a policy file if one is configured and exists on disk.
if self._config.policy_file:
policy_file = self._config.policy_file
if os.path.exists(policy_file):
ext = os.path.splitext(policy_file)[1].lower()
if ext == ".rego":
self._evaluator.load_rego(path=policy_file)
logger.info("Loaded Rego policy file: %s", policy_file)
elif ext in (".yaml", ".yml"):
self._evaluator.load_yaml(path=policy_file)
logger.info("Loaded YAML policy file: %s", policy_file)
elif ext == ".cedar":
self._evaluator.load_cedar(path=policy_file)
logger.info("Loaded Cedar policy file: %s", policy_file)
else:
logger.warning(
"Unrecognised policy file extension '%s' — skipping load.",
ext,
)
else:
logger.warning(
"policy_file '%s' does not exist — skipping load.",
policy_file,
)
self._toolkit_available = True
logger.info(
"agent-os-kernel PolicyEvaluator ready — policy_mode=%s",
self._config.policy_mode,
)
except Exception as exc: # noqa: BLE001
logger.warning(
"Failed to initialise agent-os-kernel PolicyEvaluator: %s"
"graceful degradation active (RBAC only).",
exc,
)
def check_permission(
self,
action: str,
roles: list[str],
custom_permissions: dict | None = None,
context: dict | None = None,
) -> tuple[bool, str]:
"""Evaluate an action against Molecule AI RBAC and (optionally) the toolkit.
Returns
-------
tuple[bool, str]
``(allowed, reason)`` — reason is a short human-readable string
explaining the decision.
"""
from builtin_tools import audit # inline import to avoid circular dependencies
context = context or {}
# --- Step 1: Molecule AI RBAC gate (always runs) ---
rbac_allowed: bool = audit.check_permission(action, roles, custom_permissions)
if not rbac_allowed:
self.emit(
event_type="permission_check",
action=action,
resource=context.get("resource", ""),
outcome="denied",
actor=context.get("actor"),
policy_decision="rbac_deny",
roles=roles,
)
return False, f"RBAC denied action '{action}' for roles {roles}"
# --- Step 2: If toolkit unavailable or audit-only mode, return RBAC result ---
if not self._toolkit_available or self._config.policy_mode == "audit":
self.emit(
event_type="permission_check",
action=action,
resource=context.get("resource", ""),
outcome="allowed",
actor=context.get("actor"),
policy_decision="rbac_allowed",
roles=roles,
toolkit_mode=self._config.policy_mode,
)
return rbac_allowed, "rbac_allowed"
# --- Step 3: Toolkit evaluation ---
eval_context: dict[str, Any] = {
"action": action,
"resource": context.get("resource", ""),
"roles": roles,
"workspace_id": WORKSPACE_ID,
}
# Merge any extra context keys the caller supplied.
for key, value in context.items():
if key not in eval_context:
eval_context[key] = value
toolkit_allowed: bool = True
reason: str = ""
evaluator_name: str = "agent-os-kernel"
try:
decision = self._evaluator.evaluate(eval_context)
toolkit_allowed = getattr(decision, "allowed", True)
reason = getattr(decision, "reason", "")
evaluator_name = getattr(decision, "evaluator_name", "agent-os-kernel")
except Exception as exc: # noqa: BLE001
logger.warning(
"agent-os-kernel evaluation raised an exception: %s"
"falling back to RBAC result to avoid blocking the agent.",
exc,
)
self.emit(
event_type="permission_check",
action=action,
resource=context.get("resource", ""),
outcome="allowed",
actor=context.get("actor"),
policy_decision="toolkit_evaluation_error",
toolkit_mode=self._config.policy_mode,
roles=roles,
)
return rbac_allowed, "toolkit_evaluation_error"
# --- Step 4: Combine results according to policy_mode ---
if self._config.policy_mode == "permissive":
# Toolkit denial is advisory only in permissive mode.
if not toolkit_allowed:
logger.warning(
"Governance toolkit denied action '%s' (reason=%s) but policy_mode "
"is 'permissive' — allowing and logging advisory denial.",
action,
reason,
)
final_allowed = rbac_allowed
else:
# strict: both gates must allow.
final_allowed = rbac_allowed and toolkit_allowed
outcome = "allowed" if final_allowed else "denied"
self.emit(
event_type="permission_check",
action=action,
resource=context.get("resource", ""),
outcome=outcome,
actor=context.get("actor"),
policy_decision=reason or outcome,
evaluator=evaluator_name,
toolkit_mode=self._config.policy_mode,
roles=roles,
)
return final_allowed, reason or "allowed"
def emit(
self,
event_type: str,
action: str,
resource: str,
outcome: str,
actor: str | None = None,
trace_id: str | None = None,
**extra: Any,
) -> str:
"""Write a governance-annotated audit event.
Pulls the current W3C traceparent from the active OTEL span so that
governance decisions are traceable across service boundaries.
Returns
-------
str
The ``trace_id`` produced by ``audit.log_event``.
"""
from builtin_tools import audit # inline import to avoid circular dependencies
from builtin_tools.telemetry import get_current_traceparent # inline import
traceparent: str | None = get_current_traceparent()
recorded_trace_id: str = audit.log_event(
event_type,
action,
resource,
outcome,
actor=actor,
trace_id=trace_id,
governance_toolkit=(
self._config.toolkit if self._toolkit_available else "disabled"
),
traceparent=traceparent or "",
**extra,
)
return recorded_trace_id
# ---------------------------------------------------------------------------
# Module-level functions
# ---------------------------------------------------------------------------
async def initialize_governance(config: Any) -> Optional[GovernanceAdapter]:
"""Initialize the module-level GovernanceAdapter singleton.
Called once at startup by main.py when governance.enabled is True.
Returns the adapter, or None if initialization fails.
"""
global _adapter
try:
adapter = GovernanceAdapter(config)
await adapter.initialize()
_adapter = adapter
logger.info(
"Governance singleton initialised — toolkit=%s mode=%s",
config.toolkit,
config.policy_mode,
)
return adapter
except Exception as exc: # noqa: BLE001
logger.warning(
"initialize_governance() failed: %s — governance disabled for this session.",
exc,
)
return None
def get_governance_adapter() -> Optional[GovernanceAdapter]:
"""Return the module-level GovernanceAdapter singleton (may be None)."""
return _adapter
def check_permission_with_governance(
action: str,
roles: list[str],
custom_permissions: dict | None = None,
context: dict | None = None,
) -> tuple[bool, str]:
"""Convenience wrapper: use GovernanceAdapter when available, else RBAC only.
Parameters
----------
action:
The action name to evaluate (e.g. ``"memory.write"``).
roles:
The list of role names held by the requesting actor.
custom_permissions:
Optional custom role→action mapping to overlay on built-in roles.
context:
Optional extra context forwarded to the PolicyEvaluator.
Returns
-------
tuple[bool, str]
``(allowed, reason)``
"""
if _adapter is None:
from builtin_tools import audit # inline import to avoid circular dependencies
result: bool = audit.check_permission(action, roles, custom_permissions)
return result, "rbac_only"
return _adapter.check_permission(action, roles, custom_permissions, context)
# ---------------------------------------------------------------------------
# Private helper
# ---------------------------------------------------------------------------
def _emit_governance_event(
event_type: str,
action: str,
resource: str,
outcome: str,
actor: str | None = None,
trace_id: str | None = None,
**extra: Any,
) -> Optional[str]:
"""Emit a governance audit event via the singleton adapter if one is set.
Returns the trace_id produced by log_event, or None if no adapter is set.
"""
if _adapter is None:
return None
return _adapter.emit(
event_type,
action,
resource,
outcome,
actor=actor,
trace_id=trace_id,
**extra,
)