molecule-ai-workspace-runtime/molecule_runtime/builtin_tools/approval.py
Molecule AI Infra-Runtime-BE d21f8c2064
Some checks failed
ci / mirror-guard (pull_request) Failing after 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
fix(runtime): align PLATFORM_URL default to host.docker.internal across all modules
Unified the fallback default for PLATFORM_URL from `http://platform:8080`
(Docker Compose service name) to `http://host.docker.internal:8080`
across all 13 modules that declare it. This matches:
- The provisioner's default (buildContainerEnv injects PLATFORM_URL
  from cfg.PlatformURL, which defaults to host.docker.internal on the
  platform side — main.go:platformURL)
- The molecule-git-token-helper.sh script (already uses host.docker.internal)
- The MCP client (MOLECULE_URL injected by provisioner)

The provisioner always sets PLATFORM_URL in production containers, so
this is a development/Docker-only improvement: without this change,
a workspace started outside the Docker Compose network (e.g. via
`docker run` with `--network host`) would fail platform API calls
with "Connection refused" because `platform:8080` resolves nowhere.

13 modules updated: a2a_cli, a2a_client, a2a_mcp_server, adapters/base,
builtin_tools/a2a_tools, builtin_tools/approval, builtin_tools/delegation,
builtin_tools/hitl, builtin_tools/memory, consolidation, coordinator,
main, molecule_ai_status. All docstrings updated to match.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 03:34:24 +00:00

335 lines
12 KiB
Python

"""Approval tool for human-in-the-loop workflows.
When an agent encounters a destructive, expensive, or unauthorized action,
it calls request_approval() which creates a request and waits for a decision.
## Notification strategy
By default this module uses a **WebSocket subscription** (APPROVAL_USE_WEBSOCKET=true
or when the ``websockets`` package is installed). The platform pushes an
``APPROVAL_DECIDED`` event to the workspace WebSocket as soon as a human
clicks Approve / Deny on the canvas — no polling required, instant delivery.
If WebSocket is unavailable (env var opt-out or import error) the module
falls back to a **polling loop** so existing deployments without WebSocket
support continue to work without any config change.
RBAC enforcement
----------------
The calling workspace must hold a role that grants the ``"approve"`` action.
Roles are read from ``config.yaml`` under ``rbac.roles`` (default: operator).
Audit trail
-----------
Every approval lifecycle emits structured JSON Lines records:
1. ``approval / approve / requested`` — request submitted to platform
2. ``approval / approve / granted`` — human approved (actor = decided_by)
3. ``approval / approve / denied`` — human denied (actor = decided_by)
4. ``approval / approve / timeout`` — no decision within APPROVAL_TIMEOUT
RBAC denials emit an ``rbac / rbac.deny / denied`` event instead.
Environment variables
---------------------
PLATFORM_URL Platform base URL (default: http://host.docker.internal:8080)
WORKSPACE_ID This workspace's ID (validated at startup by platform_auth)
APPROVAL_TIMEOUT Max wait in seconds (default: 300)
APPROVAL_POLL_INTERVAL Polling interval in seconds (default: 5, polling path only)
APPROVAL_USE_WEBSOCKET "true" to force WS, "false"
to force polling (default: auto-detect)
AUDIT_LOG_PATH Path for JSON Lines audit log (default: /var/log/molecule/audit.jsonl)
"""
import asyncio
import json
import logging
import os
import uuid
import httpx
from langchain_core.tools import tool
from builtin_tools.audit import check_permission, get_workspace_roles, log_event
from builtin_tools.validation import WorkspaceIdValidationError, get_validated_workspace_id
logger = logging.getLogger(__name__)
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
from molecule_runtime.platform_auth import WORKSPACE_ID
APPROVAL_POLL_INTERVAL = float(os.environ.get("APPROVAL_POLL_INTERVAL", "5"))
APPROVAL_TIMEOUT = float(os.environ.get("APPROVAL_TIMEOUT", "300"))
# Auto-detect WebSocket support; can be overridden with env var
_ws_env = os.environ.get("APPROVAL_USE_WEBSOCKET", "").lower()
if _ws_env == "false":
_USE_WEBSOCKET_DEFAULT = False
elif _ws_env == "true":
_USE_WEBSOCKET_DEFAULT = True
else:
try:
import websockets as _ws_probe # noqa: F401
_USE_WEBSOCKET_DEFAULT = True
except ImportError:
_USE_WEBSOCKET_DEFAULT = False
# Module-level reference so tests can monkeypatch it
try:
import websockets
except ImportError:
websockets = None # type: ignore[assignment]
# Expose for test introspection
APPROVAL_USE_WEBSOCKET = _USE_WEBSOCKET_DEFAULT
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _create_approval_request(action: str, reason: str) -> dict:
"""POST to the platform to create an approval request.
Returns {"approval_id": str} on success or {"error": str} on failure.
"""
# --- Workspace ID validation (CWE-20 / CWE-88) ----------------------------
try:
ws_id = get_validated_workspace_id(caller="approval._create_approval_request")
except WorkspaceIdValidationError as e:
return {"error": str(e)}
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{ws_id}/approvals",
json={"action": action, "reason": reason},
)
if resp.status_code != 201:
return {"error": f"Failed to create request: {resp.status_code}"}
try:
approval_id = resp.json().get("approval_id")
except (ValueError, Exception):
return {"error": f"Platform returned invalid JSON (status {resp.status_code})"}
logger.info("Approval requested: %s (id=%s)", action, approval_id)
return {"approval_id": approval_id}
except Exception as e:
return {"error": f"Failed to request approval: {e}"}
async def _wait_websocket(approval_id: str, timeout: float) -> dict:
"""Subscribe to the platform WebSocket and wait for APPROVAL_DECIDED event.
Returns the decision dict or raises asyncio.TimeoutError on expiry.
"""
ws_url = (
PLATFORM_URL.replace("http://", "ws://").replace("https://", "wss://")
+ "/ws"
)
headers = {"X-Workspace-ID": WORKSPACE_ID}
logger.debug("Approval %s: waiting via WebSocket %s", approval_id, ws_url)
async with websockets.connect(ws_url, additional_headers=headers) as ws:
async for raw_message in ws:
try:
event = json.loads(raw_message)
except json.JSONDecodeError:
continue
if event.get("event") != "APPROVAL_DECIDED":
continue
if event.get("approval_id") != approval_id:
continue
status = event.get("status")
decided_by = event.get("decided_by", "")
logger.info("Approval %s decided via WebSocket: %s by %s",
approval_id, status, decided_by)
if status == "approved":
return {
"approved": True,
"approval_id": approval_id,
"decided_by": decided_by,
}
else:
return {
"approved": False,
"approval_id": approval_id,
"decided_by": decided_by,
"message": "Denied by human",
}
async def _wait_polling(approval_id: str, timeout: float) -> dict:
"""Legacy polling loop — checks platform REST endpoint every APPROVAL_POLL_INTERVAL seconds."""
# --- Workspace ID validation (CWE-20 / CWE-88) ----------------------------
try:
ws_id = get_validated_workspace_id(caller="approval._wait_polling")
except WorkspaceIdValidationError:
# Transient — propagate as timeout so the caller handles it gracefully
raise asyncio.TimeoutError("WORKSPACE_ID validation failed")
elapsed = 0.0
async with httpx.AsyncClient(timeout=10.0) as client:
while elapsed < timeout:
await asyncio.sleep(APPROVAL_POLL_INTERVAL)
elapsed += APPROVAL_POLL_INTERVAL
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{ws_id}/approvals",
)
if resp.status_code == 200:
for a in resp.json():
if a.get("id") == approval_id:
status = a.get("status")
if status == "approved":
logger.info("Approval granted (poll): %s", approval_id)
return {
"approved": True,
"approval_id": approval_id,
"decided_by": a.get("decided_by"),
}
elif status == "denied":
logger.info("Approval denied (poll): %s", approval_id)
return {
"approved": False,
"approval_id": approval_id,
"decided_by": a.get("decided_by"),
"message": "Denied by human",
}
except Exception:
pass # transient error — keep retrying
raise asyncio.TimeoutError()
# ---------------------------------------------------------------------------
# Public tool
# ---------------------------------------------------------------------------
@tool
async def request_approval(
action: str,
reason: str,
) -> dict:
"""Request human approval before proceeding with a sensitive action.
Use this when you're about to do something destructive, expensive,
or outside your normal authority. The request is sent to the canvas
where a human can approve or deny it.
Args:
action: Short description of what you want to do
reason: Why this action is necessary
"""
# One trace_id links every audit event for this approval lifecycle.
trace_id = str(uuid.uuid4())
# --- RBAC check -----------------------------------------------------------
roles, custom_perms = get_workspace_roles()
if not check_permission("approve", roles, custom_perms):
log_event(
event_type="rbac",
action="rbac.deny",
resource=action,
outcome="denied",
trace_id=trace_id,
attempted_action="approve",
roles=roles,
)
return {
"approved": False,
"error": (
"RBAC: this workspace does not have the 'approve' permission. "
f"Current roles: {roles}"
),
}
# Step 1: Create the approval request
creation = await _create_approval_request(action, reason)
if "error" in creation:
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="failure",
trace_id=trace_id,
reason="submit_failed",
error=creation["error"],
)
return {"approved": False, "error": creation["error"]}
approval_id = creation["approval_id"]
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="requested",
trace_id=trace_id,
approval_id=approval_id,
reason_text=reason,
)
timeout = float(os.environ.get("APPROVAL_TIMEOUT", str(APPROVAL_TIMEOUT)))
# Step 2: Wait for decision — WebSocket preferred, polling as fallback
use_ws = APPROVAL_USE_WEBSOCKET and websockets is not None
try:
if use_ws:
try:
result = await asyncio.wait_for(
_wait_websocket(approval_id, timeout),
timeout=timeout,
)
except Exception as ws_err:
# WebSocket failed (connection error, etc.) — fall through to polling
logger.warning(
"WebSocket approval wait failed (%s), falling back to polling",
ws_err,
)
result = await asyncio.wait_for(
_wait_polling(approval_id, timeout),
timeout=timeout + APPROVAL_POLL_INTERVAL,
)
else:
# Polling path (primary when WS disabled)
result = await asyncio.wait_for(
_wait_polling(approval_id, timeout),
timeout=timeout + APPROVAL_POLL_INTERVAL, # slight grace period
)
# Log the human decision
decided_by = result.get("decided_by")
outcome = "granted" if result.get("approved") else "denied"
log_event(
event_type="approval",
action="approve",
resource=action,
outcome=outcome,
# Record the human identity as actor when available
actor=decided_by or WORKSPACE_ID,
trace_id=trace_id,
approval_id=approval_id,
decided_by=decided_by,
)
return result
except asyncio.TimeoutError:
logger.warning("Approval timed out after %.0fs: %s", timeout, approval_id)
log_event(
event_type="approval",
action="approve",
resource=action,
outcome="timeout",
trace_id=trace_id,
approval_id=approval_id,
timeout_seconds=timeout,
)
return {
"approved": False,
"approval_id": approval_id,
"error": f"Timed out after {timeout}s waiting for human decision",
}