molecule-core/workspace-template/skill_loader/watcher.py
Dev Lead Agent 7f3274391e fix(security): H1 — replace MD5 with SHA-256 in config/skill watchers
Both watcher.py (ConfigWatcher) and skill_loader/watcher.py
(SkillsWatcher) used hashlib.md5() for file-integrity change detection.
MD5 is collision-prone: a crafted config file could produce the same
hash as a benign one, silently suppressing the hot-reload callback and
preventing agents from picking up legitimate config changes.

Replace hashlib.md5 → hashlib.sha256 in both _hash_file() methods.
Update docstrings, comments, and the type-annotation comment
(rel_path → md5 hex → sha256 hex).

Test update: test_skills_watcher.py — rename helper _md5 → _sha256,
update the hash-length assertion from 32 (MD5) to 64 (SHA-256), and
rename the test from test_hash_file_returns_md5_for_existing_file to
test_hash_file_returns_sha256_for_existing_file. All 25 watcher tests
pass.

Note: H2 (a2a_client.py timeout=None) was already fixed in Cycle 5
(timeout=httpx.Timeout(connect=30.0, read=300.0, ...)) — confirmed by
code review before opening this PR.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 07:52:07 +00:00

228 lines
8.2 KiB
Python

"""Skills hot-reload watcher.
Monitors the workspace's ``skills/`` directory for file changes and reloads
affected skill modules in-place — no coordinator restart required.
Architecture
------------
``SkillsWatcher`` runs as a background asyncio task alongside the agent. It
polls the skill directories every ``POLL_INTERVAL`` seconds (default 3 s),
computes SHA-256 hashes of every file, and fires ``_reload_skill()`` when any
file inside a skill's folder changes.
``_reload_skill()`` calls ``load_skills()`` from ``skills.loader`` for the
changed skill and passes the fresh ``LoadedSkill`` to every registered
``on_reload`` callback. Adapters register a callback that rebuilds the
LangGraph agent with the updated tool set, so the change takes effect on
the very next incoming A2A task — zero downtime.
Audit event
-----------
Every successful reload emits::
event_type : "skill_reload"
action : "reload"
resource : "<skill_name>"
outcome : "success" | "failure"
changed_files : [list of relative paths that triggered the reload]
Usage::
watcher = SkillsWatcher(
config_path="/configs",
skill_names=["web_search", "code_review"],
on_reload=lambda skill: rebuild_agent_with_skill(skill),
)
asyncio.create_task(watcher.start())
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import sys
from pathlib import Path
from typing import Callable
logger = logging.getLogger(__name__)
POLL_INTERVAL = 3.0 # seconds between filesystem polls
DEBOUNCE_SECS = 1.5 # wait for writes to settle before reloading
class SkillsWatcher:
"""Watches skill directories and reloads changed skills without restarting.
Args:
config_path: Path to the workspace config directory (contains ``skills/``).
skill_names: List of skill IDs to watch (subfolder names under ``skills/``).
on_reload: Async or sync callable invoked with a fresh ``LoadedSkill``
every time a skill is reloaded. May be called concurrently
for multiple skills if several change at once.
"""
def __init__(
self,
config_path: str,
skill_names: list[str],
on_reload: Callable | None = None,
) -> None:
self.config_path = config_path
self.skill_names = list(skill_names)
self.on_reload = on_reload
self._hashes: dict[str, str] = {} # rel_path → sha256 hex
self._running = False
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def start(self) -> None:
"""Start the poll loop in the current event loop. Runs until ``stop()``."""
self._running = True
self._hashes = self._scan()
logger.info(
"SkillsWatcher: monitoring %d skill(s) in %s",
len(self.skill_names), self.config_path,
)
while self._running:
await asyncio.sleep(POLL_INTERVAL)
await self._tick()
def stop(self) -> None:
self._running = False
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _skills_root(self) -> Path:
return Path(self.config_path) / "skills"
def _hash_file(self, path: Path) -> str:
try:
# H1: SHA-256 replaces MD5 for file-integrity change detection.
return hashlib.sha256(path.read_bytes()).hexdigest()
except OSError:
return ""
def _scan(self) -> dict[str, str]:
"""Return {relative_path: sha256} for every file in watched skill dirs."""
hashes: dict[str, str] = {}
root = self._skills_root()
for skill_name in self.skill_names:
skill_dir = root / skill_name
if not skill_dir.is_dir():
continue
for fpath in skill_dir.rglob("*"):
if fpath.is_file() and not fpath.name.startswith("."):
rel = str(fpath.relative_to(root))
hashes[rel] = self._hash_file(fpath)
return hashes
def _changed_skills(self, new_hashes: dict[str, str]) -> dict[str, list[str]]:
"""Return {skill_name: [changed_file, …]} for skills with file changes."""
changed: dict[str, list[str]] = {}
all_paths = set(new_hashes) | set(self._hashes)
for rel_path in all_paths:
old = self._hashes.get(rel_path, "")
new = new_hashes.get(rel_path, "")
if old != new:
# rel_path is like "web_search/SKILL.md" or "web_search/tools/foo.py"
skill_name = rel_path.split("/")[0]
if skill_name in self.skill_names:
changed.setdefault(skill_name, []).append(rel_path)
return changed
async def _tick(self) -> None:
"""One poll cycle: detect changes, debounce, reload."""
new_hashes = self._scan()
changed = self._changed_skills(new_hashes)
if not changed:
return
logger.info("SkillsWatcher: changes detected in %s", list(changed.keys()))
await asyncio.sleep(DEBOUNCE_SECS)
# Re-scan after debounce to absorb any writes still in-flight
new_hashes = self._scan()
changed = self._changed_skills(new_hashes)
self._hashes = new_hashes # commit new baseline
for skill_name, files in changed.items():
await self._reload_skill(skill_name, files)
async def _reload_skill(self, skill_name: str, changed_files: list[str]) -> None:
"""Reload *skill_name*'s modules and notify the callback."""
logger.info("SkillsWatcher: reloading skill '%s' (changed: %s)", skill_name, changed_files)
# Evict stale module entries so importlib loads fresh copies
stale = [k for k in sys.modules if k.startswith(f"skill_tool_")]
for key in stale:
del sys.modules[key]
try:
from skill_loader.loader import load_skills
loaded = load_skills(self.config_path, [skill_name])
if loaded:
skill = loaded[0]
logger.info(
"SkillsWatcher: skill '%s' reloaded — %d tool(s)",
skill_name, len(skill.tools),
)
# Audit event
try:
from builtin_tools.audit import log_event
log_event(
event_type="skill_reload",
action="reload",
resource=skill_name,
outcome="success",
changed_files=changed_files,
tool_count=len(skill.tools),
)
except Exception:
pass
# Notify adapter callback
if self.on_reload is not None:
try:
result = self.on_reload(skill)
if asyncio.iscoroutine(result):
await result
except Exception as exc:
logger.error(
"SkillsWatcher: on_reload callback failed for '%s': %s",
skill_name, exc,
)
else:
logger.warning("SkillsWatcher: no LoadedSkill returned for '%s'", skill_name)
self._audit_failure(skill_name, changed_files, "no_skill_returned")
except Exception as exc:
logger.error("SkillsWatcher: reload failed for '%s': %s", skill_name, exc)
self._audit_failure(skill_name, changed_files, str(exc))
@staticmethod
def _audit_failure(skill_name: str, changed_files: list[str], error: str) -> None:
try:
from builtin_tools.audit import log_event
log_event(
event_type="skill_reload",
action="reload",
resource=skill_name,
outcome="failure",
changed_files=changed_files,
error=error,
)
except Exception:
pass