molecule-core/workspace/builtin_tools/approval.py
molecule-ai[bot] 3bef6af241 fix: apply #1124 env-var defaults + scrub F1088 credentials from INCIDENT_LOG.md (#1347)
- PLATFORM_URL: replace unreachable http://platform:8080 mesh-only default
  with Docker-aware detection (host.docker.internal in containers,
  localhost for local dev) across all workspace Python modules and the
  git-token-helper shell script.
- WORKSPACE_ID: add fail-fast validation in main.py (SystemExit if empty)
  consistent with coordinator.py / a2a_cli.py patterns already in place.
- INCIDENT_LOG.md: replace all 3 F1088 credential types with
  ***REDACTED*** (sk-cp- 2x, github_pat_ 2x, ADMIN_TOKEN base64 3x).

Fixes #1124, #1333.

Co-authored-by: Molecule AI Dev Lead <dev-lead@agents.moleculesai.app>
2026-04-21 08:11:44 +00:00

321 lines
12 KiB
Python

"""Approval tool for human-in-the-loop workflows.
When an agent encounters a destructive, expensive, or unauthorized action,
it calls request_approval() which creates a request and waits for a decision.
## Notification strategy
By default this module uses a **WebSocket subscription** (APPROVAL_USE_WEBSOCKET=true
or when the ``websockets`` package is installed). The platform pushes an
``APPROVAL_DECIDED`` event to the workspace WebSocket as soon as a human
clicks Approve / Deny on the canvas — no polling required, instant delivery.
If WebSocket is unavailable (env var opt-out or import error) the module
falls back to a **polling loop** so existing deployments without WebSocket
support continue to work without any config change.
RBAC enforcement
----------------
The calling workspace must hold a role that grants the ``"approve"`` action.
Roles are read from ``config.yaml`` under ``rbac.roles`` (default: operator).
Audit trail
-----------
Every approval lifecycle emits structured JSON Lines records:
1. ``approval / approve / requested`` — request submitted to platform
2. ``approval / approve / granted`` — human approved (actor = decided_by)
3. ``approval / approve / denied`` — human denied (actor = decided_by)
4. ``approval / approve / timeout`` — no decision within APPROVAL_TIMEOUT
RBAC denials emit an ``rbac / rbac.deny / denied`` event instead.
Environment variables
---------------------
PLATFORM_URL Platform base URL (default: http://platform:8080)
WORKSPACE_ID This workspace's ID (default: "")
APPROVAL_TIMEOUT Max wait in seconds (default: 300)
APPROVAL_POLL_INTERVAL Polling interval in seconds (default: 5, polling path only)
APPROVAL_USE_WEBSOCKET "true" to force WS, "false"
to force polling (default: auto-detect)
AUDIT_LOG_PATH Path for JSON Lines audit log (default: /var/log/molecule/audit.jsonl)
"""
import asyncio
import json
import logging
import os
import uuid
import httpx
from langchain_core.tools import tool
from builtin_tools.audit import check_permission, get_workspace_roles, log_event
logger = logging.getLogger(__name__)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
APPROVAL_POLL_INTERVAL = float(os.environ.get("APPROVAL_POLL_INTERVAL", "5"))
APPROVAL_TIMEOUT = float(os.environ.get("APPROVAL_TIMEOUT", "300"))
# Auto-detect WebSocket support; can be overridden with env var
_ws_env = os.environ.get("APPROVAL_USE_WEBSOCKET", "").lower()
if _ws_env == "false":
_USE_WEBSOCKET_DEFAULT = False
elif _ws_env == "true":
_USE_WEBSOCKET_DEFAULT = True
else:
try:
import websockets as _ws_probe # noqa: F401
_USE_WEBSOCKET_DEFAULT = True
except ImportError:
_USE_WEBSOCKET_DEFAULT = False
# Module-level reference so tests can monkeypatch it
try:
import websockets
except ImportError:
websockets = None # type: ignore[assignment]
# Expose for test introspection
APPROVAL_USE_WEBSOCKET = _USE_WEBSOCKET_DEFAULT
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _create_approval_request(action: str, reason: str) -> dict:
"""POST to the platform to create an approval request.
Returns {"approval_id": str} on success or {"error": str} on failure.
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/approvals",
json={"action": action, "reason": reason},
)
if resp.status_code != 201:
return {"error": f"Failed to create request: {resp.status_code}"}
try:
approval_id = resp.json().get("approval_id")
except (ValueError, Exception):
return {"error": f"Platform returned invalid JSON (status {resp.status_code})"}
logger.info("Approval requested: %s (id=%s)", action, approval_id)
return {"approval_id": approval_id}
except Exception as e:
return {"error": f"Failed to request approval: {e}"}
async def _wait_websocket(approval_id: str, timeout: float) -> dict:
"""Subscribe to the platform WebSocket and wait for APPROVAL_DECIDED event.
Returns the decision dict or raises asyncio.TimeoutError on expiry.
"""
ws_url = (
PLATFORM_URL.replace("http://", "ws://").replace("https://", "wss://")
+ "/ws"
)
headers = {"X-Workspace-ID": WORKSPACE_ID}
logger.debug("Approval %s: waiting via WebSocket %s", approval_id, ws_url)
async with websockets.connect(ws_url, additional_headers=headers) as ws:
async for raw_message in ws:
try:
event = json.loads(raw_message)
except json.JSONDecodeError:
continue
if event.get("event") != "APPROVAL_DECIDED":
continue
if event.get("approval_id") != approval_id:
continue
status = event.get("status")
decided_by = event.get("decided_by", "")
logger.info("Approval %s decided via WebSocket: %s by %s",
approval_id, status, decided_by)
if status == "approved":
return {
"approved": True,
"approval_id": approval_id,
"decided_by": decided_by,
}
else:
return {
"approved": False,
"approval_id": approval_id,
"decided_by": decided_by,
"message": "Denied by human",
}
async def _wait_polling(approval_id: str, timeout: float) -> dict:
"""Legacy polling loop — checks platform REST endpoint every APPROVAL_POLL_INTERVAL seconds."""
elapsed = 0.0
async with httpx.AsyncClient(timeout=10.0) as client:
while elapsed < timeout:
await asyncio.sleep(APPROVAL_POLL_INTERVAL)
elapsed += APPROVAL_POLL_INTERVAL
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/approvals",
)
if resp.status_code == 200:
for a in resp.json():
if a.get("id") == approval_id:
status = a.get("status")
if status == "approved":
logger.info("Approval granted (poll): %s", approval_id)
return {
"approved": True,
"approval_id": approval_id,
"decided_by": a.get("decided_by"),
}
elif status == "denied":
logger.info("Approval denied (poll): %s", approval_id)
return {
"approved": False,
"approval_id": approval_id,
"decided_by": a.get("decided_by"),
"message": "Denied by human",
}
except Exception:
pass # transient error — keep retrying
raise asyncio.TimeoutError()
# ---------------------------------------------------------------------------
# Public tool
# ---------------------------------------------------------------------------
@tool
async def request_approval(
action: str,
reason: str,
) -> dict:
"""Request human approval before proceeding with a sensitive action.
Use this when you're about to do something destructive, expensive,
or outside your normal authority. The request is sent to the canvas
where a human can approve or deny it.
Args:
action: Short description of what you want to do
reason: Why this action is necessary
"""
# One trace_id links every audit event for this approval lifecycle.
trace_id = str(uuid.uuid4())
# --- RBAC check -----------------------------------------------------------
roles, custom_perms = get_workspace_roles()
if not check_permission("approve", roles, custom_perms):
log_event(
event_type="rbac",
action="rbac.deny",
resource=action,
outcome="denied",
trace_id=trace_id,
attempted_action="approve",
roles=roles,
)
return {
"approved": False,
"error": (
"RBAC: this workspace does not have the 'approve' permission. "
f"Current roles: {roles}"
),
}
# Step 1: Create the approval request
creation = await _create_approval_request(action, reason)
if "error" in creation:
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="failure",
trace_id=trace_id,
reason="submit_failed",
error=creation["error"],
)
return {"approved": False, "error": creation["error"]}
approval_id = creation["approval_id"]
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="requested",
trace_id=trace_id,
approval_id=approval_id,
reason_text=reason,
)
timeout = float(os.environ.get("APPROVAL_TIMEOUT", str(APPROVAL_TIMEOUT)))
# Step 2: Wait for decision — WebSocket preferred, polling as fallback
use_ws = APPROVAL_USE_WEBSOCKET and websockets is not None
try:
if use_ws:
try:
result = await asyncio.wait_for(
_wait_websocket(approval_id, timeout),
timeout=timeout,
)
except Exception as ws_err:
# WebSocket failed (connection error, etc.) — fall through to polling
logger.warning(
"WebSocket approval wait failed (%s), falling back to polling",
ws_err,
)
result = await asyncio.wait_for(
_wait_polling(approval_id, timeout),
timeout=timeout + APPROVAL_POLL_INTERVAL,
)
else:
# Polling path (primary when WS disabled)
result = await asyncio.wait_for(
_wait_polling(approval_id, timeout),
timeout=timeout + APPROVAL_POLL_INTERVAL, # slight grace period
)
# Log the human decision
decided_by = result.get("decided_by")
outcome = "granted" if result.get("approved") else "denied"
log_event(
event_type="approval",
action="approve",
resource=action,
outcome=outcome,
# Record the human identity as actor when available
actor=decided_by or WORKSPACE_ID,
trace_id=trace_id,
approval_id=approval_id,
decided_by=decided_by,
)
return result
except asyncio.TimeoutError:
logger.warning("Approval timed out after %.0fs: %s", timeout, approval_id)
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="timeout",
trace_id=trace_id,
approval_id=approval_id,
timeout_seconds=timeout,
)
return {
"approved": False,
"approval_id": approval_id,
"error": f"Timed out after {timeout}s waiting for human decision",
}