fix(merge-control): harden ci-required-drift.py + gitea-merge-queue.py #2399

Merged
devops-engineer merged 1 commits from fix/merge-control-script-hardening into main 2026-06-07 23:04:33 +00:00
4 changed files with 290 additions and 1 deletions
+27 -1
View File
@@ -317,7 +317,33 @@ def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
return {str(item).strip() for item in branch_checks if str(item).strip()}
# Fail-closed validation: every entry must be a non-empty string.
# Reject null, int, dict, or empty/whitespace strings silently —
# they indicate a malformed manifest that drift-detect must not
# normalize away (that would hide config errors).
validated: set[str] = set()
for idx, item in enumerate(branch_checks):
if not isinstance(item, str):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
f"{type(item).__name__} (value={item!r}), expected str\n"
)
sys.exit(3)
stripped = item.strip()
if not stripped:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'][{idx}] is "
f"empty/whitespace string\n"
)
sys.exit(3)
if stripped in validated:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] contains "
f"duplicate context '{stripped}' at index {idx}\n"
)
sys.exit(3)
validated.add(stripped)
return validated
# Legacy variant fallback.
if found_legacy:
+110
View File
@@ -1166,12 +1166,122 @@ def _evaluate_candidate(
return decision, ctx
@dataclasses.dataclass(frozen=True)
class ReadinessEntry:
"""One candidate's readiness state."""
pr_number: int
decision: MergeDecision | None
reason: str
def enumerate_readiness(*, dry_run: bool = False) -> list[ReadinessEntry]:
"""Evaluate ALL candidates and return their readiness states.
Fail-closed: if branch protection cannot be fetched, raise
BranchProtectionUnavailable (caller must handle). Unlike
process_once, this does NOT stop at the first actionable candidate;
it evaluates every eligible PR and returns the full list so a
post-batch summary can be printed.
"""
bp = get_branch_protection(WATCH_BRANCH)
contexts = bp.required_contexts
required_approvals = bp.required_approvals
main_sha = get_branch_head(WATCH_BRANCH)
main_status = get_combined_status(main_sha)
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
candidates = choose_candidate_issues(
list_candidate_issues(auto_discover=AUTO_DISCOVER),
queue_label=QUEUE_LABEL,
opt_out_labels=OPT_OUT_LABELS,
auto_discover=AUTO_DISCOVER,
)
entries: list[ReadinessEntry] = []
for issue in candidates:
pr_number = int(issue["number"])
try:
decision, ctx = _evaluate_candidate(
issue,
main_sha=main_sha,
main_status=main_status,
required_contexts=contexts,
required_approvals=required_approvals,
dry_run=dry_run,
)
except ApiError as exc:
# Fail-closed per candidate: an unreadable PR is recorded as
# unverifiable, not skipped silently.
entries.append(
ReadinessEntry(
pr_number=pr_number,
decision=None,
reason=f"unverifiable (API error: {exc})",
)
)
continue
if decision is None:
entries.append(
ReadinessEntry(
pr_number=pr_number,
decision=None,
reason="not merge-eligible (opt-out/draft/fork/wrong-base)",
)
)
continue
entries.append(
ReadinessEntry(
pr_number=pr_number,
decision=decision,
reason=decision.reason,
)
)
return entries
def print_post_batch_summary(entries: list[ReadinessEntry]) -> None:
"""Print a structured summary of all candidates' readiness.
Emits ::notice:: lines for machine parsing and a human-readable
block for operator visibility.
"""
ready = [e for e in entries if e.decision and e.decision.ready]
waiting = [e for e in entries if e.decision and not e.decision.ready]
ineligible = [e for e in entries if e.decision is None]
print("::group::merge-queue readiness summary")
print(f"total_candidates={len(entries)}")
print(f"ready={len(ready)}")
print(f"waiting={len(waiting)}")
print(f"ineligible/unverifiable={len(ineligible)}")
print("")
for e in entries:
state = "ready" if e.decision and e.decision.ready else (
"waiting" if e.decision else "ineligible"
)
action = e.decision.action if e.decision else "n/a"
print(f"PR #{e.pr_number}: state={state} action={action} reason={e.reason}")
print("::endgroup::")
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument(
"--enumerate",
action="store_true",
help="Evaluate all candidates and print a readiness summary without merging.",
)
args = parser.parse_args()
_require_runtime_env()
try:
if args.enumerate:
entries = enumerate_readiness(dry_run=args.dry_run)
print_post_batch_summary(entries)
return 0
return process_once(dry_run=args.dry_run)
except ApiError as exc:
# FAIL-CLOSED: API errors are not "transient success" — they mean
@@ -107,6 +107,36 @@ def test_required_checks_env_json_malformed_fails():
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_non_string_item_fails():
doc = _make_audit_doc_json({"main": ["ctx-a", 123, "ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_empty_string_item_fails():
doc = _make_audit_doc_json({"main": ["ctx-a", " ", "ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_duplicate_context_fails():
doc = _make_audit_doc_json({"main": ["ctx-a", "ctx-b", "ctx-a"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
# ---------------------------------------------------------------------------
# sentinel_needs
# ---------------------------------------------------------------------------
@@ -1563,3 +1563,126 @@ def test_process_once_defensive_skip_when_pull_payload_opted_out(monkeypatch):
assert rc == 0
assert calls["merged"] is None
# ---------------------------------------------------------------------------
# readiness-enumeration + post-batch summary
# ---------------------------------------------------------------------------
def test_enumerate_readiness_evaluates_all_candidates(monkeypatch):
"""enumerate_readiness returns every candidate's state, not stopping at
the first actionable one."""
old_head, new_head = "a" * 40, "c" * 40
_wire_multi_candidate_process_once(
monkeypatch,
issues=[
_issue(500, labels=[], created="2026-06-01T01:00:00Z"),
_issue(501, labels=[], created="2026-06-01T02:00:00Z"),
],
pulls={
500: {"state": "open", "mergeable": False, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": old_head, "repo_id": 1}, "labels": []},
501: {"state": "open", "mergeable": True, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": new_head, "repo_id": 1}, "labels": []},
},
reviews={500: _two_approvals(old_head), 501: _two_approvals(new_head)},
calls={},
)
entries = mq.enumerate_readiness(dry_run=False)
assert len(entries) == 2
by_num = {e.pr_number: e for e in entries}
assert by_num[500].decision is not None
assert by_num[500].decision.ready is False
assert by_num[501].decision is not None
assert by_num[501].decision.ready is True
def test_enumerate_readiness_includes_ineligible_pr(monkeypatch):
"""enumerate_readiness marks fork / wrong-base PRs as ineligible
(decision=None) while still evaluating the rest."""
head = "a" * 40
_wire_multi_candidate_process_once(
monkeypatch,
issues=[
_issue(600, labels=[], created="2026-06-01T01:00:00Z"),
_issue(601, labels=[], created="2026-06-01T02:00:00Z"),
],
pulls={
600: {"state": "open", "mergeable": True, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": head, "repo_id": 2}, "labels": []}, # fork
601: {"state": "open", "mergeable": True, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": head, "repo_id": 1}, "labels": []},
},
reviews={600: _two_approvals(head), 601: _two_approvals(head)},
calls={},
)
entries = mq.enumerate_readiness(dry_run=False)
by_num = {e.pr_number: e for e in entries}
assert by_num[600].decision is None
assert "not merge-eligible" in by_num[600].reason
assert by_num[601].decision is not None
assert by_num[601].decision.ready is True
def test_enumerate_readiness_fail_closed_on_api_error(monkeypatch):
"""If get_pull raises for one candidate, that candidate is recorded as
unverifiable; other candidates are still evaluated."""
head = "a" * 40
_wire_multi_candidate_process_once(
monkeypatch,
issues=[
_issue(700, labels=[], created="2026-06-01T01:00:00Z"),
_issue(701, labels=[], created="2026-06-01T02:00:00Z"),
],
pulls={
700: {"state": "open", "mergeable": True, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": head, "repo_id": 1}, "labels": []},
701: {"state": "open", "mergeable": True, "draft": False,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": head, "repo_id": 1}, "labels": []},
},
reviews={700: _two_approvals(head), 701: _two_approvals(head)},
calls={},
)
original_get_pull = mq.get_pull
def failing_get_pull(n):
if n == 700:
raise mq.ApiError("simulated API failure")
return original_get_pull(n)
monkeypatch.setattr(mq, "get_pull", failing_get_pull)
entries = mq.enumerate_readiness(dry_run=False)
by_num = {e.pr_number: e for e in entries}
assert by_num[700].decision is None
assert "unverifiable" in by_num[700].reason
assert by_num[701].decision is not None
assert by_num[701].decision.ready is True
def test_print_post_batch_summary_counts_correctly(capsys):
entries = [
mq.ReadinessEntry(pr_number=1, decision=mq.MergeDecision(True, "merge", "ready"), reason="ready"),
mq.ReadinessEntry(pr_number=2, decision=mq.MergeDecision(False, "wait", "CI red"), reason="CI red"),
mq.ReadinessEntry(pr_number=3, decision=None, reason="draft"),
]
mq.print_post_batch_summary(entries)
captured = capsys.readouterr()
out = captured.out
assert "total_candidates=3" in out
assert "ready=1" in out
assert "waiting=1" in out
assert "ineligible/unverifiable=1" in out
assert "PR #1: state=ready" in out
assert "PR #2: state=waiting" in out
assert "PR #3: state=ineligible" in out