Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
secrets.Values: workspaces with no live token are grandfathered through.
Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.
Fix A — platform/internal/router/router.go:
Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
and /a2a remain on root router; all other /workspaces/:id/* sub-routes
moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
CORS AllowHeaders updated to include Authorization so browser/agent callers
can send the bearer token cross-origin.
Fix B — workspace-template/heartbeat.py:
_check_delegations(): validate source_id == self.workspace_id before
accepting a delegation result. Attacker-crafted records with a foreign
source_id are silently skipped with a WARNING log (injection attempt).
trigger_msg no longer embeds raw response_preview text; references
delegation_id + status only — removes the prompt-injection vector.
Fix C — workspace-template/skill_loader/loader.py:
load_skill_tools(): before exec_module(), verify script is within
scripts_dir (path traversal guard) and temporarily scrub sensitive env
vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
in finally block. Defence-in-depth even if /plugins auth gate is bypassed.
Fix D — platform/internal/handlers/socket.go:
HandleConnect(): agent connections (X-Workspace-ID present) validated via
wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
Canvas clients (no X-Workspace-ID) remain unauthenticated.
Fix D — workspace-template/events.py:
PlatformEventSubscriber._connect(): include platform_auth bearer token in
WebSocket upgrade headers alongside X-Workspace-ID.
Fix E — workspace-template/executor_helpers.py:
recall_memories() and commit_memory() now pass platform_auth bearer token
in Authorization header so WorkspaceAuth middleware allows access.
Fix F — workspace-template/a2a_client.py:
send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.
Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
187 lines
6.2 KiB
Python
187 lines
6.2 KiB
Python
"""Load skill packages from the workspace config directory."""
|
|
|
|
import importlib.util
|
|
import logging
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from builtin_tools.security_scan import SkillSecurityError, scan_skill_dependencies
|
|
_SECURITY_SCAN_AVAILABLE = True
|
|
except ImportError: # lightweight test environments without tools/ on sys.path
|
|
_SECURITY_SCAN_AVAILABLE = False
|
|
|
|
|
|
@dataclass
|
|
class SkillMetadata:
|
|
id: str
|
|
name: str
|
|
description: str
|
|
tags: list[str] = field(default_factory=list)
|
|
examples: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class LoadedSkill:
|
|
metadata: SkillMetadata
|
|
instructions: str
|
|
tools: list[Any] = field(default_factory=list)
|
|
|
|
|
|
def parse_skill_frontmatter(skill_md_path: Path) -> tuple[dict, str]:
|
|
"""Parse YAML frontmatter from a SKILL.md file.
|
|
|
|
Runtime-side: tolerant of malformed frontmatter (returns ``({}, body)``
|
|
so the skill loads with empty metadata rather than crashing the
|
|
workspace at startup). The SDK's :func:`molecule_plugin.parse_skill_md`
|
|
is the authoring-time strict validator that surfaces the same errors.
|
|
Keep behaviour aligned: if you change acceptance rules here, mirror
|
|
them in the SDK's parser.
|
|
"""
|
|
content = skill_md_path.read_text()
|
|
|
|
if not content.startswith("---"):
|
|
return {}, content
|
|
|
|
parts = content.split("---", 2)
|
|
if len(parts) < 3:
|
|
return {}, content
|
|
|
|
try:
|
|
frontmatter = yaml.safe_load(parts[1]) or {}
|
|
except yaml.YAMLError:
|
|
logger.warning("SKILL.md at %s has malformed frontmatter; loading with empty metadata", skill_md_path)
|
|
frontmatter = {}
|
|
if not isinstance(frontmatter, dict):
|
|
logger.warning("SKILL.md at %s frontmatter is not a mapping; ignoring", skill_md_path)
|
|
frontmatter = {}
|
|
|
|
body = parts[2].strip()
|
|
return frontmatter, body
|
|
|
|
|
|
def load_skill_tools(scripts_dir: Path) -> list[Any]:
|
|
"""Dynamically load tool functions from a skill's scripts/ directory.
|
|
|
|
Follows the agentskills.io spec layout: each skill's executable code
|
|
lives under ``scripts/``. Returns an empty list if the directory
|
|
doesn't exist.
|
|
"""
|
|
tools = []
|
|
if not scripts_dir.exists():
|
|
return tools
|
|
|
|
# Import langchain only when we actually have scripts to process.
|
|
# Keeps test environments (and empty skills) from needing langchain.
|
|
from langchain_core.tools import BaseTool
|
|
|
|
# Sensitive env vars that must not be readable by skill scripts.
|
|
# Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot
|
|
# exfiltrate credentials even if it somehow bypasses the POST /plugins
|
|
# auth gate (defence in depth).
|
|
_SCRUB_KEYS = (
|
|
"CLAUDE_CODE_OAUTH_TOKEN",
|
|
"ANTHROPIC_API_KEY",
|
|
"OPENAI_API_KEY",
|
|
"WORKSPACE_AUTH_TOKEN",
|
|
"GITHUB_TOKEN",
|
|
"GH_TOKEN",
|
|
)
|
|
|
|
for py_file in sorted(scripts_dir.glob("*.py")):
|
|
if py_file.name.startswith("_"):
|
|
continue
|
|
|
|
# Verify the script is actually inside the expected scripts directory
|
|
# (path traversal guard — glob shouldn't produce outside paths, but
|
|
# belt-and-suspenders for symlink attacks).
|
|
try:
|
|
py_file.resolve().relative_to(scripts_dir.resolve())
|
|
except ValueError:
|
|
logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file)
|
|
continue
|
|
|
|
module_name = f"skill_tool_{py_file.stem}"
|
|
spec = importlib.util.spec_from_file_location(module_name, py_file)
|
|
if spec is None or spec.loader is None:
|
|
continue
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
|
|
# Temporarily remove sensitive env vars before running skill code.
|
|
_saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ}
|
|
try:
|
|
spec.loader.exec_module(module)
|
|
finally:
|
|
# Always restore so the rest of the agent process retains them.
|
|
os.environ.update(_saved_env)
|
|
|
|
# Look for functions decorated with @tool (BaseTool instances)
|
|
for attr_name in dir(module):
|
|
attr = getattr(module, attr_name)
|
|
if isinstance(attr, BaseTool):
|
|
tools.append(attr)
|
|
|
|
return tools
|
|
|
|
|
|
def load_skills(config_path: str, skill_names: list[str]) -> list[LoadedSkill]:
|
|
"""Load all skills specified in the config."""
|
|
skills_dir = Path(config_path) / "skills"
|
|
loaded = []
|
|
|
|
# Resolve security scan mode once before the loop
|
|
scan_mode = "warn"
|
|
if _SECURITY_SCAN_AVAILABLE:
|
|
try:
|
|
from config import load_config
|
|
_cfg = load_config(config_path)
|
|
scan_mode = _cfg.security_scan.mode
|
|
except Exception:
|
|
pass # use default "warn" — never block on config error
|
|
|
|
for skill_name in skill_names:
|
|
skill_path = skills_dir / skill_name
|
|
skill_md = skill_path / "SKILL.md"
|
|
|
|
if not skill_md.exists():
|
|
logger.warning("SKILL.md not found for %s, skipping", skill_name)
|
|
continue
|
|
|
|
# --- Security scan before loading any code from the skill ------------
|
|
if _SECURITY_SCAN_AVAILABLE and scan_mode != "off":
|
|
try:
|
|
scan_skill_dependencies(skill_name, skill_path, scan_mode)
|
|
except SkillSecurityError as exc:
|
|
logger.warning("Skipping skill '%s': blocked by security scan — %s", skill_name, exc)
|
|
continue
|
|
|
|
frontmatter, instructions = parse_skill_frontmatter(skill_md)
|
|
|
|
metadata = SkillMetadata(
|
|
id=skill_name,
|
|
name=frontmatter.get("name", skill_name),
|
|
description=frontmatter.get("description", ""),
|
|
tags=frontmatter.get("tags", []),
|
|
examples=frontmatter.get("examples", []),
|
|
)
|
|
|
|
# Executables live under scripts/ per the agentskills.io spec.
|
|
tools = load_skill_tools(skill_path / "scripts")
|
|
|
|
loaded.append(LoadedSkill(
|
|
metadata=metadata,
|
|
instructions=instructions,
|
|
tools=tools,
|
|
))
|
|
|
|
return loaded
|