molecule-sdk-python/molecule_plugin/manifest.py
Molecule AI SDK-Dev beca7db42a fix(sdk): resolve KI-005 and KI-007 — secrets scan + _is_hex guard
KI-007 (High): Add isinstance(value, str) guard to _is_hex() so
non-string arguments return False cleanly instead of raising TypeError.
Updated test_is_hex_non_string to assert False instead of expecting
pytest.raises(TypeError).

KI-005 (High): Add _scan_for_secrets() to manifest.py that walks all
string values in plugin.yaml and reports common credential patterns
(sk-, ghp_, AKIA, bearer tokens, long hex strings, password/api_key
assignments). Call it from validate_manifest(). Skips the sha256
field since it's a content-addressed hash, not a secret.

Run: pytest → 210 passed, 1 skipped.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 08:09:44 +00:00

311 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Plugin + skill manifest schema and validators.
Two layers:
1. **Plugin-level** (`plugin.yaml`) — Molecule AI's superset: name, version,
description, declared `runtimes:`, skill list, rule list. The spec has
no concept of bundling; this is our own.
2. **Skill-level** (`skills/<skill>/SKILL.md`) — follows the
`agentskills.io` open standard (name, description, optional license,
compatibility, metadata, allowed-tools). Validated against the spec
so our skills are installable in Claude Code, Cursor, Codex, and
every other skills-compatible agent product.
A plugin that validates locally will also load cleanly in the Molecule AI
platform AND be installable as-is into any agentskills-compatible tool.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
import yaml
PLUGIN_YAML_SCHEMA: dict[str, Any] = {
"type": "object",
"required": ["name"],
"properties": {
"name": {"type": "string"},
"version": {"type": "string"},
"description": {"type": "string"},
"author": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"skills": {"type": "array", "items": {"type": "string"}},
"rules": {"type": "array", "items": {"type": "string"}},
"prompt_fragments": {"type": "array", "items": {"type": "string"}},
"runtimes": {
"type": "array",
"items": {"type": "string"},
"description": "Declared supported runtimes (e.g. claude_code, deepagents).",
},
"sha256": {
"type": "string",
"description": (
"Optional content integrity hash (SHA256) of the plugin directory "
"as a content-addressed manifest. If present, install_plugin() verifies "
"the unpacked tarball matches before running setup.sh. "
"Format: 64 lowercase hex characters. "
"Generate with: python -m molecule_agent verify-sha256 <plugin-dir>"
),
},
},
}
def validate_manifest(path: str | Path) -> list[str]:
"""Return a list of validation error messages. Empty list = valid.
Deliberately simple — no jsonschema dependency so SDK consumers don't
pick up an extra transitive dep just to lint their plugin.
"""
path = Path(path)
if not path.is_file():
return [f"manifest not found: {path}"]
try:
raw = yaml.safe_load(path.read_text())
except yaml.YAMLError as exc:
return [f"yaml parse error: {exc}"]
errors: list[str] = []
if not isinstance(raw, dict):
return ["manifest root must be a mapping"]
if "name" not in raw or not isinstance(raw.get("name"), str) or not raw["name"].strip():
errors.append("`name` is required and must be a non-empty string")
for field_name in ("tags", "skills", "rules", "prompt_fragments", "runtimes"):
if field_name in raw and not isinstance(raw[field_name], list):
errors.append(f"`{field_name}` must be a list")
if "runtimes" in raw and isinstance(raw["runtimes"], list):
known = {"claude_code", "deepagents", "langgraph", "crewai", "autogen", "openclaw"}
for r in raw["runtimes"]:
if not isinstance(r, str):
errors.append(f"`runtimes` entry must be string, got {type(r).__name__}")
elif r.replace("-", "_") not in known:
errors.append(
f"unknown runtime '{r}' — supported: {sorted(known)} "
f"(use underscore form, e.g. 'claude_code')"
)
# sha256 — must be a 64-char lowercase hex string if present
sha256_val = raw.get("sha256")
if sha256_val is not None:
if not isinstance(sha256_val, str):
errors.append("`sha256` must be a string (64 lowercase hex characters)")
elif len(sha256_val) != 64:
errors.append(
f"`sha256` must be exactly 64 hex characters, got {len(sha256_val)}"
)
elif not re.fullmatch(r"[0-9a-f]{64}", sha256_val):
errors.append(
"`sha256` must contain only lowercase hex characters (09, af)"
)
# Secrets scan — no credentials may appear in any string value
for field_name, field_value in raw.items():
_scan_for_secrets(field_name, field_value, errors)
return errors
# ---------------------------------------------------------------------------
# Secrets scanning
# ---------------------------------------------------------------------------
# Patterns matching common credential formats.
# Anchored with word boundaries where possible to avoid false positives
# in legitimate content (e.g. "sk" inside "ask").
_SECRET_PATTERNS: list[tuple[str, re.Pattern[str]]] = [
# AWS / GCP / Azure keys
("AWS key pattern", re.compile(r"AKIA[0-9A-Z]{16}")),
# GitHub fine-grained / classic tokens
("GitHub token", re.compile(r"gh[pousr]_[A-Za-z0-9_]{36,}")),
# GitHub app token
("GitHub app token", re.compile(r"gho_[A-Za-z0-9_]{36}")),
# OpenAI / Anthropic / generic sk- keys
("OpenAI/Anthropic key", re.compile(r"\bsk-[A-Za-z0-9_-]{20,}\b")),
# Bearer tokens in JSON/YAML values
("Bearer token", re.compile(r'"[Bb]earer\s+[A-Za-z0-9_.-]+"')),
# Long hex strings that look like cryptographic keys (32+ bytes)
("long hex secret", re.compile(r"\b[0-9a-fA-F]{64,}\b")),
# Password assignment patterns in YAML values
("password assignment", re.compile(r"(?i)password\s*[=:]\s*['\"]?[A-Za-z0-9_/+=.-]{8,}")),
# API key assignment patterns
("api_key assignment", re.compile(r"(?i)api[_-]?key\s*[=:]\s*['\"]?[A-Za-z0-9_/+=.-]{16,}")),
# Secret / token assignment patterns
("secret assignment", re.compile(r"(?i)secret\s*[=:]\s*['\"]?[A-Za-z0-9_/+=.-]{16,}")),
# Generic bearer string in raw text
("raw bearer", re.compile(r"Bearer [A-Za-z0-9_.-]{20,}")),
]
def _scan_for_secrets(key: str, value: Any, errors: list[str]) -> None:
"""Recursively scan `value` (and all nested values) for secret patterns.
Appends error messages to `errors` when a match is found.
Skips `sha256` field since it's a content-addressed hash, not a secret.
"""
if key in ("sha256",):
return
if isinstance(value, str):
for description, pattern in _SECRET_PATTERNS:
if pattern.search(value):
errors.append(
f"possible secret detected in `{key}`: {description}"
"bundles must not contain credentials; use platform secrets instead"
)
break # report first match only to keep noise minimal
elif isinstance(value, dict):
for k, v in value.items():
_scan_for_secrets(k, v, errors)
elif isinstance(value, list):
for i, item in enumerate(value):
_scan_for_secrets(f"{key}[{i}]", item, errors)
# ---------------------------------------------------------------------------
# agentskills.io spec — SKILL.md validation
# ---------------------------------------------------------------------------
# Spec limits — public so tooling/tests/docs can import them rather than
# duplicate magic numbers. Source: https://agentskills.io/specification
SKILL_NAME_RE = re.compile(r"^[a-z0-9]+(-[a-z0-9]+)*$")
SKILL_NAME_MAX = 64
SKILL_DESC_MAX = 1024
SKILL_COMPAT_MAX = 500
def parse_skill_md(path: str | Path) -> tuple[dict[str, Any], str, list[str]]:
"""Parse a SKILL.md into (frontmatter, body, errors).
Returns ``({}, "", [error])`` if the file can't be read or doesn't have
valid frontmatter. Never raises.
"""
path = Path(path)
if not path.is_file():
return {}, "", [f"SKILL.md not found: {path}"]
text = path.read_text()
if not text.startswith("---"):
return {}, text, ["SKILL.md must start with YAML frontmatter (---)"]
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text, ["malformed frontmatter — expected opening and closing '---'"]
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as exc:
return {}, parts[2], [f"frontmatter yaml parse error: {exc}"]
if not isinstance(fm, dict):
return {}, parts[2], ["frontmatter must be a YAML mapping"]
return fm, parts[2].strip(), []
def validate_skill(path: str | Path) -> list[str]:
"""Validate a single skill directory against agentskills.io/specification.
`path` should be the skill directory (its parent of `SKILL.md`). Returns
an empty list when the skill is spec-compliant.
"""
path = Path(path)
if not path.is_dir():
return [f"skill path is not a directory: {path}"]
fm, _body, errors = parse_skill_md(path / "SKILL.md")
if errors:
return errors
# name — required
name = fm.get("name")
if not name:
errors.append("`name` is required in SKILL.md frontmatter")
elif not isinstance(name, str):
errors.append(f"`name` must be a string, got {type(name).__name__}")
else:
if len(name) > SKILL_NAME_MAX:
errors.append(f"`name` length must be ≤{SKILL_NAME_MAX}, got {len(name)}")
if not SKILL_NAME_RE.match(name):
errors.append(
f"`name` '{name}' must be lowercase alphanumeric with single hyphens, "
f"no leading/trailing/consecutive hyphens"
)
if name != path.name:
errors.append(
f"`name` '{name}' must match directory name '{path.name}' "
f"(agentskills.io spec)"
)
# description — required
desc = fm.get("description")
if not desc:
errors.append("`description` is required in SKILL.md frontmatter")
elif not isinstance(desc, str):
errors.append(f"`description` must be a string, got {type(desc).__name__}")
elif len(desc) > SKILL_DESC_MAX:
errors.append(f"`description` length must be ≤{SKILL_DESC_MAX}, got {len(desc)}")
# compatibility — optional, ≤500 chars
compat = fm.get("compatibility")
if compat is not None:
if not isinstance(compat, str):
errors.append(f"`compatibility` must be a string, got {type(compat).__name__}")
elif len(compat) > SKILL_COMPAT_MAX:
errors.append(
f"`compatibility` length must be ≤{SKILL_COMPAT_MAX}, got {len(compat)}"
)
# metadata — optional, string→string map
meta = fm.get("metadata")
if meta is not None:
if not isinstance(meta, dict):
errors.append(f"`metadata` must be a mapping, got {type(meta).__name__}")
else:
for k, v in meta.items():
if not isinstance(k, str):
errors.append(f"`metadata` keys must be strings, got {type(k).__name__}")
# values may be stringified — spec says "string-to-string" but is lenient
# allowed-tools — optional, space-separated string (experimental in spec)
allowed = fm.get("allowed-tools")
if allowed is not None and not isinstance(allowed, str):
errors.append(f"`allowed-tools` must be a space-separated string, got {type(allowed).__name__}")
# license — optional, free-form string
lic = fm.get("license")
if lic is not None and not isinstance(lic, str):
errors.append(f"`license` must be a string, got {type(lic).__name__}")
return errors
def validate_plugin(path: str | Path) -> dict[str, list[str]]:
"""Validate an entire Molecule AI plugin: plugin.yaml + all skills.
Returns a dict mapping source (``"plugin.yaml"`` or ``"skills/<name>"``)
to a list of error messages. Empty dict means fully valid.
"""
path = Path(path)
results: dict[str, list[str]] = {}
manifest_errs = validate_manifest(path / "plugin.yaml")
if manifest_errs:
results["plugin.yaml"] = manifest_errs
skills_dir = path / "skills"
if skills_dir.is_dir():
for entry in sorted(skills_dir.iterdir()):
if not entry.is_dir():
continue
skill_errs = validate_skill(entry)
if skill_errs:
results[f"skills/{entry.name}"] = skill_errs
return results