* feat(security): add plugin content integrity verification (SHA256) SDK-side follow-up to molecule-core PR #1019 (pinned-ref supply-chain fix). Changes: - verify_plugin_sha256(plugin_dir, expected_sha) — content-addressed manifest hash over sorted (relpath, SHA256(content)) pairs; plugin.yaml excluded from its own hash to avoid circular dependency - _walk_files(root) / _sha256_file(path) — internal helpers - install_plugin() calls verify_sha256 after atomic rename; on mismatch deletes plugin dir and raises ValueError before setup.sh runs - PLUGIN_YAML_SCHEMA gains optional sha256 field (64-char lowercase hex) - validate_manifest() validates sha256 format when present Tests (12 new): - sha256_file correctness, walk_files ordering, verify_* (match/mismatch/invalid) - install_plugin sha256 verified: setup.sh runs - install_plugin sha256 mismatch: raises ValueError, setup.sh NOT run - install_plugin no sha256: backward-compat, skips verification - validate_manifest sha256: valid/invalid/non-hex/absent Pre-existing: 4 async tests in test_sdk.py fail without pytest-asyncio (not related to this change). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(tests): add pytest-asyncio markers to async adaptor tests The 4 tests using async def were failing because pytest-asyncio was not installed and pytest.ini set asyncio_mode=auto (which requires it). Add @pytest.mark.asyncio to each async test and add pytest-asyncio as a test optional dependency so CI gets the right extras when installing. Fixes: 4 FAILED tests in test_sdk.py --------- Co-authored-by: Molecule AI SDK-Dev <sdk-dev@agents.moleculesai.app> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
252 lines
9.4 KiB
Python
252 lines
9.4 KiB
Python
"""Plugin + skill manifest schema and validators.
|
||
|
||
Two layers:
|
||
|
||
1. **Plugin-level** (`plugin.yaml`) — Molecule AI's superset: name, version,
|
||
description, declared `runtimes:`, skill list, rule list. The spec has
|
||
no concept of bundling; this is our own.
|
||
2. **Skill-level** (`skills/<skill>/SKILL.md`) — follows the
|
||
`agentskills.io` open standard (name, description, optional license,
|
||
compatibility, metadata, allowed-tools). Validated against the spec
|
||
so our skills are installable in Claude Code, Cursor, Codex, and
|
||
every other skills-compatible agent product.
|
||
|
||
A plugin that validates locally will also load cleanly in the Molecule AI
|
||
platform AND be installable as-is into any agentskills-compatible tool.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import yaml
|
||
|
||
PLUGIN_YAML_SCHEMA: dict[str, Any] = {
|
||
"type": "object",
|
||
"required": ["name"],
|
||
"properties": {
|
||
"name": {"type": "string"},
|
||
"version": {"type": "string"},
|
||
"description": {"type": "string"},
|
||
"author": {"type": "string"},
|
||
"tags": {"type": "array", "items": {"type": "string"}},
|
||
"skills": {"type": "array", "items": {"type": "string"}},
|
||
"rules": {"type": "array", "items": {"type": "string"}},
|
||
"prompt_fragments": {"type": "array", "items": {"type": "string"}},
|
||
"runtimes": {
|
||
"type": "array",
|
||
"items": {"type": "string"},
|
||
"description": "Declared supported runtimes (e.g. claude_code, deepagents).",
|
||
},
|
||
"sha256": {
|
||
"type": "string",
|
||
"description": (
|
||
"Optional content integrity hash (SHA256) of the plugin directory "
|
||
"as a content-addressed manifest. If present, install_plugin() verifies "
|
||
"the unpacked tarball matches before running setup.sh. "
|
||
"Format: 64 lowercase hex characters. "
|
||
"Generate with: python -m molecule_agent verify-sha256 <plugin-dir>"
|
||
),
|
||
},
|
||
},
|
||
}
|
||
|
||
|
||
def validate_manifest(path: str | Path) -> list[str]:
|
||
"""Return a list of validation error messages. Empty list = valid.
|
||
|
||
Deliberately simple — no jsonschema dependency so SDK consumers don't
|
||
pick up an extra transitive dep just to lint their plugin.
|
||
"""
|
||
path = Path(path)
|
||
if not path.is_file():
|
||
return [f"manifest not found: {path}"]
|
||
|
||
try:
|
||
raw = yaml.safe_load(path.read_text())
|
||
except yaml.YAMLError as exc:
|
||
return [f"yaml parse error: {exc}"]
|
||
|
||
errors: list[str] = []
|
||
if not isinstance(raw, dict):
|
||
return ["manifest root must be a mapping"]
|
||
|
||
if "name" not in raw or not isinstance(raw.get("name"), str) or not raw["name"].strip():
|
||
errors.append("`name` is required and must be a non-empty string")
|
||
|
||
for field_name in ("tags", "skills", "rules", "prompt_fragments", "runtimes"):
|
||
if field_name in raw and not isinstance(raw[field_name], list):
|
||
errors.append(f"`{field_name}` must be a list")
|
||
|
||
if "runtimes" in raw and isinstance(raw["runtimes"], list):
|
||
known = {"claude_code", "deepagents", "langgraph", "crewai", "autogen", "openclaw"}
|
||
for r in raw["runtimes"]:
|
||
if not isinstance(r, str):
|
||
errors.append(f"`runtimes` entry must be string, got {type(r).__name__}")
|
||
elif r.replace("-", "_") not in known:
|
||
errors.append(
|
||
f"unknown runtime '{r}' — supported: {sorted(known)} "
|
||
f"(use underscore form, e.g. 'claude_code')"
|
||
)
|
||
|
||
# sha256 — must be a 64-char lowercase hex string if present
|
||
sha256_val = raw.get("sha256")
|
||
if sha256_val is not None:
|
||
if not isinstance(sha256_val, str):
|
||
errors.append("`sha256` must be a string (64 lowercase hex characters)")
|
||
elif len(sha256_val) != 64:
|
||
errors.append(
|
||
f"`sha256` must be exactly 64 hex characters, got {len(sha256_val)}"
|
||
)
|
||
elif not re.fullmatch(r"[0-9a-f]{64}", sha256_val):
|
||
errors.append(
|
||
"`sha256` must contain only lowercase hex characters (0–9, a–f)"
|
||
)
|
||
|
||
return errors
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# agentskills.io spec — SKILL.md validation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Spec limits — public so tooling/tests/docs can import them rather than
|
||
# duplicate magic numbers. Source: https://agentskills.io/specification
|
||
SKILL_NAME_RE = re.compile(r"^[a-z0-9]+(-[a-z0-9]+)*$")
|
||
SKILL_NAME_MAX = 64
|
||
SKILL_DESC_MAX = 1024
|
||
SKILL_COMPAT_MAX = 500
|
||
|
||
|
||
def parse_skill_md(path: str | Path) -> tuple[dict[str, Any], str, list[str]]:
|
||
"""Parse a SKILL.md into (frontmatter, body, errors).
|
||
|
||
Returns ``({}, "", [error])`` if the file can't be read or doesn't have
|
||
valid frontmatter. Never raises.
|
||
"""
|
||
path = Path(path)
|
||
if not path.is_file():
|
||
return {}, "", [f"SKILL.md not found: {path}"]
|
||
|
||
text = path.read_text()
|
||
if not text.startswith("---"):
|
||
return {}, text, ["SKILL.md must start with YAML frontmatter (---)"]
|
||
|
||
parts = text.split("---", 2)
|
||
if len(parts) < 3:
|
||
return {}, text, ["malformed frontmatter — expected opening and closing '---'"]
|
||
|
||
try:
|
||
fm = yaml.safe_load(parts[1]) or {}
|
||
except yaml.YAMLError as exc:
|
||
return {}, parts[2], [f"frontmatter yaml parse error: {exc}"]
|
||
|
||
if not isinstance(fm, dict):
|
||
return {}, parts[2], ["frontmatter must be a YAML mapping"]
|
||
|
||
return fm, parts[2].strip(), []
|
||
|
||
|
||
def validate_skill(path: str | Path) -> list[str]:
|
||
"""Validate a single skill directory against agentskills.io/specification.
|
||
|
||
`path` should be the skill directory (its parent of `SKILL.md`). Returns
|
||
an empty list when the skill is spec-compliant.
|
||
"""
|
||
path = Path(path)
|
||
if not path.is_dir():
|
||
return [f"skill path is not a directory: {path}"]
|
||
|
||
fm, _body, errors = parse_skill_md(path / "SKILL.md")
|
||
if errors:
|
||
return errors
|
||
|
||
# name — required
|
||
name = fm.get("name")
|
||
if not name:
|
||
errors.append("`name` is required in SKILL.md frontmatter")
|
||
elif not isinstance(name, str):
|
||
errors.append(f"`name` must be a string, got {type(name).__name__}")
|
||
else:
|
||
if len(name) > SKILL_NAME_MAX:
|
||
errors.append(f"`name` length must be ≤{SKILL_NAME_MAX}, got {len(name)}")
|
||
if not SKILL_NAME_RE.match(name):
|
||
errors.append(
|
||
f"`name` '{name}' must be lowercase alphanumeric with single hyphens, "
|
||
f"no leading/trailing/consecutive hyphens"
|
||
)
|
||
if name != path.name:
|
||
errors.append(
|
||
f"`name` '{name}' must match directory name '{path.name}' "
|
||
f"(agentskills.io spec)"
|
||
)
|
||
|
||
# description — required
|
||
desc = fm.get("description")
|
||
if not desc:
|
||
errors.append("`description` is required in SKILL.md frontmatter")
|
||
elif not isinstance(desc, str):
|
||
errors.append(f"`description` must be a string, got {type(desc).__name__}")
|
||
elif len(desc) > SKILL_DESC_MAX:
|
||
errors.append(f"`description` length must be ≤{SKILL_DESC_MAX}, got {len(desc)}")
|
||
|
||
# compatibility — optional, ≤500 chars
|
||
compat = fm.get("compatibility")
|
||
if compat is not None:
|
||
if not isinstance(compat, str):
|
||
errors.append(f"`compatibility` must be a string, got {type(compat).__name__}")
|
||
elif len(compat) > SKILL_COMPAT_MAX:
|
||
errors.append(
|
||
f"`compatibility` length must be ≤{SKILL_COMPAT_MAX}, got {len(compat)}"
|
||
)
|
||
|
||
# metadata — optional, string→string map
|
||
meta = fm.get("metadata")
|
||
if meta is not None:
|
||
if not isinstance(meta, dict):
|
||
errors.append(f"`metadata` must be a mapping, got {type(meta).__name__}")
|
||
else:
|
||
for k, v in meta.items():
|
||
if not isinstance(k, str):
|
||
errors.append(f"`metadata` keys must be strings, got {type(k).__name__}")
|
||
# values may be stringified — spec says "string-to-string" but is lenient
|
||
|
||
# allowed-tools — optional, space-separated string (experimental in spec)
|
||
allowed = fm.get("allowed-tools")
|
||
if allowed is not None and not isinstance(allowed, str):
|
||
errors.append(f"`allowed-tools` must be a space-separated string, got {type(allowed).__name__}")
|
||
|
||
# license — optional, free-form string
|
||
lic = fm.get("license")
|
||
if lic is not None and not isinstance(lic, str):
|
||
errors.append(f"`license` must be a string, got {type(lic).__name__}")
|
||
|
||
return errors
|
||
|
||
|
||
def validate_plugin(path: str | Path) -> dict[str, list[str]]:
|
||
"""Validate an entire Molecule AI plugin: plugin.yaml + all skills.
|
||
|
||
Returns a dict mapping source (``"plugin.yaml"`` or ``"skills/<name>"``)
|
||
to a list of error messages. Empty dict means fully valid.
|
||
"""
|
||
path = Path(path)
|
||
results: dict[str, list[str]] = {}
|
||
|
||
manifest_errs = validate_manifest(path / "plugin.yaml")
|
||
if manifest_errs:
|
||
results["plugin.yaml"] = manifest_errs
|
||
|
||
skills_dir = path / "skills"
|
||
if skills_dir.is_dir():
|
||
for entry in sorted(skills_dir.iterdir()):
|
||
if not entry.is_dir():
|
||
continue
|
||
skill_errs = validate_skill(entry)
|
||
if skill_errs:
|
||
results[f"skills/{entry.name}"] = skill_errs
|
||
|
||
return results
|