molecule-core/workspace-template/skill_loader/loader.py
Dev Lead Agent bea0e96a86 fix(security): Cycle 5 — auth middleware, injection hardening, skill sandbox
Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
  WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
  ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
  secrets.Values: workspaces with no live token are grandfathered through.
  Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.

Fix A — platform/internal/router/router.go:
  Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
  and /a2a remain on root router; all other /workspaces/:id/* sub-routes
  moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
  CORS AllowHeaders updated to include Authorization so browser/agent callers
  can send the bearer token cross-origin.

Fix B — workspace-template/heartbeat.py:
  _check_delegations(): validate source_id == self.workspace_id before
  accepting a delegation result. Attacker-crafted records with a foreign
  source_id are silently skipped with a WARNING log (injection attempt).
  trigger_msg no longer embeds raw response_preview text; references
  delegation_id + status only — removes the prompt-injection vector.

Fix C — workspace-template/skill_loader/loader.py:
  load_skill_tools(): before exec_module(), verify script is within
  scripts_dir (path traversal guard) and temporarily scrub sensitive env
  vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
  WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
  in finally block. Defence-in-depth even if /plugins auth gate is bypassed.

Fix D — platform/internal/handlers/socket.go:
  HandleConnect(): agent connections (X-Workspace-ID present) validated via
  wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
  Canvas clients (no X-Workspace-ID) remain unauthenticated.

Fix D — workspace-template/events.py:
  PlatformEventSubscriber._connect(): include platform_auth bearer token in
  WebSocket upgrade headers alongside X-Workspace-ID.

Fix E — workspace-template/executor_helpers.py:
  recall_memories() and commit_memory() now pass platform_auth bearer token
  in Authorization header so WorkspaceAuth middleware allows access.

Fix F — workspace-template/a2a_client.py:
  send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
  write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.

Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 04:44:42 +00:00

187 lines
6.2 KiB
Python

"""Load skill packages from the workspace config directory."""
import importlib.util
import logging
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
logger = logging.getLogger(__name__)
try:
from builtin_tools.security_scan import SkillSecurityError, scan_skill_dependencies
_SECURITY_SCAN_AVAILABLE = True
except ImportError: # lightweight test environments without tools/ on sys.path
_SECURITY_SCAN_AVAILABLE = False
@dataclass
class SkillMetadata:
id: str
name: str
description: str
tags: list[str] = field(default_factory=list)
examples: list[str] = field(default_factory=list)
@dataclass
class LoadedSkill:
metadata: SkillMetadata
instructions: str
tools: list[Any] = field(default_factory=list)
def parse_skill_frontmatter(skill_md_path: Path) -> tuple[dict, str]:
"""Parse YAML frontmatter from a SKILL.md file.
Runtime-side: tolerant of malformed frontmatter (returns ``({}, body)``
so the skill loads with empty metadata rather than crashing the
workspace at startup). The SDK's :func:`molecule_plugin.parse_skill_md`
is the authoring-time strict validator that surfaces the same errors.
Keep behaviour aligned: if you change acceptance rules here, mirror
them in the SDK's parser.
"""
content = skill_md_path.read_text()
if not content.startswith("---"):
return {}, content
parts = content.split("---", 2)
if len(parts) < 3:
return {}, content
try:
frontmatter = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError:
logger.warning("SKILL.md at %s has malformed frontmatter; loading with empty metadata", skill_md_path)
frontmatter = {}
if not isinstance(frontmatter, dict):
logger.warning("SKILL.md at %s frontmatter is not a mapping; ignoring", skill_md_path)
frontmatter = {}
body = parts[2].strip()
return frontmatter, body
def load_skill_tools(scripts_dir: Path) -> list[Any]:
"""Dynamically load tool functions from a skill's scripts/ directory.
Follows the agentskills.io spec layout: each skill's executable code
lives under ``scripts/``. Returns an empty list if the directory
doesn't exist.
"""
tools = []
if not scripts_dir.exists():
return tools
# Import langchain only when we actually have scripts to process.
# Keeps test environments (and empty skills) from needing langchain.
from langchain_core.tools import BaseTool
# Sensitive env vars that must not be readable by skill scripts.
# Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot
# exfiltrate credentials even if it somehow bypasses the POST /plugins
# auth gate (defence in depth).
_SCRUB_KEYS = (
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_API_KEY",
"OPENAI_API_KEY",
"WORKSPACE_AUTH_TOKEN",
"GITHUB_TOKEN",
"GH_TOKEN",
)
for py_file in sorted(scripts_dir.glob("*.py")):
if py_file.name.startswith("_"):
continue
# Verify the script is actually inside the expected scripts directory
# (path traversal guard — glob shouldn't produce outside paths, but
# belt-and-suspenders for symlink attacks).
try:
py_file.resolve().relative_to(scripts_dir.resolve())
except ValueError:
logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file)
continue
module_name = f"skill_tool_{py_file.stem}"
spec = importlib.util.spec_from_file_location(module_name, py_file)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
# Temporarily remove sensitive env vars before running skill code.
_saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ}
try:
spec.loader.exec_module(module)
finally:
# Always restore so the rest of the agent process retains them.
os.environ.update(_saved_env)
# Look for functions decorated with @tool (BaseTool instances)
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, BaseTool):
tools.append(attr)
return tools
def load_skills(config_path: str, skill_names: list[str]) -> list[LoadedSkill]:
"""Load all skills specified in the config."""
skills_dir = Path(config_path) / "skills"
loaded = []
# Resolve security scan mode once before the loop
scan_mode = "warn"
if _SECURITY_SCAN_AVAILABLE:
try:
from config import load_config
_cfg = load_config(config_path)
scan_mode = _cfg.security_scan.mode
except Exception:
pass # use default "warn" — never block on config error
for skill_name in skill_names:
skill_path = skills_dir / skill_name
skill_md = skill_path / "SKILL.md"
if not skill_md.exists():
logger.warning("SKILL.md not found for %s, skipping", skill_name)
continue
# --- Security scan before loading any code from the skill ------------
if _SECURITY_SCAN_AVAILABLE and scan_mode != "off":
try:
scan_skill_dependencies(skill_name, skill_path, scan_mode)
except SkillSecurityError as exc:
logger.warning("Skipping skill '%s': blocked by security scan — %s", skill_name, exc)
continue
frontmatter, instructions = parse_skill_frontmatter(skill_md)
metadata = SkillMetadata(
id=skill_name,
name=frontmatter.get("name", skill_name),
description=frontmatter.get("description", ""),
tags=frontmatter.get("tags", []),
examples=frontmatter.get("examples", []),
)
# Executables live under scripts/ per the agentskills.io spec.
tools = load_skill_tools(skill_path / "scripts")
loaded.append(LoadedSkill(
metadata=metadata,
instructions=instructions,
tools=tools,
))
return loaded