harden(merge-control): REQUIRED_CHECKS_JSON as single source of truth, fail-closed everywhere #2401

Closed
agent-dev-a wants to merge 3 commits from harden/merge-control-required-checks-json into main
6 changed files with 464 additions and 230 deletions
+69 -89
View File
@@ -8,8 +8,7 @@ pair diverges.
Sources:
A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set)
B. `status_check_contexts` in branch_protections (the merge gate)
C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy)
env in audit-force-merge.yml (the audit env)
C. `REQUIRED_CHECKS_JSON` env var (the authoritative required-checks list)
Three failure classes:
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
@@ -21,7 +20,7 @@ Three failure classes:
A stale required-check name is silent: protection demands a
green it never receives, but Gitea treats absent-as-pending,
not absent-as-red. The gate degrades to advisory.
F3 (B) and (C) are not set-equal. Audit env wider than protection
F3 (B) and (C) are not set-equal. REQUIRED_CHECKS_JSON wider than protection
→ audit flags non-force-merges as force; narrower → real
force-merges are missed.
@@ -33,8 +32,9 @@ Idempotency:
Behavior-based AST gate per `feedback_behavior_based_ast_gates`:
- Job set comes from PyYAML parse of jobs:* keys
- Sentinel needs from PyYAML parse of jobs[sentinel].needs (a list)
- Audit env from PyYAML parse, NOT grep — so reformatting the YAML
(block-scalar `|` vs flow-style list) does not break the gate
- Required-checks set comes from the REQUIRED_CHECKS_JSON env var
(parsed at runtime), NOT from a hardcoded list or YAML-grep.
Reformatting the audit workflow does not affect this script.
"""
from __future__ import annotations
@@ -66,7 +66,6 @@ GITEA_HOST = env("GITEA_HOST", required=False)
REPO = env("REPO", required=False)
BRANCHES = env("BRANCHES", required=False).split()
SENTINEL_JOB = env("SENTINEL_JOB", required=False)
AUDIT_WORKFLOW_PATH = env("AUDIT_WORKFLOW_PATH", required=False)
CI_WORKFLOW_PATH = env("CI_WORKFLOW_PATH", required=False)
DRIFT_LABEL = env("DRIFT_LABEL", required=False)
@@ -83,7 +82,6 @@ def _require_runtime_env() -> None:
"REPO",
"BRANCHES",
"SENTINEL_JOB",
"AUDIT_WORKFLOW_PATH",
"CI_WORKFLOW_PATH",
"DRIFT_LABEL",
):
@@ -251,93 +249,77 @@ def sentinel_needs(ci_doc: dict) -> set[str]:
return set(needs)
def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
"""Pull the required-checks env value from audit-force-merge.yml.
def required_checks_from_env(branch: str) -> set[str]:
"""Parse REQUIRED_CHECKS_JSON from the environment variable.
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
NOT grep for env keys — that breaks under reformatting,
multi-job workflows, or a future move of the env to a different
step. Instead, look inside every job's every step's `env:` map.
Supports two variants:
- REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name.
We extract the array for the target branch.
- REQUIRED_CHECKS (legacy): newline-separated list of context names.
Fail-closed: exits non-zero if the env var is missing, empty,
unparseable, not a dict, missing the branch key, the branch value
is not a list, or the list is empty (treating "empty required" as
a configuration error, not as "nothing required").
"""
found_json: list[str] = []
found_legacy: list[str] = []
jobs = audit_doc.get("jobs", {})
if not isinstance(jobs, dict):
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
return set()
for job in jobs.values():
if not isinstance(job, dict):
continue
for step in job.get("steps", []) or []:
if not isinstance(step, dict):
continue
step_env = step.get("env") or {}
if isinstance(step_env, dict):
if "REQUIRED_CHECKS_JSON" in step_env:
v = step_env["REQUIRED_CHECKS_JSON"]
if isinstance(v, str):
found_json.append(v)
if "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found_legacy.append(v)
# JSON variant takes precedence.
if found_json:
if len(found_json) > 1:
raw = os.environ.get("REQUIRED_CHECKS_JSON")
if not raw or not raw.strip():
sys.stderr.write(
"::error::REQUIRED_CHECKS_JSON env var is missing or empty; "
"cannot determine required checks (fail-closed)\n"
)
sys.exit(3)
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
)
sys.exit(3)
if not isinstance(parsed, dict):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
)
sys.exit(3)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(3)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
seen: set[str] = set()
result: set[str] = set()
for idx, item in enumerate(branch_checks):
if not isinstance(item, str):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n"
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
f"{type(item).__name__} (value={item!r}), expected str "
"(fail-closed)\n"
)
sys.exit(3)
try:
parsed = json.loads(found_json[0])
except json.JSONDecodeError as e:
stripped = item.strip()
if not stripped:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
"whitespace-only (fail-closed)\n"
)
sys.exit(3)
if not isinstance(parsed, dict):
if stripped in seen:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
f"::error::REQUIRED_CHECKS_JSON['{branch}'] contains "
f"duplicate entry '{stripped}' at index {idx} "
"(fail-closed)\n"
)
sys.exit(3)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(3)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
return {str(item).strip() for item in branch_checks if str(item).strip()}
# Legacy variant fallback.
if found_legacy:
if len(found_legacy) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n"
)
sys.exit(3)
raw = found_legacy[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
sys.stderr.write(
f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
seen.add(stripped)
result.add(stripped)
if not result:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is empty after stripping; "
"treating as unparseable (fail-closed)\n"
)
sys.exit(3)
return result
# --------------------------------------------------------------------------
@@ -376,12 +358,11 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
findings: list[str] = []
ci_doc = load_yaml(CI_WORKFLOW_PATH)
audit_doc = load_yaml(AUDIT_WORKFLOW_PATH)
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc, branch)
env_set = required_checks_from_env(branch)
# Protection
# api() raises ApiError on non-2xx. Transient 5xx should fail loud.
@@ -510,7 +491,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
only_in_protection = sorted(contexts - env_set)
if only_in_env:
findings.append(
"F3a — audit-force-merge.yml `REQUIRED_CHECKS` env has contexts NOT in "
"F3a — REQUIRED_CHECKS_JSON env var has contexts NOT in "
f"branch_protections/{branch}.status_check_contexts (audit would flag "
"non-force-merges as force):\n"
+ "\n".join(f" - {c}" for c in only_in_env)
@@ -518,7 +499,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
if only_in_protection:
findings.append(
"F3b — branch_protections/{br}.status_check_contexts has contexts NOT in "
"audit-force-merge.yml `REQUIRED_CHECKS` env (real force-merges would be "
"REQUIRED_CHECKS_JSON env var (real force-merges would be "
"missed):\n".format(br=branch)
+ "\n".join(f" - {c}" for c in only_in_protection)
)
@@ -607,8 +588,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
"- **F2**: rename the protection context to match an emitter, "
"or remove it from `status_check_contexts` "
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in "
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` env var into set-equality with "
"`status_check_contexts` (single PR, both files).",
"",
"## Debug",
+149 -31
View File
@@ -144,22 +144,6 @@ OPT_OUT_LABELS = {
).split(",")
if name.strip()
} | ({HOLD_LABEL} if HOLD_LABEL else set())
REQUIRED_CONTEXTS_RAW = _env(
"REQUIRED_CONTEXTS",
default=(
"CI / all-required (pull_request),"
"sop-checklist / all-items-acked (pull_request)"
),
)
# Required contexts for push (main/staging) runs. The push CI uses the same
# aggregator names with " (push)" suffix. Checking these explicitly instead of
# the combined state avoids false-pause when non-blocking jobs (e.g. Platform
# Go with continue-on-error: true due to mc#774) have failed — their failures
# pollute the combined state but do not block merges.
PUSH_REQUIRED_CONTEXTS_RAW = _env(
"PUSH_REQUIRED_CONTEXTS",
default="CI / all-required (push)",
)
# Recognised official-reviewer set. A merge requires this many DISTINCT genuine
# approvals (not stale/dismissed, on the current head sha) from accounts in
@@ -312,13 +296,99 @@ def api_paginated(
return results
def required_contexts(raw: str) -> list[str]:
return [part.strip() for part in raw.split(",") if part.strip()]
def required_checks_from_env(branch: str) -> list[str]:
"""Parse REQUIRED_CHECKS_JSON env var and return the required checks
for the given branch.
Fail-closed: exits non-zero if missing, empty, unparseable,
not a dict, missing branch key, not a list, or empty list.
"""
raw = os.environ.get("REQUIRED_CHECKS_JSON")
if not raw or not raw.strip():
sys.stderr.write(
"::error::REQUIRED_CHECKS_JSON env var is missing or empty; "
"cannot determine required checks (fail-closed)\n"
)
sys.exit(2)
try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
)
sys.exit(2)
if not isinstance(parsed, dict):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
)
sys.exit(2)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(2)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(2)
result: list[str] = []
seen: set[str] = set()
for idx, item in enumerate(branch_checks):
if not isinstance(item, str):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
f"{type(item).__name__} (value={item!r}), expected str "
"(fail-closed)\n"
)
sys.exit(2)
stripped = item.strip()
if not stripped:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
"whitespace-only (fail-closed)\n"
)
sys.exit(2)
if stripped in seen:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] contains "
f"duplicate entry '{stripped}' at index {idx} "
"(fail-closed)\n"
)
sys.exit(2)
seen.add(stripped)
result.append(stripped)
if not result:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is empty after stripping; "
"treating as unparseable (fail-closed)\n"
)
sys.exit(2)
return result
def push_required_contexts() -> list[str]:
"""Required contexts for push (branch) CI runs. See PUSH_REQUIRED_CONTEXTS_RAW."""
return required_contexts(PUSH_REQUIRED_CONTEXTS_RAW)
def push_required_contexts_for_branch(branch: str) -> list[str]:
"""Derive push-required contexts from REQUIRED_CHECKS_JSON by
replacing the `` (pull_request)`` suffix with `` (push)``.
"""
pr_contexts = required_checks_from_env(branch)
push_contexts: list[str] = []
for ctx in pr_contexts:
if ctx.endswith(" (pull_request)"):
push_contexts.append(ctx[: -len(" (pull_request)")] + " (push)")
else:
# Context does not follow the standard naming convention;
# we cannot derive its push equivalent. Include it as-is
# (it will likely never match a push status, which is
# fail-closed: main will appear red and the queue will pause).
sys.stderr.write(
f"::warning::required context '{ctx}' does not end with "
"' (pull_request)'; cannot derive push equivalent, "
"including as-is\n"
)
push_contexts.append(ctx)
return push_contexts
def status_state(status: dict) -> str:
@@ -634,6 +704,7 @@ def evaluate_merge_readiness(
main_status: dict,
pr_status: dict,
required_contexts: list[str],
push_required_contexts: list[str],
required_approvals: int,
approvers: set[str],
request_changes: list[str],
@@ -652,7 +723,7 @@ def evaluate_merge_readiness(
# main), main's required contexts go red and this gate PAUSES the queue —
# no further merge piles onto an unverified/red main until it is green.
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts)
if not main_ok:
return MergeDecision(False, "pause", "main required contexts not green: " + ", ".join(main_bad))
@@ -934,9 +1005,10 @@ def merge_pull(pr_number: int, *, dry_run: bool, force: bool = False) -> None:
def process_once(*, dry_run: bool = False) -> int:
# Required status contexts come from BRANCH PROTECTION, not a hand-kept env
# list. Fail-closed: if BP cannot be enumerated, HOLD the whole tick rather
# than merge against an unverified required set.
# Required status contexts come from REQUIRED_CHECKS_JSON env var (the
# authoritative required-checks list), not a hand-kept env list or branch
# protection alone. Fail-closed: if REQUIRED_CHECKS_JSON cannot be parsed,
# the queue exits non-zero so the workflow goes red.
try:
bp = get_branch_protection(WATCH_BRANCH)
except BranchProtectionUnavailable as exc:
@@ -945,20 +1017,44 @@ def process_once(*, dry_run: bool = False) -> int:
f"unavailable (fail-closed): {exc}\n"
)
return 0
contexts = bp.required_contexts
# Authoritative required contexts from REQUIRED_CHECKS_JSON.
pr_required_contexts = required_checks_from_env(WATCH_BRANCH)
push_required = push_required_contexts_for_branch(WATCH_BRANCH)
required_approvals = bp.required_approvals
print(
f"::notice::queue policy from branch protection: "
f"required_approvals={required_approvals} "
f"required_contexts={contexts or '[none]'}"
f"::notice::queue policy: required_approvals={required_approvals} "
f"pr_required={pr_required_contexts} push_required={push_required}"
)
# Gate-source drift guard (mc#1982): REQUIRED_CHECKS_JSON must be
# set-equal to branch protection status_check_contexts (modulo event
# suffix). A mismatch means a required context could be misclassified
# as non-required (or vice versa), weakening the fail-closed contract.
def _normalize(ctx: str) -> str:
for suffix in (" (pull_request)", " (push)"):
if ctx.endswith(suffix):
return ctx[: -len(suffix)]
return ctx
bp_contexts = {_normalize(c) for c in bp.required_contexts}
json_contexts = {_normalize(c) for c in pr_required_contexts}
if bp_contexts != json_contexts:
only_in_json = sorted(json_contexts - bp_contexts)
only_in_bp = sorted(bp_contexts - json_contexts)
sys.stderr.write(
f"::error::queue held: REQUIRED_CHECKS_JSON drift against "
f"branch protection for {WATCH_BRANCH}. "
f"JSON only: {only_in_json}. BP only: {only_in_bp}. "
"Fix the drift before merging.\n"
)
return 0
main_sha = get_branch_head(WATCH_BRANCH)
main_status = get_combined_status(main_sha)
# Check push-required contexts explicitly instead of combined state.
# See evaluate_merge_readiness for rationale.
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
main_ok, main_bad = required_contexts_green(main_latest, push_required)
if not main_ok:
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} required contexts not green: {', '.join(main_bad)}")
return 0
@@ -988,12 +1084,14 @@ def process_once(*, dry_run: bool = False) -> int:
# lacks current main is in a legitimate in-progress state (updating it +
# rerunning CI moves it toward ready), unlike a PR that can never become
# ready without a human (RC / conflict), which is a `wait` and gets skipped.
readiness_summaries: list[str] = []
for issue in candidates:
decision, ctx = _evaluate_candidate(
issue,
main_sha=main_sha,
main_status=main_status,
required_contexts=contexts,
required_contexts=pr_required_contexts,
push_required_contexts=push_required,
required_approvals=required_approvals,
dry_run=dry_run,
)
@@ -1003,6 +1101,8 @@ def process_once(*, dry_run: bool = False) -> int:
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
if decision.action == "wait":
# Non-ready: skip to the next candidate (no HOL block, no merge).
if ctx.get("readiness_summary"):
readiness_summaries.append(ctx["readiness_summary"])
continue
if decision.action == "update":
try:
@@ -1076,6 +1176,10 @@ def process_once(*, dry_run: bool = False) -> int:
post_comment(pr_number, hold_note, dry_run=dry_run)
continue # held — keep scanning for a mergeable candidate
return 0
if readiness_summaries:
print("::notice::Post-batch readiness summary:")
for summary in readiness_summaries:
print(f" {summary}")
return 0
@@ -1085,6 +1189,7 @@ def _evaluate_candidate(
main_sha: str,
main_status: dict,
required_contexts: list[str],
push_required_contexts: list[str],
required_approvals: int,
dry_run: bool,
) -> tuple[MergeDecision | None, dict]:
@@ -1152,10 +1257,23 @@ def _evaluate_candidate(
reviews, head_sha=head_sha, reviewer_set=REVIEWER_SET
)
# Build per-PR readiness summary (required checks green vs missing/pending).
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
summary_parts: list[str] = []
for ctx_name in required_contexts:
status = latest.get(ctx_name)
state = status_state(status or {})
if state == "success":
summary_parts.append(f"{ctx_name}=green")
else:
summary_parts.append(f"{ctx_name}={state or 'missing'}")
ctx["readiness_summary"] = f"PR #{pr_number}: " + ", ".join(summary_parts)
decision = evaluate_merge_readiness(
main_status=main_status,
pr_status=pr_status,
required_contexts=required_contexts,
push_required_contexts=push_required_contexts,
required_approvals=required_approvals,
approvers=approvers,
request_changes=request_changes,
+90 -88
View File
@@ -4,6 +4,8 @@ import sys
from pathlib import Path
from unittest.mock import patch
import pytest
SCRIPT = Path(__file__).resolve().parents[1] / "ci-required-drift.py"
spec = importlib.util.spec_from_file_location("ci_required_drift", SCRIPT)
drift = importlib.util.module_from_spec(spec)
@@ -14,9 +16,10 @@ spec.loader.exec_module(drift)
# explicitly so unit tests can import without the full env contract.
drift.SENTINEL_JOB = "all-required"
drift.CI_WORKFLOW_PATH = ".gitea/workflows/ci.yml"
drift.AUDIT_WORKFLOW_PATH = ".gitea/workflows/audit-force-merge.yml"
import os
# ---------------------------------------------------------------------------
# Helper fixtures
# ---------------------------------------------------------------------------
@@ -25,82 +28,22 @@ def _make_ci_doc(jobs: dict) -> dict:
return {"jobs": jobs}
def _make_audit_doc(required_checks: list[str]) -> dict:
return {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS": "\n".join(required_checks)}}
]
}
}
}
def _make_audit_doc_json(required_checks_json: dict) -> dict:
return {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}}
]
}
}
}
# ---------------------------------------------------------------------------
# required_checks_env — dual-variant parsing
# required_checks_from_env — REQUIRED_CHECKS_JSON parsing
# ---------------------------------------------------------------------------
def test_required_checks_env_prefers_json_over_legacy():
doc = {
"jobs": {
"audit": {
"steps": [
{
"env": {
"REQUIRED_CHECKS_JSON": json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
),
"REQUIRED_CHECKS": "ctx-legacy\nctx-old",
}
}
]
}
}
}
assert drift.required_checks_env(doc, "main") == {"ctx-a"}
assert drift.required_checks_env(doc, "staging") == {"ctx-b"}
def test_required_checks_from_env_parses_json():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
)
assert drift.required_checks_from_env("main") == {"ctx-a"}
assert drift.required_checks_from_env("staging") == {"ctx-b"}
def test_required_checks_env_falls_back_to_legacy():
doc = _make_audit_doc(["legacy-ctx"])
assert drift.required_checks_env(doc, "main") == {"legacy-ctx"}
def test_required_checks_env_json_missing_branch_fails():
doc = _make_audit_doc_json({"staging": ["ctx-b"]})
def test_required_checks_from_env_missing_branch_fails():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps({"staging": ["ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_malformed_fails():
doc = {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": "not-json"}}
]
}
}
}
try:
drift.required_checks_env(doc, "main")
drift.required_checks_from_env("main")
except SystemExit as exc:
assert exc.code == 3
else:
@@ -176,16 +119,18 @@ def test_detect_drift_no_needs_sentinel_skips_f1():
"all-required": {},
}
)
audit = _make_audit_doc(
[
"CI / all-required (pull_request)",
"Secret scan / Scan diff for credential-shaped strings (pull_request)",
]
)
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "load_yaml", return_value=ci):
with patch.object(drift, "api", return_value=(200, SAMPLE_PROTECTION)):
findings, debug = drift.detect_drift("main")
with patch.object(
drift,
"required_checks_from_env",
return_value={
"CI / all-required (pull_request)",
"Secret scan / Scan diff for credential-shaped strings (pull_request)",
},
):
findings, debug = drift.detect_drift("main")
assert findings == []
assert debug["sentinel_needs"] == []
@@ -199,11 +144,15 @@ def test_detect_drift_typo_in_needs_triggers_f1b():
"all-required": {"needs": ["platfom-build"]}, # typo
}
)
audit = _make_audit_doc(["CI / all-required (pull_request)"])
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "load_yaml", return_value=ci):
with patch.object(drift, "api", return_value=(200, SAMPLE_PROTECTION)):
findings, _ = drift.detect_drift("main")
with patch.object(
drift,
"required_checks_from_env",
return_value={"CI / all-required (pull_request)"},
):
findings, _ = drift.detect_drift("main")
assert any("F1b" in f for f in findings)
assert any("platfom-build" in f for f in findings)
@@ -218,11 +167,15 @@ def test_detect_drift_missing_job_in_needs_triggers_f1():
"all-required": {"needs": ["platform-build"]},
}
)
audit = _make_audit_doc(["CI / all-required (pull_request)"])
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "load_yaml", return_value=ci):
with patch.object(drift, "api", return_value=(200, SAMPLE_PROTECTION)):
findings, _ = drift.detect_drift("main")
with patch.object(
drift,
"required_checks_from_env",
return_value={"CI / all-required (pull_request)"},
):
findings, _ = drift.detect_drift("main")
assert any("F1 —" in f for f in findings)
assert any("canvas-build" in f for f in findings)
@@ -238,10 +191,59 @@ def test_detect_drift_no_f1_when_needs_empty_even_with_jobs():
"all-required": {"needs": []},
}
)
audit = _make_audit_doc(["CI / all-required (pull_request)"])
with patch.object(drift, "load_yaml", side_effect=[ci, audit]):
with patch.object(drift, "load_yaml", return_value=ci):
with patch.object(drift, "api", return_value=(200, SAMPLE_PROTECTION)):
findings, _ = drift.detect_drift("main")
with patch.object(
drift,
"required_checks_from_env",
return_value={"CI / all-required (pull_request)"},
):
findings, _ = drift.detect_drift("main")
assert not any("F1 —" in f for f in findings)
def test_required_checks_from_env_rejects_non_string_item():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", 123]}
)
with pytest.raises(SystemExit) as exc_info:
drift.required_checks_from_env("main")
assert exc_info.value.code == 3
def test_required_checks_from_env_rejects_bool_item():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", True]}
)
with pytest.raises(SystemExit) as exc_info:
drift.required_checks_from_env("main")
assert exc_info.value.code == 3
def test_required_checks_from_env_rejects_null_item():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", None]}
)
with pytest.raises(SystemExit) as exc_info:
drift.required_checks_from_env("main")
assert exc_info.value.code == 3
def test_required_checks_from_env_rejects_whitespace_only_item():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", " "]}
)
with pytest.raises(SystemExit) as exc_info:
drift.required_checks_from_env("main")
assert exc_info.value.code == 3
def test_required_checks_from_env_rejects_duplicate_item():
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", "ctx-b", "ctx-a"]}
)
with pytest.raises(SystemExit) as exc_info:
drift.required_checks_from_env("main")
assert exc_info.value.code == 3
+127 -2
View File
@@ -1,4 +1,5 @@
import importlib.util
import json
import sys
from pathlib import Path
@@ -125,6 +126,7 @@ def _ready_kwargs(**overrides):
"statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}],
},
required_contexts=["CI / all-required (pull_request)"],
push_required_contexts=["CI / all-required (push)"],
required_approvals=2,
approvers={"agent-reviewer-cr2", "agent-researcher"},
request_changes=[],
@@ -312,7 +314,9 @@ def test_non_required_red_does_not_block_merge():
{"context": "Staging SaaS / e2e (pull_request)", "status": "failure"},
],
}
decision = mq.evaluate_merge_readiness(**_ready_kwargs(pr_status=pr_status))
decision = mq.evaluate_merge_readiness(
**_ready_kwargs(pr_status=pr_status, push_required_contexts=["CI / all-required (push)"])
)
assert decision.ready is True
assert decision.action == "merge"
assert decision.force is True
@@ -325,7 +329,9 @@ def test_failing_required_context_blocks_even_with_approvals():
{"context": "CI / all-required (pull_request)", "status": "failure"},
],
}
decision = mq.evaluate_merge_readiness(**_ready_kwargs(pr_status=pr_status))
decision = mq.evaluate_merge_readiness(
**_ready_kwargs(pr_status=pr_status, push_required_contexts=["CI / all-required (push)"])
)
assert decision.ready is False
assert decision.action == "wait"
assert decision.force is False
@@ -407,6 +413,11 @@ def test_process_once_holds_pr_on_permanent_merge_error(monkeypatch):
required_approvals=2,
block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
@@ -474,6 +485,11 @@ def _fully_ready_process_once_monkeypatch(monkeypatch, mergeable, calls):
required_approvals=2,
block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
@@ -710,6 +726,9 @@ def test_status_fetch_failure_is_fail_closed(monkeypatch):
required_contexts=["CI / all-required (pull_request)"],
required_approvals=2, block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
@@ -879,6 +898,11 @@ def _stale_pr_update_409_monkeypatch(monkeypatch, queued_issues, calls):
required_approvals=2,
block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
@@ -1148,6 +1172,9 @@ def _wire_ready_process_once(monkeypatch, *, issues, pr_payload, calls):
required_contexts=["CI / all-required (pull_request)"],
required_approvals=2, block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
@@ -1332,6 +1359,9 @@ def _wire_multi_candidate_process_once(monkeypatch, *, issues, pulls, reviews, c
required_contexts=["CI / all-required (pull_request)"],
required_approvals=2, block_on_rejected_reviews=True,
))
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "required_checks_from_env", lambda branch: ["CI / all-required (pull_request)"])
monkeypatch.setattr(mq, "get_branch_head", lambda branch: MAIN_SHA)
def fake_combined(sha):
@@ -1563,3 +1593,98 @@ def test_process_once_defensive_skip_when_pull_payload_opted_out(monkeypatch):
assert rc == 0
assert calls["merged"] is None
# ---------------------------------------------------------------------------
# required_checks_from_env — REQUIRED_CHECKS_JSON parsing (fail-closed)
# ---------------------------------------------------------------------------
def test_required_checks_from_env_parses_json():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
)
assert mq.required_checks_from_env("main") == ["ctx-a"]
assert mq.required_checks_from_env("staging") == ["ctx-b"]
def test_required_checks_from_env_rejects_non_string_item():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", 123]}
)
with pytest.raises(SystemExit) as exc_info:
mq.required_checks_from_env("main")
assert exc_info.value.code == 2
def test_required_checks_from_env_rejects_bool_item():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", True]}
)
with pytest.raises(SystemExit) as exc_info:
mq.required_checks_from_env("main")
assert exc_info.value.code == 2
def test_required_checks_from_env_rejects_null_item():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", None]}
)
with pytest.raises(SystemExit) as exc_info:
mq.required_checks_from_env("main")
assert exc_info.value.code == 2
def test_required_checks_from_env_rejects_whitespace_only_item():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", " "]}
)
with pytest.raises(SystemExit) as exc_info:
mq.required_checks_from_env("main")
assert exc_info.value.code == 2
def test_required_checks_from_env_rejects_duplicate_item():
import os
os.environ["REQUIRED_CHECKS_JSON"] = json.dumps(
{"main": ["ctx-a", "ctx-b", "ctx-a"]}
)
with pytest.raises(SystemExit) as exc_info:
mq.required_checks_from_env("main")
assert exc_info.value.code == 2
def test_process_once_holds_when_json_drifts_from_branch_protection(monkeypatch):
"""REQUIRED_CHECKS_JSON missing a BP-required context → HOLD (no merge).
Regression for CR2 review on #2401: if JSON omits a context that BP
requires, the queue must not merge (force_merge could bypass it).
"""
merged = {"called": False}
monkeypatch.setattr(mq, "WATCH_BRANCH", "main")
monkeypatch.setattr(
mq,
"get_branch_protection",
lambda branch: mq.BranchProtection(
required_contexts=[
"CI / all-required (pull_request)",
"qa-review / approved (pull_request)",
],
required_approvals=2,
block_on_rejected_reviews=True,
),
)
monkeypatch.setattr(
mq,
"required_checks_from_env",
lambda branch: ["CI / all-required (pull_request)"],
)
monkeypatch.setattr(mq, "merge_pull", lambda *a, **k: merged.__setitem__("called", True))
rc = mq.process_once(dry_run=False)
assert rc == 0
assert merged["called"] is False
+15 -3
View File
@@ -101,11 +101,23 @@ jobs:
# currently treats `all-required` as the source of "what
# the sentinel claims to require").
SENTINEL_JOB: 'all-required'
# Path to the audit workflow whose REQUIRED_CHECKS env we
# cross-check against protection (RFC §6).
AUDIT_WORKFLOW_PATH: '.gitea/workflows/audit-force-merge.yml'
# Path to the CI workflow with the sentinel + the jobs.
CI_WORKFLOW_PATH: '.gitea/workflows/ci.yml'
# Authoritative required-checks list (JSON dict keyed by branch).
# MUST match branch protection status_check_contexts for each branch.
# If missing/empty/unparseable, ci-required-drift.py fails closed.
REQUIRED_CHECKS_JSON: |
{
"main": [
"CI / all-required (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)"
],
"staging": [
"CI / all-required (pull_request)",
"sop-checklist / all-items-acked (pull_request)"
]
}
# Issue label applied on file/update. `tier:high` exists in
# the molecule-core label set (verified 2026-05-11, label id 9).
DRIFT_LABEL: 'tier:high'
+14 -17
View File
@@ -70,22 +70,19 @@ jobs:
# CURRENT head sha (not stale/dismissed). The required_approvals count
# itself is read from branch protection at runtime.
REVIEWER_SET: agent-reviewer,agent-researcher,agent-reviewer-cr2
# NOTE: REQUIRED_CONTEXTS is no longer the authoritative PR gate. The
# queue now reads the required status contexts from BRANCH PROTECTION
# (status_check_contexts) so non-required governance reds (qa-review,
# security-review, sop-tier, sop-checklist when not branch-required,
# E2E Chat, Staging SaaS, ci-arm64-advisory) cannot block a merge.
# If branch protection cannot be enumerated the queue HOLDS
# (fail-closed). REQUIRED_APPROVALS below is only a fallback used when
# branch protection does not specify required_approvals.
# Authoritative required-checks list (JSON dict keyed by branch).
# The queue parses this env var to determine which checks are required
# for readiness evaluation. If missing/empty/unparseable, the queue
# fails closed (exits non-zero). Push contexts are derived automatically
# by replacing (pull_request) with (push).
REQUIRED_CHECKS_JSON: |
{
"main": [
"CI / all-required (pull_request)",
"E2E API Smoke Test / E2E API Smoke Test (pull_request)",
"Handlers Postgres Integration / Handlers Postgres Integration (pull_request)"
]
}
# Fallback required approvals when branch protection does not specify one.
REQUIRED_APPROVALS: "2"
# Push-side required contexts. Checking CI / all-required (push)
# explicitly instead of the combined state avoids false-pause when
# non-blocking jobs (continue-on-error: true) have failed — those
# failures pollute combined state but do not gate merges.
# NOTE: the event-suffixed context name is intentional — branch protection
# MUST require `CI / all-required (pull_request)` (with suffix), NOT the
# bare `CI / all-required`. Gitea treats absent contexts as pending, not
# skipped; requiring the bare name silently blocks all merges (issue #1473).
PUSH_REQUIRED_CONTEXTS: CI / all-required (push)
run: python3 .gitea/scripts/gitea-merge-queue.py