forked from molecule-ai/molecule-core
Sub-issue of #799, security condition C4. Standalone module in workspace/lib/snapshot_scrub.py with three public functions: - scrub_content(str) → str: regex-based redaction of secret patterns - is_sandbox_content(str) → bool: detect run_code tool output markers - scrub_snapshot(dict) → dict: walk memories, scrub each, drop sandbox entries Patterns covered: sk-ant-/sk-proj-, ghp_/ghs_/github_pat_, AKIA, cfut_, mol_pk_, ctx7_, Bearer, env-var assignments, base64 blobs ≥33 chars. 21 unit tests, 100% coverage on new code. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
126 lines
4.8 KiB
Python
126 lines
4.8 KiB
Python
"""Snapshot scrubbing — strip secrets and internal details from hibernation snapshots.
|
|
|
|
Issue #823 (sub of #799). Before the workspace runtime serializes a memory
|
|
snapshot for hibernation, every memory entry's content must pass through
|
|
this scrubber so an attacker who obtains a snapshot blob cannot recover:
|
|
|
|
- API keys (sk-ant-, sk-proj-, ghp_, etc.)
|
|
- Auth tokens (Bearer headers, OAuth tokens)
|
|
- Env-var assignments (ANTHROPIC_API_KEY=..., OPENAI_API_KEY=...)
|
|
- Arbitrary subprocess output from the sandbox tool (can be anything)
|
|
|
|
The scrubber is a pure function so it can be unit-tested independently.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
|
|
# Compiled once at import time — most-specific patterns first so that
|
|
# env-var assignments are caught before the generic sk-* or base64 sweeps
|
|
# swallow only part of the match.
|
|
_SECRET_PATTERNS: list[tuple[re.Pattern[str], str]] = [
|
|
# Env-var assignments: ANTHROPIC_API_KEY=sk-ant-... GITHUB_TOKEN=ghp_...
|
|
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_API_KEY\s*=\s*\S+"), "API_KEY"),
|
|
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_TOKEN\s*=\s*\S+"), "TOKEN"),
|
|
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_SECRET\s*=\s*\S+"), "SECRET"),
|
|
# HTTP Bearer header values.
|
|
(re.compile(r"Bearer\s+\S+"), "BEARER_TOKEN"),
|
|
# OpenAI / Anthropic sk-... / sk-ant-... / sk-proj-... key format.
|
|
(re.compile(r"sk-[A-Za-z0-9\-_]{16,}"), "SK_TOKEN"),
|
|
# GitHub personal access tokens and installation tokens.
|
|
(re.compile(r"ghp_[A-Za-z0-9]{20,}"), "GITHUB_PAT"),
|
|
(re.compile(r"ghs_[A-Za-z0-9]{20,}"), "GITHUB_SERVER_TOKEN"),
|
|
(re.compile(r"github_pat_[A-Za-z0-9_]{60,}"), "GITHUB_PAT_V2"),
|
|
# AWS access key IDs.
|
|
(re.compile(r"\bAKIA[A-Z0-9]{16}\b"), "AWS_ACCESS_KEY"),
|
|
# Cloudflare API tokens.
|
|
(re.compile(r"\bcfut_[A-Za-z0-9]{32,}"), "CF_TOKEN"),
|
|
# Molecule partner API keys (Phase 34).
|
|
(re.compile(r"\bmol_pk_[A-Za-z0-9]{20,}"), "MOL_PK"),
|
|
# context7 tokens.
|
|
(re.compile(r"\bctx7_[A-Za-z0-9]+"), "CTX7_TOKEN"),
|
|
# High-entropy base64 blobs 33+ chars. Catches long opaque tokens that
|
|
# don't match any structured pattern above.
|
|
(re.compile(r"[A-Za-z0-9+/]{33,}={0,2}"), "BASE64_BLOB"),
|
|
]
|
|
|
|
|
|
# Substring markers that identify content from the run_code sandbox tool.
|
|
# Any memory entry tagged with this source is excluded wholesale from the
|
|
# snapshot — the arbitrary subprocess output cannot be safely scrubbed by
|
|
# pattern alone (attacker could print `echo "innocent"` but have hidden
|
|
# secrets in stderr or file handles).
|
|
_SANDBOX_TOOL_MARKERS = (
|
|
"source=sandbox",
|
|
"tool=run_code",
|
|
"[sandbox_output]",
|
|
)
|
|
|
|
|
|
def scrub_content(content: str) -> str:
|
|
"""Return `content` with secret patterns replaced by [REDACTED:LABEL] markers.
|
|
|
|
Idempotent — running scrub_content on already-scrubbed output is a no-op
|
|
because [REDACTED:...] doesn't match any of the patterns above.
|
|
"""
|
|
if not content:
|
|
return content
|
|
out = content
|
|
for pattern, label in _SECRET_PATTERNS:
|
|
out = pattern.sub(f"[REDACTED:{label}]", out)
|
|
return out
|
|
|
|
|
|
def is_sandbox_content(content: str) -> bool:
|
|
"""Return True if `content` originates from the run_code sandbox tool.
|
|
|
|
Sandbox output can contain arbitrary subprocess stdout/stderr that may
|
|
include secrets the scrubber wouldn't recognize (e.g. printed via a
|
|
custom format). Entries matching this check should be excluded from
|
|
the snapshot entirely rather than scrubbed.
|
|
"""
|
|
if not content:
|
|
return False
|
|
lower = content.lower()
|
|
return any(marker in lower for marker in _SANDBOX_TOOL_MARKERS)
|
|
|
|
|
|
def scrub_memory_entry(entry: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""Scrub a single memory entry for snapshot inclusion.
|
|
|
|
Returns a new dict with secrets redacted, or None if the entry must be
|
|
excluded entirely (sandbox-sourced content).
|
|
|
|
The input dict is treated as read-only — callers should use the returned
|
|
value and not mutate the original.
|
|
"""
|
|
content = entry.get("content", "")
|
|
if is_sandbox_content(content):
|
|
return None
|
|
scrubbed = dict(entry)
|
|
scrubbed["content"] = scrub_content(content)
|
|
return scrubbed
|
|
|
|
|
|
def scrub_snapshot(snapshot: dict[str, Any]) -> dict[str, Any]:
|
|
"""Scrub a full snapshot payload before serialization.
|
|
|
|
Walks the `memories` list, scrubs each entry's content, and drops
|
|
sandbox-sourced entries. Other snapshot fields (workspace metadata,
|
|
config, etc.) pass through unchanged — they are not expected to contain
|
|
user-supplied secret-bearing content.
|
|
|
|
Returns a new dict; the input is not mutated.
|
|
"""
|
|
out = dict(snapshot)
|
|
memories = snapshot.get("memories") or []
|
|
scrubbed_list = []
|
|
for entry in memories:
|
|
cleaned = scrub_memory_entry(entry)
|
|
if cleaned is not None:
|
|
scrubbed_list.append(cleaned)
|
|
out["memories"] = scrubbed_list
|
|
return out
|