ci-required-drift: fail-closed on 403/404 (port cp#544) #2330

Closed
core-be wants to merge 1 commits from fix/port-cp544-fail-closed into main
2 changed files with 117 additions and 37 deletions
+41 -37
View File
@@ -394,44 +394,48 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
try:
_, protection = api("GET", protection_path)
except ApiError as e:
# Isolate the HTTP status from the error message.
http_status: int | None = None
msg = str(e)
# ApiError message format: "{method} {path} → HTTP {status}: {body}"
import re as _re
m = _re.search(r"HTTP (\d{3})", msg)
if m:
http_status = int(m.group(1))
if http_status in (403, 404):
# Token lacks scope OR branch has no protection. Cannot
# determine drift — skip this branch. Do NOT exit non-zero;
# the issue IS the alarm, not a red workflow.
sys.stderr.write(
f"::error::GET {protection_path} returned HTTP {http_status}"
f"DRIFT_BOT_TOKEN lacks repo-admin scope (Gitea 1.22.6 "
f"requires it for this endpoint) OR branch has no protection "
f"configured. Cannot determine drift for {branch}; "
f"skipping. Fix: grant repo-admin to mc-drift-bot or "
f"configure protection on {branch}.\n"
# Parse the HTTP status out of the ApiError message so we can
# distinguish 404 (missing protection) and 403 (authz gap) from
# transient 5xx / unparseable statuses. Only 403/404 are safe to
# convert to findings; everything else must fail loud so the cron
# retries rather than silently masking an outage.
status_str = str(e).split("HTTP ", 1)[-1].split(":", 1)[0]
try:
status_code = int(status_str)
except ValueError:
status_code = 0
if status_code == 404:
findings.append(
f"BRANCH_PROTECTION_MISSING — {branch} has no protection rule; "
"the merge gate is open to anyone with write access."
)
debug = {
"branch": branch,
"ci_jobs": sorted(jobs),
"sentinel_needs": sorted(needs),
"protection_contexts_skipped": True,
"protection_http_status": http_status,
"audit_env_checks": sorted(env_set),
}
return [], debug
# 5xx — propagate (transient outage, fail loud per design).
raise
if not isinstance(protection, dict):
sys.stderr.write(
f"::error::protection response for {branch} not a JSON object\n"
)
sys.exit(4)
contexts = set(protection.get("status_check_contexts") or [])
elif status_code == 403:
findings.append(
f"BRANCH_PROTECTION_UNREADABLE — Gitea API returned HTTP {status_code} "
f"for branch_protections/{branch}. "
f"Token may lack repo-ADMIN or the endpoint is unreachable: {e}"
)
else:
raise
protection = None
if protection is not None:
if not isinstance(protection, dict):
sys.stderr.write(
f"::error::protection response for {branch} not a JSON object\n"
)
sys.exit(4)
raw_contexts = protection.get("status_check_contexts")
if isinstance(raw_contexts, list):
contexts = {str(c) for c in raw_contexts}
elif raw_contexts is not None:
# Defensive: Gitea once returned a string here during a schema
# migration (internal#772). Treat malformed as unreadable.
findings.append(
f"BRANCH_PROTECTION_UNREADABLE — status_check_contexts is not a list "
f"(got {type(raw_contexts).__name__}: {raw_contexts!r})"
)
contexts = set()
# ----- F1: job exists in CI but not under sentinel.needs -----
# Post-#1766 contract: the sentinel may deliberately have no `needs:`
+76
View File
@@ -679,3 +679,79 @@ def test_api_allows_raw_when_expect_json_false(drift_module, monkeypatch):
)
assert status == 201
assert "_raw" in body
# --------------------------------------------------------------------------
# cp#544: 403/404 must produce findings (fail-closed), 5xx must propagate
# --------------------------------------------------------------------------
def test_branch_protection_404_produces_findings(drift_module, tmp_path, monkeypatch):
"""404 (no protection rule) must produce BRANCH_PROTECTION_MISSING,
NOT return empty findings (fail-open)."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
drift_module.ApiError(
"GET /repos/owner/repo/branch_protections/main → HTTP 404: not found"
)
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, debug = drift_module.detect_drift("main")
assert any("BRANCH_PROTECTION_MISSING" in f for f in findings), findings
assert debug["protection_contexts"] == []
def test_branch_protection_403_produces_findings(drift_module, tmp_path, monkeypatch):
"""403 (unreadable) must produce BRANCH_PROTECTION_UNREADABLE,
NOT return empty findings (fail-open)."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
drift_module.ApiError(
"GET /repos/owner/repo/branch_protections/main → HTTP 403: forbidden"
)
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, debug = drift_module.detect_drift("main")
assert any("BRANCH_PROTECTION_UNREADABLE" in f for f in findings), findings
assert debug["protection_contexts"] == []
def test_branch_protection_500_propagates_loud(drift_module, tmp_path, monkeypatch):
"""5xx must propagate as ApiError (fail loud), NOT be swallowed."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
drift_module.ApiError(
"GET /repos/owner/repo/branch_protections/main → HTTP 500: oops"
)
),
})
monkeypatch.setattr(drift_module, "api", stub)
with pytest.raises(drift_module.ApiError):
drift_module.detect_drift("main")