feat: initial Python SDK (extracted from molecule-monorepo/sdk/python)

Workspace, org, channel, memory, delegation client for Molecule AI.
Package renamed to molecule-ai-sdk for PyPI.
This commit is contained in:
Hongming Wang 2026-04-16 03:15:38 -07:00
commit fefcc38e11
27 changed files with 4009 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

135
README.md Normal file
View File

@ -0,0 +1,135 @@
# molecule_plugin — Python SDK for building Molecule AI plugins
A Molecule AI plugin is a directory that bundles rules, skills, and per-runtime
install adaptors. Any plugin that conforms to this contract is installable
on any Molecule AI workspace whose runtime the plugin supports.
## Quick start
Copy `template/` to a new directory and edit:
```
my-plugin/
├── plugin.yaml # name, version, runtimes, description
├── rules/my-rule.md # optional — appended to CLAUDE.md at install
├── skills/my-skill/
│ ├── SKILL.md # instructions injected into the system prompt
│ └── tools/do_thing.py # optional LangChain @tool functions
└── adapters/
├── claude_code.py # one-liner: `from molecule_plugin import AgentskillsAdaptor as Adaptor`
└── deepagents.py # same
```
Validate:
```python
from molecule_plugin import validate_manifest
errors = validate_manifest("my-plugin/plugin.yaml")
assert not errors, errors
```
## CLI
The SDK ships a CLI for validating Molecule AI artifacts before publishing:
```bash
python -m molecule_plugin validate plugin my-plugin/
python -m molecule_plugin validate workspace workspace-configs-templates/claude-code-default/
python -m molecule_plugin validate org org-templates/molecule-dev/
python -m molecule_plugin validate channel channels.yaml
python -m molecule_plugin validate my-plugin/ # kind defaults to 'plugin'
```
Exit code is 0 when valid, 1 when any errors are found — suitable for CI.
Add `-q` / `--quiet` to suppress success lines and emit only errors.
Programmatic equivalents:
```python
from molecule_plugin import (
validate_plugin,
validate_workspace_template,
validate_org_template,
validate_channel_file,
validate_channel_config,
)
```
## Per-runtime adaptors — when to write a custom one
The default `AgentskillsAdaptor` handles the common shape: rules go into
the runtime's memory file (CLAUDE.md), skill dirs go into `/configs/skills/`.
That covers most plugins.
Write a custom adaptor when you need to:
- **Register runtime tools dynamically** — call `ctx.register_tool(name, fn)`.
- **Register DeepAgents sub-agents** — call `ctx.register_subagent(name, spec)`.
- **Write to a non-standard memory file** — call `ctx.append_to_memory(filename, content)`.
Minimum custom adaptor:
```python
# adapters/deepagents.py
from molecule_plugin import InstallContext, InstallResult
class Adaptor:
def __init__(self, plugin_name: str, runtime: str):
self.plugin_name, self.runtime = plugin_name, runtime
async def install(self, ctx: InstallContext) -> InstallResult:
ctx.register_subagent("my-agent", {"prompt": "...", "tools": [...]})
return InstallResult(plugin_name=self.plugin_name, runtime=self.runtime, source="plugin")
async def uninstall(self, ctx: InstallContext) -> None:
pass
```
## Resolution order (understood by the platform)
For `(plugin_name, runtime)`:
1. **Platform registry**`workspace-template/plugins_registry/<plugin>/<runtime>.py`
(curated; set by the Molecule AI team for quality-assured plugins).
2. **Plugin-shipped**`<plugin_root>/adapters/<runtime>.py` (what this SDK helps you build).
3. **Raw-drop fallback** — copies plugin files into `/configs/plugins/<name>/`
and surfaces a warning; no tools are wired.
You generally ship for path #2. If your plugin becomes popular enough to be
promoted to "default," the Molecule AI team PRs a copy of your adaptor into
the platform registry (path #1) so it survives upstream breakage.
## Testing locally
The SDK ships `AgentskillsAdaptor` as a standalone, unit-testable class:
```python
import asyncio
from pathlib import Path
from molecule_plugin import AgentskillsAdaptor, InstallContext
ctx = InstallContext(
configs_dir=Path("/tmp/configs"),
workspace_id="local",
runtime="claude_code",
plugin_root=Path("./my-plugin"),
)
asyncio.run(AgentskillsAdaptor("my-plugin", "claude_code").install(ctx))
# check /tmp/configs/CLAUDE.md, /tmp/configs/skills/
```
## Publishing
A plugin is just a directory. Push it to any Git host. Installation via
`POST /plugins/install {git_url}` is on the roadmap — see the platform's
`PLAN.md` under "Install-from-GitHub-URL flow." Until then, plugins are
bundled into the platform by dropping them into `plugins/` at deploy time.
## Supported runtimes
As of 2026-Q2: `claude_code`, `deepagents`, `langgraph`, `crewai`, `autogen`,
`openclaw`. See the live list with:
```bash
curl $PLATFORM_URL/plugins
```

View File

@ -0,0 +1,63 @@
# Remote agent demo
A ~100-line Python script that registers with a Molecule AI platform from
outside its Docker network, pulls its secrets, and heartbeats — exercising
the Phase 30.1 / 30.2 / 30.4 endpoints end-to-end.
## Prerequisites
* A running Molecule AI platform (`./infra/scripts/setup.sh` + `go run
./cmd/server` from `platform/`)
* `pip install requests` in your Python environment
## Quick start
```bash
# 1. Create the workspace row on the platform. `external` runtime keeps
# the provisioner from trying to start a Docker container:
curl -s -X POST http://localhost:8080/workspaces \
-H 'Content-Type: application/json' \
-d '{"name":"remote-demo","tier":2,"runtime":"external"}'
# → {"id":"<UUID>", ...}
# 2. (Optional) seed a secret so `pull_secrets` has something to return:
curl -s -X POST http://localhost:8080/workspaces/<UUID>/secrets \
-H 'Content-Type: application/json' \
-d '{"key":"REMOTE_DEMO_KEY","value":"hello-from-remote"}'
# 3. Run the demo from any machine that can reach the platform:
WORKSPACE_ID=<UUID> PLATFORM_URL=http://localhost:8080 \
python3 sdk/python/examples/remote-agent/run.py
```
You should see log lines for each of the three phases, and then
heartbeat lines every 5s. The workspace should appear online on the
canvas. Pause or delete it from the canvas / via API, and the script
exits cleanly.
## What this demonstrates
| Phase | Endpoint | Shown in the demo |
|---|---|---|
| 30.1 | `POST /registry/register` | Token issuance + on-disk caching |
| 30.1 | `POST /registry/heartbeat` | Bearer-authenticated liveness report |
| 30.2 | `GET /workspaces/:id/secrets/values` | Token-gated decrypted-secrets pull |
| 30.4 | `GET /workspaces/:id/state` | Token-gated pause/delete detection |
## What it doesn't do yet
* **No inbound A2A server.** Other agents can't initiate calls back to
this remote agent. Future 30.8b adds an optional HTTP server helper.
* **No sibling discovery.** Future 30.6 adds peer URL caching so this
agent can call siblings directly instead of going through the proxy.
## Troubleshooting
* `401 missing workspace auth token` on the secrets/state calls — your
cached token is stale (workspace was recreated). Delete
`~/.molecule/<workspace_id>/.auth_token` and re-run.
* `connection refused` — double-check `PLATFORM_URL` and that the
platform is actually listening.
* Workspace never appears as online on the canvas — confirm it was
created with `runtime: external` (otherwise the provisioner will
try to start a local container and fail).

View File

@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""Minimal remote-agent demo — Phase 30.130.5 end-to-end from outside the
platform's Docker network.
What this does:
1. Registers the workspace with the platform (mints + saves a bearer token).
2. Pulls the merged decrypted secrets via the token-gated 30.2 endpoint.
3. Runs a heartbeat + state-poll loop; exits cleanly when the platform
reports the workspace paused or deleted.
What it doesn't do (future 30.8b work):
- Host an inbound A2A server. Platform-initiated calls to this agent
won't reach it unless you expose one yourself.
Usage:
# One-time setup on the platform side:
# 1) Create the workspace row (any template is fine — external runtime
# is cleanest if you don't want Docker to try to start a container):
curl -s -X POST http://localhost:8080/workspaces \\
-H 'Content-Type: application/json' \\
-d '{"name":"remote-demo","tier":2,"runtime":"external"}'
# 2) Grab the returned workspace id.
# 3) Optional — seed a secret:
curl -s -X POST http://localhost:8080/workspaces/<id>/secrets \\
-H 'Content-Type: application/json' \\
-d '{"key":"REMOTE_DEMO_KEY","value":"hello-from-remote"}'
# Now run this script from any machine that can reach the platform:
WORKSPACE_ID=<id> PLATFORM_URL=http://localhost:8080 python3 run.py
Environment variables:
WORKSPACE_ID (required)
PLATFORM_URL (required)
AGENT_NAME (optional; default derived from workspace id)
MAX_ITERATIONS (optional; caps loop length for demos)
"""
from __future__ import annotations
import logging
import os
import sys
# Local-dev import path — when installed via pip the molecule_agent package
# resolves normally; when running from the repo checkout we add sdk/python/
# to sys.path so you can run `python3 run.py` without a pip install.
_here = os.path.dirname(os.path.abspath(__file__))
_sdk = os.path.join(_here, "..", "..", "sdk", "python")
if os.path.isdir(_sdk) and _sdk not in sys.path:
sys.path.insert(0, _sdk)
from molecule_agent import RemoteAgentClient # noqa: E402
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
log = logging.getLogger("remote-agent-demo")
workspace_id = os.environ.get("WORKSPACE_ID", "").strip()
platform_url = os.environ.get("PLATFORM_URL", "").strip()
if not workspace_id or not platform_url:
log.error("set WORKSPACE_ID and PLATFORM_URL and re-run")
return 2
agent_name = os.environ.get("AGENT_NAME", f"remote-demo-{workspace_id[:8]}")
max_iter_env = os.environ.get("MAX_ITERATIONS", "").strip()
max_iter = int(max_iter_env) if max_iter_env else None
client = RemoteAgentClient(
workspace_id=workspace_id,
platform_url=platform_url,
agent_card={"name": agent_name, "skills": []},
# Shorter intervals for demo visibility; production would leave defaults.
heartbeat_interval=5.0,
)
log.info("phase 1 — registering workspace %s with %s", workspace_id, platform_url)
client.register()
log.info("phase 2 — pulling secrets via 30.2 token-gated endpoint")
try:
secrets = client.pull_secrets()
except Exception as exc:
log.error("pull_secrets failed: %s", exc)
return 1
log.info("received %d secret(s): keys=%s", len(secrets), sorted(secrets.keys()))
log.info("phase 3 — heartbeat + state-poll loop (will exit on pause/delete)")
terminal = client.run_heartbeat_loop(
max_iterations=max_iter,
task_supplier=lambda: {"current_task": "remote-agent demo idle", "active_tasks": 0},
)
log.info("loop exited: terminal_status=%s", terminal)
return 0
if __name__ == "__main__":
sys.exit(main())

97
molecule_agent/README.md Normal file
View File

@ -0,0 +1,97 @@
# molecule_agent — Remote-agent SDK for Molecule AI
Build a Python agent that runs **outside** a Molecule AI platform's Docker network
and registers as a first-class workspace. The agent gets bearer-token auth,
pulls its secrets, calls siblings, installs plugins from the platform's
registry, and reacts to platform-initiated lifecycle events (pause, delete) —
all over plain HTTP.
This is the client side of [Phase 30](../../../PLAN.md). The platform side
ships in the same release; this package is just the SDK an agent author
imports.
## Install
```bash
pip install molecule-sdk # ships molecule_plugin + molecule_agent
```
## 60-second example
```python
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="<the-uuid-of-an-external-workspace-on-the-platform>",
platform_url="https://your-platform.example.com",
agent_card={"name": "my-remote-agent", "skills": []},
)
# 1. Register and mint a bearer token (cached at ~/.molecule/<id>/.auth_token).
client.register()
# 2. Pull secrets the platform was set to inject.
secrets = client.pull_secrets()
# → {"OPENAI_API_KEY": "...", ...}
# 3. (Optional) install a plugin locally — pulls a tarball, unpacks, runs setup.sh.
client.install_plugin("molecule-dev")
client.install_plugin("my-plugin", source="github://acme/my-plugin")
# 4. Run the heartbeat + state-poll loop until the platform pauses/deletes us.
terminal = client.run_heartbeat_loop()
print(f"loop exited: {terminal}")
```
A runnable demo with full setup walkthrough lives at
[`sdk/python/examples/remote-agent/`](../examples/remote-agent).
## What the SDK gives you
| Method | Phase | What it does |
|---|---|---|
| `register()` | 30.1 | Mint + cache the workspace's bearer token |
| `pull_secrets()` | 30.2 | Token-gated GET of merged secrets dict |
| `install_plugin(name, source=None)` | 30.3 | Stream plugin tarball, atomic extract, run setup.sh |
| `poll_state()` | 30.4 | Lightweight `{status, paused, deleted}` poll |
| `heartbeat(...)` | 30.1 | Single bearer-authed heartbeat |
| `get_peers()` / `discover_peer()` | 30.6 | Sibling URL discovery with TTL cache |
| `call_peer(target, message)` | 30.6 | Direct A2A with proxy fallback |
| `run_heartbeat_loop()` | combo | Drives heartbeat + state-poll on a timer; exits on pause/delete |
## What it doesn't do (yet)
- **No inbound A2A server.** Other agents can't initiate calls to your remote
agent unless you host an HTTP endpoint yourself. Future `start_a2a_server()`
helper will close this gap.
- **No automatic reconnect after token loss.** If `~/.molecule/<id>/.auth_token`
is deleted, you'll need to re-issue the token via the platform admin (since
`POST /registry/register` is idempotent — it won't mint a second token for
a workspace that already has one).
## Design choices
- **Blocking (`requests`), not async.** Drops into any runtime — script,
thread, asyncio loop. No framework lock-in.
- **Token cached on disk with 0600** so a restart of the agent doesn't
re-issue (the platform refuses anyway). Lives at
`~/.molecule/<workspace_id>/.auth_token`.
- **URL cache for siblings is process-memory only**, 5-minute TTL. Cleared
on graceful failures via `invalidate_peer_url`.
- **Tar extraction uses `_safe_extract_tar`** that rejects path-traversal
and skips symlinks — defense against tar-slip CVEs in case a plugin
source is compromised.
## Compatibility
Requires a Molecule AI platform with Phase 30 endpoints (PR #122 onwards).
Older platforms grandfather pre-token workspaces through, so this SDK
also works against a transition-period deployment — but you won't get
the security benefits of bearer auth until both sides upgrade.
## Related
- [`molecule_plugin`](../molecule_plugin) — the *other* SDK in this
package, for plugin authors. Different audience.
- [`sdk/python/examples/remote-agent/run.py`](../examples/remote-agent/run.py)
— the runnable demo that proves all of the above end-to-end.

View File

@ -0,0 +1,40 @@
"""Molecule AI remote-agent SDK — build agents that run outside the platform
network and register as first-class workspaces.
This is the Phase 30.8 companion to ``molecule_plugin`` (for plugin authors).
Where ``molecule_plugin`` helps you ship installable behavior for workspaces
that already exist, ``molecule_agent`` helps you *be* a workspace from the
other side of the wire: register, authenticate, pull secrets, heartbeat,
and detect pause/resume/delete all via the Phase 30.130.5 HTTP contract.
Intended usage::
from molecule_agent import RemoteAgentClient
client = RemoteAgentClient(
workspace_id="550e8400-e29b-41d4-a716-446655440000",
platform_url="https://your-platform.example.com",
agent_card={"name": "my-remote-agent", "skills": []},
)
client.register() # mints + persists the auth token
env = client.pull_secrets() # decrypted secrets dict
client.run_heartbeat_loop() # background heartbeat + state-poll
See ``sdk/python/examples/remote-agent/`` for a runnable demo.
Design notes:
* **No async.** The SDK uses blocking ``requests`` so a remote agent author
can embed it in any event loop / thread / script without forcing anyio.
* **Token cached on disk** at ``~/.molecule/<workspace_id>/.auth_token``
with 0600 permissions, so a restart of the agent doesn't re-issue a
token (the platform refuses to issue a second token when one is on file).
* **Pause/delete detection is polling-based** because remote agents usually
can't expose an inbound WebSocket reachable from the platform.
"""
from __future__ import annotations
from .client import PeerInfo, RemoteAgentClient, WorkspaceState
__all__ = ["RemoteAgentClient", "WorkspaceState", "PeerInfo", "__version__"]
__version__ = "0.1.0"

685
molecule_agent/client.py Normal file
View File

@ -0,0 +1,685 @@
"""RemoteAgentClient — blocking HTTP client for the Phase 30 remote-agent flow.
The client is deliberately dependency-light (``requests`` only) so a remote
agent author can drop it into any runtime. All methods correspond 1:1 to
a Phase 30 endpoint:
* :py:meth:`register` ``POST /registry/register`` (30.1)
* :py:meth:`pull_secrets` ``GET /workspaces/:id/secrets/values`` (30.2)
* :py:meth:`poll_state` ``GET /workspaces/:id/state`` (30.4)
* :py:meth:`heartbeat` ``POST /registry/heartbeat`` (30.1)
* :py:meth:`run_heartbeat_loop` drives heartbeat + state-poll on a timer,
returns when the platform reports the workspace paused or deleted.
No inbound A2A server is bundled here yet that requires hosting an HTTP
endpoint the platform's proxy can reach, which is network-dependent. A
future 30.8b iteration will add an optional ``start_a2a_server()`` helper.
"""
from __future__ import annotations
import logging
import os
import stat
import subprocess
import tarfile
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import requests
logger = logging.getLogger(__name__)
# Polling cadence defaults. Chosen to align with the platform's 60-second
# Redis TTL — one heartbeat per minute keeps the TTL refreshed; state-poll
# at the same cadence is cheap (tiny GET) and gives ≤60s reaction time on
# pause / delete. Overridable via RemoteAgentClient constructor kwargs.
DEFAULT_HEARTBEAT_INTERVAL = 30.0 # seconds
DEFAULT_STATE_POLL_INTERVAL = 30.0 # seconds
# Phase 30.6 — sibling URL cache TTL. Cached URLs expire after this many
# seconds, forcing a re-discovery call. Short enough that a sibling that
# moved (restart with new port) is picked up quickly; long enough that
# we don't hit the discovery endpoint on every A2A call.
DEFAULT_URL_CACHE_TTL = 300.0 # 5 minutes
def _safe_extract_tar(tf: tarfile.TarFile, dest: Path) -> None:
"""Extract a tarfile, refusing entries that would escape `dest`
and silently skipping symlinks/hardlinks.
Tar archives can include ``..`` and absolute paths. Without explicit
rejection we'd risk the classic "tar slip" CVE — a malicious plugin
source could overwrite the agent's home files. We resolve every
entry's target to an absolute path, verify it lives inside dest, and
extract one-by-one so the symlink-skip actually takes effect (a bare
``extractall`` would still write the symlinks we marked as skipped).
"""
dest_abs = dest.resolve()
for member in tf.getmembers():
# Symlinks and hardlinks could point outside the staged tree;
# skip them entirely (matches the platform-side tar producer).
if member.issym() or member.islnk():
continue
target = (dest / member.name).resolve()
try:
target.relative_to(dest_abs)
except ValueError as exc:
raise ValueError(
f"refusing tar entry escaping dest: {member.name!r} -> {target}"
) from exc
tf.extract(member, dest)
def _rmtree_quiet(path: Path) -> None:
"""rm -rf <path> swallowing missing-file errors. Used for atomic
install rollback where we sometimes call this on a non-existent
staging dir."""
import shutil
try:
shutil.rmtree(path)
except FileNotFoundError:
pass
except Exception as exc:
logger.warning("rmtree(%s) failed: %s", path, exc)
@dataclass
class WorkspaceState:
"""Snapshot of a remote workspace's platform-side state."""
workspace_id: str
status: str # "online" / "paused" / "degraded" / "removed" / "offline" / ...
paused: bool
deleted: bool
@property
def should_stop(self) -> bool:
"""True when the agent should exit its run loop — platform has
paused or hard-deleted the workspace. The agent can be restarted
later; we just don't want to keep heartbeating against a dead row.
"""
return self.paused or self.deleted
@dataclass
class PeerInfo:
"""A sibling or parent workspace that this agent can communicate with."""
id: str
name: str
url: str
role: str = ""
tier: int = 2
status: str = "unknown"
agent_card: dict[str, Any] = field(default_factory=dict)
class RemoteAgentClient:
"""Blocking HTTP client for a Phase 30 remote agent.
Args:
workspace_id: UUID of the workspace this agent represents. The
workspace row must exist on the platform (created via
``POST /workspaces`` or ``POST /org/import``) the agent
claims that identity when it registers.
platform_url: Base URL of the platform, e.g.
``https://molecule.example.com``. No trailing slash; the
client adds paths.
agent_card: A2A agent card payload. Minimal: ``{"name": "..."}``.
Full schema matches what an in-container agent would report
(skills list, capabilities, etc.).
reported_url: Optional externally-reachable URL at which siblings
can call this agent's A2A endpoint. If omitted, the agent is
reachable only via the platform's proxy (which won't be able
to initiate calls to the agent either that's a limitation
of remote agents today, resolved by 30.6/30.7 or by exposing
an inbound endpoint yourself).
token_dir: Where to cache the workspace auth token on disk.
Defaults to ``~/.molecule/<workspace_id>/``. Created with
0700 permissions if missing.
heartbeat_interval: Seconds between heartbeats in the run loop.
state_poll_interval: Seconds between state polls in the run loop.
"""
def __init__(
self,
workspace_id: str,
platform_url: str,
agent_card: dict[str, Any] | None = None,
reported_url: str = "",
token_dir: Path | None = None,
heartbeat_interval: float = DEFAULT_HEARTBEAT_INTERVAL,
state_poll_interval: float = DEFAULT_STATE_POLL_INTERVAL,
url_cache_ttl: float = DEFAULT_URL_CACHE_TTL,
session: requests.Session | None = None,
) -> None:
self.workspace_id = workspace_id
self.platform_url = platform_url.rstrip("/")
self.agent_card = agent_card or {"name": f"remote-agent-{workspace_id[:8]}"}
self.reported_url = reported_url
self.heartbeat_interval = heartbeat_interval
self.state_poll_interval = state_poll_interval
self.url_cache_ttl = url_cache_ttl
# Phase 30.6 — sibling URL cache keyed by workspace id. Values are
# (url, expires_at_unix_seconds). Process-memory only; we re-fetch
# on restart because agent lifetimes are short enough that
# persisting doesn't buy much.
self._url_cache: dict[str, tuple[str, float]] = {}
self._session = session or requests.Session()
self._token_dir = token_dir or (
Path.home() / ".molecule" / workspace_id
)
self._token: str | None = None
self._start_time = time.time()
# ------------------------------------------------------------------
# Token persistence
# ------------------------------------------------------------------
@property
def token_file(self) -> Path:
return self._token_dir / ".auth_token"
def load_token(self) -> str | None:
"""Load a cached token from disk if present. Populates the
in-memory cache on success."""
if self._token is not None:
return self._token
if not self.token_file.exists():
return None
try:
tok = self.token_file.read_text().strip()
except OSError as exc:
logger.warning("failed to read %s: %s", self.token_file, exc)
return None
if not tok:
return None
self._token = tok
return tok
def save_token(self, token: str) -> None:
"""Persist a freshly-issued token to disk. Creates the parent
directory with 0700 and the file with 0600 to keep the credential
off other users' prying eyes."""
token = token.strip()
if not token:
raise ValueError("refusing to save empty token")
self._token_dir.mkdir(parents=True, exist_ok=True)
try:
os.chmod(self._token_dir, 0o700)
except OSError:
pass # non-fatal — best-effort on unusual filesystems
self.token_file.write_text(token)
try:
os.chmod(self.token_file, 0o600)
except OSError:
pass
self._token = token
def _auth_headers(self) -> dict[str, str]:
tok = self.load_token()
if not tok:
return {}
return {"Authorization": f"Bearer {tok}"}
# ------------------------------------------------------------------
# Endpoints
# ------------------------------------------------------------------
def register(self) -> str:
"""Register with the platform and cache the issued auth token.
Returns the token (also persisted to disk). If the platform has
already issued a token for this workspace (identified by the
cached file), register will still succeed but the response will
not include a new ``auth_token`` the client keeps using the
on-disk copy.
Raises :class:`requests.HTTPError` on non-2xx responses.
"""
# The platform's RegisterPayload requires a non-empty url. A remote
# agent that doesn't expose inbound A2A yet still needs a placeholder
# — we use "remote://no-inbound" so the platform can distinguish it
# from a real HTTP URL and not try to reach the agent.
reported = self.reported_url or "remote://no-inbound"
resp = self._session.post(
f"{self.platform_url}/registry/register",
json={
"id": self.workspace_id,
"url": reported,
"agent_card": self.agent_card,
},
timeout=10.0,
)
resp.raise_for_status()
data = resp.json()
tok = data.get("auth_token", "")
if tok:
self.save_token(tok)
logger.info("registered and saved new token (prefix=%s…)", tok[:8])
else:
# Already-tokened workspace — keep using the cached one.
existing = self.load_token()
if not existing:
logger.warning(
"register returned no auth_token and no cached token exists — "
"authenticated calls will 401 until a token is minted"
)
return self._token or ""
def pull_secrets(self) -> dict[str, str]:
"""Fetch the merged decrypted secrets via the 30.2 endpoint.
Returns an empty dict when the platform has no secrets configured
for this workspace. Raises on network errors and on 401 (which
means the token is missing / invalid call :py:meth:`register`
first).
"""
resp = self._session.get(
f"{self.platform_url}/workspaces/{self.workspace_id}/secrets/values",
headers=self._auth_headers(),
timeout=10.0,
)
resp.raise_for_status()
return resp.json() or {}
def poll_state(self) -> WorkspaceState | None:
"""Fetch the workspace's current state via the 30.4 endpoint.
Returns None if the platform returns 404 with ``{"deleted": true}``
(workspace hard-deleted) callers typically exit their run loop
in that case. Raises on other HTTP errors.
"""
resp = self._session.get(
f"{self.platform_url}/workspaces/{self.workspace_id}/state",
headers=self._auth_headers(),
timeout=10.0,
)
if resp.status_code == 404:
# Platform signals hard-delete via 404 + deleted:true
return WorkspaceState(
workspace_id=self.workspace_id,
status="removed",
paused=False,
deleted=True,
)
resp.raise_for_status()
data = resp.json()
return WorkspaceState(
workspace_id=self.workspace_id,
status=str(data.get("status", "unknown")),
paused=bool(data.get("paused", False)),
deleted=bool(data.get("deleted", False)),
)
def heartbeat(
self,
current_task: str = "",
active_tasks: int = 0,
error_rate: float = 0.0,
sample_error: str = "",
) -> None:
"""Send a single heartbeat. Safe to call repeatedly — the platform
treats it as idempotent state-refresh. Raises on non-2xx."""
uptime = int(time.time() - self._start_time)
resp = self._session.post(
f"{self.platform_url}/registry/heartbeat",
headers=self._auth_headers(),
json={
"workspace_id": self.workspace_id,
"current_task": current_task,
"active_tasks": active_tasks,
"error_rate": error_rate,
"sample_error": sample_error,
"uptime_seconds": uptime,
},
timeout=10.0,
)
resp.raise_for_status()
# ------------------------------------------------------------------
# Peer discovery + cache (Phase 30.6)
# ------------------------------------------------------------------
def get_peers(self) -> list[PeerInfo]:
"""Fetch the list of peer workspaces this agent can communicate with.
Hits ``GET /registry/:id/peers`` with the bearer token. The returned
list includes siblings (same parent) and, if applicable, the parent.
Each peer's URL is seeded into the local cache so subsequent calls
to :py:meth:`discover_peer` short-circuit without hitting the
platform.
Raises on 401 (stale/missing token call :py:meth:`register`) and
other non-2xx.
"""
resp = self._session.get(
f"{self.platform_url}/registry/{self.workspace_id}/peers",
headers={
**self._auth_headers(),
"X-Workspace-ID": self.workspace_id,
},
timeout=10.0,
)
resp.raise_for_status()
data = resp.json() or []
peers: list[PeerInfo] = []
now = time.time()
for row in data:
pid = str(row.get("id", ""))
url = str(row.get("url", ""))
if not pid:
continue
peer = PeerInfo(
id=pid,
name=str(row.get("name", "")),
url=url,
role=str(row.get("role", "")),
tier=int(row.get("tier", 2) or 2),
status=str(row.get("status", "unknown")),
agent_card=row.get("agent_card") or {},
)
peers.append(peer)
# Seed the cache so a subsequent call_peer doesn't need a
# discover round-trip. Only cache HTTP-shaped URLs; skip the
# "remote://no-inbound" placeholder and empty strings.
if url.startswith(("http://", "https://")):
self._url_cache[pid] = (url, now + self.url_cache_ttl)
return peers
def discover_peer(self, target_id: str) -> str | None:
"""Resolve a peer's URL, using the cache when fresh.
Returns the URL string, or None if the platform has no usable URL
for this target. On 401/403 the caller should re-authenticate or
verify the hierarchy rule; those are raised as ``HTTPError``.
Cache semantics: a cached entry is returned immediately if its TTL
hasn't expired; otherwise the platform is hit and the cache
refreshed. Call :py:meth:`invalidate_peer_url` to drop an entry
that was stale (connection error, 5xx) so the next discover
re-fetches instead of returning the dead URL again.
"""
cached = self._url_cache.get(target_id)
if cached is not None:
url, expires_at = cached
if time.time() < expires_at:
return url
# Expired — drop and fall through to refresh
self._url_cache.pop(target_id, None)
resp = self._session.get(
f"{self.platform_url}/registry/discover/{target_id}",
headers={
**self._auth_headers(),
"X-Workspace-ID": self.workspace_id,
},
timeout=10.0,
)
if resp.status_code == 404:
return None
resp.raise_for_status()
url = str((resp.json() or {}).get("url", ""))
if url.startswith(("http://", "https://")):
self._url_cache[target_id] = (url, time.time() + self.url_cache_ttl)
return url
return None
def invalidate_peer_url(self, target_id: str) -> None:
"""Drop a peer's cached URL. Call this after a direct-call failure
so the next call_peer performs a fresh discover."""
self._url_cache.pop(target_id, None)
def call_peer(
self,
target_id: str,
message: str,
prefer_direct: bool = True,
) -> dict[str, Any]:
"""Send an A2A ``message/send`` to a peer.
Preferred path (``prefer_direct=True``, default):
1. Resolve target URL via :py:meth:`discover_peer` (cache-hot
path when we've seen this peer before).
2. POST the JSON-RPC envelope directly to the peer's URL.
3. On connection error / 5xx, invalidate the cache and retry
via the platform proxy graceful fallback so a stale URL
doesn't brick inter-agent communication.
Proxy-only path (``prefer_direct=False``):
Always routes through ``POST /workspaces/:id/a2a`` useful
when both agents are behind NAT and can't reach each other
directly, but the platform can reach both.
Returns the full JSON-RPC response dict so callers can inspect
``result`` vs ``error`` without us flattening the envelope.
"""
body = {
"jsonrpc": "2.0",
"id": str(uuid.uuid4()),
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": str(uuid.uuid4()),
"parts": [{"kind": "text", "text": message}],
}
},
}
headers = {
**self._auth_headers(),
"X-Workspace-ID": self.workspace_id,
"Content-Type": "application/json",
}
if prefer_direct:
url = self.discover_peer(target_id)
if url:
try:
resp = self._session.post(url, json=body, headers=headers, timeout=30.0)
resp.raise_for_status()
return resp.json()
except Exception as exc:
logger.warning(
"direct A2A to %s (%s) failed: %s — invalidating cache, falling back to proxy",
target_id, url, exc,
)
self.invalidate_peer_url(target_id)
# Proxy fallback (or prefer_direct=False)
resp = self._session.post(
f"{self.platform_url}/workspaces/{target_id}/a2a",
json=body, headers=headers, timeout=30.0,
)
resp.raise_for_status()
return resp.json()
# ------------------------------------------------------------------
# Plugin install (Phase 30.3)
# ------------------------------------------------------------------
@property
def plugins_dir(self) -> Path:
"""Where pulled plugins are unpacked. Lives under the same
per-workspace directory as the auth token (``~/.molecule/<id>/``)
so a single rm cleans the agent's local state."""
return self._token_dir / "plugins"
def install_plugin(
self,
name: str,
source: str | None = None,
run_setup_sh: bool = True,
report_to_platform: bool = True,
) -> Path:
"""Pull a plugin tarball from the platform, unpack it, optionally
run its ``setup.sh``, and report success.
Phase 30.3 contract:
1. Stream ``GET /workspaces/:id/plugins/:name/download[?source=]``
2. Atomically extract into ``~/.molecule/<workspace>/plugins/<name>/``
via a sibling-tempdir + rename so a partial extract never
leaves the directory in a half-installed state.
3. If ``setup.sh`` exists in the unpacked tree, run it (bash) so
pip/npm deps land on the agent's machine. Failures from
``setup.sh`` are logged but don't prevent the install record
the agent author can re-run setup manually.
4. POST ``/workspaces/:id/plugins`` with the source string so the
platform's ``workspace_plugins`` table reflects the install.
Returns the path to the unpacked plugin directory.
Raises ``requests.HTTPError`` on download failure (401 / 404 / etc.).
"""
target = self.plugins_dir / name
staging = self.plugins_dir / f".staging-{name}-{uuid.uuid4().hex[:8]}"
self.plugins_dir.mkdir(parents=True, exist_ok=True)
url = f"{self.platform_url}/workspaces/{self.workspace_id}/plugins/{name}/download"
params: dict[str, str] = {}
if source:
params["source"] = source
# We pull the whole tarball into memory before extracting. The
# platform side (PLUGIN_INSTALL_MAX_DIR_BYTES) caps plugin size
# at 100 MiB by default, which is comfortable to hold in process
# memory and lets us use tarfile's seekable r:gz mode (the
# streaming r|gz mode is sequential-only and breaks on tarballs
# whose entries weren't sorted by name, which we don't enforce).
# If the cap is ever raised above ~500 MiB, switch to a temp
# file: tarfile.open(fileobj=open(temp, "rb"), mode="r:gz").
with self._session.get(
url, headers=self._auth_headers(), params=params,
timeout=60.0,
) as resp:
resp.raise_for_status()
staging.mkdir(parents=True)
try:
import io as _io
with tarfile.open(fileobj=_io.BytesIO(resp.content), mode="r:gz") as tf:
_safe_extract_tar(tf, staging)
except Exception:
# Roll back the partial extract
_rmtree_quiet(staging)
raise
# Atomic swap: remove old dir if present, rename staging into place.
if target.exists():
_rmtree_quiet(target)
staging.rename(target)
logger.info("plugin %s unpacked to %s", name, target)
# 3. setup.sh — best-effort. We never raise on its failure because
# the plugin files are now correctly installed; setup is just for
# heavy deps that the agent author can rerun manually.
if run_setup_sh:
setup = target / "setup.sh"
if setup.is_file():
try:
proc = subprocess.run(
["bash", str(setup)],
cwd=str(target),
capture_output=True, text=True, timeout=120,
)
if proc.returncode == 0:
logger.info("plugin %s setup.sh ok", name)
else:
logger.warning(
"plugin %s setup.sh exit=%d stderr=%s",
name, proc.returncode, proc.stderr[:200],
)
except subprocess.TimeoutExpired:
logger.warning("plugin %s setup.sh timed out (120s)", name)
except FileNotFoundError:
logger.warning("plugin %s setup.sh present but bash not found", name)
# 4. Report to platform — write a workspace_plugins row so List
# reflects the install. Best-effort; the local files are correct
# regardless of whether this POST succeeds.
if report_to_platform:
try:
report_source = source or f"local://{name}"
self._session.post(
f"{self.platform_url}/workspaces/{self.workspace_id}/plugins",
headers=self._auth_headers(),
json={"source": report_source},
timeout=10.0,
)
except Exception as exc:
logger.warning("plugin %s install record POST failed: %s", name, exc)
return target
# ------------------------------------------------------------------
# Run loop
# ------------------------------------------------------------------
def run_heartbeat_loop(
self,
max_iterations: int | None = None,
task_supplier: "callable | None" = None,
) -> str:
"""Drive heartbeat + state-poll on a timer. Returns the terminal
status when the loop exits (``"paused"``, ``"removed"``, or
``"max_iterations"``).
Args:
max_iterations: Stop after N loop iterations. None = run until
the workspace is paused / deleted. Useful for tests and
smoke scripts.
task_supplier: Optional zero-arg callable returning a dict
``{"current_task": str, "active_tasks": int}`` fetched
each iteration. Lets the agent report what it's doing.
The loop sends one heartbeat + one state poll per iteration; the
next iteration sleeps for ``heartbeat_interval`` seconds. Errors
from either call are logged and the loop continues we deliberately
do NOT re-raise because a transient platform hiccup shouldn't take
a remote agent offline.
"""
i = 0
while True:
if max_iterations is not None and i >= max_iterations:
return "max_iterations"
i += 1
report: dict[str, Any] = {}
if task_supplier is not None:
try:
report = task_supplier() or {}
except Exception as exc:
logger.warning("task_supplier raised: %s", exc)
try:
self.heartbeat(
current_task=str(report.get("current_task", "")),
active_tasks=int(report.get("active_tasks", 0)),
)
except Exception as exc:
logger.warning("heartbeat failed: %s — continuing", exc)
try:
state = self.poll_state()
except Exception as exc:
logger.warning("state poll failed: %s — continuing", exc)
state = None
if state is not None and state.should_stop:
logger.info(
"platform reports workspace %s (paused=%s deleted=%s) — exiting",
state.status, state.paused, state.deleted,
)
return state.status
time.sleep(self.heartbeat_interval)
__all__ = [
"RemoteAgentClient",
"WorkspaceState",
"PeerInfo",
"DEFAULT_HEARTBEAT_INTERVAL",
"DEFAULT_STATE_POLL_INTERVAL",
"DEFAULT_URL_CACHE_TTL",
]

View File

@ -0,0 +1,98 @@
"""Molecule AI plugin SDK — build plugins installable on any Molecule AI workspace.
A plugin is a directory containing a ``plugin.yaml`` manifest and one or more
per-runtime adaptors under ``adapters/<runtime>.py``. The Molecule AI platform
resolves and installs the right adaptor at workspace startup.
This SDK exposes:
* :class:`PluginAdaptor` the Protocol every adaptor must satisfy.
* :class:`InstallContext`, :class:`InstallResult` data classes passed to
``install()`` and returned from it.
* :class:`AgentskillsAdaptor` a drop-in adaptor for plugins that ship
agentskills.io-format skills + Molecule AI's optional rules (covers the
vast majority of cases).
* :data:`PLUGIN_YAML_SCHEMA` the manifest schema for validation tooling.
Example: a minimal plugin that's installable on Claude Code and DeepAgents
.. code-block:: text
my-plugin/
plugin.yaml
rules/my-rule.md
skills/my-skill/SKILL.md
adapters/
claude_code.py # `from molecule_plugin import AgentskillsAdaptor as Adaptor`
deepagents.py # same one-liner
Full docs + cookiecutter template: see ``sdk/python/README.md``.
"""
from __future__ import annotations
# Re-export from the runtime registry so plugins have a single import path.
# The workspace-template package is not pip-installable yet; the SDK duplicates
# the Protocol definition so community authors can build against it without
# depending on the runtime. When a plugin is installed in a workspace, the
# runtime's own ``plugins_registry`` is what actually executes the adaptor —
# these types are structurally compatible (duck-typed via Protocol).
from .protocol import ( # noqa: F401
InstallContext,
InstallResult,
PluginAdaptor,
)
from .builtins import AgentskillsAdaptor, SKIP_ROOT_MD # noqa: F401
from .manifest import ( # noqa: F401
PLUGIN_YAML_SCHEMA,
SKILL_COMPAT_MAX,
SKILL_DESC_MAX,
SKILL_NAME_MAX,
SKILL_NAME_RE,
parse_skill_md,
validate_manifest,
validate_plugin,
validate_skill,
)
from .protocol import DEFAULT_MEMORY_FILENAME, SKILLS_SUBDIR # noqa: F401
from .workspace import ( # noqa: F401
SUPPORTED_RUNTIMES,
ValidationError,
validate_workspace_template,
)
from .org import validate_org_template # noqa: F401
from .channel import ( # noqa: F401
SUPPORTED_CHANNEL_TYPES,
validate_channel_config,
validate_channel_file,
)
__version__ = "0.1.0"
__all__ = [
"AgentskillsAdaptor",
"DEFAULT_MEMORY_FILENAME",
"InstallContext",
"InstallResult",
"PLUGIN_YAML_SCHEMA",
"PluginAdaptor",
"SKILLS_SUBDIR",
"SKILL_COMPAT_MAX",
"SKILL_DESC_MAX",
"SKILL_NAME_MAX",
"SKILL_NAME_RE",
"SKIP_ROOT_MD",
"SUPPORTED_CHANNEL_TYPES",
"SUPPORTED_RUNTIMES",
"ValidationError",
"parse_skill_md",
"validate_channel_config",
"validate_channel_file",
"validate_manifest",
"validate_org_template",
"validate_plugin",
"validate_skill",
"validate_workspace_template",
"__version__",
]

130
molecule_plugin/__main__.py Normal file
View File

@ -0,0 +1,130 @@
"""CLI: ``python -m molecule_plugin validate <kind> <path>...``.
Kinds:
* ``plugin`` a plugin directory (plugin.yaml + skills/, adapters/)
* ``workspace`` a workspace-configs-template directory (config.yaml)
* ``org`` an org-template directory (org.yaml)
* ``channel`` a channel config YAML/JSON file (standalone or list)
Exit 0 on valid, 1 when errors found. Intended for CI and local author
workflows before publishing. ``validate <path>`` (kind omitted) is kept as
a back-compat shortcut for plugin validation.
"""
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from .channel import validate_channel_file
from .manifest import validate_plugin
from .org import validate_org_template
from .workspace import validate_workspace_template
def _validate_plugin(paths: list[str], quiet: bool) -> int:
total = 0
for raw in paths:
path = Path(raw)
if not path.exists():
print(f"{path}: does not exist", file=sys.stderr)
total += 1
continue
if not path.is_dir():
print(f"{path}: not a directory", file=sys.stderr)
total += 1
continue
results = validate_plugin(path)
if not results:
if not quiet:
print(f"{path}: valid (plugin.yaml + all skills pass agentskills.io spec)")
continue
for source, errors in results.items():
total += len(errors)
for err in errors:
print(f"{path}/{source}: {err}", file=sys.stderr)
return 0 if total == 0 else 1
def _validate_dir(
kind: str,
paths: list[str],
validator,
quiet: bool,
) -> int:
total = 0
for raw in paths:
path = Path(raw)
if not path.exists():
print(f"{path}: does not exist", file=sys.stderr)
total += 1
continue
if not path.is_dir():
print(f"{path}: not a directory", file=sys.stderr)
total += 1
continue
errors = validator(path)
if not errors:
if not quiet:
print(f"{path}: valid {kind}")
continue
total += len(errors)
for err in errors:
print(f"{err.file}: {err.message}", file=sys.stderr)
return 0 if total == 0 else 1
def _validate_channel(paths: list[str], quiet: bool) -> int:
total = 0
for raw in paths:
path = Path(raw)
if not path.exists():
print(f"{path}: does not exist", file=sys.stderr)
total += 1
continue
errors = validate_channel_file(path)
if not errors:
if not quiet:
print(f"{path}: valid channel config")
continue
total += len(errors)
for err in errors:
print(f"{err.file}: {err.message}", file=sys.stderr)
return 0 if total == 0 else 1
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="molecule_plugin")
sub = parser.add_subparsers(dest="cmd", required=True)
v = sub.add_parser("validate", help="Validate Molecule AI artifacts")
v.add_argument("args", nargs="+", help="[kind] paths... — kind in {plugin,workspace,org,channel}; defaults to plugin")
v.add_argument("--quiet", "-q", action="store_true")
args = parser.parse_args(argv)
kinds = {"plugin", "workspace", "org", "channel"}
if args.args and args.args[0] in kinds:
args.kind = args.args[0]
args.paths = args.args[1:]
else:
args.kind = "plugin"
args.paths = args.args
if not args.paths:
parser.error("at least one path is required")
if args.kind == "plugin":
return _validate_plugin(args.paths, args.quiet)
if args.kind == "workspace":
return _validate_dir("workspace template", args.paths, validate_workspace_template, args.quiet)
if args.kind == "org":
return _validate_dir("org template", args.paths, validate_org_template, args.quiet)
if args.kind == "channel":
return _validate_channel(args.paths, args.quiet)
return 2 # pragma: no cover
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

212
molecule_plugin/builtins.py Normal file
View File

@ -0,0 +1,212 @@
"""Built-in sub-type adapters for the SDK.
One class per agent shape. Currently ships :class:`AgentskillsAdaptor`
(the `agentskills.io <https://agentskills.io>`_-format default); more
will be added as new shapes emerge in the ecosystem
(``MCPServerAdaptor``, ``DeepAgentsSubagentAdaptor``, ``RAGPipelineAdaptor``,
etc.).
SDK authors pick a sub-type by import:
.. code-block:: python
# adapters/claude_code.py
from molecule_plugin import AgentskillsAdaptor as Adaptor
Plugins whose shape doesn't match any built-in ship a custom adapter
class in Python unlimited expressiveness, no framework constraint.
"""
from __future__ import annotations
import json
import os
import shutil
import subprocess
from pathlib import Path
from .protocol import SKILLS_SUBDIR, InstallContext, InstallResult
# Files at the plugin root that are never treated as prompt fragments.
SKIP_ROOT_MD = frozenset({"readme.md", "changelog.md", "license.md", "contributing.md"})
class AgentskillsAdaptor:
"""Sub-type adaptor for `agentskills.io <https://agentskills.io>`_-format skills.
The default adapter for the "skills + rules" shape installs
``skills/<name>/SKILL.md`` into ``/configs/skills/`` (where native
agentskills runtimes like Claude Code activate them automatically)
and appends Molecule AI-level ``rules/*.md`` + root prompt fragments to
the runtime memory file.
Matches the behaviour of the workspace runtime's
``plugins_registry.builtins.AgentskillsAdaptor``. Kept as a separate
copy here so SDK users can unit-test their plugins without installing
the full workspace runtime.
"""
def __init__(self, plugin_name: str, runtime: str) -> None:
self.plugin_name = plugin_name
self.runtime = runtime
async def install(self, ctx: InstallContext) -> InstallResult:
result = InstallResult(plugin_name=self.plugin_name, runtime=self.runtime, source="plugin")
rules_dir = ctx.plugin_root / "rules"
blocks: list[str] = []
if rules_dir.is_dir():
for p in sorted(rules_dir.iterdir()):
if p.is_file() and p.suffix == ".md":
content = p.read_text().strip()
if content:
blocks.append(f"# Plugin: {self.plugin_name} / rule: {p.name}\n\n{content}")
if ctx.plugin_root.is_dir():
for p in sorted(ctx.plugin_root.iterdir()):
if p.is_file() and p.suffix == ".md" and p.name.lower() not in SKIP_ROOT_MD:
content = p.read_text().strip()
if content:
blocks.append(f"# Plugin: {self.plugin_name} / fragment: {p.name}\n\n{content}")
if blocks:
ctx.append_to_memory(ctx.memory_filename, "\n\n".join(blocks))
src_skills = ctx.plugin_root / "skills"
if src_skills.is_dir():
dst_root = ctx.configs_dir / SKILLS_SUBDIR
dst_root.mkdir(parents=True, exist_ok=True)
for entry in sorted(src_skills.iterdir()):
if not entry.is_dir():
continue
dst = dst_root / entry.name
if dst.exists():
continue
shutil.copytree(entry, dst)
for p in dst.rglob("*"):
if p.is_file():
result.files_written.append(str(p.relative_to(ctx.configs_dir)))
# 4. Setup script — run setup.sh if present (npm/pip dependencies).
# Mirrors workspace-template/plugins_registry/builtins.py — must stay
# in sync (drift guard: tests/test_plugins_builtins_drift.py).
setup_script = ctx.plugin_root / "setup.sh"
if setup_script.is_file():
ctx.logger.info("%s: running setup.sh", self.plugin_name)
try:
proc = subprocess.run(
["bash", str(setup_script)],
capture_output=True, text=True, timeout=120,
cwd=str(ctx.plugin_root),
env={**os.environ, "CONFIGS_DIR": str(ctx.configs_dir)},
)
if proc.returncode == 0:
ctx.logger.info("%s: setup.sh completed successfully", self.plugin_name)
else:
result.warnings.append(f"setup.sh exited {proc.returncode}: {proc.stderr[:200]}")
ctx.logger.warning("%s: setup.sh failed: %s", self.plugin_name, proc.stderr[:200])
except subprocess.TimeoutExpired:
result.warnings.append("setup.sh timed out (120s)")
ctx.logger.warning("%s: setup.sh timed out", self.plugin_name)
# Claude Code layer — hooks/, commands/, settings-fragment.json.
# Mirrors workspace-template/plugins_registry/builtins.py — drift
# guarded by tests/test_plugins_builtins_drift.py.
_install_claude_layer(ctx, result, self.plugin_name)
return result
async def uninstall(self, ctx: InstallContext) -> None:
src_skills = ctx.plugin_root / "skills"
if src_skills.is_dir():
for entry in src_skills.iterdir():
dst = ctx.configs_dir / SKILLS_SUBDIR / entry.name
if dst.exists() and dst.is_dir():
shutil.rmtree(dst)
memory_path = ctx.configs_dir / ctx.memory_filename
if memory_path.exists():
prefix = f"# Plugin: {self.plugin_name} / "
kept = [ln for ln in memory_path.read_text().splitlines(keepends=True) if not ln.startswith(prefix)]
memory_path.write_text("".join(kept))
# ----------------------------------------------------------------------
# Claude Code layer — mirrors workspace-template/plugins_registry/builtins.py.
# Drift-guarded by workspace-template/tests/test_plugins_builtins_drift.py.
# ----------------------------------------------------------------------
def _install_claude_layer(ctx: InstallContext, result: InstallResult, plugin_name: str) -> None:
claude_dir = ctx.configs_dir / ".claude"
claude_dir.mkdir(parents=True, exist_ok=True)
_copy_dir_files(ctx.plugin_root / "hooks", claude_dir / "hooks", result, executable_suffix=".sh")
_copy_dir_files(ctx.plugin_root / "commands", claude_dir / "commands", result, only_suffix=".md")
_merge_settings_fragment(ctx, claude_dir, result, plugin_name)
def _copy_dir_files(src: Path, dst: Path, result: InstallResult,
executable_suffix: str | None = None,
only_suffix: str | None = None) -> None:
if not src.is_dir():
return
dst.mkdir(parents=True, exist_ok=True)
for f in src.iterdir():
if not f.is_file():
continue
if only_suffix and f.suffix != only_suffix:
if not (executable_suffix and f.suffix == ".py"):
continue
target = dst / f.name
shutil.copy2(f, target)
if executable_suffix and f.suffix == executable_suffix:
target.chmod(0o755)
result.files_written.append(str(target.relative_to(target.parents[2])))
def _merge_settings_fragment(ctx: InstallContext, claude_dir: Path,
result: InstallResult, plugin_name: str) -> None:
fragment_path = ctx.plugin_root / "settings-fragment.json"
if not fragment_path.is_file():
return
try:
fragment = json.loads(fragment_path.read_text())
except Exception as e:
result.warnings.append(f"settings-fragment.json invalid: {e}")
return
settings_path = claude_dir / "settings.json"
if settings_path.is_file():
try:
existing = json.loads(settings_path.read_text())
except Exception:
existing = {}
else:
existing = {}
rewritten = _rewrite_hook_paths(fragment, claude_dir)
merged = _deep_merge_hooks(existing, rewritten)
settings_path.write_text(json.dumps(merged, indent=2) + "\n")
result.files_written.append(str(settings_path.relative_to(ctx.configs_dir)))
ctx.logger.info("%s: merged hook config into %s", plugin_name, settings_path)
def _rewrite_hook_paths(fragment: dict, claude_dir: Path) -> dict:
out = json.loads(json.dumps(fragment))
for handlers in out.get("hooks", {}).values():
for handler in handlers:
for h in handler.get("hooks", []):
h["command"] = h.get("command", "").replace("${CLAUDE_DIR}", str(claude_dir))
return out
def _deep_merge_hooks(existing: dict, fragment: dict) -> dict:
out = dict(existing)
out.setdefault("hooks", {})
for event, handlers in fragment.get("hooks", {}).items():
out["hooks"].setdefault(event, [])
out["hooks"][event].extend(handlers)
for key, val in fragment.items():
if key == "hooks":
continue
out.setdefault(key, val)
return out

112
molecule_plugin/channel.py Normal file
View File

@ -0,0 +1,112 @@
"""Validator for social-channel configurations embedded in org.yaml / direct API payloads.
The platform's Go channel adapters (``platform/internal/channels/``) are the
authoritative implementations (Telegram first, Slack/Discord/WhatsApp on the
roadmap). This module provides a Python-side schema check for the YAML /
JSON blob that users write so authors catch misspelled fields before the
platform rejects them.
Shape (matches ``platform/internal/handlers/channels.go``):
.. code-block:: yaml
type: telegram
config:
bot_token: ${TELEGRAM_BOT_TOKEN} # platform-resolved env var
chat_id: ${TELEGRAM_CHAT_ID}
enabled: true # default true
Supported types track what the platform knows about via the channel
adapter registry. Keep in sync with ``channels.ChannelAdapter.Type()``.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from .workspace import ValidationError
# Channel types the platform has adapters for, as of today. New adapters
# (slack, discord, whatsapp) are welcome additions — update this set when
# the corresponding Go adapter lands.
SUPPORTED_CHANNEL_TYPES = frozenset({"telegram"})
# Per-type required config keys. Empty tuple = no required keys (for
# adapters that accept zero config).
_REQUIRED_KEYS: dict[str, tuple[str, ...]] = {
"telegram": ("bot_token",),
}
def validate_channel_config(
cfg: dict[str, Any], file_ref: str = "<channel>"
) -> list[ValidationError]:
"""Validate a single channel config dict (not a file)."""
errors: list[ValidationError] = []
ch_type = cfg.get("type")
if not ch_type:
errors.append(ValidationError(file_ref, "missing required field: type"))
return errors
if ch_type not in SUPPORTED_CHANNEL_TYPES:
errors.append(
ValidationError(
file_ref,
f"type={ch_type!r} — must be one of {sorted(SUPPORTED_CHANNEL_TYPES)}",
)
)
return errors
config = cfg.get("config")
if config is not None and not isinstance(config, dict):
errors.append(ValidationError(file_ref, f"config must be an object; got {type(config).__name__}"))
return errors
required = _REQUIRED_KEYS.get(ch_type, ())
for key in required:
if not config or key not in config:
errors.append(
ValidationError(file_ref, f"config.{key} is required for type={ch_type!r}")
)
if "enabled" in cfg and not isinstance(cfg["enabled"], bool):
errors.append(ValidationError(file_ref, f"enabled must be a boolean; got {type(cfg['enabled']).__name__}"))
return errors
def validate_channel_file(path: Path) -> list[ValidationError]:
"""Validate a YAML / JSON file containing a channel config or a list of them."""
if not path.exists():
return [ValidationError(str(path), "file does not exist")]
try:
doc = yaml.safe_load(path.read_text())
except yaml.YAMLError as exc:
return [ValidationError(str(path), f"invalid YAML / JSON: {exc}")]
if doc is None:
return [ValidationError(str(path), "file is empty")]
errors: list[ValidationError] = []
if isinstance(doc, list):
for i, entry in enumerate(doc):
if not isinstance(entry, dict):
errors.append(ValidationError(str(path), f"[{i}]: entry must be an object"))
continue
errors.extend(validate_channel_config(entry, f"{path}[{i}]"))
elif isinstance(doc, dict):
errors.extend(validate_channel_config(doc, str(path)))
else:
errors.append(ValidationError(str(path), f"top-level must be a channel object or list; got {type(doc).__name__}"))
return errors
__all__ = [
"SUPPORTED_CHANNEL_TYPES",
"validate_channel_config",
"validate_channel_file",
]

227
molecule_plugin/manifest.py Normal file
View File

@ -0,0 +1,227 @@
"""Plugin + skill manifest schema and validators.
Two layers:
1. **Plugin-level** (`plugin.yaml`) Molecule AI's superset: name, version,
description, declared `runtimes:`, skill list, rule list. The spec has
no concept of bundling; this is our own.
2. **Skill-level** (`skills/<skill>/SKILL.md`) follows the
`agentskills.io` open standard (name, description, optional license,
compatibility, metadata, allowed-tools). Validated against the spec
so our skills are installable in Claude Code, Cursor, Codex, and
every other skills-compatible agent product.
A plugin that validates locally will also load cleanly in the Molecule AI
platform AND be installable as-is into any agentskills-compatible tool.
"""
from __future__ import annotations
import re
from pathlib import Path
from typing import Any
import yaml
PLUGIN_YAML_SCHEMA: dict[str, Any] = {
"type": "object",
"required": ["name"],
"properties": {
"name": {"type": "string"},
"version": {"type": "string"},
"description": {"type": "string"},
"author": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"skills": {"type": "array", "items": {"type": "string"}},
"rules": {"type": "array", "items": {"type": "string"}},
"prompt_fragments": {"type": "array", "items": {"type": "string"}},
"runtimes": {
"type": "array",
"items": {"type": "string"},
"description": "Declared supported runtimes (e.g. claude_code, deepagents).",
},
},
}
def validate_manifest(path: str | Path) -> list[str]:
"""Return a list of validation error messages. Empty list = valid.
Deliberately simple no jsonschema dependency so SDK consumers don't
pick up an extra transitive dep just to lint their plugin.
"""
path = Path(path)
if not path.is_file():
return [f"manifest not found: {path}"]
try:
raw = yaml.safe_load(path.read_text())
except yaml.YAMLError as exc:
return [f"yaml parse error: {exc}"]
errors: list[str] = []
if not isinstance(raw, dict):
return ["manifest root must be a mapping"]
if "name" not in raw or not isinstance(raw.get("name"), str) or not raw["name"].strip():
errors.append("`name` is required and must be a non-empty string")
for field_name in ("tags", "skills", "rules", "prompt_fragments", "runtimes"):
if field_name in raw and not isinstance(raw[field_name], list):
errors.append(f"`{field_name}` must be a list")
if "runtimes" in raw and isinstance(raw["runtimes"], list):
known = {"claude_code", "deepagents", "langgraph", "crewai", "autogen", "openclaw"}
for r in raw["runtimes"]:
if not isinstance(r, str):
errors.append(f"`runtimes` entry must be string, got {type(r).__name__}")
elif r.replace("-", "_") not in known:
errors.append(
f"unknown runtime '{r}' — supported: {sorted(known)} "
f"(use underscore form, e.g. 'claude_code')"
)
return errors
# ---------------------------------------------------------------------------
# agentskills.io spec — SKILL.md validation
# ---------------------------------------------------------------------------
# Spec limits — public so tooling/tests/docs can import them rather than
# duplicate magic numbers. Source: https://agentskills.io/specification
SKILL_NAME_RE = re.compile(r"^[a-z0-9]+(-[a-z0-9]+)*$")
SKILL_NAME_MAX = 64
SKILL_DESC_MAX = 1024
SKILL_COMPAT_MAX = 500
def parse_skill_md(path: str | Path) -> tuple[dict[str, Any], str, list[str]]:
"""Parse a SKILL.md into (frontmatter, body, errors).
Returns ``({}, "", [error])`` if the file can't be read or doesn't have
valid frontmatter. Never raises.
"""
path = Path(path)
if not path.is_file():
return {}, "", [f"SKILL.md not found: {path}"]
text = path.read_text()
if not text.startswith("---"):
return {}, text, ["SKILL.md must start with YAML frontmatter (---)"]
parts = text.split("---", 2)
if len(parts) < 3:
return {}, text, ["malformed frontmatter — expected opening and closing '---'"]
try:
fm = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError as exc:
return {}, parts[2], [f"frontmatter yaml parse error: {exc}"]
if not isinstance(fm, dict):
return {}, parts[2], ["frontmatter must be a YAML mapping"]
return fm, parts[2].strip(), []
def validate_skill(path: str | Path) -> list[str]:
"""Validate a single skill directory against agentskills.io/specification.
`path` should be the skill directory (its parent of `SKILL.md`). Returns
an empty list when the skill is spec-compliant.
"""
path = Path(path)
if not path.is_dir():
return [f"skill path is not a directory: {path}"]
fm, _body, errors = parse_skill_md(path / "SKILL.md")
if errors:
return errors
# name — required
name = fm.get("name")
if not name:
errors.append("`name` is required in SKILL.md frontmatter")
elif not isinstance(name, str):
errors.append(f"`name` must be a string, got {type(name).__name__}")
else:
if len(name) > SKILL_NAME_MAX:
errors.append(f"`name` length must be ≤{SKILL_NAME_MAX}, got {len(name)}")
if not SKILL_NAME_RE.match(name):
errors.append(
f"`name` '{name}' must be lowercase alphanumeric with single hyphens, "
f"no leading/trailing/consecutive hyphens"
)
if name != path.name:
errors.append(
f"`name` '{name}' must match directory name '{path.name}' "
f"(agentskills.io spec)"
)
# description — required
desc = fm.get("description")
if not desc:
errors.append("`description` is required in SKILL.md frontmatter")
elif not isinstance(desc, str):
errors.append(f"`description` must be a string, got {type(desc).__name__}")
elif len(desc) > SKILL_DESC_MAX:
errors.append(f"`description` length must be ≤{SKILL_DESC_MAX}, got {len(desc)}")
# compatibility — optional, ≤500 chars
compat = fm.get("compatibility")
if compat is not None:
if not isinstance(compat, str):
errors.append(f"`compatibility` must be a string, got {type(compat).__name__}")
elif len(compat) > SKILL_COMPAT_MAX:
errors.append(
f"`compatibility` length must be ≤{SKILL_COMPAT_MAX}, got {len(compat)}"
)
# metadata — optional, string→string map
meta = fm.get("metadata")
if meta is not None:
if not isinstance(meta, dict):
errors.append(f"`metadata` must be a mapping, got {type(meta).__name__}")
else:
for k, v in meta.items():
if not isinstance(k, str):
errors.append(f"`metadata` keys must be strings, got {type(k).__name__}")
# values may be stringified — spec says "string-to-string" but is lenient
# allowed-tools — optional, space-separated string (experimental in spec)
allowed = fm.get("allowed-tools")
if allowed is not None and not isinstance(allowed, str):
errors.append(f"`allowed-tools` must be a space-separated string, got {type(allowed).__name__}")
# license — optional, free-form string
lic = fm.get("license")
if lic is not None and not isinstance(lic, str):
errors.append(f"`license` must be a string, got {type(lic).__name__}")
return errors
def validate_plugin(path: str | Path) -> dict[str, list[str]]:
"""Validate an entire Molecule AI plugin: plugin.yaml + all skills.
Returns a dict mapping source (``"plugin.yaml"`` or ``"skills/<name>"``)
to a list of error messages. Empty dict means fully valid.
"""
path = Path(path)
results: dict[str, list[str]] = {}
manifest_errs = validate_manifest(path / "plugin.yaml")
if manifest_errs:
results["plugin.yaml"] = manifest_errs
skills_dir = path / "skills"
if skills_dir.is_dir():
for entry in sorted(skills_dir.iterdir()):
if not entry.is_dir():
continue
skill_errs = validate_skill(entry)
if skill_errs:
results[f"skills/{entry.name}"] = skill_errs
return results

205
molecule_plugin/org.py Normal file
View File

@ -0,0 +1,205 @@
"""Validator for org-templates/<name>/org.yaml.
An **org template** defines a hierarchical team of workspaces typically a
PM with research + dev branches, each with their own children. The platform
instantiates the whole tree on ``POST /org/import``.
Schema (matches ``platform/internal/handlers/org.go::OrgWorkspace``):
.. code-block:: yaml
name: Molecule AI Dev Team
description: AI agent company for building Molecule AI
defaults: # inherited by every workspace unless overridden
runtime: claude-code
tier: 2
required_env: [CLAUDE_CODE_OAUTH_TOKEN]
initial_prompt: |
...
workspaces:
- name: PM
role: Project Manager
tier: 3
files_dir: pm
channels: # optional social channel configs
- type: telegram
config: {bot_token: ${TELEGRAM_BOT_TOKEN}}
enabled: true
workspace_access: read_only # #65: none | read_only | read_write
children:
- name: Research Lead
...
This module catches schema errors before ``POST /org/import`` so authors
don't burn platform cycles on typos.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
from .channel import validate_channel_config
from .workspace import SUPPORTED_RUNTIMES, ValidationError
# Workspace-access values — mirrors the CHECK constraint in
# platform/migrations/019_workspace_access.up.sql. #65.
_WORKSPACE_ACCESS_VALUES = frozenset({"none", "read_only", "read_write"})
def _validate_workspace_node(
node: Any,
path: str,
file_ref: str,
errors: list[ValidationError],
) -> None:
"""Recursively validate a single workspace node (and its children)."""
if not isinstance(node, dict):
errors.append(ValidationError(file_ref, f"{path}: must be an object, got {type(node).__name__}"))
return
# Required
if not node.get("name"):
errors.append(ValidationError(file_ref, f"{path}: missing required field 'name'"))
# Tier (optional)
if "tier" in node and node["tier"] not in (1, 2, 3):
errors.append(
ValidationError(file_ref, f"{path}: tier must be 1, 2, or 3; got {node['tier']!r}")
)
# Runtime (optional — inherited from defaults)
runtime = node.get("runtime")
if runtime and runtime not in SUPPORTED_RUNTIMES:
errors.append(
ValidationError(
file_ref,
f"{path}: runtime={runtime!r} — must be one of {sorted(SUPPORTED_RUNTIMES)}",
)
)
# workspace_access (#65)
access = node.get("workspace_access")
if access is not None and access not in _WORKSPACE_ACCESS_VALUES:
errors.append(
ValidationError(
file_ref,
f"{path}: workspace_access={access!r} — must be one of {sorted(_WORKSPACE_ACCESS_VALUES)}",
)
)
if access in ("read_only", "read_write") and not node.get("workspace_dir"):
errors.append(
ValidationError(
file_ref,
f"{path}: workspace_access={access!r} requires workspace_dir to be set",
)
)
# Channels (optional list)
channels = node.get("channels")
if channels is not None:
if not isinstance(channels, list):
errors.append(ValidationError(file_ref, f"{path}.channels: must be a list"))
else:
for i, ch in enumerate(channels):
if not isinstance(ch, dict):
errors.append(
ValidationError(file_ref, f"{path}.channels[{i}]: must be an object")
)
continue
# Delegate to channel validator — single source of truth for channel schema.
ch_ref = f"{file_ref}:{path}.channels[{i}]"
errors.extend(validate_channel_config(ch, ch_ref))
# Schedules (optional list)
schedules = node.get("schedules")
if schedules is not None:
if not isinstance(schedules, list):
errors.append(ValidationError(file_ref, f"{path}.schedules: must be a list"))
else:
for i, sch in enumerate(schedules):
if not isinstance(sch, dict):
errors.append(
ValidationError(file_ref, f"{path}.schedules[{i}]: must be an object")
)
continue
if not sch.get("cron_expr"):
errors.append(
ValidationError(
file_ref, f"{path}.schedules[{i}]: missing 'cron_expr'"
)
)
if not sch.get("prompt"):
errors.append(
ValidationError(
file_ref, f"{path}.schedules[{i}]: missing 'prompt'"
)
)
# Plugins (optional list of strings)
plugins = node.get("plugins")
if plugins is not None:
if not isinstance(plugins, list) or not all(isinstance(p, str) for p in plugins):
errors.append(ValidationError(file_ref, f"{path}.plugins: must be a list of strings"))
# External workspaces must declare a URL
if node.get("external") and not node.get("url"):
errors.append(
ValidationError(file_ref, f"{path}: external=true requires url to be set")
)
# Recurse into children
children = node.get("children")
if children is not None:
if not isinstance(children, list):
errors.append(ValidationError(file_ref, f"{path}.children: must be a list"))
else:
for i, child in enumerate(children):
cname = child.get("name", "?") if isinstance(child, dict) else "?"
_validate_workspace_node(
child, f"{path}.children[{i}:{cname}]", file_ref, errors
)
def validate_org_template(path: Path) -> list[ValidationError]:
"""Validate an org-template directory (must contain org.yaml)."""
errors: list[ValidationError] = []
org_yaml = path / "org.yaml"
if not org_yaml.exists():
errors.append(ValidationError(str(org_yaml), "missing org.yaml"))
return errors
try:
org = yaml.safe_load(org_yaml.read_text()) or {}
except yaml.YAMLError as exc:
errors.append(ValidationError(str(org_yaml), f"invalid YAML: {exc}"))
return errors
if not isinstance(org, dict):
errors.append(ValidationError(str(org_yaml), "org.yaml must be a YAML object"))
return errors
if not org.get("name"):
errors.append(ValidationError(str(org_yaml), "missing required field: name"))
# defaults block (optional but common)
defaults = org.get("defaults")
if defaults is not None and not isinstance(defaults, dict):
errors.append(ValidationError(str(org_yaml), "defaults must be an object"))
workspaces = org.get("workspaces")
if not workspaces:
errors.append(ValidationError(str(org_yaml), "missing required field: workspaces (non-empty list)"))
elif not isinstance(workspaces, list):
errors.append(ValidationError(str(org_yaml), "workspaces must be a list"))
else:
for i, ws in enumerate(workspaces):
_validate_workspace_node(ws, f"workspaces[{i}:{ws.get('name','?') if isinstance(ws, dict) else '?'}]", str(org_yaml), errors)
return errors
__all__ = ["validate_org_template"]

View File

@ -0,0 +1,84 @@
"""Adaptor protocol — kept in sync with workspace-template/plugins_registry/protocol.py.
SDK authors depend only on this module so their plugin repos don't need to
pull in the full workspace-template package. At runtime the platform's own
``plugins_registry`` loads the adaptor; the two ``InstallContext`` shapes are
structurally identical so the Protocol check passes.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Protocol, runtime_checkable
# Kept in sync with workspace-template/plugins_registry/protocol.py.
DEFAULT_MEMORY_FILENAME = "CLAUDE.md"
SKILLS_SUBDIR = "skills"
@dataclass
class InstallContext:
"""Hooks + state passed to every PluginAdaptor.install() call."""
configs_dir: Path
"""Workspace's /configs directory (where memory file, plugins/, skills/ live)."""
workspace_id: str
"""Workspace UUID — useful for per-workspace state or logging."""
runtime: str
"""Runtime identifier (``claude_code``, ``deepagents``, …)."""
plugin_root: Path
"""Path to the plugin's directory (where plugin.yaml + content lives)."""
memory_filename: str = DEFAULT_MEMORY_FILENAME
"""Runtime's long-lived memory file. Populated by the runtime's
:meth:`BaseAdapter.memory_filename`; adaptors pass this to
:attr:`append_to_memory` rather than hardcoding a filename."""
register_tool: Callable[[str, Callable[..., Any]], None] = field(
default=lambda name, fn: None
)
"""Register a callable as a runtime tool. No-op on runtimes without
a dynamic tool registry those runtimes pick tools up at startup
via filesystem scan instead."""
register_subagent: Callable[[str, dict[str, Any]], None] = field(
default=lambda name, spec: None
)
"""Register a sub-agent specification (DeepAgents-only). No-op elsewhere."""
append_to_memory: Callable[[str, str], None] = field(
default=lambda filename, content: None
)
"""Append text to a runtime memory file. The default no-op lets
adaptors run in test harnesses without a real workspace filesystem."""
logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__))
@dataclass
class InstallResult:
plugin_name: str
runtime: str
source: str
files_written: list[str] = field(default_factory=list)
tools_registered: list[str] = field(default_factory=list)
subagents_registered: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
@runtime_checkable
class PluginAdaptor(Protocol):
plugin_name: str
runtime: str
async def install(self, ctx: InstallContext) -> InstallResult:
...
async def uninstall(self, ctx: InstallContext) -> None:
...

View File

@ -0,0 +1,117 @@
"""Validator for workspace-configs-templates/<name>/config.yaml.
A **workspace template** is a directory the platform copies into a new
workspace's /configs volume at provision time. It contains at minimum a
``config.yaml`` declaring the agent's runtime, model defaults, and env
requirements; optionally ``CLAUDE.md``, ``system-prompt.md``, ``skills/``,
etc.
This module validates the shape of a workspace-template directory so
authors can catch errors before publishing. Called from
``python -m molecule_plugin validate workspace <dir>``.
"""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import yaml
# Runtimes the platform knows how to provision. Stays aligned with
# provisioner.RuntimeImages in platform/internal/provisioner/provisioner.go.
SUPPORTED_RUNTIMES = frozenset(
{
"langgraph",
"claude-code",
"claude_code", # adapter dirs use underscores
"openclaw",
"deepagents",
"crewai",
"autogen",
}
)
@dataclass
class ValidationError:
"""Single problem found in a workspace template."""
file: str
message: str
def validate_workspace_template(path: Path) -> list[ValidationError]:
"""Validate a workspace-template directory.
Returns an empty list when the template is well-formed. Each element
in the returned list is a distinct problem callers render them as
a checklist for the author.
"""
errors: list[ValidationError] = []
config_path = path / "config.yaml"
if not config_path.exists():
errors.append(ValidationError(str(config_path), "missing config.yaml"))
return errors
try:
config = yaml.safe_load(config_path.read_text()) or {}
except yaml.YAMLError as exc:
errors.append(ValidationError(str(config_path), f"invalid YAML: {exc}"))
return errors
if not isinstance(config, dict):
errors.append(ValidationError(str(config_path), "config.yaml must be a YAML object"))
return errors
# Required top-level fields
for field in ("name", "runtime"):
if field not in config or not config[field]:
errors.append(ValidationError(str(config_path), f"missing required field: {field}"))
# Runtime must be one the platform knows about
runtime = config.get("runtime")
if runtime and runtime not in SUPPORTED_RUNTIMES:
errors.append(
ValidationError(
str(config_path),
f"runtime={runtime!r} — must be one of: {sorted(SUPPORTED_RUNTIMES)}",
)
)
# Tier is optional but when present must be 1/2/3
if "tier" in config and config["tier"] not in (1, 2, 3):
errors.append(
ValidationError(str(config_path), f"tier must be 1, 2, or 3; got {config['tier']!r}")
)
# runtime_config (when present) should be a dict
rc = config.get("runtime_config")
if rc is not None and not isinstance(rc, dict):
errors.append(
ValidationError(str(config_path), "runtime_config must be an object")
)
elif isinstance(rc, dict):
required_env = rc.get("required_env", [])
if required_env is not None and not isinstance(required_env, list):
errors.append(
ValidationError(
str(config_path),
"runtime_config.required_env must be a list of env var names",
)
)
timeout = rc.get("timeout")
if timeout is not None and not isinstance(timeout, (int, float)):
errors.append(
ValidationError(
str(config_path),
f"runtime_config.timeout must be a number; got {type(timeout).__name__}",
)
)
return errors
# Re-exported for type hints in __init__.py
__all__ = ["ValidationError", "SUPPORTED_RUNTIMES", "validate_workspace_template"]

35
pyproject.toml Normal file
View File

@ -0,0 +1,35 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "molecule-ai-sdk"
version = "0.2.0"
description = "Molecule AI SDK — build plugins (molecule_plugin) AND remote agents that join a Molecule AI org from another machine (molecule_agent)."
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [{ name = "Molecule AI" }]
keywords = ["agents", "ai", "multi-agent", "a2a", "plugins", "saas", "remote-agent"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
]
dependencies = [
"pyyaml>=6.0",
"requests>=2.31",
]
[project.urls]
Homepage = "https://github.com/hongmingw/molecule-monorepo"
Repository = "https://github.com/hongmingw/molecule-monorepo"
Documentation = "https://github.com/hongmingw/molecule-monorepo/tree/main/sdk/python"
[tool.setuptools.packages.find]
include = ["molecule_plugin*", "molecule_agent*"]
exclude = ["template*", "tests*"]

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto

View File

@ -0,0 +1,7 @@
"""Claude Code adaptor.
For most plugins the generic filesystem installer is enough it copies
skill dirs to /configs/skills/ and appends rules to CLAUDE.md. Replace
with a custom class if you need to register runtime tools or sub-agents.
"""
from molecule_plugin import AgentskillsAdaptor as Adaptor # noqa: F401

View File

@ -0,0 +1,6 @@
"""DeepAgents adaptor.
If your plugin defines a sub-agent, swap the import for a custom class
that calls ``ctx.register_subagent(name, spec)`` inside ``install()``.
"""
from molecule_plugin import AgentskillsAdaptor as Adaptor # noqa: F401

21
template/plugin.yaml Normal file
View File

@ -0,0 +1,21 @@
name: my-plugin
version: 0.1.0
description: One-sentence description of what this plugin adds.
author: your-name
tags: [example]
# List every workspace runtime your plugin supports. Each entry must have a
# matching file under adapters/<runtime>.py. The Molecule AI platform resolves
# the right adaptor at workspace startup; unsupported runtimes fall through
# to a raw-drop (files copied, no tools wired — warning surfaced to user).
runtimes:
- claude_code
- deepagents
# Optional: list of skill directory names under skills/ (for documentation).
# skills:
# - my-skill
# Optional: list of rule file names under rules/ (for documentation).
# rules:
# - rules/my-rule.md

View File

@ -0,0 +1,36 @@
---
name: example-skill
description: Short description — what this does and when to use it.
license: MIT
metadata:
author: your-name
version: "0.1.0"
---
# Example Skill
Write the skill instructions as plain Markdown below the frontmatter.
Agents load this entire file when the skill activates, so keep it focused
and under ~500 lines. Move deep-dive docs to `references/` and large
assets to `assets/` — they're loaded on demand.
## When to use
- Describe the triggering situation
- Describe what the agent should output
## Steps
1. First step
2. Second step
## Files under this skill
- `scripts/` — executable code the agent can run
- `references/REFERENCE.md` — detailed docs (loaded only when needed)
- `assets/` — templates, images, data files
## Notes
- This file is validated against the agentskills.io open standard.
- Run `python -m molecule_plugin validate <plugin-dir>` before publishing.

755
tests/test_remote_agent.py Normal file
View File

@ -0,0 +1,755 @@
"""Tests for the molecule_agent Phase 30.8 remote-agent client.
The client is pure HTTP we mock the network via ``requests_mock``-style
monkey-patching of ``requests.Session.get`` / ``.post`` instead of pulling
in a third-party mock library.
"""
from __future__ import annotations
import stat
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
from molecule_agent import PeerInfo, RemoteAgentClient, WorkspaceState
# ---------------------------------------------------------------------------
# FakeResponse / FakeSession — minimal stand-ins for requests
# ---------------------------------------------------------------------------
class FakeResponse:
def __init__(self, status_code: int = 200, json_body: Any = None, text: str = ""):
self.status_code = status_code
self._json = json_body
self.text = text
def json(self) -> Any:
return self._json
def raise_for_status(self) -> None:
if self.status_code >= 400:
import requests
raise requests.HTTPError(f"HTTP {self.status_code}")
@pytest.fixture
def tmp_token_dir(tmp_path: Path) -> Path:
return tmp_path / "molecule-token-cache"
@pytest.fixture
def client(tmp_token_dir: Path) -> RemoteAgentClient:
session = MagicMock()
return RemoteAgentClient(
workspace_id="ws-abc-123",
platform_url="http://platform.test",
agent_card={"name": "test-agent"},
token_dir=tmp_token_dir,
session=session,
)
# ---------------------------------------------------------------------------
# Token persistence
# ---------------------------------------------------------------------------
def test_save_and_load_token_roundtrip(client: RemoteAgentClient, tmp_token_dir: Path):
client.save_token("secret-token-abc")
assert client.token_file.exists()
# File must be 0600 so other local users can't read the credential.
mode = stat.S_IMODE(client.token_file.stat().st_mode)
assert mode == 0o600, f"expected 0600, got 0o{mode:o}"
assert client.load_token() == "secret-token-abc"
def test_save_empty_token_rejected(client: RemoteAgentClient):
with pytest.raises(ValueError):
client.save_token("")
with pytest.raises(ValueError):
client.save_token(" ")
def test_load_token_returns_none_when_absent(client: RemoteAgentClient):
assert client.load_token() is None
def test_load_token_returns_none_when_file_empty(client: RemoteAgentClient, tmp_token_dir: Path):
tmp_token_dir.mkdir(parents=True, exist_ok=True)
(tmp_token_dir / ".auth_token").write_text("")
assert client.load_token() is None
def test_token_dir_default_is_under_home(tmp_path: Path):
# Just verifies the default path shape — we don't want to actually
# write to $HOME during tests.
c = RemoteAgentClient(
workspace_id="ws-xyz",
platform_url="http://p",
)
assert "ws-xyz" in str(c.token_file)
assert ".molecule" in str(c.token_file)
# ---------------------------------------------------------------------------
# register()
# ---------------------------------------------------------------------------
def test_register_saves_token_when_issued(client: RemoteAgentClient):
client._session.post.return_value = FakeResponse(
200, {"status": "registered", "auth_token": "fresh-token-xyz"}
)
tok = client.register()
assert tok == "fresh-token-xyz"
assert client.load_token() == "fresh-token-xyz"
# Verify call shape
url, kwargs = client._session.post.call_args[0][0], client._session.post.call_args[1]
assert url == "http://platform.test/registry/register"
assert kwargs["json"]["id"] == "ws-abc-123"
assert kwargs["json"]["agent_card"] == {"name": "test-agent"}
def test_register_keeps_cached_token_when_platform_omits(client: RemoteAgentClient):
# Simulate re-register of an already-tokened workspace: platform returns
# no auth_token, SDK must keep using the cached one.
client.save_token("cached-from-earlier")
client._session.post.return_value = FakeResponse(200, {"status": "registered"})
tok = client.register()
assert tok == "cached-from-earlier"
def test_register_http_error_propagates(client: RemoteAgentClient):
client._session.post.return_value = FakeResponse(500, {"error": "boom"})
with pytest.raises(Exception):
client.register()
# ---------------------------------------------------------------------------
# pull_secrets()
# ---------------------------------------------------------------------------
def test_pull_secrets_sends_bearer_token(client: RemoteAgentClient):
client.save_token("tok-for-secrets")
client._session.get.return_value = FakeResponse(200, {"API_KEY": "v1", "DB_URL": "v2"})
out = client.pull_secrets()
assert out == {"API_KEY": "v1", "DB_URL": "v2"}
url, kwargs = client._session.get.call_args[0][0], client._session.get.call_args[1]
assert url == "http://platform.test/workspaces/ws-abc-123/secrets/values"
assert kwargs["headers"]["Authorization"] == "Bearer tok-for-secrets"
def test_pull_secrets_empty_body_yields_empty_dict(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(200, None)
assert client.pull_secrets() == {}
def test_pull_secrets_401_raises(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(401, {"error": "missing token"})
with pytest.raises(Exception):
client.pull_secrets()
# ---------------------------------------------------------------------------
# poll_state()
# ---------------------------------------------------------------------------
def test_poll_state_returns_normal_state(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(
200, {"workspace_id": "ws-abc-123", "status": "online", "paused": False, "deleted": False}
)
state = client.poll_state()
assert state is not None
assert state.status == "online"
assert state.paused is False
assert state.deleted is False
assert state.should_stop is False
def test_poll_state_detects_paused(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(
200, {"workspace_id": "ws-abc-123", "status": "paused", "paused": True, "deleted": False}
)
state = client.poll_state()
assert state.should_stop is True
def test_poll_state_404_means_deleted(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(404, {"deleted": True})
state = client.poll_state()
assert state is not None
assert state.deleted is True
assert state.should_stop is True
def test_poll_state_server_error_raises(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(500, {"error": "boom"})
with pytest.raises(Exception):
client.poll_state()
# ---------------------------------------------------------------------------
# heartbeat()
# ---------------------------------------------------------------------------
def test_heartbeat_sends_full_payload(client: RemoteAgentClient):
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client.heartbeat(current_task="indexing", active_tasks=1, error_rate=0.1, sample_error="err")
url = client._session.post.call_args[0][0]
kwargs = client._session.post.call_args[1]
assert url == "http://platform.test/registry/heartbeat"
body = kwargs["json"]
assert body["workspace_id"] == "ws-abc-123"
assert body["current_task"] == "indexing"
assert body["active_tasks"] == 1
assert body["error_rate"] == 0.1
assert body["sample_error"] == "err"
assert "uptime_seconds" in body
assert kwargs["headers"]["Authorization"] == "Bearer t"
# ---------------------------------------------------------------------------
# run_heartbeat_loop()
# ---------------------------------------------------------------------------
def test_run_loop_exits_on_max_iterations(client: RemoteAgentClient, monkeypatch):
# Stub sleep so the test doesn't actually wait
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client._session.get.return_value = FakeResponse(
200, {"status": "online", "paused": False, "deleted": False}
)
terminal = client.run_heartbeat_loop(max_iterations=3)
assert terminal == "max_iterations"
# 3 heartbeats + 3 state polls
assert client._session.post.call_count == 3
assert client._session.get.call_count == 3
def test_run_loop_exits_on_paused(client: RemoteAgentClient, monkeypatch):
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
# First iteration: online. Second: paused.
responses = [
FakeResponse(200, {"status": "online", "paused": False, "deleted": False}),
FakeResponse(200, {"status": "paused", "paused": True, "deleted": False}),
]
client._session.get.side_effect = responses
terminal = client.run_heartbeat_loop(max_iterations=10)
assert terminal == "paused"
assert client._session.post.call_count == 2
assert client._session.get.call_count == 2
def test_run_loop_exits_on_deleted_404(client: RemoteAgentClient, monkeypatch):
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client._session.get.return_value = FakeResponse(404, {"deleted": True})
terminal = client.run_heartbeat_loop(max_iterations=10)
assert terminal == "removed"
assert client._session.get.call_count == 1
def test_run_loop_continues_through_transient_errors(client: RemoteAgentClient, monkeypatch):
"""Network hiccups must log-and-continue, never crash the loop."""
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
# Heartbeat fails on iter 1, succeeds on iter 2
client._session.post.side_effect = [
ConnectionError("flaky net"),
FakeResponse(200, {"status": "ok"}),
]
# State poll returns online both times
client._session.get.return_value = FakeResponse(
200, {"status": "online", "paused": False, "deleted": False}
)
terminal = client.run_heartbeat_loop(max_iterations=2)
assert terminal == "max_iterations"
# Both iterations completed despite the first post failing
assert client._session.post.call_count == 2
def test_run_loop_task_supplier_reported(client: RemoteAgentClient, monkeypatch):
import molecule_agent.client as mod
monkeypatch.setattr(mod.time, "sleep", lambda s: None)
client.save_token("t")
client._session.post.return_value = FakeResponse(200, {"status": "ok"})
client._session.get.return_value = FakeResponse(
200, {"status": "online", "paused": False, "deleted": False}
)
reports = [{"current_task": "step-1", "active_tasks": 1}]
client.run_heartbeat_loop(max_iterations=1, task_supplier=lambda: reports[0])
body = client._session.post.call_args[1]["json"]
assert body["current_task"] == "step-1"
assert body["active_tasks"] == 1
# ---------------------------------------------------------------------------
# WorkspaceState dataclass
# ---------------------------------------------------------------------------
def test_workspace_state_should_stop_semantics():
assert WorkspaceState("w", "online", False, False).should_stop is False
assert WorkspaceState("w", "degraded", False, False).should_stop is False
assert WorkspaceState("w", "paused", True, False).should_stop is True
assert WorkspaceState("w", "removed", False, True).should_stop is True
# ---------------------------------------------------------------------------
# Phase 30.6 — sibling URL cache + call_peer
# ---------------------------------------------------------------------------
def test_get_peers_seeds_cache(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(200, [
{"id": "sibling-1", "name": "Research", "url": "http://10.0.0.5:8000", "role": "researcher", "tier": 2, "status": "online"},
{"id": "sibling-2", "name": "Dev", "url": "http://10.0.0.6:8000", "role": "developer", "tier": 2, "status": "online"},
])
peers = client.get_peers()
assert len(peers) == 2
assert peers[0].id == "sibling-1"
assert peers[0].name == "Research"
assert peers[0].url == "http://10.0.0.5:8000"
# Cache seeded for both
assert client._url_cache["sibling-1"][0] == "http://10.0.0.5:8000"
assert client._url_cache["sibling-2"][0] == "http://10.0.0.6:8000"
# Request included bearer + X-Workspace-ID
headers = client._session.get.call_args[1]["headers"]
assert headers["Authorization"] == "Bearer t"
assert headers["X-Workspace-ID"] == "ws-abc-123"
def test_get_peers_skips_non_http_urls_in_cache(client: RemoteAgentClient):
"""Cache seed only accepts http(s); the 'remote://no-inbound' placeholder
for remote agents without inbound servers must not poison the cache."""
client.save_token("t")
client._session.get.return_value = FakeResponse(200, [
{"id": "sib-remote", "name": "Remote", "url": "remote://no-inbound"},
{"id": "sib-http", "name": "HTTP", "url": "http://192.168.1.7:8000"},
])
client.get_peers()
assert "sib-remote" not in client._url_cache
assert "sib-http" in client._url_cache
def test_discover_peer_cache_hit(client: RemoteAgentClient):
client._url_cache["sib-x"] = ("http://cached.url:8000", time.time() + 60)
url = client.discover_peer("sib-x")
assert url == "http://cached.url:8000"
# No network call
client._session.get.assert_not_called()
def test_discover_peer_cache_miss_hits_platform(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(
200, {"id": "sib-y", "url": "http://fresh.url:8000", "name": "Y"}
)
url = client.discover_peer("sib-y")
assert url == "http://fresh.url:8000"
assert client._url_cache["sib-y"][0] == "http://fresh.url:8000"
# Request used discover endpoint
called_url = client._session.get.call_args[0][0]
assert "/registry/discover/sib-y" in called_url
def test_discover_peer_expired_cache_refreshes(client: RemoteAgentClient, monkeypatch):
# Cache entry already expired
client._url_cache["sib-stale"] = ("http://stale.url", time.time() - 10)
client.save_token("t")
client._session.get.return_value = FakeResponse(
200, {"url": "http://fresh.url:9000"}
)
url = client.discover_peer("sib-stale")
assert url == "http://fresh.url:9000"
# Cache replaced with fresh entry
assert client._url_cache["sib-stale"][0] == "http://fresh.url:9000"
def test_discover_peer_404_returns_none(client: RemoteAgentClient):
client.save_token("t")
client._session.get.return_value = FakeResponse(404, {"error": "not found"})
assert client.discover_peer("missing") is None
def test_invalidate_peer_url_drops_cache_entry(client: RemoteAgentClient):
client._url_cache["sib-x"] = ("http://x", time.time() + 100)
client.invalidate_peer_url("sib-x")
assert "sib-x" not in client._url_cache
# Idempotent — second call is safe
client.invalidate_peer_url("sib-x")
def test_call_peer_direct_path_on_cache_hit(client: RemoteAgentClient):
client.save_token("t")
client._url_cache["sib"] = ("http://direct.peer:8000", time.time() + 60)
client._session.post.return_value = FakeResponse(
200, {"jsonrpc": "2.0", "id": "x", "result": {"ok": True}}
)
out = client.call_peer("sib", "hello sibling")
assert out["result"]["ok"] is True
# Exactly ONE post: direct to the cached URL, not through proxy
assert client._session.post.call_count == 1
called_url = client._session.post.call_args[0][0]
assert called_url == "http://direct.peer:8000"
body = client._session.post.call_args[1]["json"]
assert body["method"] == "message/send"
assert body["params"]["message"]["parts"][0]["text"] == "hello sibling"
headers = client._session.post.call_args[1]["headers"]
assert headers["X-Workspace-ID"] == "ws-abc-123"
def test_call_peer_falls_back_to_proxy_on_direct_error(client: RemoteAgentClient):
client.save_token("t")
client._url_cache["sib"] = ("http://dead.peer:8000", time.time() + 60)
# First post (direct): connection error. Second post (proxy): success.
client._session.post.side_effect = [
ConnectionError("unreachable"),
FakeResponse(200, {"jsonrpc": "2.0", "result": {"via": "proxy"}}),
]
out = client.call_peer("sib", "hello")
assert out["result"]["via"] == "proxy"
assert client._session.post.call_count == 2
# Direct URL was invalidated so next call re-discovers
assert "sib" not in client._url_cache
# Second call went to /workspaces/sib/a2a
proxy_url = client._session.post.call_args_list[1][0][0]
assert "/workspaces/sib/a2a" in proxy_url
def test_call_peer_proxy_only_when_prefer_direct_false(client: RemoteAgentClient):
client.save_token("t")
client._url_cache["sib"] = ("http://direct.peer:8000", time.time() + 60)
client._session.post.return_value = FakeResponse(
200, {"jsonrpc": "2.0", "result": {"via": "proxy-only"}}
)
client.call_peer("sib", "hello", prefer_direct=False)
# Exactly one post — went straight to proxy despite cache hit
assert client._session.post.call_count == 1
assert "/workspaces/sib/a2a" in client._session.post.call_args[0][0]
def test_call_peer_no_cached_url_uses_discover_then_direct(client: RemoteAgentClient):
"""Fresh call: no cache entry → discover via GET, then direct POST to the
returned URL. Tests the full discover-then-call sequence in one shot."""
client.save_token("t")
# discover returns a URL
client._session.get.return_value = FakeResponse(
200, {"url": "http://newly-discovered:9000"}
)
# direct post succeeds
client._session.post.return_value = FakeResponse(
200, {"jsonrpc": "2.0", "result": {"ok": True}}
)
out = client.call_peer("new-sib", "hi")
assert out["result"]["ok"] is True
assert client._url_cache["new-sib"][0] == "http://newly-discovered:9000"
called_url = client._session.post.call_args[0][0]
assert called_url == "http://newly-discovered:9000"
def test_peer_info_dataclass_defaults():
p = PeerInfo(id="x", name="y", url="http://z")
assert p.role == ""
assert p.tier == 2
assert p.status == "unknown"
assert p.agent_card == {}
# ---------------------------------------------------------------------------
# Phase 30.3 — install_plugin
# ---------------------------------------------------------------------------
import io
import tarfile
from molecule_agent.client import _safe_extract_tar
def _make_tarball(files: dict[str, bytes]) -> bytes:
"""Build a gzipped tarball in memory from a {name: content} dict."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
for name, content in files.items():
info = tarfile.TarInfo(name=name)
info.size = len(content)
info.mode = 0o644
tf.addfile(info, io.BytesIO(content))
return buf.getvalue()
class _StreamingResp:
"""requests-shaped response with .content + .iter_content + context-manager.
install_plugin switched from streaming reads to .content (we hold the
full <=100MiB tarball in memory before extract see client.py comment),
but we keep iter_content available for any future test that wants to
exercise a streaming path.
"""
def __init__(self, status: int, body: bytes):
self.status_code = status
self._body = body
self.content = body # used by .content readers (install_plugin today)
def __enter__(self): return self
def __exit__(self, *a): return None
def raise_for_status(self):
if self.status_code >= 400:
import requests
raise requests.HTTPError(f"HTTP {self.status_code}")
def iter_content(self, chunk_size=64*1024):
i = 0
while i < len(self._body):
yield self._body[i:i+chunk_size]
i += chunk_size
i += chunk_size
def test_install_plugin_unpacks_into_per_workspace_dir(client: RemoteAgentClient, tmp_path):
client.save_token("t")
tarball = _make_tarball({
"plugin.yaml": b"name: hello\nversion: 1.0.0\n",
"rules.md": b"some rules\n",
"skills/x/SKILL.md": b"---\nname: x\n---\n",
})
# Stub out the streaming GET (used inside `with`)
def fake_get(url, headers=None, params=None, stream=False, timeout=None):
assert "/plugins/hello/download" in url
assert headers["Authorization"] == "Bearer t"
return _StreamingResp(200, tarball)
client._session.get.side_effect = fake_get
# POST install record — also stubbed
client._session.post.return_value = FakeResponse(200, {"status": "installed"})
target = client.install_plugin("hello")
assert target.exists()
assert (target / "plugin.yaml").read_bytes() == b"name: hello\nversion: 1.0.0\n"
assert (target / "skills" / "x" / "SKILL.md").read_text().startswith("---\nname: x\n")
# Atomic-rename means no .staging-* leftover
assert not any(p.name.startswith(".staging-") for p in client.plugins_dir.iterdir())
# Reported the install
post_url = client._session.post.call_args[0][0]
assert post_url.endswith(f"/workspaces/{client.workspace_id}/plugins")
def test_install_plugin_passes_source_query_when_given(client: RemoteAgentClient):
client.save_token("t")
tarball = _make_tarball({"plugin.yaml": b"name: gh\nversion: 0.1.0\n"})
captured = {}
def fake_get(url, headers=None, params=None, stream=False, timeout=None):
captured["url"] = url
captured["params"] = params
return _StreamingResp(200, tarball)
client._session.get.side_effect = fake_get
client._session.post.return_value = FakeResponse(200, {})
client.install_plugin("gh", source="github://acme/my-plugin")
assert captured["params"] == {"source": "github://acme/my-plugin"}
def test_install_plugin_atomic_rollback_on_corrupt_tarball(client: RemoteAgentClient):
client.save_token("t")
# Truncated gzip — tarfile.open will raise
client._session.get.side_effect = lambda *a, **k: _StreamingResp(200, b"not a gzip")
client._session.post.return_value = FakeResponse(200, {})
import pytest as _pytest
with _pytest.raises(Exception):
client.install_plugin("broken")
# No .staging-* dir lingering, no half-installed plugin dir
assert not list(client.plugins_dir.iterdir()) if client.plugins_dir.exists() else True
def test_install_plugin_overwrites_existing(client: RemoteAgentClient):
client.save_token("t")
# Pre-populate an old version
old_dir = client.plugins_dir / "rotateme"
old_dir.mkdir(parents=True)
(old_dir / "old-marker").write_text("old")
new_tarball = _make_tarball({
"plugin.yaml": b"name: rotateme\nversion: 2.0.0\n",
"new-marker": b"new",
})
client._session.get.side_effect = lambda *a, **k: _StreamingResp(200, new_tarball)
client._session.post.return_value = FakeResponse(200, {})
client.install_plugin("rotateme")
assert not (client.plugins_dir / "rotateme" / "old-marker").exists()
assert (client.plugins_dir / "rotateme" / "new-marker").read_text() == "new"
def test_install_plugin_runs_setup_sh_when_present(client: RemoteAgentClient, tmp_path):
client.save_token("t")
# setup.sh that drops a sentinel file we can verify
sentinel = tmp_path / "ran"
setup_script = f"#!/bin/bash\nset -e\ntouch {sentinel}\n".encode()
tarball = _make_tarball({
"plugin.yaml": b"name: withsetup\n",
"setup.sh": setup_script,
})
client._session.get.side_effect = lambda *a, **k: _StreamingResp(200, tarball)
client._session.post.return_value = FakeResponse(200, {})
client.install_plugin("withsetup")
# setup.sh extracted with 0644 perms (tar default), so script execution
# depends on bash interpreting the file contents. The bash invocation
# runs without the +x bit because we call `bash <setup>` not `<setup>`.
assert sentinel.exists(), "setup.sh did not run"
def test_install_plugin_skips_setup_when_disabled(client: RemoteAgentClient, tmp_path):
client.save_token("t")
sentinel = tmp_path / "should-not-exist"
tarball = _make_tarball({
"setup.sh": f"#!/bin/bash\ntouch {sentinel}\n".encode(),
})
client._session.get.side_effect = lambda *a, **k: _StreamingResp(200, tarball)
client._session.post.return_value = FakeResponse(200, {})
client.install_plugin("nosetup", run_setup_sh=False)
assert not sentinel.exists()
def test_install_plugin_skips_platform_report_when_disabled(client: RemoteAgentClient):
client.save_token("t")
tarball = _make_tarball({"plugin.yaml": b"name: silent\n"})
client._session.get.side_effect = lambda *a, **k: _StreamingResp(200, tarball)
client.install_plugin("silent", report_to_platform=False)
# POST never called when report disabled
client._session.post.assert_not_called()
def test_install_plugin_404_raises_with_useful_url(client: RemoteAgentClient):
client.save_token("t")
client._session.get.side_effect = lambda *a, **k: _StreamingResp(404, b"")
import pytest as _pytest
with _pytest.raises(Exception):
client.install_plugin("missing")
# ---------------------------------------------------------------------------
# _safe_extract_tar
# ---------------------------------------------------------------------------
def test_safe_extract_rejects_path_traversal(tmp_path: Path):
"""Tar slip CVE: an entry named '../escape' must be rejected."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
info = tarfile.TarInfo(name="../escape.txt")
data = b"oops"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
import pytest as _pytest
with _pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_safe_extract_rejects_absolute_paths(tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
info = tarfile.TarInfo(name="/etc/passwd")
data = b"oops"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
import pytest as _pytest
with _pytest.raises(ValueError):
_safe_extract_tar(tf, tmp_path)
def test_safe_extract_skips_symlinks_silently(tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
sym = tarfile.TarInfo(name="link.lnk")
sym.type = tarfile.SYMTYPE
sym.linkname = "/etc/passwd"
tf.addfile(sym)
# Plus a normal file alongside
info = tarfile.TarInfo(name="real.md")
data = b"ok"
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "real.md").exists()
assert not (tmp_path / "link.lnk").exists()

524
tests/test_sdk.py Normal file
View File

@ -0,0 +1,524 @@
"""Smoke tests for the molecule_plugin SDK.
Runs without the workspace runtime SDK consumers should be able to
lint/unit-test their plugins with only `pip install molecule-plugin`.
"""
from __future__ import annotations
import logging
import sys
from pathlib import Path
import pytest
_SDK_ROOT = Path(__file__).resolve().parents[1]
if str(_SDK_ROOT) not in sys.path:
sys.path.insert(0, str(_SDK_ROOT))
from molecule_plugin import ( # noqa: E402
AgentskillsAdaptor,
InstallContext,
PluginAdaptor,
parse_skill_md,
validate_manifest,
validate_plugin,
validate_skill,
)
def test_generic_adaptor_satisfies_protocol():
adaptor = AgentskillsAdaptor("p", "claude_code")
assert isinstance(adaptor, PluginAdaptor)
async def test_generic_adaptor_installs_skills_and_rules(tmp_path: Path):
plugin_root = tmp_path / "demo"
(plugin_root / "rules").mkdir(parents=True)
(plugin_root / "rules" / "r1.md").write_text("- be kind")
(plugin_root / "skills" / "s1").mkdir(parents=True)
(plugin_root / "skills" / "s1" / "SKILL.md").write_text("# skill")
configs = tmp_path / "configs"
configs.mkdir()
def _append(fn: str, content: str) -> None:
with open(configs / fn, "a") as f:
f.write(content + "\n")
ctx = InstallContext(
configs_dir=configs,
workspace_id="ws",
runtime="claude_code",
plugin_root=plugin_root,
append_to_memory=_append,
logger=logging.getLogger("test"),
)
result = await AgentskillsAdaptor("demo", "claude_code").install(ctx)
assert result.plugin_name == "demo"
assert (configs / "skills" / "s1" / "SKILL.md").exists()
assert "# Plugin: demo" in (configs / "CLAUDE.md").read_text()
def test_validate_manifest_accepts_minimal(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\n")
assert validate_manifest(p) == []
def test_validate_manifest_rejects_unknown_runtime(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\nruntimes: [martian]\n")
errors = validate_manifest(p)
assert any("unknown runtime" in e for e in errors)
def test_validate_manifest_accepts_hyphen_form(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\nruntimes: [claude-code]\n")
assert validate_manifest(p) == []
def test_validate_manifest_requires_name(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("version: 1.0\n")
errors = validate_manifest(p)
assert any("name" in e for e in errors)
def test_validate_manifest_missing_file(tmp_path: Path):
errors = validate_manifest(tmp_path / "does-not-exist.yaml")
assert any("not found" in e for e in errors)
def test_validate_manifest_invalid_yaml(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\n: bad\n")
errors = validate_manifest(p)
assert any("yaml parse error" in e for e in errors)
def test_validate_manifest_non_mapping_root(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("- just\n- a\n- list\n")
errors = validate_manifest(p)
assert any("mapping" in e for e in errors)
def test_validate_manifest_list_fields_must_be_lists(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\ntags: not-a-list\n")
errors = validate_manifest(p)
assert any("tags" in e and "list" in e for e in errors)
def test_validate_manifest_runtime_entry_must_be_string(tmp_path: Path):
p = tmp_path / "plugin.yaml"
p.write_text("name: demo\nruntimes:\n - 42\n")
errors = validate_manifest(p)
assert any("string" in e for e in errors)
async def test_generic_adaptor_installs_rules_and_skills_both(tmp_path: Path):
"""Full shape: rules + root fragment + skills + skip-list files + empty rule file."""
plugin_root = tmp_path / "demo"
(plugin_root / "rules").mkdir(parents=True)
(plugin_root / "rules" / "good.md").write_text("- real content")
(plugin_root / "rules" / "empty.md").write_text(" \n") # empty after strip — ignored
(plugin_root / "skills" / "s1").mkdir(parents=True)
(plugin_root / "skills" / "s1" / "SKILL.md").write_text("# skill")
(plugin_root / "skills" / "loose").write_text("not a dir entry")
(plugin_root / "fragment.md").write_text("extra")
(plugin_root / "README.md").write_text("SKIPPED")
configs = tmp_path / "configs"
configs.mkdir()
def _append(fn: str, content: str) -> None:
with open(configs / fn, "a") as f:
f.write(content + "\n")
ctx = InstallContext(
configs_dir=configs, workspace_id="w", runtime="claude_code",
plugin_root=plugin_root, append_to_memory=_append,
logger=logging.getLogger("test"),
)
adaptor = AgentskillsAdaptor("demo", "claude_code")
result = await adaptor.install(ctx)
text = (configs / "CLAUDE.md").read_text()
assert "# Plugin: demo / rule: good.md" in text
assert "# Plugin: demo / rule: empty.md" not in text # empty skipped
assert "# Plugin: demo / fragment: fragment.md" in text
assert "# Plugin: demo / fragment: README.md" not in text # skip-listed
assert (configs / "skills" / "s1" / "SKILL.md").exists()
assert len(result.files_written) >= 1
# Uninstall — strips markers, removes skills.
await adaptor.uninstall(ctx)
assert not (configs / "skills" / "s1").exists()
assert "# Plugin: demo /" not in (configs / "CLAUDE.md").read_text()
async def test_generic_adaptor_skips_existing_skill_dir(tmp_path: Path):
"""Idempotency: a skill dir already at /configs/skills/<name>/ isn't clobbered."""
plugin_root = tmp_path / "demo"
(plugin_root / "skills" / "s1").mkdir(parents=True)
(plugin_root / "skills" / "s1" / "SKILL.md").write_text("# from plugin")
configs = tmp_path / "configs"
(configs / "skills" / "s1").mkdir(parents=True)
(configs / "skills" / "s1" / "SKILL.md").write_text("# user wrote this")
ctx = InstallContext(
configs_dir=configs, workspace_id="w", runtime="claude_code",
plugin_root=plugin_root,
)
await AgentskillsAdaptor("demo", "claude_code").install(ctx)
# Pre-existing content preserved.
assert (configs / "skills" / "s1" / "SKILL.md").read_text() == "# user wrote this"
async def test_generic_adaptor_uninstall_when_nothing_installed(tmp_path: Path):
configs = tmp_path / "configs"
configs.mkdir()
plugin_root = tmp_path / "bare"
plugin_root.mkdir()
ctx = InstallContext(
configs_dir=configs, workspace_id="w", runtime="claude_code",
plugin_root=plugin_root,
)
# Should not raise even with no CLAUDE.md and no skills/
await AgentskillsAdaptor("bare", "claude_code").uninstall(ctx)
# ---------------------------------------------------------------------------
# agentskills.io SKILL.md validation
# ---------------------------------------------------------------------------
def _write_skill(dir: Path, name: str, content: str) -> Path:
skill = dir / name
skill.mkdir(parents=True, exist_ok=True)
(skill / "SKILL.md").write_text(content)
return skill
def test_parse_skill_md_missing_file(tmp_path: Path):
fm, body, errs = parse_skill_md(tmp_path / "missing.md")
assert fm == {}
assert any("not found" in e for e in errs)
def test_parse_skill_md_missing_frontmatter(tmp_path: Path):
p = tmp_path / "SKILL.md"
p.write_text("no frontmatter at all")
fm, body, errs = parse_skill_md(p)
assert fm == {}
assert any("frontmatter" in e for e in errs)
def test_parse_skill_md_malformed_frontmatter(tmp_path: Path):
p = tmp_path / "SKILL.md"
p.write_text("---\nname: foo\n")
_, _, errs = parse_skill_md(p)
assert any("malformed" in e for e in errs)
def test_parse_skill_md_yaml_parse_error(tmp_path: Path):
p = tmp_path / "SKILL.md"
p.write_text("---\n: bad\nfoo: [unclosed\n---\nbody")
_, _, errs = parse_skill_md(p)
assert any("yaml parse error" in e for e in errs)
def test_parse_skill_md_non_mapping_frontmatter(tmp_path: Path):
p = tmp_path / "SKILL.md"
p.write_text("---\n- a\n- b\n---\nbody")
_, _, errs = parse_skill_md(p)
assert any("mapping" in e for e in errs)
def test_validate_skill_accepts_minimal(tmp_path: Path):
skill = _write_skill(tmp_path, "good-skill", "---\nname: good-skill\ndescription: Does something useful.\n---\nbody")
assert validate_skill(skill) == []
def test_validate_skill_requires_name(tmp_path: Path):
skill = _write_skill(tmp_path, "foo", "---\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("name" in e and "required" in e for e in errs)
def test_validate_skill_requires_description(tmp_path: Path):
skill = _write_skill(tmp_path, "foo", "---\nname: foo\n---\n")
errs = validate_skill(skill)
assert any("description" in e and "required" in e for e in errs)
def test_validate_skill_name_must_match_dir(tmp_path: Path):
skill = _write_skill(tmp_path, "dir-name", "---\nname: different\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("match directory name" in e for e in errs)
def test_validate_skill_name_uppercase_rejected(tmp_path: Path):
skill = _write_skill(tmp_path, "BadName", "---\nname: BadName\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("lowercase" in e for e in errs)
def test_validate_skill_name_leading_hyphen_rejected(tmp_path: Path):
skill = _write_skill(tmp_path, "-foo", "---\nname: -foo\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("hyphen" in e for e in errs)
def test_validate_skill_name_consecutive_hyphens_rejected(tmp_path: Path):
skill = _write_skill(tmp_path, "foo--bar", "---\nname: foo--bar\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("hyphen" in e for e in errs)
def test_validate_skill_name_too_long(tmp_path: Path):
long = "a" * 65
skill = _write_skill(tmp_path, long, f"---\nname: {long}\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("length" in e for e in errs)
def test_validate_skill_description_too_long(tmp_path: Path):
long_desc = "x" * 1025
skill = _write_skill(tmp_path, "foo", f"---\nname: foo\ndescription: {long_desc}\n---\n")
errs = validate_skill(skill)
assert any("1024" in e for e in errs)
def test_validate_skill_compatibility_too_long(tmp_path: Path):
long = "x" * 501
skill = _write_skill(tmp_path, "foo", f"---\nname: foo\ndescription: x\ncompatibility: {long}\n---\n")
errs = validate_skill(skill)
assert any("compatibility" in e.lower() and "500" in e for e in errs)
def test_validate_skill_accepts_all_optional_fields(tmp_path: Path):
content = """---
name: full-skill
description: Does everything.
license: MIT
compatibility: Requires Python 3.14+
metadata:
author: test
version: "1.0"
allowed-tools: Bash(git:*) Read
---
body
"""
skill = _write_skill(tmp_path, "full-skill", content)
assert validate_skill(skill) == []
def test_validate_skill_metadata_must_be_mapping(tmp_path: Path):
skill = _write_skill(tmp_path, "foo", "---\nname: foo\ndescription: x\nmetadata: str\n---\n")
errs = validate_skill(skill)
assert any("metadata" in e and "mapping" in e for e in errs)
def test_validate_skill_allowed_tools_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "foo", "---\nname: foo\ndescription: x\nallowed-tools:\n - Read\n---\n")
errs = validate_skill(skill)
assert any("allowed-tools" in e for e in errs)
def test_validate_skill_rejects_missing_dir(tmp_path: Path):
errs = validate_skill(tmp_path / "nonexistent")
assert any("not a directory" in e for e in errs)
def test_validate_plugin_walks_all_skills(tmp_path: Path):
plugin = tmp_path / "p"
plugin.mkdir()
(plugin / "plugin.yaml").write_text("name: p\n")
(plugin / "skills" / "good").mkdir(parents=True)
(plugin / "skills" / "good" / "SKILL.md").write_text("---\nname: good\ndescription: ok\n---\n")
(plugin / "skills" / "bad").mkdir()
(plugin / "skills" / "bad" / "SKILL.md").write_text("---\nname: wrong-name\ndescription: ok\n---\n")
results = validate_plugin(plugin)
assert "plugin.yaml" not in results
assert "skills/good" not in results
assert "skills/bad" in results
assert any("match" in e for e in results["skills/bad"])
def test_validate_plugin_empty_when_all_valid(tmp_path: Path):
plugin = tmp_path / "p"
plugin.mkdir()
(plugin / "plugin.yaml").write_text("name: p\n")
assert validate_plugin(plugin) == {}
def test_first_party_plugins_are_spec_compliant():
"""Every plugin in this repo must pass full agentskills.io validation."""
repo_root = Path(__file__).resolve().parents[3]
plugins_dir = repo_root / "plugins"
if not plugins_dir.is_dir():
import pytest
pytest.skip("not in a checkout with first-party plugins")
failures: dict = {}
for plugin in sorted(plugins_dir.iterdir()):
if not plugin.is_dir():
continue
results = validate_plugin(plugin)
if results:
failures[plugin.name] = results
assert not failures, f"Spec failures: {failures}"
# ---------------------------------------------------------------------------
# CLI (python -m molecule_plugin)
# ---------------------------------------------------------------------------
def _write_valid_plugin(tmp_path: Path, name: str = "ok-plugin") -> Path:
p = tmp_path / name
p.mkdir()
(p / "plugin.yaml").write_text(f"name: {name}\nruntimes: [claude_code]\n")
(p / "skills" / "hello").mkdir(parents=True)
(p / "skills" / "hello" / "SKILL.md").write_text(
"---\nname: hello\ndescription: greet\n---\nbody"
)
return p
def test_cli_exits_zero_on_valid_plugin(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
plugin = _write_valid_plugin(tmp_path)
assert main(["validate", str(plugin)]) == 0
out = capsys.readouterr().out
assert "" in out
assert "valid" in out
def test_cli_exits_nonzero_on_invalid_plugin(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
plugin = tmp_path / "bad"
plugin.mkdir()
(plugin / "plugin.yaml").write_text("name: bad\n")
(plugin / "skills" / "mismatched").mkdir(parents=True)
(plugin / "skills" / "mismatched" / "SKILL.md").write_text(
"---\nname: different\ndescription: d\n---\n"
)
assert main(["validate", str(plugin)]) == 1
err = capsys.readouterr().err
assert "" in err
assert "match directory name" in err
def test_cli_quiet_suppresses_success_lines(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
plugin = _write_valid_plugin(tmp_path)
assert main(["validate", "--quiet", str(plugin)]) == 0
out = capsys.readouterr().out
assert "" not in out # success line suppressed
def test_cli_quiet_still_prints_errors(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
plugin = tmp_path / "bad"
plugin.mkdir()
(plugin / "plugin.yaml").write_text("")
assert main(["validate", "-q", str(plugin)]) != 0
err = capsys.readouterr().err
assert "" in err
def test_cli_rejects_nonexistent_path(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
assert main(["validate", str(tmp_path / "nope")]) == 1
err = capsys.readouterr().err
assert "does not exist" in err
def test_cli_rejects_file_instead_of_dir(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
f = tmp_path / "plugin.yaml"
f.write_text("name: x\n")
assert main(["validate", str(f)]) == 1
err = capsys.readouterr().err
assert "not a directory" in err
def test_cli_validates_multiple_plugins(tmp_path: Path, capsys):
from molecule_plugin.__main__ import main
p1 = _write_valid_plugin(tmp_path, "one")
p2 = _write_valid_plugin(tmp_path, "two")
assert main(["validate", str(p1), str(p2)]) == 0
out = capsys.readouterr().out
assert out.count("") == 2
# ---------------------------------------------------------------------------
# Type-error branches in validate_skill (non-string values for typed fields)
# ---------------------------------------------------------------------------
def test_validate_skill_parse_error_propagates(tmp_path: Path):
"""A malformed SKILL.md surfaces parse errors through validate_skill."""
skill = tmp_path / "bad"
skill.mkdir()
(skill / "SKILL.md").write_text("no frontmatter here")
errs = validate_skill(skill)
assert any("frontmatter" in e for e in errs)
def test_validate_skill_name_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "x", "---\nname: 42\ndescription: x\n---\n")
errs = validate_skill(skill)
assert any("name" in e and "string" in e for e in errs)
def test_validate_skill_description_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "x", "---\nname: x\ndescription: 42\n---\n")
errs = validate_skill(skill)
assert any("description" in e and "string" in e for e in errs)
def test_validate_skill_compatibility_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "x", "---\nname: x\ndescription: d\ncompatibility: 42\n---\n")
errs = validate_skill(skill)
assert any("compatibility" in e.lower() and "string" in e for e in errs)
def test_validate_skill_metadata_key_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "x", "---\nname: x\ndescription: d\nmetadata:\n 1: value\n---\n")
errs = validate_skill(skill)
assert any("metadata" in e and "string" in e for e in errs)
def test_validate_skill_license_must_be_string(tmp_path: Path):
skill = _write_skill(tmp_path, "x", "---\nname: x\ndescription: d\nlicense: 42\n---\n")
errs = validate_skill(skill)
assert any("license" in e and "string" in e for e in errs)
def test_validate_plugin_skips_file_entries_in_skills_dir(tmp_path: Path):
"""A stray file inside skills/ (not a dir) is not treated as a skill."""
plugin = tmp_path / "p"
plugin.mkdir()
(plugin / "plugin.yaml").write_text("name: p\n")
(plugin / "skills").mkdir()
(plugin / "skills" / "stray.txt").write_text("not a skill")
assert validate_plugin(plugin) == {}

318
tests/test_validators.py Normal file
View File

@ -0,0 +1,318 @@
"""Tests for the SDK's workspace/org/channel validators + CLI dispatch."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import yaml
from molecule_plugin import (
SUPPORTED_CHANNEL_TYPES,
SUPPORTED_RUNTIMES,
validate_channel_config,
validate_channel_file,
validate_org_template,
validate_workspace_template,
)
from molecule_plugin.__main__ import main as cli_main
# ---------- workspace ----------
def _write_yaml(path: Path, data) -> None:
path.write_text(yaml.safe_dump(data))
def test_workspace_happy(tmp_path: Path):
_write_yaml(
tmp_path / "config.yaml",
{"name": "x", "runtime": "claude-code", "tier": 2,
"runtime_config": {"required_env": ["FOO"], "timeout": 30}},
)
assert validate_workspace_template(tmp_path) == []
def test_workspace_missing_file(tmp_path: Path):
errs = validate_workspace_template(tmp_path)
assert len(errs) == 1 and "missing config.yaml" in errs[0].message
def test_workspace_bad_yaml(tmp_path: Path):
(tmp_path / "config.yaml").write_text("foo: [bar\n")
errs = validate_workspace_template(tmp_path)
assert any("invalid YAML" in e.message for e in errs)
def test_workspace_not_object(tmp_path: Path):
(tmp_path / "config.yaml").write_text("- a\n- b\n")
errs = validate_workspace_template(tmp_path)
assert any("must be a YAML object" in e.message for e in errs)
def test_workspace_validation_errors(tmp_path: Path):
_write_yaml(
tmp_path / "config.yaml",
{"name": "", "runtime": "wat", "tier": 9,
"runtime_config": {"required_env": "nope", "timeout": "soon"}},
)
msgs = [e.message for e in validate_workspace_template(tmp_path)]
assert any("missing required field: name" in m for m in msgs)
assert any("runtime=" in m for m in msgs)
assert any("tier must be 1, 2, or 3" in m for m in msgs)
assert any("required_env" in m for m in msgs)
assert any("timeout" in m for m in msgs)
def test_workspace_runtime_config_not_dict(tmp_path: Path):
_write_yaml(
tmp_path / "config.yaml",
{"name": "x", "runtime": "langgraph", "runtime_config": "nope"},
)
msgs = [e.message for e in validate_workspace_template(tmp_path)]
assert any("runtime_config must be an object" in m for m in msgs)
def test_workspace_runtime_config_none_ok(tmp_path: Path):
_write_yaml(tmp_path / "config.yaml", {"name": "x", "runtime": "langgraph", "runtime_config": None})
assert validate_workspace_template(tmp_path) == []
def test_org_defaults_none_ok(tmp_path: Path):
_write_yaml(tmp_path / "org.yaml", {"name": "T", "defaults": None, "workspaces": [{"name": "a"}]})
assert validate_org_template(tmp_path) == []
def test_supported_runtimes_contains_known():
assert "claude-code" in SUPPORTED_RUNTIMES
assert "deepagents" in SUPPORTED_RUNTIMES
# ---------- org ----------
def test_org_happy(tmp_path: Path):
_write_yaml(
tmp_path / "org.yaml",
{
"name": "T",
"defaults": {"runtime": "claude-code"},
"workspaces": [
{
"name": "PM",
"tier": 3,
"runtime": "claude-code",
"workspace_access": "read_only",
"workspace_dir": "/repo",
"channels": [{"type": "telegram", "config": {"bot_token": "x"}}],
"schedules": [{"cron_expr": "* * * * *", "prompt": "hi"}],
"plugins": ["molecule-dev"],
"children": [{"name": "Dev"}],
}
],
},
)
assert validate_org_template(tmp_path) == []
def test_org_missing_file(tmp_path: Path):
errs = validate_org_template(tmp_path)
assert any("missing org.yaml" in e.message for e in errs)
def test_org_bad_yaml(tmp_path: Path):
(tmp_path / "org.yaml").write_text("foo: [bar\n")
errs = validate_org_template(tmp_path)
assert any("invalid YAML" in e.message for e in errs)
def test_org_not_object(tmp_path: Path):
(tmp_path / "org.yaml").write_text("- a\n")
errs = validate_org_template(tmp_path)
assert any("must be a YAML object" in e.message for e in errs)
def test_org_various_errors(tmp_path: Path):
_write_yaml(
tmp_path / "org.yaml",
{
"defaults": "nope",
"workspaces": [
"notadict",
{
"name": "",
"tier": 8,
"runtime": "wat",
"workspace_access": "invalid",
"channels": "nope",
"schedules": "nope",
"plugins": [1, 2],
"external": True,
},
{
"name": "y",
"workspace_access": "read_write", # but no workspace_dir
"channels": ["bad", {"config": "nope"}],
"schedules": ["bad", {}],
"children": "nope",
},
{
"name": "z",
"children": [{"name": "c"}, "bad"],
},
],
},
)
msgs = [e.message for e in validate_org_template(tmp_path)]
joined = "\n".join(msgs)
assert "missing required field: name" in joined
assert "defaults must be an object" in joined
assert "tier must be 1, 2, or 3" in joined
assert "runtime=" in joined
assert "workspace_access=" in joined
assert "requires workspace_dir" in joined
assert ".channels: must be a list" in joined
assert ".schedules: must be a list" in joined
assert "plugins: must be a list of strings" in joined
assert "external=true requires url" in joined
assert "missing required 'type'" in joined or "must be an object" in joined
assert "missing 'cron_expr'" in joined
assert "missing 'prompt'" in joined
assert ".children: must be a list" in joined
assert "must be an object" in joined
def test_org_missing_workspaces(tmp_path: Path):
_write_yaml(tmp_path / "org.yaml", {"name": "T"})
msgs = [e.message for e in validate_org_template(tmp_path)]
assert any("missing required field: workspaces" in m for m in msgs)
def test_org_workspaces_not_list(tmp_path: Path):
_write_yaml(tmp_path / "org.yaml", {"name": "T", "workspaces": "nope"})
msgs = [e.message for e in validate_org_template(tmp_path)]
assert any("workspaces must be a list" in m for m in msgs)
# ---------- channel ----------
def test_channel_config_happy():
assert validate_channel_config({
"type": "telegram",
"config": {"bot_token": "x"},
"enabled": True,
}) == []
def test_channel_config_missing_type():
errs = validate_channel_config({})
assert any("missing required field: type" in e.message for e in errs)
def test_channel_config_unsupported_type():
errs = validate_channel_config({"type": "fax"})
assert any("must be one of" in e.message for e in errs)
def test_channel_config_bad_config_type():
errs = validate_channel_config({"type": "telegram", "config": "nope"})
assert any("config must be an object" in e.message for e in errs)
def test_channel_config_missing_required_key():
errs = validate_channel_config({"type": "telegram", "config": {}})
assert any("bot_token is required" in e.message for e in errs)
def test_channel_config_bad_enabled():
errs = validate_channel_config({"type": "telegram", "config": {"bot_token": "x"}, "enabled": "yes"})
assert any("enabled must be a boolean" in e.message for e in errs)
def test_channel_file_list(tmp_path: Path):
p = tmp_path / "channels.yaml"
p.write_text(yaml.safe_dump([
{"type": "telegram", "config": {"bot_token": "x"}},
"notadict",
]))
errs = validate_channel_file(p)
assert any("must be an object" in e.message for e in errs)
def test_channel_file_single_dict(tmp_path: Path):
p = tmp_path / "channel.yaml"
p.write_text(yaml.safe_dump({"type": "telegram", "config": {"bot_token": "x"}}))
assert validate_channel_file(p) == []
def test_channel_file_missing():
errs = validate_channel_file(Path("/nonexistent/channel.yaml"))
assert any("file does not exist" in e.message for e in errs)
def test_channel_file_empty(tmp_path: Path):
p = tmp_path / "c.yaml"
p.write_text("")
errs = validate_channel_file(p)
assert any("empty" in e.message for e in errs)
def test_channel_file_bad_yaml(tmp_path: Path):
p = tmp_path / "c.yaml"
p.write_text("foo: [bar\n")
errs = validate_channel_file(p)
assert any("invalid YAML" in e.message for e in errs)
def test_channel_file_wrong_toplevel(tmp_path: Path):
p = tmp_path / "c.yaml"
p.write_text("5\n")
errs = validate_channel_file(p)
assert any("top-level must be" in e.message for e in errs)
def test_channel_types_exports():
assert "telegram" in SUPPORTED_CHANNEL_TYPES
# ---------- CLI ----------
def test_cli_workspace_valid(tmp_path, capsys):
_write_yaml(tmp_path / "config.yaml", {"name": "x", "runtime": "langgraph"})
assert cli_main(["validate", "workspace", str(tmp_path)]) == 0
def test_cli_workspace_invalid(tmp_path, capsys):
_write_yaml(tmp_path / "config.yaml", {"name": "", "runtime": ""})
assert cli_main(["validate", "workspace", str(tmp_path)]) == 1
def test_cli_org_quiet(tmp_path, capsys):
_write_yaml(tmp_path / "org.yaml", {"name": "T", "workspaces": [{"name": "a"}]})
assert cli_main(["validate", "org", str(tmp_path), "-q"]) == 0
out = capsys.readouterr().out
assert out == ""
def test_cli_channel_valid(tmp_path):
p = tmp_path / "c.yaml"
p.write_text(yaml.safe_dump({"type": "telegram", "config": {"bot_token": "x"}}))
assert cli_main(["validate", "channel", str(p)]) == 0
def test_cli_channel_missing(tmp_path):
assert cli_main(["validate", "channel", str(tmp_path / "missing.yaml")]) == 1
def test_cli_missing_path(tmp_path):
assert cli_main(["validate", "workspace", str(tmp_path / "nope")]) == 1
def test_cli_path_not_dir(tmp_path):
p = tmp_path / "file.txt"
p.write_text("hi")
assert cli_main(["validate", "workspace", str(p)]) == 1
def test_cli_plugin_dispatch(tmp_path):
# Plugin dir missing plugin.yaml -> validator returns errors -> exit 1
assert cli_main(["validate", "plugin", str(tmp_path)]) == 1