molecule-sdk-python/tests/test_safe_extract.py
Molecule AI SDK-Dev 66502e669a fix(tests): add conftest.py + fix test_call_peer_errors.py broken imports
- tests/conftest.py: added FakeResponse, client fixture, tmp_token_dir fixture,
  and _CaptureHandler (stubs for integration tests)
- tests/test_call_peer_errors.py: rewrote all tests to use existing client.py
  patterns (MagicMock session) instead of non-existent httpx fixtures.
  Removed mocker/http_mock/conftest fixtures that don't exist in this repo.
  12 tests now cover: timeout, connection error, 4xx/5xx errors, empty body,
  JSON-RPC envelope format, auth headers, direct→proxy fallback path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 18:18:10 +00:00

422 lines
19 KiB
Python

"""Security tests for ``_safe_extract_tar`` — tar-slip and archive-bomb mitigation.
The function guards against escape via ``target.relative_to(dest_abs)``. This
rejects:
• Entries whose resolved path is outside ``dest`` (absolute paths, paths that
start above ``dest``, paths with more leading ``..`` components than the
depth of ``dest``).
• Symlinks and hardlinks entirely (silently skipped, no file written).
Paths that contain ``..`` but still resolve inside ``dest`` are ACCEPTED.
For example ``foo/../bar.txt`` resolves to ``dest/bar.txt`` which is inside
``dest``, so it is accepted.
Covers:
1. **Paths that start above dest** — ``../``, ``../../`` at name start.
2. **Absolute paths** — entries with a leading ``/``.
3. **Depth-exceeding traversal** — ``a/../../../file`` exits dest.
4. **Symlink / hardlink skip** — no exception, no file written.
5. **Valid paths accepted** — relative paths with or without embedded ``..``
that still resolve inside ``dest``.
GAP-01.
"""
from __future__ import annotations
import io
import tarfile
from pathlib import Path
import pytest
from molecule_agent.client import _safe_extract_tar
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_tar_entry(name: str, content: bytes) -> tarfile.TarInfo:
info = tarfile.TarInfo(name=name)
info.size = len(content)
info.mode = 0o644
return info
def _build_tar(names_and_contents: list[tuple[str, bytes]]) -> io.BytesIO:
"""Return a BytesIO gzipped-tar containing the given (name, content) pairs."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
for name, content in names_and_contents:
info = _make_tar_entry(name, content)
tf.addfile(info, io.BytesIO(content))
buf.seek(0)
return buf
def _open_tar(buf: io.BytesIO) -> tarfile.TarFile:
buf.seek(0)
return tarfile.open(fileobj=buf, mode="r")
# ---------------------------------------------------------------------------
# 1. Paths that start above dest — always rejected
# ---------------------------------------------------------------------------
class TestTraversalFromRoot:
"""Entries whose name begins with ``../`` escape dest regardless of how
many intermediate directories are traversed."""
def test_single_parent_component_at_start_rejected(self, tmp_path: Path):
"""``../escape.txt`` starts above dest — must be rejected."""
buf = _build_tar([("../escape.txt", b"overwrite")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_two_parent_components_at_start_rejected(self, tmp_path: Path):
"""``../../file`` starts two levels above dest — must be rejected."""
buf = _build_tar([("../../file", b"exfil")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_traversal_into_sibling_directory_rejected(self, tmp_path: Path):
"""``../sibling/marker.txt`` — verify we cannot write into an adjacent dir."""
sibling = tmp_path.parent / (tmp_path.name + "-sibling")
sibling.mkdir()
(sibling / "marker.txt").write_text("original")
buf = _build_tar([(f"../{tmp_path.name}-sibling/marker.txt", b"tampered")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
assert (sibling / "marker.txt").read_text() == "original"
# ---------------------------------------------------------------------------
# 2. Absolute paths — always rejected
# ---------------------------------------------------------------------------
class TestAbsolutePaths:
"""Entries with an absolute path (leading ``/``) resolve outside any
relative dest and must be rejected."""
def test_absolute_etc_passwd_rejected(self, tmp_path: Path):
buf = _build_tar([("/etc/passwd", b"root::0:0")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_absolute_usr_local_rejected(self, tmp_path: Path):
buf = _build_tar([("/usr/local/anything", b"data")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_absolute_tmp_rejected(self, tmp_path: Path):
buf = _build_tar([("/tmp/staged/foo.txt", b"danger")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_pure_relative_accepted(self, tmp_path: Path):
"""``foo/bar.txt`` (no leading /) is fine."""
buf = _build_tar([("foo/bar.txt", b"ok")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "foo" / "bar.txt").read_bytes() == b"ok"
# ---------------------------------------------------------------------------
# 3. Depth-exceeding traversal — more leading ``..`` than dest depth
# ---------------------------------------------------------------------------
class TestDepthExceedingTraversal:
"""An entry that has more ``..`` components than the depth of its path
within ``dest`` will resolve outside ``dest`` and must be rejected."""
def test_single_dir_then_four_parents_rejected(self, tmp_path: Path):
"""``a/../../../b.txt`` — one dir + four parents = exits dest."""
buf = _build_tar([("a/../../../b.txt", b"escaped")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_unicode_traversal_exits_dest_rejected(self, tmp_path: Path):
"""``日本語/../../file.txt`` — non-ASCII traversal that exits dest."""
buf = _build_tar([("日本語/../../file.txt", b"unicode bomb")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
# Note: paths like ``a/b/c/../../d.txt`` or ``subdir/../outdir/file.txt``
# resolve INSIDE dest (they cancel out within the path) and are tested in
# TestEmbeddedDotdotAccepted below.
# ---------------------------------------------------------------------------
# 4. Embedded ``..`` that still resolves inside dest — accepted
# ---------------------------------------------------------------------------
class TestEmbeddedDotdotAccepted:
"""Paths that contain ``..`` but whose resolved target is still inside
``dest`` are accepted. Not all such paths can be extracted without error —
Python's ``tarfile`` module raises ``FileExistsError`` for some path shapes
(e.g., ``foo/../bar.txt`` where ``foo`` doesn't pre-exist: tarfile's
``makedirs`` tries to create ``foo/..`` as a directory, but ``..`` is not a
valid directory name). We test the paths that extract cleanly.
The key security guarantee is: any path that escapes ``dest`` raises
``ValueError`` before any file is written. Paths that don't escape but also
can't be extracted cleanly are a tarfile implementation detail — the function
accepts them or raises a non-ValueError error. We only assert on the
security-relevant behavior (escape rejection) and on paths that work."""
def test_subdir_parent_outdir_file_accepted(self, tmp_path: Path):
buf = _build_tar([("subdir/../outdir/file.txt", b"escaped")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "outdir" / "file.txt").read_bytes() == b"escaped"
def test_subdir_parent_file_accepted(self, tmp_path: Path):
"""``subdir/../file.txt`` — the intermediate dir ``subdir`` must pre-exist
(or be created by a prior entry) for this path to extract without error."""
(tmp_path / "subdir").mkdir()
buf = _build_tar([("subdir/../another.txt", b"data")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "another.txt").read_bytes() == b"data"
def test_foo_parent_bar_accepted(self, tmp_path: Path):
"""``foo/../bar.txt`` — the intermediate dir ``foo`` must pre-exist."""
(tmp_path / "foo").mkdir()
buf = _build_tar([("foo/../bar.txt", b"dangerous")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "bar.txt").read_bytes() == b"dangerous"
def test_a_b_c_up_up_file_accepted(self, tmp_path: Path):
"""``a/b/c/../../d.txt`` — pre-create the full directory tree down to the
deepest non-dotdot segment (``a/b/c``) so that makedirs doesn't try to
create ``a/b/c/..`` as a directory name (which would fail with
FileExistsError since .. is not a valid directory name on POSIX)."""
(tmp_path / "a" / "b" / "c").mkdir(parents=True)
buf = _build_tar([("a/b/c/../../d.txt", b"escaped")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "a" / "d.txt").read_bytes() == b"escaped"
def test_three_deep_three_up_accepted(self, tmp_path: Path):
"""``a/b/c/../../../file.txt`` — pre-create ``a/b/c``."""
(tmp_path / "a" / "b" / "c").mkdir(parents=True)
buf = _build_tar([("a/b/c/../../../file.txt", b"deep")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "file.txt").read_bytes() == b"deep"
def test_dot_dot_slash_dot_bar_dot_dot_baz_accepted(self, tmp_path: Path):
"""``foo/./bar/../baz.txt`` — pre-create ``foo/bar``."""
(tmp_path / "foo" / "bar").mkdir(parents=True)
buf = _build_tar([("foo/./bar/../baz.txt", b"danger")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "foo" / "baz.txt").read_bytes() == b"danger"
def test_valid_nested_path_accepted(self, tmp_path: Path):
"""``foo/bar/baz.txt`` (no ..) must be extracted normally."""
buf = _build_tar([("foo/bar/baz.txt", b"deep content")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "foo" / "bar" / "baz.txt").read_bytes() == b"deep content"
def test_rules_file_accepted(self, tmp_path: Path):
buf = _build_tar([("rules/x.md", b"# rule")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "rules" / "x.md").read_text() == "# rule"
# ---------------------------------------------------------------------------
# 5. Symlink / hardlink skip
# ---------------------------------------------------------------------------
class TestSymlinkHardlinkSkip:
"""Symlinks and hardlinks are skipped entirely — no exception, no file
created, real files extracted normally."""
def test_symlink_to_absolute_path_skipped(self, tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
sym = tarfile.TarInfo(name="evil.link")
sym.type = tarfile.SYMTYPE
sym.linkname = "/etc/passwd"
sym.size = 0
tf.addfile(sym)
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert not (tmp_path / "evil.link").exists()
def test_symlink_to_parent_directory_skipped(self, tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
sym = tarfile.TarInfo(name="parent.link")
sym.type = tarfile.SYMTYPE
sym.linkname = ".."
sym.size = 0
tf.addfile(sym)
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert not (tmp_path / "parent.link").exists()
def test_symlink_within_dest_skipped_but_real_file_intact(self, tmp_path: Path):
buf = _build_tar([("real.txt", b"content")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "real.txt").read_text() == "content"
buf2 = io.BytesIO()
with tarfile.open(fileobj=buf2, mode="w:gz") as tf:
sym = tarfile.TarInfo(name="link-to-real")
sym.type = tarfile.SYMTYPE
sym.linkname = "real.txt"
sym.size = 0
tf.addfile(sym)
buf2.seek(0)
with tarfile.open(fileobj=buf2, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert not (tmp_path / "link-to-real").exists()
assert (tmp_path / "real.txt").read_text() == "content"
def test_hardlink_to_absolute_path_skipped(self, tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
hl = tarfile.TarInfo(name="hard.link")
hl.type = tarfile.LNKTYPE
hl.linkname = "/etc/passwd"
hl.size = 0
tf.addfile(hl)
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert not (tmp_path / "hard.link").exists()
def test_hardlink_within_dest_skipped_original_intact(self, tmp_path: Path):
buf = _build_tar([("original.txt", b"data")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
buf2 = io.BytesIO()
with tarfile.open(fileobj=buf2, mode="w:gz") as tf:
hl = tarfile.TarInfo(name="link-to-original")
hl.type = tarfile.LNKTYPE
hl.linkname = "original.txt"
hl.size = 0
tf.addfile(hl)
buf2.seek(0)
with tarfile.open(fileobj=buf2, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert not (tmp_path / "link-to-original").exists()
assert (tmp_path / "original.txt").read_text() == "data"
def test_mixed_valid_and_symlink_entries(self, tmp_path: Path):
"""Valid file extracted, symlink silently skipped — no exception."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
info = _make_tar_entry("valid/file.txt", b"ok")
tf.addfile(info, io.BytesIO(b"ok"))
sym = tarfile.TarInfo(name="bad.link")
sym.type = tarfile.SYMTYPE
sym.linkname = "/etc/passwd"
sym.size = 0
tf.addfile(sym)
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "valid" / "file.txt").read_bytes() == b"ok"
assert not (tmp_path / "bad.link").exists()
def test_symlink_then_valid_file_in_same_archive(self, tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
sym = tarfile.TarInfo(name="dangling.link")
sym.type = tarfile.SYMTYPE
sym.linkname = "../nonexistent"
sym.size = 0
tf.addfile(sym)
info = _make_tar_entry("doc.txt", b"important")
tf.addfile(info, io.BytesIO(b"important"))
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "doc.txt").read_bytes() == b"important"
assert not (tmp_path / "dangling.link").exists()
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
"""Boundary conditions for _safe_extract_tar."""
def test_empty_archive_accepted(self, tmp_path: Path):
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
pass
buf.seek(0)
with tarfile.open(fileobj=buf, mode="r") as tf:
_safe_extract_tar(tf, tmp_path)
assert list(tmp_path.iterdir()) == []
def test_dot_slash_file_accepted(self, tmp_path: Path):
"""``./file.txt`` — tarfile normalises the leading ``./`` so the file
lands as ``file.txt`` inside dest."""
buf = _build_tar([("./file.txt", b"dot")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "file.txt").read_bytes() == b"dot"
def test_unicode_normal_path_accepted(self, tmp_path: Path):
"""Non-ASCII path without traversal must be accepted."""
buf = _build_tar([("日本語/文件.txt", b"native text")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert any(p.name.endswith(".txt") for p in tmp_path.rglob("*.txt"))
def test_extraction_rejects_before_writing_traversal_entry(self, tmp_path: Path):
"""When the first entry is a traversal, no files are extracted."""
buf = _build_tar([("a/../../../b.txt", b"first")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
assert not any(tmp_path.iterdir())
def test_traversal_entry_rejected_no_partial_state(self, tmp_path: Path):
"""After a traversal entry is rejected, dest must be clean."""
buf = _build_tar([("a/../../../b.txt", b"first")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError):
_safe_extract_tar(tf, tmp_path)
assert list(tmp_path.iterdir()) == []
def test_many_levels_traversal_exits_dest(self, tmp_path: Path):
"""A depth-10 path ``a/.../a`` needs 11 or more ``..`` components to exit
dest (ups ≥ depth+1 → net ≤ -1). With 11 ``..``, net depth = -1 = outside."""
long = "/".join(["a"] * 10) + "/../" * 11 + "file.txt"
long = long.rstrip("/")
buf = _build_tar([(long, b"escaped")])
with _open_tar(buf) as tf:
with pytest.raises(ValueError, match="refusing tar entry escaping"):
_safe_extract_tar(tf, tmp_path)
def test_many_levels_traversal_stays_inside(self, tmp_path: Path):
"""``subdir/../outdir/file.txt`` — intermediate dir exists after ..,
final segment is a new directory so no FileExistsError on makedirs."""
buf = _build_tar([("subdir/../outdir/file.txt", b"ok")])
with _open_tar(buf) as tf:
_safe_extract_tar(tf, tmp_path)
assert (tmp_path / "outdir" / "file.txt").read_bytes() == b"ok"