fix(runtime#86): regression gate against token-in-URL in workflows + scripts #166

Closed
agent-dev-b wants to merge 2 commits from fix/86-git-askpass-no-token-in-argv into main
4 changed files with 319 additions and 24 deletions
+29 -12
View File
@@ -112,6 +112,32 @@ def find_runtime_drift(repo_name: str, repo_path: Path, runtime_root: Path | Non
return findings
def _git_clone_with_token(dest: Path, url: str, token: str) -> subprocess.CompletedProcess[str]:
"""Clone using GIT_ASKPASS so the token never appears in argv or remote URL."""
import shlex
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as f:
f.write("#!/bin/sh\n")
f.write('case "$1" in\n')
f.write(' *Username*) echo "x-access-token" ;;\n')
f.write(f' *Password*) echo {shlex.quote(token)} ;;\n')
f.write("esac\n")
askpass = f.name
os.chmod(askpass, 0o700)
try:
return subprocess.run(
["git", "clone", "--depth", "1", url, str(dest)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30,
env={**os.environ, "GIT_ASKPASS": askpass},
)
finally:
os.unlink(askpass)
def clone_consumers(
workdir: Path,
repos: tuple[str, ...],
@@ -126,28 +152,19 @@ def clone_consumers(
parsed_url = urlsplit(gitea_url)
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
raise RuntimeError(f"invalid Gitea URL: {gitea_url}")
safe_token = quote(token, safe="")
base_url = f"{parsed_url.scheme}://x-access-token:{safe_token}@{parsed_url.netloc}"
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
for repo in repos:
dest = workdir / repo
clone_url = f"{base_url}/molecule-ai/{repo}.git"
last_exc: Exception | None = None
for attempt in range(1, 4):
result = subprocess.run(
["git", "clone", "--depth", "1", clone_url, str(dest)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30,
)
result = _git_clone_with_token(dest, clone_url, token)
if result.returncode == 0:
paths[repo] = dest
break
if attempt < 3:
time.sleep(2 ** (attempt - 1))
continue
stderr = result.stderr.replace(token, "<redacted>").replace(safe_token, "<redacted>")
stderr = result.stderr.replace(token, "<redacted>")
raise RuntimeError(f"failed to clone {repo} after 3 attempts: {stderr.strip()}")
return paths
+30 -12
View File
@@ -278,6 +278,32 @@ def find_platform_comm_drift(repo_name: str, repo_path: Path) -> list[ContractFi
return []
def _git_clone_with_token(dest: Path, url: str, token: str) -> subprocess.CompletedProcess[str]:
"""Clone using GIT_ASKPASS so the token never appears in argv or remote URL."""
import shlex
with tempfile.NamedTemporaryFile(mode="w", suffix=".sh", delete=False) as f:
f.write("#!/bin/sh\n")
f.write('case "$1" in\n')
f.write(' *Username*) echo "x-access-token" ;;\n')
f.write(f' *Password*) echo {shlex.quote(token)} ;;\n')
f.write("esac\n")
askpass = f.name
os.chmod(askpass, 0o700)
try:
return subprocess.run(
["git", "clone", "--depth", "1", url, str(dest)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30,
env={**os.environ, "GIT_ASKPASS": askpass},
)
finally:
os.unlink(askpass)
def clone_repos(workdir: Path, repos: tuple[str, ...], *, gitea_url: str, token: str) -> dict[str, Path]:
if not token:
raise RuntimeError("GITEA_TOKEN is required when --root is not provided")
@@ -286,28 +312,20 @@ def clone_repos(workdir: Path, repos: tuple[str, ...], *, gitea_url: str, token:
if parsed_url.scheme not in {"http", "https"} or not parsed_url.netloc:
raise RuntimeError(f"invalid Gitea URL: {gitea_url}")
safe_token = quote(token, safe="")
base_url = f"{parsed_url.scheme}://x-access-token:{safe_token}@{parsed_url.netloc}"
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
paths: dict[str, Path] = {}
for repo in repos:
dest = workdir / repo
last_exc: Exception | None = None
clone_url = f"{base_url}/molecule-ai/{repo}.git"
for attempt in range(1, 4):
result = subprocess.run(
["git", "clone", "--depth", "1", f"{base_url}/molecule-ai/{repo}.git", str(dest)],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=30,
)
result = _git_clone_with_token(dest, clone_url, token)
if result.returncode == 0:
paths[repo] = dest
break
if attempt < 3:
time.sleep(2 ** (attempt - 1))
continue
stderr = result.stderr.replace(token, "<redacted>").replace(safe_token, "<redacted>")
stderr = result.stderr.replace(token, "<redacted>")
raise RuntimeError(f"failed to clone {repo} after 3 attempts: {stderr.strip()}")
return paths
@@ -104,3 +104,28 @@ def test_clone_consumers_retries_on_transient_failure(monkeypatch: pytest.Monkey
import check_consumer_runtime_drift as guard
guard.clone_consumers(workdir, ("molecule-core",), gitea_url="https://git.moleculesai.app", token="fake-token")
assert call_count == 3, f"expected 3 attempts, got {call_count}"
def test_clone_consumers_never_puts_token_in_argv(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""GIT_ASKPASS path: token must not appear in git clone argv or remote URL (runtime#86)."""
import subprocess
captured: list[tuple[tuple[object, ...], dict[str, object]]] = []
def capture_run(*args: object, **kwargs: object) -> object:
captured.append((args, kwargs))
return type("Result", (), {"returncode": 0, "stdout": "", "stderr": ""})()
monkeypatch.setattr(subprocess, "run", capture_run)
workdir = tmp_path / "wd"
workdir.mkdir()
import check_consumer_runtime_drift as guard
guard.clone_consumers(workdir, ("molecule-core",), gitea_url="https://git.moleculesai.app", token="s3cr3t-t0k3n")
assert len(captured) == 1
cmd = captured[0][0][0]
env = captured[0][1].get("env") or {}
cmd_str = " ".join(str(c) for c in cmd)
assert "s3cr3t-t0k3n" not in cmd_str, "token leaked into subprocess argv"
assert "x-access-token" not in cmd_str, "username leaked into subprocess argv"
assert env.get("GIT_ASKPASS") is not None, "GIT_ASKPASS not set in clone env"
+235
View File
@@ -0,0 +1,235 @@
"""Regression gate (runtime#86): forbid token-in-URL patterns in workflow + scripts.
The 2026-05-28 family of bugs included ``x-access-token:${DISPATCH_TOKEN}``
embedded in a ``git clone`` URL — the token ended up in subprocess argv
(visible via ``ps``/``/proc/*/cmdline``), the git remote URL (visible via
``git remote -v``), and git diagnostics/logs.
The fix is GIT_ASKPASS / Gitea-API rather than URL-embedded tokens. This
gate makes the regression observable: a CI red on the next PR that
re-introduces the pattern.
Scope
-----
* Walks ``.gitea/workflows/*.yml`` — the CI surface.
* Walks ``scripts/*.py`` — the audit-script surface (clone helpers used by
workflows).
* Walks ``scripts/*.sh`` — shell helpers (none shipped today, but covered
for symmetry).
What the gate matches
---------------------
A *leak* is ``x-access-token:`` followed by something that *looks like* a
token or a variable that evaluates to a token — bash ``${...}``,
GitHub Actions ``${{ ... }}``, Python f-string ``{...}``, or a quoted
literal token.
What the gate does NOT match
----------------------------
* ``echo "x-access-token"`` — the GIT_ASKPASS script's *username* response.
This is the EXPECTED, safe occurrence (the token is fed via the
*password* prompt, never the URL).
* Comment text describing the pattern (e.g. ``# fix: x-access-token:${...}``).
These use a leading ``#`` so are skipped.
Failure shape
-------------
A red gate prints every offending (file, line-no, line) tuple so the
offending PR author can fix or justify each in review.
"""
from __future__ import annotations
import re
from pathlib import Path
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
# Paths the gate scans. Resolved relative to REPO_ROOT and the existence
# of each is asserted at collection time so a misconfigured env surfaces
# a clear failure rather than a vacuous pass.
SCAN_TARGETS = (
REPO_ROOT / ".gitea" / "workflows",
REPO_ROOT / "scripts",
)
# Leak pattern: x-access-token: followed by a token-looking value.
# The non-capturing group below covers the four "looks-like-a-token"
# shapes we care about:
# 1. bash ${VAR}
# 2. GitHub Actions ${{ ... }}
# 3. Python f-string {var} (with optional !r/!s and format spec)
# 4. A bare quoted token literal, e.g. "ghs_xxx..." or "abc123..."
# Comment-line filtering is applied at the file-scan layer
# (see _COMMENT_LINE_RE) rather than in the regex — a negative-lookbehind
# for "#" only catches the char immediately before, which is whitespace
# (not "#") on a real comment line, so the regex alone cannot tell.
_LEAK_RE = re.compile(
r"""
x-access-token # the forbidden prefix
\s*:\s* # the colon separator
(?:
\$\{[^}]+\} # bash ${VAR}
| \$\{\{[^}]+\}\} # GitHub Actions ${{ expr }}
| \{[a-zA-Z_][a-zA-Z0-9_]*[!:>.<}]? # Python f-string {var}
| ["'][A-Za-z0-9_\-]{8,}["'] # quoted token literal (>=8 chars)
)
""",
re.VERBOSE,
)
# Comment-line detector: lines whose first non-whitespace is '#'.
_COMMENT_LINE_RE = re.compile(r"^\s*#")
def _iter_candidate_files() -> list[Path]:
"""Enumerate files the gate scans. Excludes the test file itself."""
files: list[Path] = []
for target in SCAN_TARGETS:
assert target.exists(), f"scan target missing: {target}"
if target.is_file():
files.append(target)
continue
for path in sorted(target.rglob("*")):
if not path.is_file():
continue
if path.suffix in {".yml", ".yaml", ".py", ".sh"}:
files.append(path)
# Exclude the test file itself (its docstring + tests reference the
# pattern; gating the gate would loop).
return [p for p in files if p != Path(__file__)]
def _scan_file(path: Path) -> list[tuple[Path, int, str]]:
"""Return list of (path, line-no, line) for every leak match in path."""
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return []
return _scan_file_for_text(text, source_name=str(path))
def _scan_file_for_text(
text: str, source_name: str = "<text>"
) -> list[tuple[Path, int, str]]:
"""Return list of (path, line-no, line) for every leak match in text.
``source_name`` is just a label for the returned tuples; the line
number is 1-based. Split out from ``_scan_file`` so the
``test_leak_regex_does_not_match_comment_lines`` regression guard can
drive the same scan pipeline against inline text.
"""
findings: list[tuple[Path, int, str]] = []
label = Path(source_name)
for lineno, line in enumerate(text.splitlines(), start=1):
if _COMMENT_LINE_RE.match(line):
continue
if _LEAK_RE.search(line):
findings.append((label, lineno, line))
return findings
def test_scan_targets_exist() -> None:
"""The gate's scan targets are present (so a vacuous pass is impossible)."""
for target in SCAN_TARGETS:
assert target.exists(), (
f"scan target {target} must exist; otherwise the gate is a "
f"vacuous pass — fix the gate, not the test"
)
def test_no_workflow_or_script_embeds_token_in_clone_url() -> None:
"""No ``.gitea/workflows/*.yml`` or script may embed a token in a URL.
Runtime#86: the original bug was ``x-access-token:${DISPATCH_TOKEN}`` in
a ``git clone`` URL inside the publish-runtime workflow. The fix moved
to GIT_ASKPASS / Gitea-API. This regression gate makes the fix durable.
"""
files = _iter_candidate_files()
# Defence-in-depth: the gate must actually be scanning something.
assert files, "no candidate files found — gate is vacuous"
all_findings: list[tuple[Path, int, str]] = []
for path in files:
all_findings.extend(_scan_file(path))
if all_findings:
# Pretty-print all offenders so the failing PR author gets a clear
# diff-shaped report (rather than a single AssertionError blob).
report = "\n".join(
f" {p.relative_to(REPO_ROOT)}:{ln}: {line.strip()}"
for p, ln, line in all_findings
)
pytest.fail(
"Forbidden token-in-URL pattern detected (runtime#86 regression). "
"Use GIT_ASKPASS or the Gitea contents+pulls API instead — never "
"embed a token in a URL passed to git / curl. Offenders:\n"
f"{report}"
)
def test_askpass_username_response_is_the_only_allowed_occurrence() -> None:
"""The GIT_ASKPASS helper legitimately writes ``x-access-token`` as the
*username* response (the token rides the *password* prompt). This test
pins that pattern so any future re-introduction of the URL-embedding
pattern is *visible* in a diff but does not falsely trip the gate.
It does NOT relax the gate — it just guards the documented exception
shape so a refactor of the askpass helper can keep working.
"""
expected_substring = 'echo "x-access-token"'
matches: list[Path] = []
for path in _iter_candidate_files():
if path.suffix != ".py":
continue
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
continue
if expected_substring in text:
matches.append(path)
assert matches, (
"GIT_ASKPASS helpers no longer write the expected `x-access-token` "
"username response. If the askpass mechanism was intentionally "
"replaced, update this test to match the new shape — do not silently "
"let the gate pass on no occurrences (vacuous protection)."
)
# The expected set: both consumer-drift + platform-comm-contract scripts.
expected_paths = {
REPO_ROOT / "scripts" / "check_consumer_runtime_drift.py",
REPO_ROOT / "scripts" / "check_platform_comm_contract.py",
}
assert set(matches) == expected_paths, (
f"GIT_ASKPASS helper moved/added/removed: got {sorted(p.name for p in matches)}, "
f"expected {sorted(p.name for p in expected_paths)}"
)
def test_leak_regex_does_not_match_comment_lines() -> None:
"""The gate's scan pipeline must skip comment lines so documentation
referencing the pattern (commit messages, this file, etc.) does not
false-positive.
Regression guard for the gate itself: if someone refactors
``_scan_file`` and drops the ``_COMMENT_LINE_RE`` short-circuit, every
doc line referencing the pattern would trip the gate.
"""
sample = (
"# The prior bug was: x-access-token:${DISPATCH_TOKEN}@github.com/...\n"
'x-access-token:"sk_live_actual_leak"\n'
)
# Drive the full scan pipeline (per-line comment skip + regex) rather
# than the regex in isolation, so the test exercises the *real* guard
# contract — not just the regex.
findings = _scan_file_for_text(sample, source_name="<inline>")
assert len(findings) == 1, (
f"expected exactly 1 leak finding (the non-comment line), got "
f"{findings!r}"
)
_path, _lineno, line = findings[0]
assert "sk_live_actual_leak" in line, (
f"the leak match should be the actual token line, got {line!r}"
)