molecule-sdk-python/tests/test_safe_extract.py
molecule-ai[bot] 4e289e3004
tests: add GAP-01 tar security + GAP-02 SHA256 verification suites (#8)
* tests: add GAP-01 tar security and GAP-02 SHA256 verification test suites

GAP-01 (test_safe_extract.py):
- CWE-22 traversal via ../ in tar header names (3 cases)
- Absolute path rejection in tar entries (2 cases)
- Symlink hardlink skip (2 cases each)
- Hardlink skip
- Deep traversal rejection
- Deep valid path extraction
- Empty tar noop
- Normal operation smoke test
- zipfile placeholder (documents no zip hardening yet)

GAP-02 (test_sha256_verification.py):
- _is_hex validation (4 cases)
- _sha256_file empty/small/large/binary/not-found (5 cases)
- _walk_files excludes dirs/deterministic/set equality (3 cases)
- verify_plugin_sha256 empty plugin/excludes plugin.yaml/invalid format (3 cases)
- compute_plugin_sha256 stable/deterministic order/content changes exclusion (4 cases)
- CLI verify-sha256 exit zero/nonzero/file-not-dir/error message (4 cases)
- Round-trip compute→verify (1 case)
- Mismatch returns False (1 case)

Total: 37 new test cases, all passing.
180 passed / 1 skipped across full suite (excluding broken conftest import in test_call_peer_errors.py).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add KI-007 (_is_hex TypeError gap) and KI-008 (test_call_peer_errors conftest)

KI-007: _is_hex raises TypeError on non-strings instead of returning False;
guard with isinstance(value, str) check.

KI-008: test_call_peer_errors.py imports tests.conftest which doesn't exist;
fix import or create conftest.py stub.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Molecule AI SDK Lead <sdk-lead@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 06:17:42 +00:00

320 lines
11 KiB
Python

"""Security tests for _safe_extract_tar and related tar-extraction helpers.
Covers GAP-01 from TEST_GAP_ANALYSIS.md — CWE-22 / CVE-2007-4559 "tar slip"
family: directory traversal, absolute paths, zip bombs, symlink escapes.
These are unit tests with no external dependencies.
"""
from __future__ import annotations
import io
import tarfile
import zipfile
from pathlib import Path
import pytest
import sys
from pathlib import Path as _Path
_SDK_ROOT = _Path(__file__).resolve().parents[1]
if str(_SDK_ROOT) not in sys.path:
sys.path.insert(0, str(_SDK_ROOT))
from molecule_agent.client import _safe_extract_tar
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_tar(entries: list[tuple[str, str | bytes, bool]]) -> io.BytesIO:
"""Build an in-memory tar archive.
Args:
entries: list of (filename, content, is_dir) tuples.
"""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
for name, content, is_dir in entries:
if is_dir:
tinfo = tarfile.TarInfo(name=name)
tinfo.type = tarfile.DIRTYPE
tinfo.mode = 0o755
tinfo.size = 0
tf.addfile(tinfo)
else:
data = content.encode() if isinstance(content, str) else content
tinfo = tarfile.TarInfo(name=name)
tinfo.size = len(data)
tf.addfile(tinfo, io.BytesIO(data))
buf.seek(0)
return buf
def _make_tar_with_symlink(name: str, link_target: str) -> io.BytesIO:
"""Build an in-memory tar with one symlink entry and optional normal file."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
info = tarfile.TarInfo(name=name)
info.type = tarfile.SYMTYPE
info.linkname = link_target
tf.addfile(info, io.BytesIO(b""))
buf.seek(0)
return buf
# ---------------------------------------------------------------------------
# Test: directory traversal via ../ in filename
# ---------------------------------------------------------------------------
def test_traversal_dotdot_in_name(tmp_path: Path):
"""CWE-22: ../ in a tar entry must be rejected, not silently stripped."""
dest = tmp_path / "dest"
dest.mkdir()
# Normal file must extract correctly.
buf = _make_tar([("sub/normal.txt", "hello", False)])
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest)
assert (dest / "sub" / "normal.txt").read_text() == "hello"
# Now try traversal — _safe_extract_tar must raise.
buf2 = _make_tar([("../escape.txt", "pwned", False)])
with tarfile.open(fileobj=buf2) as tf:
with pytest.raises(ValueError, match="escaping dest"):
_safe_extract_tar(tf, dest)
assert not (dest.parent / "escape.txt").exists()
def test_traversal_dotdot_in_deep_path(tmp_path: Path):
"""A ../ in the middle of a long path must also be rejected."""
dest = tmp_path / "dest"
dest.mkdir()
buf = _make_tar([("../a/../../../etc/passwd", "root:x:0:0", False)])
with tarfile.open(fileobj=buf) as tf:
with pytest.raises(ValueError, match="escaping dest"):
_safe_extract_tar(tf, dest)
# ---------------------------------------------------------------------------
# Test: absolute paths in tar entries
# ---------------------------------------------------------------------------
def test_absolute_path_rejected(tmp_path: Path):
"""An entry with an absolute path must be rejected."""
dest = tmp_path / "dest"
dest.mkdir()
buf = _make_tar([("/etc/passwd", "root:x:0:0", False)])
with tarfile.open(fileobj=buf) as tf:
with pytest.raises(ValueError, match="escaping dest"):
_safe_extract_tar(tf, dest)
def test_absolute_path_in_subdirectory(tmp_path: Path):
"""Absolute path buried under a normal directory component must be rejected."""
dest = tmp_path / "dest"
dest.mkdir()
buf = _make_tar([("subdir/../../../usr/local/bin/malware.sh", "#!/bin/sh", False)])
with tarfile.open(fileobj=buf) as tf:
with pytest.raises(ValueError, match="escaping dest"):
_safe_extract_tar(tf, dest)
# ---------------------------------------------------------------------------
# Test: symlink escape (symlink → outside dest)
# ---------------------------------------------------------------------------
def test_symlink_to_parent_skipped(tmp_path: Path):
"""A symlink pointing outside the extraction root must not be written.
_safe_extract_tar skips symlinks silently (matches platform tar producer).
"""
dest = tmp_path / "dest"
dest.mkdir()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
normal_info = tarfile.TarInfo(name="sub/normal.txt")
normal_info.size = 5
tf.addfile(normal_info, io.BytesIO(b"hello"))
link_info = tarfile.TarInfo(name="sub/link_to_escape")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "../escape.txt"
tf.addfile(link_info, io.BytesIO(b""))
buf.seek(0)
with tarfile.open(fileobj=buf) as tf:
# Must not raise — symlinks are silently skipped.
_safe_extract_tar(tf, dest)
assert (dest / "sub" / "normal.txt").read_text() == "hello"
assert not (dest / "sub" / "link_to_escape").exists()
def test_symlink_to_absolute_path_skipped(tmp_path: Path):
"""A symlink using an absolute path must not be written."""
dest = tmp_path / "dest"
dest.mkdir()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
normal_info = tarfile.TarInfo(name="sub/normal.txt")
normal_info.size = 5
tf.addfile(normal_info, io.BytesIO(b"hello"))
link_info = tarfile.TarInfo(name="sub/abs_link")
link_info.type = tarfile.SYMTYPE
link_info.linkname = "/etc/passwd"
tf.addfile(link_info, io.BytesIO(b""))
buf.seek(0)
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest)
assert (dest / "sub" / "normal.txt").read_text() == "hello"
assert not (dest / "sub" / "abs_link").exists()
# ---------------------------------------------------------------------------
# Test: hardlink escape
# ---------------------------------------------------------------------------
def test_hardlink_skipped(tmp_path: Path):
"""Hardlinks must be skipped silently (not followed, not created)."""
dest = tmp_path / "dest"
dest.mkdir()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
normal_info = tarfile.TarInfo(name="sub/normal.txt")
normal_info.size = 5
tf.addfile(normal_info, io.BytesIO(b"hello"))
link_info = tarfile.TarInfo(name="sub/hardlink")
link_info.type = tarfile.LNKTYPE
link_info.linkname = "sub/normal.txt"
tf.addfile(link_info, io.BytesIO(b""))
buf.seek(0)
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest)
assert (dest / "sub" / "normal.txt").read_text() == "hello"
assert not (dest / "sub" / "hardlink").exists()
# ---------------------------------------------------------------------------
# Test: deeply nested traversal
# ---------------------------------------------------------------------------
def test_deeply_nested_traversal_rejected(tmp_path: Path):
"""Many levels of ../ must all be rejected."""
dest = tmp_path / "dest"
dest.mkdir()
deep_path = "/".join([".."] * 20) + "/etc/passwd"
buf = _make_tar([(deep_path, "root:x:0:0", False)])
with tarfile.open(fileobj=buf) as tf:
with pytest.raises(ValueError, match="escaping dest"):
_safe_extract_tar(tf, dest)
# ---------------------------------------------------------------------------
# Test: deeply nested valid paths
# ---------------------------------------------------------------------------
def test_deeply_nested_valid_path_extracted(tmp_path: Path):
"""Deeply nested directories with no traversal must be extracted correctly."""
dest = tmp_path / "dest"
dest.mkdir()
deep_name = "/".join(["a"] * 20) + "/file.txt"
buf = _make_tar([(deep_name, "content", False)])
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest)
assert (dest / "a" / "a" / "a" / "a" / "a" /
"a" / "a" / "a" / "a" / "a" /
"a" / "a" / "a" / "a" / "a" /
"a" / "a" / "a" / "a" / "a" /
"file.txt").read_text() == "content"
# ---------------------------------------------------------------------------
# Test: zipfile extraction (separate code path)
# ---------------------------------------------------------------------------
def test_zipfile_with_dotdot_entries(tmp_path: Path):
"""ZIP archives with ../ in filenames must be handled safely.
The SDK currently uses _safe_extract_tar for tar archives only.
This test documents that zip handling needs equivalent protection
if .zip plugin support is added. The test is a placeholder that
checks zipfile.ZipFile accepts such entries.
"""
dest = tmp_path / "dest"
dest.mkdir()
buf = io.BytesIO()
with zipfile.ZipFile(buf, mode="w") as zf:
zf.writestr("sub/normal.txt", "hello")
zf.writestr("../escape.txt", "pwned")
buf.seek(0)
with zipfile.ZipFile(buf) as zf:
names = zf.namelist()
assert "../escape.txt" in names
assert "sub/normal.txt" in names
# SDK does not currently extract zip archives for plugin install.
# This assertion will need updating when zip safety is implemented.
# ---------------------------------------------------------------------------
# Test: empty tar archive
# ---------------------------------------------------------------------------
def test_empty_tar_noops(tmp_path: Path):
"""An empty tar archive must not raise."""
dest = tmp_path / "dest"
dest.mkdir()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tf:
pass # empty archive
buf.seek(0)
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest) # must not raise
# ---------------------------------------------------------------------------
# Test: normal operation
# ---------------------------------------------------------------------------
def test_normal_files_extracted_correctly(tmp_path: Path):
"""Normal, well-behaved tar entries must be extracted correctly."""
dest = tmp_path / "dest"
dest.mkdir()
buf = _make_tar([
("a.txt", "alpha", False),
("sub/b.txt", "beta", False),
("sub/c.txt", "gamma", False),
("rules/", "", True),
("rules/foo.md", "- be kind", False),
])
with tarfile.open(fileobj=buf) as tf:
_safe_extract_tar(tf, dest)
assert (dest / "a.txt").read_text() == "alpha"
assert (dest / "sub" / "b.txt").read_text() == "beta"
assert (dest / "sub" / "c.txt").read_text() == "gamma"
assert (dest / "rules" / "foo.md").read_text() == "- be kind"