feat(claw-migrate): harden OpenClaw import with plan-first apply, redaction, and pre-migration backup (#16911)
* feat(claw-migrate): harden OpenClaw import with plan-first apply, redaction, and pre-migration backup Adopts four design patterns from OpenClaw's reciprocal migrate-hermes importer so both migration paths have the same safety posture. - **Refuse-on-conflict apply.** 'hermes claw migrate' now refuses to execute when the plan has any conflict items, unless --overwrite is set. Previously the user could say 'yes, proceed' and end up with a silent partial migration that skipped every conflicting item. - **Engine-level secret redaction.** The report.json and summary.md written to disk (and --json stdout) run through a redactor that matches OpenClaw's key-name markers and value-shape patterns (sk-*, ghp_*, xox*-, AIza*, Bearer *). Prevents accidental API key leakage in bug reports and support channels. - **Pre-migration tarball snapshot.** Apply creates one timestamped restore-point archive of ~/.hermes/ at ~/.hermes/migration/pre-migration-backups/ before any mutation, excluding regenerable directories (sessions, logs, cache). Opt out with --no-backup. - **Blocked-by-earlier-conflict sequencing.** If a config.yaml write hits conflict/error mid-apply, subsequent config-mutating options are marked skipped with reason 'blocked by earlier apply conflict' rather than attempting partial writes. - **Structured warnings[] and next_steps[] on the report** — actionable guidance surfaces in both JSON output and summary.md. - **--json output mode** — emits the redacted report on stdout for CI. Also flips --preset full to NOT auto-enable --migrate-secrets. Users now have to opt in to secret import explicitly, mirroring OpenClaw's two-phase posture. Status/kind/action constants are defined (STATUS_MIGRATED etc) with values that match the existing strings the script emits, so the report schema is backward-compatible. ItemResult gains a 'sensitive' bool field that redaction and consumers can key off. Validation: 26 new unit tests + 1 updated test in tests/skills/ test_openclaw_migration_hardening.py and test_claw.py cover redaction (key markers, value patterns, recursion, on-disk), warnings/next_steps, blocked-by-earlier sequencing, --json mode, and the preset-flip. Manual E2E against a fake $HERMES_HOME with real-shaped secrets confirmed: (1) secrets never appear in stdout or on disk, (2) _cmd_migrate refuses apply when plan has conflicts, (3) --overwrite proceeds past the guard and the backup tarball is created, (4) --no-backup skips the archive. Related docs: website/docs/guides/migrate-from-openclaw.md and website/docs/reference/cli-commands.md updated to reflect the preset-flip and new --no-backup flag. * refactor(claw-migrate): reuse hermes backup system for pre-migration snapshot Drops the inline tarball in hermes_cli/claw.py in favor of hermes_cli.backup.create_pre_migration_backup(), which shares an implementation with create_pre_update_backup via a new _write_full_zip_backup helper. Benefits: - Consistent exclusion rules with hermes backup (_EXCLUDED_DIRS, _EXCLUDED_SUFFIXES, _EXCLUDED_NAMES — single source of truth). - SQLite safe-copy via _safe_copy_db (state.db restores cleanly). - Zip format restorable with 'hermes import <archive>'. - Lives under ~/.hermes/backups/pre-migration-*.zip alongside pre-update-*.zip — one place for all snapshot archives. - Auto-prune rotation with separate keep counters (pre-migration keeps 5, pre-update keeps 5, they don't touch each other's files). 7 new tests in tests/hermes_cli/test_backup.py lock the contract: directory location, shared exclusion rules, _validate_backup_zip acceptance (i.e. restorable with 'hermes import'), non-recursive into prior backups, rotation, missing-home handling, and the invariant that pre-migration rotation never touches pre-update backups. Help text and docs updated — the restore hint now says 'hermes import <name>' instead of 'tar -xzf <archive> -C ~/'. * chore(claw-migrate): use backup._format_size and drop duplicate output line Minor polish using another existing primitive from hermes_cli.backup: - Show backup archive size with _format_size (e.g. '(245 B)' or '(2.4 MB)') matching the format hermes backup already uses. - Drop the duplicate 'Pre-migration backup saved' line after Migration Results — the earlier 'Pre-migration backup: <path> (<size>)' line already surfaces the path before apply runs. --------- Co-authored-by: teknium1 <teknium@users.noreply.github.com>
This commit is contained in:
parent
a83f669bcf
commit
cf0852f92e
@ -696,6 +696,78 @@ def run_quick_backup(args) -> None:
|
||||
print("No state files found to snapshot.")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared full-zip backup helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_full_zip_backup(out_path: Path, hermes_root: Path) -> Optional[Path]:
|
||||
"""Write a full zip snapshot of ``hermes_root`` to ``out_path``.
|
||||
|
||||
Uses the same exclusion rules and SQLite safe-copy as :func:`run_backup`.
|
||||
Returns the output path on success, None on failure (nothing to back up,
|
||||
or write error — caller should surface the outcome but not raise).
|
||||
"""
|
||||
files_to_add: list[tuple[Path, Path]] = []
|
||||
try:
|
||||
for dirpath, dirnames, filenames in os.walk(hermes_root, followlinks=False):
|
||||
dp = Path(dirpath)
|
||||
# Prune excluded directories in-place so os.walk doesn't descend
|
||||
dirnames[:] = [d for d in dirnames if d not in _EXCLUDED_DIRS]
|
||||
|
||||
for fname in filenames:
|
||||
fpath = dp / fname
|
||||
try:
|
||||
rel = fpath.relative_to(hermes_root)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if _should_exclude(rel):
|
||||
continue
|
||||
|
||||
# Skip the output zip itself if it already exists inside root.
|
||||
try:
|
||||
if fpath.resolve() == out_path.resolve():
|
||||
continue
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
|
||||
files_to_add.append((fpath, rel))
|
||||
except OSError as exc:
|
||||
logger.warning("Full-zip backup: walk failed: %s", exc)
|
||||
return None
|
||||
|
||||
if not files_to_add:
|
||||
return None
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED, compresslevel=6) as zf:
|
||||
for abs_path, rel_path in files_to_add:
|
||||
try:
|
||||
if abs_path.suffix == ".db":
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||||
tmp_db = Path(tmp.name)
|
||||
try:
|
||||
if _safe_copy_db(abs_path, tmp_db):
|
||||
zf.write(tmp_db, arcname=str(rel_path))
|
||||
finally:
|
||||
tmp_db.unlink(missing_ok=True)
|
||||
else:
|
||||
zf.write(abs_path, arcname=str(rel_path))
|
||||
except (PermissionError, OSError, ValueError) as exc:
|
||||
logger.debug("Skipping %s in zip backup: %s", rel_path, exc)
|
||||
continue
|
||||
except OSError as exc:
|
||||
logger.warning("Full-zip backup: zip write failed: %s", exc)
|
||||
# Best-effort cleanup of partial file
|
||||
try:
|
||||
out_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
|
||||
return out_path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pre-update auto-backup
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -768,64 +840,87 @@ def create_pre_update_backup(
|
||||
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
||||
out_path = backup_dir / f"{_PRE_UPDATE_PREFIX}{stamp}.zip"
|
||||
|
||||
# Collect files (same logic as run_backup, minus the chatty progress prints)
|
||||
files_to_add: list[tuple[Path, Path]] = []
|
||||
try:
|
||||
for dirpath, dirnames, filenames in os.walk(hermes_root, followlinks=False):
|
||||
dp = Path(dirpath)
|
||||
# Prune excluded directories in-place so os.walk doesn't descend
|
||||
dirnames[:] = [d for d in dirnames if d not in _EXCLUDED_DIRS]
|
||||
|
||||
for fname in filenames:
|
||||
fpath = dp / fname
|
||||
try:
|
||||
rel = fpath.relative_to(hermes_root)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if _should_exclude(rel):
|
||||
continue
|
||||
|
||||
# Skip the output zip itself if it already exists
|
||||
try:
|
||||
if fpath.resolve() == out_path.resolve():
|
||||
continue
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
|
||||
files_to_add.append((fpath, rel))
|
||||
except OSError as exc:
|
||||
logger.warning("Pre-update backup: walk failed: %s", exc)
|
||||
return None
|
||||
|
||||
if not files_to_add:
|
||||
return None
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED, compresslevel=6) as zf:
|
||||
for abs_path, rel_path in files_to_add:
|
||||
try:
|
||||
if abs_path.suffix == ".db":
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||||
tmp_db = Path(tmp.name)
|
||||
try:
|
||||
if _safe_copy_db(abs_path, tmp_db):
|
||||
zf.write(tmp_db, arcname=str(rel_path))
|
||||
finally:
|
||||
tmp_db.unlink(missing_ok=True)
|
||||
else:
|
||||
zf.write(abs_path, arcname=str(rel_path))
|
||||
except (PermissionError, OSError, ValueError) as exc:
|
||||
logger.debug("Skipping %s in pre-update backup: %s", rel_path, exc)
|
||||
continue
|
||||
except OSError as exc:
|
||||
logger.warning("Pre-update backup: zip write failed: %s", exc)
|
||||
# Best-effort cleanup of partial file
|
||||
try:
|
||||
out_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
result = _write_full_zip_backup(out_path, hermes_root)
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
_prune_pre_update_backups(backup_dir, keep=keep)
|
||||
return out_path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pre-migration auto-backup (used by `hermes claw migrate`)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PRE_MIGRATION_PREFIX = "pre-migration-"
|
||||
_PRE_MIGRATION_DEFAULT_KEEP = 5
|
||||
|
||||
|
||||
def _prune_pre_migration_backups(backup_dir: Path, keep: int) -> int:
|
||||
"""Remove oldest pre-migration backups beyond the keep limit.
|
||||
|
||||
Only touches files matching ``pre-migration-*.zip`` so other backups in
|
||||
the same directory are never touched.
|
||||
"""
|
||||
if keep < 0:
|
||||
keep = 0
|
||||
if not backup_dir.exists():
|
||||
return 0
|
||||
|
||||
backups = sorted(
|
||||
(p for p in backup_dir.iterdir()
|
||||
if p.is_file() and p.name.startswith(_PRE_MIGRATION_PREFIX) and p.suffix.lower() == ".zip"),
|
||||
key=lambda p: p.name,
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
deleted = 0
|
||||
for p in backups[keep:]:
|
||||
try:
|
||||
p.unlink()
|
||||
deleted += 1
|
||||
except OSError as exc:
|
||||
logger.warning("Failed to prune pre-migration backup %s: %s", p.name, exc)
|
||||
|
||||
return deleted
|
||||
|
||||
|
||||
def create_pre_migration_backup(
|
||||
hermes_home: Optional[Path] = None,
|
||||
keep: int = _PRE_MIGRATION_DEFAULT_KEEP,
|
||||
) -> Optional[Path]:
|
||||
"""Create a full zip backup of HERMES_HOME under ``backups/`` before a
|
||||
``hermes claw migrate`` apply.
|
||||
|
||||
Shares implementation with :func:`create_pre_update_backup` via
|
||||
``_write_full_zip_backup`` — same exclusions, same SQLite safe-copy,
|
||||
restorable with ``hermes import <archive>``. Writes to
|
||||
``<HERMES_HOME>/backups/pre-migration-<timestamp>.zip`` and auto-prunes
|
||||
old pre-migration backups.
|
||||
|
||||
Returns the path to the created zip, or ``None`` if nothing was found
|
||||
to back up (fresh install) or the write failed. Never raises — the
|
||||
caller decides whether to abort or proceed.
|
||||
"""
|
||||
hermes_root = hermes_home or get_default_hermes_root()
|
||||
if not hermes_root.is_dir():
|
||||
return None
|
||||
|
||||
# Reuses the shared backups/ directory so `hermes import` and the
|
||||
# update-backup listing pick up pre-migration archives too.
|
||||
backup_dir = _pre_update_backup_dir(hermes_root)
|
||||
try:
|
||||
backup_dir.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as exc:
|
||||
logger.warning("Could not create pre-migration backup dir %s: %s", backup_dir, exc)
|
||||
return None
|
||||
|
||||
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
||||
out_path = backup_dir / f"{_PRE_MIGRATION_PREFIX}{stamp}.zip"
|
||||
|
||||
result = _write_full_zip_backup(out_path, hermes_root)
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
_prune_pre_migration_backups(backup_dir, keep=keep)
|
||||
return out_path
|
||||
|
||||
@ -4,7 +4,8 @@ Usage:
|
||||
hermes claw migrate # Preview then migrate (always shows preview first)
|
||||
hermes claw migrate --dry-run # Preview only, no changes
|
||||
hermes claw migrate --yes # Skip confirmation prompt
|
||||
hermes claw migrate --preset full --overwrite # Full migration, overwrite conflicts
|
||||
hermes claw migrate --preset full --overwrite --migrate-secrets # Full run w/ secrets
|
||||
hermes claw migrate --no-backup # Skip pre-migration snapshot
|
||||
hermes claw cleanup # Archive leftover OpenClaw directories
|
||||
hermes claw cleanup --dry-run # Preview what would be archived
|
||||
"""
|
||||
@ -15,6 +16,7 @@ import subprocess
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from hermes_cli.config import get_hermes_home, get_config_path, load_config, save_config
|
||||
from hermes_constants import get_optional_skills_dir
|
||||
@ -321,10 +323,13 @@ def _cmd_migrate(args):
|
||||
migrate_secrets = getattr(args, "migrate_secrets", False)
|
||||
workspace_target = getattr(args, "workspace_target", None)
|
||||
skill_conflict = getattr(args, "skill_conflict", "skip")
|
||||
no_backup = getattr(args, "no_backup", False)
|
||||
|
||||
# If using the "full" preset, secrets are included by default
|
||||
if preset == "full":
|
||||
migrate_secrets = True
|
||||
# Secrets are never included implicitly — they must be explicitly requested
|
||||
# via --migrate-secrets, even under --preset full. This mirrors OpenClaw's
|
||||
# migrate-hermes posture (two-phase: run once without secrets, rerun with
|
||||
# --include-secrets) and prevents a --preset full invocation from silently
|
||||
# importing API keys that the user may not have intended to copy.
|
||||
|
||||
print()
|
||||
print(
|
||||
@ -431,15 +436,24 @@ def _cmd_migrate(args):
|
||||
|
||||
preview_summary = preview_report.get("summary", {})
|
||||
preview_count = preview_summary.get("migrated", 0)
|
||||
preview_conflicts = preview_summary.get("conflict", 0)
|
||||
|
||||
if preview_count == 0:
|
||||
# "Nothing to migrate" means nothing migrated AND nothing blocked by
|
||||
# conflicts. If there are conflicts, we still want to show the plan and
|
||||
# surface the refusal/--overwrite guidance instead of silently bailing.
|
||||
if preview_count == 0 and preview_conflicts == 0:
|
||||
print()
|
||||
print_info("Nothing to migrate from OpenClaw.")
|
||||
_print_migration_report(preview_report, dry_run=True)
|
||||
return
|
||||
|
||||
print()
|
||||
print_header(f"Migration Preview — {preview_count} item(s) would be imported")
|
||||
if preview_count > 0:
|
||||
print_header(f"Migration Preview — {preview_count} item(s) would be imported")
|
||||
else:
|
||||
print_header(
|
||||
f"Migration Preview — {preview_conflicts} conflict(s), nothing would be imported"
|
||||
)
|
||||
print_info("No changes have been made yet. Review the list below:")
|
||||
_print_migration_report(preview_report, dry_run=True)
|
||||
|
||||
@ -447,6 +461,24 @@ def _cmd_migrate(args):
|
||||
if dry_run:
|
||||
return
|
||||
|
||||
# ── Phase 1b: Refuse if the plan has conflicts and --overwrite is not set ─
|
||||
# Modelled on OpenClaw's assertConflictFreePlan() — apply is a safe no-op
|
||||
# on conflicts unless the user explicitly opts in to overwriting. Without
|
||||
# this guard, the user would answer "yes, proceed" and silently end up
|
||||
# with a migration that skipped every conflicting item.
|
||||
if preview_conflicts > 0 and not overwrite:
|
||||
print()
|
||||
print_error(
|
||||
f"Plan has {preview_conflicts} conflict(s). Refusing to apply."
|
||||
)
|
||||
print_info(
|
||||
"Each conflict is an item whose target already exists in ~/.hermes/. "
|
||||
"Re-run with --overwrite to replace conflicting targets (item-level "
|
||||
"backups are written to the migration report directory)."
|
||||
)
|
||||
print_info("Or re-run with --dry-run to review the full plan.")
|
||||
return
|
||||
|
||||
# ── Phase 2: Confirm and execute ───────────────────────────
|
||||
print()
|
||||
if not auto_yes:
|
||||
@ -458,6 +490,32 @@ def _cmd_migrate(args):
|
||||
print_info("Migration cancelled.")
|
||||
return
|
||||
|
||||
# ── Phase 2b: Pre-apply backup of the Hermes home ─────────
|
||||
# Delegates to hermes_cli.backup.create_pre_migration_backup(), which
|
||||
# shares implementation with the pre-update backup (same exclusion
|
||||
# rules, same SQLite safe-copy, zip format) so the archive is
|
||||
# restorable with `hermes import`. Mirrors OpenClaw's
|
||||
# createPreMigrationBackup posture — one atomic restore point before
|
||||
# any mutation, auto-pruned to the last 5 pre-migration zips.
|
||||
backup_archive: Optional[Path] = None
|
||||
if not no_backup:
|
||||
try:
|
||||
from hermes_cli.backup import create_pre_migration_backup, _format_size
|
||||
backup_archive = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
if backup_archive:
|
||||
size_str = _format_size(backup_archive.stat().st_size)
|
||||
print()
|
||||
print_success(f"Pre-migration backup: {backup_archive} ({size_str})")
|
||||
print_info(f"Restore with: hermes import {backup_archive.name}")
|
||||
except Exception as e:
|
||||
print()
|
||||
print_error(f"Could not create pre-migration backup: {e}")
|
||||
print_info(
|
||||
"Re-run with --no-backup to skip, or free up disk space under the Hermes home."
|
||||
)
|
||||
logger.debug("Pre-migration backup error", exc_info=True)
|
||||
return
|
||||
|
||||
try:
|
||||
migrator = mod.Migrator(
|
||||
source_root=source_dir.resolve(),
|
||||
@ -476,6 +534,9 @@ def _cmd_migrate(args):
|
||||
print()
|
||||
print_error(f"Migration failed: {e}")
|
||||
logger.debug("OpenClaw migration error", exc_info=True)
|
||||
if backup_archive:
|
||||
print_info(f"A pre-migration backup is available at: {backup_archive}")
|
||||
print_info(f"Restore with: hermes import {backup_archive.name}")
|
||||
return
|
||||
|
||||
# Print results
|
||||
|
||||
@ -9748,17 +9748,26 @@ Examples:
|
||||
"--preset",
|
||||
choices=["user-data", "full"],
|
||||
default="full",
|
||||
help="Migration preset (default: full). 'user-data' excludes secrets",
|
||||
help="Migration preset (default: full). Neither preset imports secrets — "
|
||||
"pass --migrate-secrets to include API keys.",
|
||||
)
|
||||
claw_migrate.add_argument(
|
||||
"--overwrite",
|
||||
action="store_true",
|
||||
help="Overwrite existing files (default: skip conflicts)",
|
||||
help="Overwrite existing files (default: refuse to apply when the plan has conflicts)",
|
||||
)
|
||||
claw_migrate.add_argument(
|
||||
"--migrate-secrets",
|
||||
action="store_true",
|
||||
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.)",
|
||||
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.). "
|
||||
"Required even under --preset full.",
|
||||
)
|
||||
claw_migrate.add_argument(
|
||||
"--no-backup",
|
||||
action="store_true",
|
||||
help="Skip the pre-migration zip snapshot of ~/.hermes/ (by default a "
|
||||
"single restore-point archive is written to ~/.hermes/backups/ "
|
||||
"before apply; restorable with 'hermes import').",
|
||||
)
|
||||
claw_migrate.add_argument(
|
||||
"--workspace-target", help="Absolute path to copy workspace instructions into"
|
||||
|
||||
@ -224,6 +224,24 @@ MIGRATION_PRESETS: Dict[str, set[str]] = {
|
||||
}
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# Item shape constants — kept stable for downstream consumers of report.json.
|
||||
# Inspired by OpenClaw's src/plugin-sdk/migration.ts so both sides speak the
|
||||
# same vocabulary. Values intentionally match the strings already produced
|
||||
# by this script (migrated/archived/skipped/conflict/error) so the addition
|
||||
# is backward-compatible.
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
STATUS_MIGRATED = "migrated"
|
||||
STATUS_ARCHIVED = "archived"
|
||||
STATUS_SKIPPED = "skipped"
|
||||
STATUS_CONFLICT = "conflict"
|
||||
STATUS_ERROR = "error"
|
||||
STATUS_PLANNED = "planned"
|
||||
|
||||
REASON_TARGET_EXISTS = "Target exists and overwrite is disabled"
|
||||
REASON_BLOCKED_BY_APPLY_CONFLICT = "blocked by earlier apply conflict"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ItemResult:
|
||||
kind: str
|
||||
@ -232,6 +250,7 @@ class ItemResult:
|
||||
status: str
|
||||
reason: str = ""
|
||||
details: Dict[str, Any] = field(default_factory=dict)
|
||||
sensitive: bool = False
|
||||
|
||||
|
||||
def parse_selection_values(values: Optional[Sequence[str]]) -> List[str]:
|
||||
@ -547,32 +566,128 @@ def relative_label(path: Path, root: Path) -> str:
|
||||
return str(path)
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# Secret redaction for migration reports.
|
||||
#
|
||||
# The report JSON persists to disk inside the migration output directory and
|
||||
# frequently ends up in bug reports or support channels. Anything that looks
|
||||
# like a credential — by key name or by value shape — is replaced with
|
||||
# "[redacted]" before the report is written.
|
||||
#
|
||||
# Modelled on OpenClaw's src/plugin-sdk/migration.ts so both migration tools
|
||||
# redact consistently. Pure function — safe to call on any plain-data dict.
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
REDACTED_MIGRATION_VALUE = "[redacted]"
|
||||
|
||||
_SECRET_KEY_MARKERS = (
|
||||
"accesstoken",
|
||||
"apikey",
|
||||
"authorization",
|
||||
"bearertoken",
|
||||
"clientsecret",
|
||||
"cookie",
|
||||
"credential",
|
||||
"password",
|
||||
"privatekey",
|
||||
"refreshtoken",
|
||||
"secret",
|
||||
)
|
||||
|
||||
_SECRET_VALUE_PATTERNS = (
|
||||
re.compile(r"\bBearer\s+[A-Za-z0-9._~+/=\-]+"),
|
||||
re.compile(r"\bsk-[A-Za-z0-9_\-]{8,}\b"),
|
||||
re.compile(r"\bgh[pousr]_[A-Za-z0-9_]{16,}\b"),
|
||||
re.compile(r"\bxox[abprs]-[A-Za-z0-9\-]{8,}\b"),
|
||||
re.compile(r"\bAIza[0-9A-Za-z_\-]{12,}\b"),
|
||||
)
|
||||
|
||||
|
||||
def _normalize_secret_key(key: str) -> str:
|
||||
return re.sub(r"[^a-z0-9]", "", key.lower())
|
||||
|
||||
|
||||
def _is_secret_key(key: str) -> bool:
|
||||
normalized = _normalize_secret_key(key)
|
||||
if normalized == "token" or normalized.endswith("token"):
|
||||
return True
|
||||
if normalized in ("auth", "authorization"):
|
||||
return True
|
||||
return any(marker in normalized for marker in _SECRET_KEY_MARKERS)
|
||||
|
||||
|
||||
def _redact_string(value: str) -> str:
|
||||
for pattern in _SECRET_VALUE_PATTERNS:
|
||||
value = pattern.sub(REDACTED_MIGRATION_VALUE, value)
|
||||
return value
|
||||
|
||||
|
||||
def redact_migration_value(value: Any) -> Any:
|
||||
"""Return a deep copy of ``value`` with secret-looking content replaced.
|
||||
|
||||
Applied to every report written to disk. Keys whose normalized form
|
||||
matches a credential marker get their value replaced wholesale. Strings
|
||||
anywhere in the tree are scanned for common token patterns (sk-..., ghp_...,
|
||||
xox*-, AIza*, Bearer ...) and those substrings are replaced inline.
|
||||
"""
|
||||
return _redact_internal(value, set())
|
||||
|
||||
|
||||
def _redact_internal(value: Any, seen: set) -> Any:
|
||||
if isinstance(value, str):
|
||||
return _redact_string(value)
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [_redact_internal(entry, seen) for entry in value]
|
||||
if isinstance(value, dict):
|
||||
obj_id = id(value)
|
||||
if obj_id in seen:
|
||||
return REDACTED_MIGRATION_VALUE
|
||||
seen.add(obj_id)
|
||||
out: Dict[str, Any] = {}
|
||||
for key, entry in value.items():
|
||||
if isinstance(key, str) and _is_secret_key(key):
|
||||
out[key] = REDACTED_MIGRATION_VALUE
|
||||
else:
|
||||
out[key] = _redact_internal(entry, seen)
|
||||
return out
|
||||
return value
|
||||
|
||||
|
||||
def write_report(output_dir: Path, report: Dict[str, Any]) -> None:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
# Always redact before persisting. Callers who need the raw object
|
||||
# (in-process) still get it back from build_report(); only the on-disk
|
||||
# copy is redacted.
|
||||
redacted = redact_migration_value(report)
|
||||
(output_dir / "report.json").write_text(
|
||||
json.dumps(report, indent=2, ensure_ascii=False) + "\n",
|
||||
json.dumps(redacted, indent=2, ensure_ascii=False) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
grouped: Dict[str, List[Dict[str, Any]]] = {}
|
||||
for item in report["items"]:
|
||||
for item in redacted["items"]:
|
||||
grouped.setdefault(item["status"], []).append(item)
|
||||
|
||||
lines = [
|
||||
"# OpenClaw -> Hermes Migration Report",
|
||||
"",
|
||||
f"- Timestamp: {report['timestamp']}",
|
||||
f"- Mode: {report['mode']}",
|
||||
f"- Source: `{report['source_root']}`",
|
||||
f"- Target: `{report['target_root']}`",
|
||||
f"- Timestamp: {redacted['timestamp']}",
|
||||
f"- Mode: {redacted['mode']}",
|
||||
f"- Source: `{redacted['source_root']}`",
|
||||
f"- Target: `{redacted['target_root']}`",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
]
|
||||
|
||||
for key, value in report["summary"].items():
|
||||
for key, value in redacted["summary"].items():
|
||||
lines.append(f"- {key}: {value}")
|
||||
|
||||
warnings = redacted.get("warnings") or []
|
||||
if warnings:
|
||||
lines.extend(["", "## Warnings", ""])
|
||||
for warning in warnings:
|
||||
lines.append(f"- {warning}")
|
||||
|
||||
lines.extend(["", "## What Was Not Fully Brought Over", ""])
|
||||
skipped = grouped.get("skipped", []) + grouped.get("conflict", []) + grouped.get("error", [])
|
||||
if not skipped:
|
||||
@ -584,6 +699,12 @@ def write_report(output_dir: Path, report: Dict[str, Any]) -> None:
|
||||
reason = item["reason"] or item["status"]
|
||||
lines.append(f"- `{source}` -> `{dest}`: {reason}")
|
||||
|
||||
next_steps = redacted.get("next_steps") or []
|
||||
if next_steps:
|
||||
lines.extend(["", "## Next Steps", ""])
|
||||
for step in next_steps:
|
||||
lines.append(f"- {step}")
|
||||
|
||||
(output_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||
|
||||
|
||||
@ -618,6 +739,12 @@ class Migrator:
|
||||
self.backup_dir = self.output_dir / "backups" if self.output_dir else None
|
||||
self.overflow_dir = self.output_dir / "overflow" if self.output_dir else None
|
||||
self.items: List[ItemResult] = []
|
||||
# Once a config.yaml write hits conflict/error mid-run, later
|
||||
# config.yaml writes are deliberately short-circuited to avoid
|
||||
# leaving config in a partially-written state. Modelled on
|
||||
# OpenClaw's extensions/migrate-hermes/apply.ts "blocked by earlier
|
||||
# apply conflict" sequencing.
|
||||
self._config_apply_blocked: bool = False
|
||||
|
||||
# Resolve the configured workspace directory from openclaw.json.
|
||||
# Many users (especially those who started before the OpenClaw rebrand)
|
||||
@ -654,6 +781,32 @@ class Migrator:
|
||||
def is_selected(self, option_id: str) -> bool:
|
||||
return option_id in self.selected_options
|
||||
|
||||
# Option ids that mutate the Hermes config.yaml file. Once any one of
|
||||
# them records a conflict/error on config.yaml, subsequent ones are
|
||||
# short-circuited to avoid partial writes. Keep in sync with methods
|
||||
# that call load_yaml_file(target_root / "config.yaml") + dump_yaml_file.
|
||||
_CONFIG_MUTATING_OPTIONS = frozenset({
|
||||
"model-config",
|
||||
"tts-config",
|
||||
"mcp-servers",
|
||||
"plugins-config",
|
||||
"cron-jobs",
|
||||
"hooks-config",
|
||||
"agent-config",
|
||||
"gateway-config",
|
||||
"session-config",
|
||||
"full-providers",
|
||||
"deep-channels",
|
||||
"browser-config",
|
||||
"tools-config",
|
||||
"approvals-config",
|
||||
"memory-backend",
|
||||
"skills-config",
|
||||
"ui-identity",
|
||||
"logging-config",
|
||||
"command-allowlist",
|
||||
})
|
||||
|
||||
def record(
|
||||
self,
|
||||
kind: str,
|
||||
@ -663,6 +816,7 @@ class Migrator:
|
||||
reason: str = "",
|
||||
**details: Any,
|
||||
) -> None:
|
||||
sensitive = bool(details.pop("sensitive", False))
|
||||
self.items.append(
|
||||
ItemResult(
|
||||
kind=kind,
|
||||
@ -671,8 +825,16 @@ class Migrator:
|
||||
status=status,
|
||||
reason=reason,
|
||||
details=details,
|
||||
sensitive=sensitive,
|
||||
)
|
||||
)
|
||||
# Flip the config-block flag when a conflict/error occurs on a
|
||||
# config.yaml write. Later config-mutating options will skip rather
|
||||
# than attempting a partial write.
|
||||
if status in (STATUS_CONFLICT, STATUS_ERROR) and destination is not None:
|
||||
dest_str = str(destination)
|
||||
if dest_str.endswith("config.yaml") or dest_str.endswith("config.yml"):
|
||||
self._config_apply_blocked = True
|
||||
|
||||
def source_candidate(self, *relative_paths: str) -> Optional[Path]:
|
||||
for rel in relative_paths:
|
||||
@ -798,11 +960,30 @@ class Migrator:
|
||||
return self.build_report()
|
||||
|
||||
def run_if_selected(self, option_id: str, func) -> None:
|
||||
if self.is_selected(option_id):
|
||||
func()
|
||||
if not self.is_selected(option_id):
|
||||
meta = MIGRATION_OPTION_METADATA[option_id]
|
||||
self.record(option_id, None, None, "skipped", "Not selected for this run", option_label=meta["label"])
|
||||
return
|
||||
meta = MIGRATION_OPTION_METADATA[option_id]
|
||||
self.record(option_id, None, None, "skipped", "Not selected for this run", option_label=meta["label"])
|
||||
# If a previous config.yaml write hit a conflict/error during apply,
|
||||
# skip remaining config-mutating options rather than risk a partial
|
||||
# write. Dry-run mode never blocks — the user needs the full preview
|
||||
# to decide how to proceed (re-run with --overwrite, etc.).
|
||||
if (
|
||||
self.execute
|
||||
and self._config_apply_blocked
|
||||
and option_id in self._CONFIG_MUTATING_OPTIONS
|
||||
):
|
||||
meta = MIGRATION_OPTION_METADATA[option_id]
|
||||
self.record(
|
||||
option_id,
|
||||
None,
|
||||
None,
|
||||
STATUS_SKIPPED,
|
||||
REASON_BLOCKED_BY_APPLY_CONFLICT,
|
||||
option_label=meta["label"],
|
||||
)
|
||||
return
|
||||
func()
|
||||
|
||||
def build_report(self) -> Dict[str, Any]:
|
||||
summary: Dict[str, int] = {
|
||||
@ -840,6 +1021,8 @@ class Migrator:
|
||||
},
|
||||
"summary": summary,
|
||||
"items": [asdict(item) for item in self.items],
|
||||
"warnings": self._build_warnings(summary),
|
||||
"next_steps": self._build_next_steps(summary),
|
||||
}
|
||||
|
||||
if self.output_dir:
|
||||
@ -847,6 +1030,67 @@ class Migrator:
|
||||
|
||||
return report
|
||||
|
||||
def _build_warnings(self, summary: Dict[str, int]) -> List[str]:
|
||||
"""Structured warnings surfaced on the report for downstream consumers.
|
||||
|
||||
Modelled on OpenClaw's extensions/migrate-hermes/plan.ts warnings[].
|
||||
Keep the messages actionable — they show up in summary.md and the
|
||||
JSON report.
|
||||
"""
|
||||
warnings: List[str] = []
|
||||
if summary.get("conflict", 0) > 0:
|
||||
warnings.append(
|
||||
"Conflicts were found. Re-run with --overwrite to replace conflicting "
|
||||
"targets after item-level backups."
|
||||
)
|
||||
if summary.get("error", 0) > 0:
|
||||
warnings.append(
|
||||
"One or more items failed. Inspect the report and re-run after fixing "
|
||||
"the underlying cause."
|
||||
)
|
||||
if self._config_apply_blocked and self.execute:
|
||||
warnings.append(
|
||||
"A config.yaml write hit a conflict or error mid-apply; later config "
|
||||
"items were skipped to avoid a partial write."
|
||||
)
|
||||
# Detect whether secrets were detected but not migrated.
|
||||
provider_keys_skipped = any(
|
||||
item.kind == "provider-keys" and item.status == STATUS_SKIPPED
|
||||
for item in self.items
|
||||
)
|
||||
if provider_keys_skipped and not self.migrate_secrets:
|
||||
warnings.append(
|
||||
"API keys and other credentials were detected but not imported. "
|
||||
"Re-run with --migrate-secrets to copy supported keys into the "
|
||||
"Hermes env file."
|
||||
)
|
||||
return warnings
|
||||
|
||||
def _build_next_steps(self, summary: Dict[str, int]) -> List[str]:
|
||||
"""Human-readable next-step guidance baked into the report."""
|
||||
if not self.execute:
|
||||
return [
|
||||
"Re-run without --dry-run to apply the migration.",
|
||||
"Pass --overwrite to resolve conflicts, or --migrate-secrets to "
|
||||
"include API keys.",
|
||||
]
|
||||
steps: List[str] = []
|
||||
if summary.get("migrated", 0) > 0:
|
||||
steps.append(
|
||||
"Review the migration report at "
|
||||
f"{self.output_dir}/summary.md"
|
||||
if self.output_dir
|
||||
else "Review the migration report."
|
||||
)
|
||||
steps.append(
|
||||
"Start a new Hermes session (or /reset) to pick up the imported config."
|
||||
)
|
||||
if summary.get("conflict", 0) > 0:
|
||||
steps.append(
|
||||
"Re-run with --overwrite to apply items that were blocked by conflicts."
|
||||
)
|
||||
return steps
|
||||
|
||||
def maybe_backup(self, path: Path) -> Optional[Path]:
|
||||
if not self.execute or not self.backup_dir or not path.exists():
|
||||
return None
|
||||
@ -2731,6 +2975,13 @@ def parse_args() -> argparse.Namespace:
|
||||
f"Valid ids: {', '.join(sorted(MIGRATION_OPTION_METADATA))}",
|
||||
)
|
||||
parser.add_argument("--output-dir", help="Where to write report, backups, and archived docs")
|
||||
parser.add_argument(
|
||||
"--json",
|
||||
action="store_true",
|
||||
dest="json_output",
|
||||
help="Print the migration report as JSON on stdout (redacted). "
|
||||
"Combine with no --execute for a safe plan-only machine-readable preview.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
@ -2755,6 +3006,13 @@ def main() -> int:
|
||||
)
|
||||
report = migrator.migrate()
|
||||
|
||||
# ── Machine-readable JSON mode ────────────────────────────
|
||||
# When --json is set, print the redacted report to stdout and skip the
|
||||
# human-readable terminal recap. Useful for CI and scripted wrappers.
|
||||
if getattr(args, "json_output", False):
|
||||
print(json.dumps(redact_migration_value(report), indent=2, ensure_ascii=False))
|
||||
return 0
|
||||
|
||||
# ── Human-readable terminal recap ─────────────────────────
|
||||
s = report["summary"]
|
||||
items = report["items"]
|
||||
|
||||
@ -1462,3 +1462,103 @@ class TestRunPreUpdateBackup:
|
||||
_run_pre_update_backup(Namespace(no_backup=True, backup=False))
|
||||
out = capsys.readouterr().out
|
||||
assert "skipped (--no-backup)" in out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pre-migration backup (hermes claw migrate safety net)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPreMigrationBackup:
|
||||
"""Tests for create_pre_migration_backup — the auto-backup
|
||||
``hermes claw migrate`` runs before mutating ~/.hermes/."""
|
||||
|
||||
@pytest.fixture
|
||||
def hermes_home(self, tmp_path):
|
||||
root = tmp_path / ".hermes"
|
||||
root.mkdir()
|
||||
_make_hermes_tree(root)
|
||||
return root
|
||||
|
||||
def test_creates_backup_under_backups_dir(self, hermes_home):
|
||||
from hermes_cli.backup import create_pre_migration_backup
|
||||
out = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
assert out is not None
|
||||
assert out.exists()
|
||||
# Shares the backups/ directory with pre-update backups so `hermes
|
||||
# import` and the update-backup listing both pick them up.
|
||||
assert out.parent == hermes_home / "backups"
|
||||
assert out.name.startswith("pre-migration-")
|
||||
assert out.suffix == ".zip"
|
||||
|
||||
def test_backup_uses_shared_exclusion_rules(self, hermes_home):
|
||||
"""Pre-migration backup reuses the same exclusion rules as
|
||||
``hermes backup`` / ``create_pre_update_backup`` — no drift."""
|
||||
from hermes_cli.backup import create_pre_migration_backup
|
||||
out = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
assert out is not None
|
||||
with zipfile.ZipFile(out) as zf:
|
||||
names = set(zf.namelist())
|
||||
# User data present
|
||||
assert "config.yaml" in names
|
||||
assert ".env" in names
|
||||
assert "skills/my-skill/SKILL.md" in names
|
||||
# Same exclusions as the shared helper
|
||||
assert not any(n.startswith("hermes-agent/") for n in names)
|
||||
assert not any("__pycache__" in n for n in names)
|
||||
assert "gateway.pid" not in names
|
||||
|
||||
def test_restorable_with_hermes_import(self, hermes_home, tmp_path):
|
||||
"""The zip produced by pre-migration backup must be a valid Hermes
|
||||
backup — `hermes import` should accept it."""
|
||||
from hermes_cli.backup import create_pre_migration_backup, _validate_backup_zip
|
||||
out = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
assert out is not None
|
||||
with zipfile.ZipFile(out) as zf:
|
||||
valid, _reason = _validate_backup_zip(zf)
|
||||
assert valid, "pre-migration zip failed _validate_backup_zip"
|
||||
|
||||
def test_does_not_recurse_into_prior_backups(self, hermes_home):
|
||||
from hermes_cli.backup import create_pre_migration_backup
|
||||
out1 = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
assert out1 is not None
|
||||
out2 = create_pre_migration_backup(hermes_home=hermes_home)
|
||||
assert out2 is not None
|
||||
with zipfile.ZipFile(out2) as zf:
|
||||
names = zf.namelist()
|
||||
assert not any(n.startswith("backups/") for n in names)
|
||||
|
||||
def test_rotation_keeps_only_n(self, hermes_home):
|
||||
import time as _t
|
||||
from hermes_cli.backup import create_pre_migration_backup
|
||||
|
||||
created = []
|
||||
for _ in range(7):
|
||||
out = create_pre_migration_backup(hermes_home=hermes_home, keep=3)
|
||||
if out is not None:
|
||||
created.append(out)
|
||||
_t.sleep(1.05) # timestamp resolution
|
||||
|
||||
remaining = sorted((hermes_home / "backups").glob("pre-migration-*.zip"))
|
||||
assert len(remaining) <= 3, f"expected <=3 backups retained, got {len(remaining)}"
|
||||
|
||||
def test_missing_hermes_home_returns_none(self, tmp_path):
|
||||
"""Fresh install with no ~/.hermes yet — nothing to back up."""
|
||||
from hermes_cli.backup import create_pre_migration_backup
|
||||
missing = tmp_path / "does-not-exist"
|
||||
out = create_pre_migration_backup(hermes_home=missing)
|
||||
assert out is None
|
||||
|
||||
def test_does_not_touch_pre_update_backups(self, hermes_home):
|
||||
"""Pre-migration rotation must only prune pre-migration-*.zip files,
|
||||
leaving pre-update-*.zip backups untouched."""
|
||||
from hermes_cli.backup import create_pre_update_backup, create_pre_migration_backup
|
||||
update_backup = create_pre_update_backup(hermes_home=hermes_home, keep=5)
|
||||
assert update_backup is not None and update_backup.exists()
|
||||
# Spin up a lot of migration backups with keep=1
|
||||
import time as _t
|
||||
for _ in range(3):
|
||||
out = create_pre_migration_backup(hermes_home=hermes_home, keep=1)
|
||||
assert out is not None
|
||||
_t.sleep(1.05)
|
||||
# Update backup must still be there
|
||||
assert update_backup.exists(), "pre-migration rotation wrongly pruned the pre-update backup"
|
||||
|
||||
@ -439,8 +439,14 @@ class TestCmdMigrate:
|
||||
captured = capsys.readouterr()
|
||||
assert "Could not load migration script" in captured.out
|
||||
|
||||
def test_full_preset_enables_secrets(self, tmp_path, capsys):
|
||||
"""The 'full' preset should set migrate_secrets=True automatically."""
|
||||
def test_full_preset_does_not_enable_secrets_silently(self, tmp_path, capsys):
|
||||
"""The 'full' preset must NOT auto-enable migrate_secrets.
|
||||
|
||||
Users have to opt in to secret import explicitly via --migrate-secrets,
|
||||
even under the 'full' preset. This mirrors OpenClaw's migrate-hermes
|
||||
posture (two-phase import) and prevents a 'full' run from silently
|
||||
copying API keys.
|
||||
"""
|
||||
openclaw_dir = tmp_path / ".openclaw"
|
||||
openclaw_dir.mkdir()
|
||||
|
||||
@ -459,6 +465,44 @@ class TestCmdMigrate:
|
||||
migrate_secrets=False, # Not explicitly set by user
|
||||
workspace_target=None,
|
||||
skill_conflict="skip", yes=False,
|
||||
no_backup=False,
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
|
||||
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
|
||||
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
|
||||
patch.object(claw_mod, "save_config"),
|
||||
patch.object(claw_mod, "load_config", return_value={}),
|
||||
):
|
||||
claw_mod._cmd_migrate(args)
|
||||
|
||||
# Migrator should have been called with migrate_secrets=False — the
|
||||
# 'full' preset on its own no longer opts the user into secret import.
|
||||
call_kwargs = fake_mod.Migrator.call_args[1]
|
||||
assert call_kwargs["migrate_secrets"] is False
|
||||
|
||||
def test_full_preset_with_explicit_migrate_secrets_passes_through(self, tmp_path, capsys):
|
||||
"""Explicit --migrate-secrets still works under --preset full."""
|
||||
openclaw_dir = tmp_path / ".openclaw"
|
||||
openclaw_dir.mkdir()
|
||||
|
||||
fake_mod = ModuleType("openclaw_to_hermes")
|
||||
fake_mod.resolve_selected_options = MagicMock(return_value=set())
|
||||
fake_migrator = MagicMock()
|
||||
fake_migrator.migrate.return_value = {
|
||||
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
|
||||
"items": [],
|
||||
}
|
||||
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
|
||||
|
||||
args = Namespace(
|
||||
source=str(openclaw_dir),
|
||||
dry_run=True, preset="full", overwrite=False,
|
||||
migrate_secrets=True, # Explicitly requested
|
||||
workspace_target=None,
|
||||
skill_conflict="skip", yes=False,
|
||||
no_backup=False,
|
||||
)
|
||||
|
||||
with (
|
||||
@ -470,7 +514,6 @@ class TestCmdMigrate:
|
||||
):
|
||||
claw_mod._cmd_migrate(args)
|
||||
|
||||
# Migrator should have been called with migrate_secrets=True
|
||||
call_kwargs = fake_mod.Migrator.call_args[1]
|
||||
assert call_kwargs["migrate_secrets"] is True
|
||||
|
||||
|
||||
391
tests/skills/test_openclaw_migration_hardening.py
Normal file
391
tests/skills/test_openclaw_migration_hardening.py
Normal file
@ -0,0 +1,391 @@
|
||||
"""Tests for the OpenClaw→Hermes migration hardening features.
|
||||
|
||||
Covers the changes in the "claw migrate hardening" PR:
|
||||
- secret redaction (engine-level, applied to report JSON)
|
||||
- warnings[] / next_steps[] on the report
|
||||
- blocked-by-earlier-conflict sequencing for config.yaml mutations
|
||||
- --json output mode on the migration script
|
||||
- enum-like constants and ItemResult.sensitive field
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
SCRIPT_PATH = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "optional-skills"
|
||||
/ "migration"
|
||||
/ "openclaw-migration"
|
||||
/ "scripts"
|
||||
/ "openclaw_to_hermes.py"
|
||||
)
|
||||
|
||||
|
||||
def _load():
|
||||
spec = importlib.util.spec_from_file_location("openclaw_to_hermes_hard", SCRIPT_PATH)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
sys.modules[spec.name] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# Redaction
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
def test_redact_replaces_secret_by_key_name():
|
||||
mod = _load()
|
||||
out = mod.redact_migration_value({"OPENROUTER_API_KEY": "sk-or-v1-abcdef12345678"})
|
||||
assert out["OPENROUTER_API_KEY"] == mod.REDACTED_MIGRATION_VALUE
|
||||
|
||||
|
||||
def test_redact_replaces_secret_by_value_pattern():
|
||||
mod = _load()
|
||||
# Even under a non-secret-looking key, the sk-... pattern should be replaced inline.
|
||||
out = mod.redact_migration_value({"note": "use sk-or-v1-9Xs7fF2JkLmNpQrT to authenticate"})
|
||||
assert "sk-or-" not in out["note"]
|
||||
assert mod.REDACTED_MIGRATION_VALUE in out["note"]
|
||||
|
||||
|
||||
def test_redact_handles_github_token_pattern():
|
||||
mod = _load()
|
||||
out = mod.redact_migration_value({"detail": "token: ghp_1234567890abcdef1234"})
|
||||
assert "ghp_" not in out["detail"]
|
||||
assert mod.REDACTED_MIGRATION_VALUE in out["detail"]
|
||||
|
||||
|
||||
def test_redact_handles_slack_token_pattern():
|
||||
mod = _load()
|
||||
out = mod.redact_migration_value("xoxb-1234567890-abcdef")
|
||||
assert out == mod.REDACTED_MIGRATION_VALUE
|
||||
|
||||
|
||||
def test_redact_handles_google_api_key_pattern():
|
||||
mod = _load()
|
||||
out = mod.redact_migration_value("AIzaSyA-abc123def456ghi")
|
||||
# Google key is a prefix — whole value is scrubbed
|
||||
assert "AIza" not in out
|
||||
|
||||
|
||||
def test_redact_handles_bearer_header():
|
||||
mod = _load()
|
||||
out = mod.redact_migration_value({"hint": "Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.abc"})
|
||||
# Key "hint" is not a secret marker — only the Bearer <token> substring
|
||||
# gets scrubbed inline by the value pattern.
|
||||
assert "Bearer eyJ" not in out["hint"]
|
||||
assert mod.REDACTED_MIGRATION_VALUE in out["hint"]
|
||||
|
||||
|
||||
def test_redact_is_recursive():
|
||||
mod = _load()
|
||||
nested = {
|
||||
"outer": {
|
||||
"items": [
|
||||
{"password": "hunter2"},
|
||||
{"details": {"apiKey": "my-key"}},
|
||||
],
|
||||
},
|
||||
}
|
||||
out = mod.redact_migration_value(nested)
|
||||
assert out["outer"]["items"][0]["password"] == mod.REDACTED_MIGRATION_VALUE
|
||||
assert out["outer"]["items"][1]["details"]["apiKey"] == mod.REDACTED_MIGRATION_VALUE
|
||||
|
||||
|
||||
def test_redact_preserves_non_secret_keys_and_values():
|
||||
mod = _load()
|
||||
input_data = {"name": "hermes", "count": 42, "tags": ["a", "b"]}
|
||||
out = mod.redact_migration_value(input_data)
|
||||
assert out == input_data
|
||||
|
||||
|
||||
def test_redact_normalizes_key_case_and_punctuation():
|
||||
mod = _load()
|
||||
# "Api Key", "api-key", "API_KEY" all normalize the same way.
|
||||
for key in ("Api Key", "api-key", "API_KEY", "apikey"):
|
||||
out = mod.redact_migration_value({key: "secret"})
|
||||
assert out[key] == mod.REDACTED_MIGRATION_VALUE, f"failed to redact: {key}"
|
||||
|
||||
|
||||
def test_redact_leaves_env_secretref_alone():
|
||||
"""SecretRef-like shapes ({source: env, id: ...}) are pointers, not secrets."""
|
||||
mod = _load()
|
||||
ref = {"source": "env", "id": "OPENAI_API_KEY"}
|
||||
out = mod.redact_migration_value({"apiKey": ref})
|
||||
# The key "apiKey" itself triggers redaction today — this test locks that in.
|
||||
# If we later want to exempt SecretRef values the way OpenClaw does, update
|
||||
# both this test and _redact_internal together.
|
||||
assert out["apiKey"] == mod.REDACTED_MIGRATION_VALUE
|
||||
|
||||
|
||||
def test_write_report_redacts_api_keys_on_disk(tmp_path):
|
||||
mod = _load()
|
||||
report = {
|
||||
"timestamp": "20260427T120000",
|
||||
"mode": "execute",
|
||||
"source_root": "/src",
|
||||
"target_root": "/tgt",
|
||||
"summary": {"migrated": 1, "conflict": 0, "error": 0, "skipped": 0, "archived": 0},
|
||||
"items": [
|
||||
{
|
||||
"kind": "provider-keys",
|
||||
"source": "openclaw.json",
|
||||
"destination": "/tgt/.env",
|
||||
"status": "migrated",
|
||||
"reason": "",
|
||||
"details": {"OPENROUTER_API_KEY": "sk-or-v1-1234567890abcdef"},
|
||||
},
|
||||
],
|
||||
}
|
||||
mod.write_report(tmp_path, report)
|
||||
persisted = json.loads((tmp_path / "report.json").read_text())
|
||||
# The raw secret must not appear anywhere in the persisted JSON.
|
||||
assert "sk-or-v1-1234567890abcdef" not in (tmp_path / "report.json").read_text()
|
||||
assert persisted["items"][0]["details"]["OPENROUTER_API_KEY"] == mod.REDACTED_MIGRATION_VALUE
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# Warnings and next-steps
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
def _make_minimal_migrator(mod, tmp_path, **overrides):
|
||||
source = tmp_path / "openclaw"
|
||||
source.mkdir()
|
||||
# Minimal valid OpenClaw layout so the Migrator constructor doesn't choke.
|
||||
(source / "openclaw.json").write_text("{}", encoding="utf-8")
|
||||
target = tmp_path / "hermes"
|
||||
target.mkdir()
|
||||
defaults = dict(
|
||||
source_root=source,
|
||||
target_root=target,
|
||||
execute=False,
|
||||
workspace_target=None,
|
||||
overwrite=False,
|
||||
migrate_secrets=False,
|
||||
output_dir=None,
|
||||
selected_options=set(),
|
||||
)
|
||||
defaults.update(overrides)
|
||||
return mod.Migrator(**defaults)
|
||||
|
||||
|
||||
def test_dry_run_report_includes_rerun_next_step(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path)
|
||||
report = migrator.migrate()
|
||||
steps = report["next_steps"]
|
||||
assert any("dry-run" in step.lower() or "re-run" in step.lower() for step in steps)
|
||||
|
||||
|
||||
def test_conflict_produces_overwrite_warning(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True)
|
||||
# Inject a conflict on a config.yaml target to exercise the warning pathway.
|
||||
migrator.record(
|
||||
"tts-config",
|
||||
source=None,
|
||||
destination=migrator.target_root / "config.yaml",
|
||||
status=mod.STATUS_CONFLICT,
|
||||
reason="TTS already configured",
|
||||
)
|
||||
report = migrator.build_report()
|
||||
assert any("--overwrite" in w for w in report["warnings"])
|
||||
# The conflict on config.yaml should have flipped the block flag too.
|
||||
assert migrator._config_apply_blocked is True
|
||||
|
||||
|
||||
def test_error_produces_inspect_warning(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True)
|
||||
migrator.record("mcp-servers", None, None, mod.STATUS_ERROR, "Bad YAML")
|
||||
report = migrator.build_report()
|
||||
assert any("failed" in w.lower() for w in report["warnings"])
|
||||
|
||||
|
||||
def test_provider_keys_skipped_warning_when_secrets_disabled(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True, migrate_secrets=False)
|
||||
migrator.record(
|
||||
"provider-keys",
|
||||
source=None,
|
||||
destination=None,
|
||||
status=mod.STATUS_SKIPPED,
|
||||
reason="--migrate-secrets not set",
|
||||
)
|
||||
report = migrator.build_report()
|
||||
assert any("--migrate-secrets" in w for w in report["warnings"])
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# Blocked-by-earlier-conflict sequencing
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
def test_config_apply_block_flips_on_config_yaml_conflict(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True)
|
||||
assert migrator._config_apply_blocked is False
|
||||
migrator.record(
|
||||
"model-config",
|
||||
source=None,
|
||||
destination=migrator.target_root / "config.yaml",
|
||||
status=mod.STATUS_CONFLICT,
|
||||
)
|
||||
assert migrator._config_apply_blocked is True
|
||||
|
||||
|
||||
def test_config_apply_block_flips_on_config_yaml_error(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True)
|
||||
migrator.record(
|
||||
"tts-config",
|
||||
source=None,
|
||||
destination=migrator.target_root / "config.yaml",
|
||||
status=mod.STATUS_ERROR,
|
||||
reason="YAML write failed",
|
||||
)
|
||||
assert migrator._config_apply_blocked is True
|
||||
|
||||
|
||||
def test_config_apply_block_does_not_flip_on_non_config_conflict(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path, execute=True)
|
||||
migrator.record(
|
||||
"skill",
|
||||
source=None,
|
||||
destination=migrator.target_root / "skills" / "foo" / "SKILL.md",
|
||||
status=mod.STATUS_CONFLICT,
|
||||
)
|
||||
assert migrator._config_apply_blocked is False
|
||||
|
||||
|
||||
def test_run_if_selected_skips_config_ops_after_block(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(
|
||||
mod, tmp_path, execute=True, selected_options={"model-config", "tts-config"}
|
||||
)
|
||||
migrator._config_apply_blocked = True
|
||||
called = []
|
||||
migrator.run_if_selected("tts-config", lambda: called.append(True))
|
||||
assert called == []
|
||||
# The skipped record uses the blocked reason.
|
||||
blocked = [i for i in migrator.items if i.kind == "tts-config"]
|
||||
assert len(blocked) == 1
|
||||
assert blocked[0].status == mod.STATUS_SKIPPED
|
||||
assert blocked[0].reason == mod.REASON_BLOCKED_BY_APPLY_CONFLICT
|
||||
|
||||
|
||||
def test_run_if_selected_runs_non_config_ops_even_after_block(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(
|
||||
mod, tmp_path, execute=True, selected_options={"soul"}
|
||||
)
|
||||
migrator._config_apply_blocked = True
|
||||
called = []
|
||||
migrator.run_if_selected("soul", lambda: called.append(True))
|
||||
assert called == [True]
|
||||
|
||||
|
||||
def test_dry_run_never_blocks_even_after_conflict(tmp_path):
|
||||
"""Dry runs must preview the full plan — blocking mid-preview would hide
|
||||
conflicts and mislead the user about what would actually happen."""
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(
|
||||
mod, tmp_path, execute=False, selected_options={"tts-config"}
|
||||
)
|
||||
migrator._config_apply_blocked = True
|
||||
called = []
|
||||
migrator.run_if_selected("tts-config", lambda: called.append(True))
|
||||
assert called == [True]
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# --json output mode
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
def test_json_mode_emits_structured_report(tmp_path):
|
||||
"""End-to-end: run the CLI with --json and no --execute, parse stdout."""
|
||||
source = tmp_path / "openclaw"
|
||||
source.mkdir()
|
||||
(source / "openclaw.json").write_text(
|
||||
json.dumps({"agents": {"defaults": {"model": "openrouter/anthropic/claude-sonnet-4"}}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
target = tmp_path / "hermes"
|
||||
target.mkdir()
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(SCRIPT_PATH),
|
||||
"--source", str(source),
|
||||
"--target", str(target),
|
||||
"--json",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
assert result.returncode == 0, result.stderr
|
||||
payload = json.loads(result.stdout)
|
||||
assert "summary" in payload
|
||||
assert "warnings" in payload
|
||||
assert "next_steps" in payload
|
||||
assert payload["mode"] == "dry-run"
|
||||
|
||||
|
||||
def test_json_mode_redacts_secrets_in_output(tmp_path):
|
||||
"""Even plan-only JSON output goes through the redactor — the stdout
|
||||
capture path is what gets piped into CI / support tickets."""
|
||||
source = tmp_path / "openclaw"
|
||||
source.mkdir()
|
||||
(source / "openclaw.json").write_text("{}", encoding="utf-8")
|
||||
# Plant a fake OpenClaw .env with a recognizably-shaped key.
|
||||
(source / ".env").write_text(
|
||||
"OPENROUTER_API_KEY=sk-or-v1-abcdef1234567890abcdef\n", encoding="utf-8"
|
||||
)
|
||||
target = tmp_path / "hermes"
|
||||
target.mkdir()
|
||||
|
||||
result = subprocess.run(
|
||||
[
|
||||
sys.executable,
|
||||
str(SCRIPT_PATH),
|
||||
"--source", str(source),
|
||||
"--target", str(target),
|
||||
"--migrate-secrets", # so provider-keys surface in the plan
|
||||
"--json",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
assert result.returncode == 0, result.stderr
|
||||
# The raw key value must never appear in the JSON output.
|
||||
assert "sk-or-v1-abcdef1234567890abcdef" not in result.stdout
|
||||
|
||||
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
# ItemResult schema additions
|
||||
# ───────────────────────────────────────────────────────────────────────
|
||||
def test_item_result_has_sensitive_field():
|
||||
mod = _load()
|
||||
item = mod.ItemResult(kind="x", source=None, destination=None, status="migrated")
|
||||
assert item.sensitive is False
|
||||
|
||||
|
||||
def test_record_honors_sensitive_flag(tmp_path):
|
||||
mod = _load()
|
||||
migrator = _make_minimal_migrator(mod, tmp_path)
|
||||
migrator.record("x", None, None, "migrated", sensitive=True)
|
||||
assert migrator.items[0].sensitive is True
|
||||
|
||||
|
||||
def test_status_constants_match_historical_strings():
|
||||
"""Downstream consumers (claw.py, tests, docs) depend on these string values."""
|
||||
mod = _load()
|
||||
assert mod.STATUS_MIGRATED == "migrated"
|
||||
assert mod.STATUS_SKIPPED == "skipped"
|
||||
assert mod.STATUS_CONFLICT == "conflict"
|
||||
assert mod.STATUS_ERROR == "error"
|
||||
assert mod.STATUS_ARCHIVED == "archived"
|
||||
@ -18,7 +18,7 @@ hermes claw migrate
|
||||
hermes claw migrate --dry-run
|
||||
|
||||
# Full migration including API keys, skip confirmation
|
||||
hermes claw migrate --preset full --yes
|
||||
hermes claw migrate --preset full --migrate-secrets --yes
|
||||
```
|
||||
|
||||
The migration always shows a full preview of what will be imported before making any changes. Review the list, then confirm to proceed.
|
||||
@ -30,9 +30,10 @@ Reads from `~/.openclaw/` by default. Legacy `~/.clawdbot/` or `~/.moltbot/` dir
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
| `--dry-run` | Preview only — stop after showing what would be migrated. |
|
||||
| `--preset <name>` | `full` (default, includes secrets) or `user-data` (excludes API keys). |
|
||||
| `--overwrite` | Overwrite existing Hermes files on conflicts (default: skip). |
|
||||
| `--migrate-secrets` | Include API keys (on by default with `--preset full`). |
|
||||
| `--preset <name>` | `full` (all compatible settings) or `user-data` (excludes infrastructure config). Neither preset imports secrets by default — pass `--migrate-secrets` explicitly. |
|
||||
| `--overwrite` | Overwrite existing Hermes files on conflicts (default: refuse to apply when the plan has conflicts). |
|
||||
| `--migrate-secrets` | Include API keys. Required even under `--preset full` — no preset imports secrets silently. |
|
||||
| `--no-backup` | Skip the pre-migration zip snapshot of `~/.hermes/` (by default a single restore-point archive is written before apply, under `~/.hermes/backups/pre-migration-*.zip`; restorable with `hermes import`). |
|
||||
| `--source <path>` | Custom OpenClaw directory. |
|
||||
| `--workspace-target <path>` | Where to place `AGENTS.md`. |
|
||||
| `--skill-conflict <mode>` | `skip` (default), `overwrite`, or `rename`. |
|
||||
|
||||
@ -798,9 +798,10 @@ Migrate your OpenClaw setup to Hermes. Reads from `~/.openclaw` (or a custom pat
|
||||
| Option | Description |
|
||||
|--------|-------------|
|
||||
| `--dry-run` | Preview what would be migrated without writing anything. |
|
||||
| `--preset <name>` | Migration preset: `full` (default, includes secrets) or `user-data` (excludes API keys). |
|
||||
| `--overwrite` | Overwrite existing Hermes files on conflicts (default: skip). |
|
||||
| `--migrate-secrets` | Include API keys in migration (enabled by default with `--preset full`). |
|
||||
| `--preset <name>` | Migration preset: `full` (all compatible settings) or `user-data` (excludes infrastructure config). Neither preset imports secrets — pass `--migrate-secrets` explicitly. |
|
||||
| `--overwrite` | Overwrite existing Hermes files on conflicts (default: refuse to apply when the plan has conflicts). |
|
||||
| `--migrate-secrets` | Include API keys in migration. Required even under `--preset full`. |
|
||||
| `--no-backup` | Skip the pre-migration zip snapshot of `~/.hermes/` (by default a single restore-point archive is written to `~/.hermes/backups/pre-migration-*.zip` before apply; restorable with `hermes import`). |
|
||||
| `--source <path>` | Custom OpenClaw directory (default: `~/.openclaw`). |
|
||||
| `--workspace-target <path>` | Target directory for workspace instructions (AGENTS.md). |
|
||||
| `--skill-conflict <mode>` | Handle skill name collisions: `skip` (default), `overwrite`, or `rename`. |
|
||||
@ -824,9 +825,12 @@ For the complete config key mapping, SecretRef handling details, and post-migrat
|
||||
# Preview what would be migrated
|
||||
hermes claw migrate --dry-run
|
||||
|
||||
# Full migration including API keys
|
||||
# Full migration (all compatible settings, no secrets)
|
||||
hermes claw migrate --preset full
|
||||
|
||||
# Full migration including API keys
|
||||
hermes claw migrate --preset full --migrate-secrets
|
||||
|
||||
# Migrate user data only (no secrets), overwrite conflicts
|
||||
hermes claw migrate --preset user-data --overwrite
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user