molecule-core/workspace/lib/snapshot_scrub.py
Hongming Wang e7b9b7df71 feat(workspace): snapshot secret scrubber (closes #823)
Sub-issue of #799, security condition C4. Standalone module in
workspace/lib/snapshot_scrub.py with three public functions:

- scrub_content(str) → str: regex-based redaction of secret patterns
- is_sandbox_content(str) → bool: detect run_code tool output markers
- scrub_snapshot(dict) → dict: walk memories, scrub each, drop sandbox entries

Patterns covered: sk-ant-/sk-proj-, ghp_/ghs_/github_pat_, AKIA,
cfut_, mol_pk_, ctx7_, Bearer, env-var assignments, base64 blobs ≥33 chars.

21 unit tests, 100% coverage on new code.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-19 00:32:42 -07:00

126 lines
4.8 KiB
Python

"""Snapshot scrubbing — strip secrets and internal details from hibernation snapshots.
Issue #823 (sub of #799). Before the workspace runtime serializes a memory
snapshot for hibernation, every memory entry's content must pass through
this scrubber so an attacker who obtains a snapshot blob cannot recover:
- API keys (sk-ant-, sk-proj-, ghp_, etc.)
- Auth tokens (Bearer headers, OAuth tokens)
- Env-var assignments (ANTHROPIC_API_KEY=..., OPENAI_API_KEY=...)
- Arbitrary subprocess output from the sandbox tool (can be anything)
The scrubber is a pure function so it can be unit-tested independently.
"""
from __future__ import annotations
import re
from typing import Any
# Compiled once at import time — most-specific patterns first so that
# env-var assignments are caught before the generic sk-* or base64 sweeps
# swallow only part of the match.
_SECRET_PATTERNS: list[tuple[re.Pattern[str], str]] = [
# Env-var assignments: ANTHROPIC_API_KEY=sk-ant-... GITHUB_TOKEN=ghp_...
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_API_KEY\s*=\s*\S+"), "API_KEY"),
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_TOKEN\s*=\s*\S+"), "TOKEN"),
(re.compile(r"(?i)\b[A-Z][A-Z0-9_]*_SECRET\s*=\s*\S+"), "SECRET"),
# HTTP Bearer header values.
(re.compile(r"Bearer\s+\S+"), "BEARER_TOKEN"),
# OpenAI / Anthropic sk-... / sk-ant-... / sk-proj-... key format.
(re.compile(r"sk-[A-Za-z0-9\-_]{16,}"), "SK_TOKEN"),
# GitHub personal access tokens and installation tokens.
(re.compile(r"ghp_[A-Za-z0-9]{20,}"), "GITHUB_PAT"),
(re.compile(r"ghs_[A-Za-z0-9]{20,}"), "GITHUB_SERVER_TOKEN"),
(re.compile(r"github_pat_[A-Za-z0-9_]{60,}"), "GITHUB_PAT_V2"),
# AWS access key IDs.
(re.compile(r"\bAKIA[A-Z0-9]{16}\b"), "AWS_ACCESS_KEY"),
# Cloudflare API tokens.
(re.compile(r"\bcfut_[A-Za-z0-9]{32,}"), "CF_TOKEN"),
# Molecule partner API keys (Phase 34).
(re.compile(r"\bmol_pk_[A-Za-z0-9]{20,}"), "MOL_PK"),
# context7 tokens.
(re.compile(r"\bctx7_[A-Za-z0-9]+"), "CTX7_TOKEN"),
# High-entropy base64 blobs 33+ chars. Catches long opaque tokens that
# don't match any structured pattern above.
(re.compile(r"[A-Za-z0-9+/]{33,}={0,2}"), "BASE64_BLOB"),
]
# Substring markers that identify content from the run_code sandbox tool.
# Any memory entry tagged with this source is excluded wholesale from the
# snapshot — the arbitrary subprocess output cannot be safely scrubbed by
# pattern alone (attacker could print `echo "innocent"` but have hidden
# secrets in stderr or file handles).
_SANDBOX_TOOL_MARKERS = (
"source=sandbox",
"tool=run_code",
"[sandbox_output]",
)
def scrub_content(content: str) -> str:
"""Return `content` with secret patterns replaced by [REDACTED:LABEL] markers.
Idempotent — running scrub_content on already-scrubbed output is a no-op
because [REDACTED:...] doesn't match any of the patterns above.
"""
if not content:
return content
out = content
for pattern, label in _SECRET_PATTERNS:
out = pattern.sub(f"[REDACTED:{label}]", out)
return out
def is_sandbox_content(content: str) -> bool:
"""Return True if `content` originates from the run_code sandbox tool.
Sandbox output can contain arbitrary subprocess stdout/stderr that may
include secrets the scrubber wouldn't recognize (e.g. printed via a
custom format). Entries matching this check should be excluded from
the snapshot entirely rather than scrubbed.
"""
if not content:
return False
lower = content.lower()
return any(marker in lower for marker in _SANDBOX_TOOL_MARKERS)
def scrub_memory_entry(entry: dict[str, Any]) -> dict[str, Any] | None:
"""Scrub a single memory entry for snapshot inclusion.
Returns a new dict with secrets redacted, or None if the entry must be
excluded entirely (sandbox-sourced content).
The input dict is treated as read-only — callers should use the returned
value and not mutate the original.
"""
content = entry.get("content", "")
if is_sandbox_content(content):
return None
scrubbed = dict(entry)
scrubbed["content"] = scrub_content(content)
return scrubbed
def scrub_snapshot(snapshot: dict[str, Any]) -> dict[str, Any]:
"""Scrub a full snapshot payload before serialization.
Walks the `memories` list, scrubs each entry's content, and drops
sandbox-sourced entries. Other snapshot fields (workspace metadata,
config, etc.) pass through unchanged — they are not expected to contain
user-supplied secret-bearing content.
Returns a new dict; the input is not mutated.
"""
out = dict(snapshot)
memories = snapshot.get("memories") or []
scrubbed_list = []
for entry in memories:
cleaned = scrub_memory_entry(entry)
if cleaned is not None:
scrubbed_list.append(cleaned)
out["memories"] = scrubbed_list
return out