fix(gate): SEV-1 fail-closed approval-validator (SEV-1 internal#812) #2440
@@ -0,0 +1,247 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
SSOT fail-closed approval validator (SEV-1 internal#812).
|
||||
|
||||
This module is the SINGLE source of truth for whether a Gitea review counts
|
||||
as a "genuine" approval. Both consumers must call into it — they MUST NOT
|
||||
duplicate the predicate:
|
||||
|
||||
- .gitea/scripts/gitea-merge-queue.py (Python) — imports directly.
|
||||
- .gitea/scripts/review-check.sh (bash, jq) — calls the Python helper
|
||||
at .gitea/scripts/_review_check_filter.py, which in turn calls this
|
||||
module. There is no separate jq / bash copy of the predicate; a
|
||||
reviewer who wants to weaken the gate has to weaken this one file.
|
||||
|
||||
# The fail-closed contract
|
||||
|
||||
A review counts as a GENUINE APPROVED on the current head ONLY IF ALL hold:
|
||||
|
||||
1. state == "APPROVED"
|
||||
2. official == true
|
||||
3. dismissed != true
|
||||
4. stale != true
|
||||
5. commit_id is present and equals the PR's current head SHA
|
||||
|
||||
ANY failure of any of the above → REJECT.
|
||||
|
||||
# The bug this fixes
|
||||
|
||||
The previous gitea-merge-queue.py predicate had a `if isinstance(commit_id,
|
||||
str) and commit_id and headsha:` guard that *skipped* the commit_id check
|
||||
when the review carried no commit_id. The previous review-check.sh jq
|
||||
filter required `commit_id == $head`, which is also implicitly fail-closed
|
||||
on missing commit_id (null != head), but only one of the two consumers
|
||||
behaved correctly — a code-drift trap.
|
||||
|
||||
Both behaviors are now defined here, as a single fail-closed predicate.
|
||||
A MISSING commit_id is the Gitea row signature of a spoofed or pre-commit
|
||||
review: a real reviewer cannot have submitted against a commit that
|
||||
doesn't exist. Accepting these is exactly the fail-open that SEV-1
|
||||
internal#812 describes and the re-opened path that closed #843 (with CR2
|
||||
+ Researcher both flagging it) addresses.
|
||||
|
||||
# Mutation-resistance
|
||||
|
||||
The unit tests in tests/test_approval_validator.py assert rejection
|
||||
explicitly for each fail-closed case (missing commit_id, stale head,
|
||||
non-official, dismissed, etc.). A reviewer who tries to weaken the
|
||||
predicate by removing the commit_id check, by re-introducing the
|
||||
"no commit_id is accepted" escape hatch, or by changing `!=` to `==`
|
||||
in the head comparison will trip those tests in CI.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Iterable, Optional, Tuple
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Canonical Gitea review-state enum (EXACT match -- no case coercion).
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# Gitea's reviews API emits review.state as one of a fixed set of
|
||||
# UPPERCASE string constants: "APPROVED", "REQUEST_CHANGES",
|
||||
# "REQUEST_REVIEW", "COMMENT", "PENDING", "DISMISSED" (verified
|
||||
# against the live API across real molecule-core PRs). They are ALWAYS
|
||||
# uppercase on the wire.
|
||||
#
|
||||
# FAIL-CLOSED: we compare review.state to these constants with EXACT
|
||||
# equality. The previous code used str(state or "").upper(), which
|
||||
# coerced a lowercase/mixed-case "approved" or "request_changes" into
|
||||
# the canonical value and ACCEPTED it. A real Gitea row never carries a
|
||||
# lowercase state, so a case-variant value is the signature of a
|
||||
# hand-forged / spoofed row, not a legitimate review. Coercing it was a
|
||||
# residual fail-open (SEV-1 internal#812, RCs 9849/9851/9852). We reject
|
||||
# anything that is not byte-for-byte the canonical constant.
|
||||
STATE_APPROVED = "APPROVED"
|
||||
STATE_REQUEST_CHANGES = "REQUEST_CHANGES"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared predicate — fail-closed on every condition
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def is_official_current_head(review: object, headsha: object) -> bool:
|
||||
"""Common predicate: review is official, not dismissed, not stale, and
|
||||
bound to the PR's current head SHA. EVERY condition is mandatory and
|
||||
fail-closed. Both is_genuine_approval and is_open_request_changes build
|
||||
on this so the rule cannot drift between the two cases.
|
||||
|
||||
`official` is checked with `is not True` (NOT `not review.get("official")`).
|
||||
The latter is truthy on the string "false" or the integer 1, which is
|
||||
exactly the fail-open surface we are closing here — a non-boolean
|
||||
pass-through is treated as official. Gitea emits a real boolean, so
|
||||
the stricter check rejects anything that isn't literally True.
|
||||
"""
|
||||
if not isinstance(review, dict):
|
||||
return False
|
||||
if review.get("official") is not True:
|
||||
return False
|
||||
if review.get("dismissed"):
|
||||
return False
|
||||
if review.get("stale"):
|
||||
return False
|
||||
commit_id = review.get("commit_id")
|
||||
# FAIL-CLOSED: a missing/empty/non-string commit_id is REJECTED. The
|
||||
# previous code had `if isinstance(commit_id, str) and commit_id and
|
||||
# headsha:` which SKIPPED the check when the review carried no
|
||||
# commit_id. That was the spoof-bug surface.
|
||||
if not isinstance(commit_id, str) or not commit_id:
|
||||
return False
|
||||
# FAIL-CLOSED: a present-but-wrong commit_id is also REJECTED. Stale
|
||||
# reviews (on a previous head) cannot count.
|
||||
if not isinstance(headsha, str) or not headsha or commit_id != headsha:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-verdict predicates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def is_genuine_approval(
|
||||
review: object,
|
||||
*,
|
||||
headsha: str,
|
||||
reviewer_set: Optional[Iterable[str]] = None,
|
||||
) -> bool:
|
||||
"""Return True iff `review` is a genuine APPROVED on the current head.
|
||||
|
||||
When `reviewer_set` is provided, the review's `user.login` must be in
|
||||
the set (the merge-queue uses this to count only "recognised"
|
||||
reviewers for the 2-genuine floor; review-check.sh applies its own
|
||||
team-membership probe separately and so does not pass a set).
|
||||
"""
|
||||
if not isinstance(review, dict):
|
||||
return False
|
||||
# EXACT-ENUM (fail-closed): no .upper()/.strip() coercion. A
|
||||
# case-variant or whitespace-padded state is a forged row and is
|
||||
# rejected, not normalised into APPROVED.
|
||||
if review.get("state") != STATE_APPROVED:
|
||||
return False
|
||||
if not is_official_current_head(review, headsha):
|
||||
return False
|
||||
if reviewer_set is not None:
|
||||
user = (review.get("user") or {}).get("login")
|
||||
if not isinstance(user, str) or user not in set(reviewer_set):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_open_request_changes(review: object, *, headsha: str) -> bool:
|
||||
"""Return True iff `review` is an open official REQUEST_CHANGES on the
|
||||
current head. Same fail-closed contract as is_genuine_approval —
|
||||
a missing commit_id is REJECTED, not silently treated as 'still
|
||||
blocking the merge from an old head'.
|
||||
"""
|
||||
if not isinstance(review, dict):
|
||||
return False
|
||||
# EXACT-ENUM (fail-closed): same contract as is_genuine_approval. A
|
||||
# lowercase/mixed-case "request_changes" must NOT be coerced into a
|
||||
# block-erasing match; an exact REQUEST_CHANGES is required.
|
||||
if review.get("state") != STATE_REQUEST_CHANGES:
|
||||
return False
|
||||
if not is_official_current_head(review, headsha):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Consumer-facing reducer (returns the two call sites need)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def classify_reviews(
|
||||
reviews: Iterable[object],
|
||||
*,
|
||||
headsha: str,
|
||||
reviewer_set: Optional[Iterable[str]] = None,
|
||||
) -> Tuple[set[str], list[str]]:
|
||||
"""Reduce a PR's reviews to (approvers, request_changes) on the CURRENT head.
|
||||
|
||||
approvers: distinct logins whose LATEST official review on the current
|
||||
head is APPROVED.
|
||||
request_changes: distinct logins whose LATEST official review on the
|
||||
current head is REQUEST_CHANGES.
|
||||
|
||||
Gitea returns reviews oldest-first. We keep the latest *VALID*
|
||||
submission per user (later VALID entries overwrite earlier ones; an
|
||||
invalid later row — a COMMENT, or a review with a null/old commit_id —
|
||||
is ignored and can NOT overwrite or erase a genuine review). See the
|
||||
inline VALIDATE-BEFORE-REDUCE note below for the exploit this closes.
|
||||
"""
|
||||
reviewer_set_set = set(reviewer_set) if reviewer_set is not None else None
|
||||
|
||||
# VALIDATE-BEFORE-REDUCE (SEV-1 internal#812 follow-up).
|
||||
#
|
||||
# The earlier implementation reduced FIRST (latest row per user, keyed
|
||||
# only on state in {APPROVED, REQUEST_CHANGES}) and validated the single
|
||||
# surviving row AFTER. That is reduce-before-validate, and it is
|
||||
# exploitable: a user posts a genuine current-head APPROVED (or
|
||||
# REQUEST_CHANGES), then posts a LATER row that fails the fail-closed
|
||||
# predicate (a COMMENT, or an APPROVED with a null/old commit_id). The
|
||||
# later INVALID row overwrote the genuine one in latest_by_user, so a
|
||||
# real approval was masked, and — worse — a real current-head
|
||||
# REQUEST_CHANGES could be erased and the block silently evaporate.
|
||||
#
|
||||
# The fix: filter to VALID reviews FIRST (each row must pass
|
||||
# is_official_current_head AND carry an APPROVED/REQUEST_CHANGES state),
|
||||
# and only then reduce to the latest VALID review per user. An invalid
|
||||
# later row is never eligible to become a user's "latest" state, so it
|
||||
# cannot overwrite or erase a genuine review. A user's verdict is the
|
||||
# state of their latest VALID (official, current-head, non-dismissed,
|
||||
# non-stale, commit_id-present-and-matching) review.
|
||||
latest_valid_by_user: dict = {}
|
||||
for review in reviews:
|
||||
if not isinstance(review, dict):
|
||||
continue
|
||||
user = (review.get("user") or {}).get("login")
|
||||
if not isinstance(user, str):
|
||||
continue
|
||||
if reviewer_set_set is not None and user not in reviewer_set_set:
|
||||
continue
|
||||
# EXACT-ENUM (fail-closed): exact constants only, no coercion. A
|
||||
# case-coerced row must not become eligible to overwrite/erase a
|
||||
# genuine per-user verdict in the reduce below.
|
||||
state = review.get("state")
|
||||
if state not in (STATE_APPROVED, STATE_REQUEST_CHANGES):
|
||||
continue
|
||||
# Fail-closed predicate BEFORE the reduce: official, not dismissed,
|
||||
# not stale, commit_id present AND == head. Invalid rows are dropped
|
||||
# here and so can never become the per-user "latest".
|
||||
if not is_official_current_head(review, headsha):
|
||||
continue
|
||||
latest_valid_by_user[user] = review
|
||||
|
||||
approvers: set[str] = set()
|
||||
request_changes: list[str] = []
|
||||
for user, review in latest_valid_by_user.items():
|
||||
# Each surviving review already passed is_official_current_head, so
|
||||
# the state alone determines the verdict. We still go through the
|
||||
# per-verdict SSOT predicates so the rule cannot drift.
|
||||
if is_genuine_approval(review, headsha=headsha, reviewer_set=None):
|
||||
approvers.add(user)
|
||||
elif is_open_request_changes(review, headsha=headsha):
|
||||
request_changes.append(user)
|
||||
return approvers, request_changes
|
||||
@@ -0,0 +1,74 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Helper for review-check.sh: applies the SSOT approval predicate to a
|
||||
PR's reviews and prints the candidate approver logins on stdout (one per
|
||||
line, de-duplicated, author excluded).
|
||||
|
||||
review-check.sh uses this in place of its previous inline jq filter so the
|
||||
predicate is single-sourced. The jq filter is gone; if you want to change
|
||||
the predicate, edit .gitea/scripts/_approval_validator.py, not this file.
|
||||
|
||||
Usage:
|
||||
python3 _review_check_filter.py <reviews.json> <head-sha> <author-login>
|
||||
|
||||
Output:
|
||||
- Candidate approver logins, one per line, de-duplicated, sorted.
|
||||
- Excludes `author-login` (the PR author cannot approve their own PR).
|
||||
- Empty output → review-check.sh interprets as "no candidates" and exits 1
|
||||
after the team-membership probe.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Same-dir import — script lives next to _approval_validator.py
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
from _approval_validator import is_genuine_approval # noqa: E402
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
if len(argv) != 4:
|
||||
print(
|
||||
f"usage: {argv[0] if argv else '_review_check_filter.py'} "
|
||||
"<reviews.json> <head-sha> <author-login>",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 2
|
||||
reviews_path = Path(argv[1])
|
||||
headsha = argv[2]
|
||||
author = argv[3]
|
||||
|
||||
try:
|
||||
reviews = json.loads(reviews_path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
print(f"::error::could not read reviews JSON: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
if not isinstance(reviews, list):
|
||||
print("::error::reviews JSON was not a list", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
candidates: set[str] = set()
|
||||
for review in reviews:
|
||||
# We pass reviewer_set=None here because review-check.sh applies its
|
||||
# own team-membership probe (CURL_AUTH_FILE + 200/204/403/404 logic)
|
||||
# separately. The SSOT predicate enforces only the fail-closed
|
||||
# commit_id / state / official / dismissed / stale contract here.
|
||||
if not is_genuine_approval(review, headsha=headsha, reviewer_set=None):
|
||||
continue
|
||||
user = (review.get("user") or {}).get("login")
|
||||
if not isinstance(user, str) or not user:
|
||||
continue
|
||||
if user == author:
|
||||
continue
|
||||
candidates.add(user)
|
||||
|
||||
for user in sorted(candidates):
|
||||
print(user)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv))
|
||||
@@ -105,6 +105,12 @@ import urllib.parse
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
|
||||
# SSOT fail-closed approval predicate (SEV-1 internal#812). review-check.sh
|
||||
# consumes the same module via _review_check_filter.py — do NOT duplicate
|
||||
# the predicate here. See _approval_validator.py for the fail-closed contract.
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from _approval_validator import classify_reviews as _classify_reviews_ssot # noqa: E402
|
||||
|
||||
|
||||
def _env(key: str, *, default: str = "") -> str:
|
||||
return os.environ.get(key, default)
|
||||
@@ -424,57 +430,26 @@ def get_branch_protection(branch: str) -> BranchProtection:
|
||||
def genuine_approvals(
|
||||
reviews: list[dict],
|
||||
*,
|
||||
head_sha: str,
|
||||
headsha: str,
|
||||
reviewer_set: set[str],
|
||||
) -> tuple[set[str], list[str]]:
|
||||
"""Reduce a PR's reviews to genuine official approvals on the CURRENT head.
|
||||
"""Thin wrapper over the SSOT predicate in _approval_validator.py.
|
||||
|
||||
Returns (approvers, request_changes) where:
|
||||
- approvers is the set of distinct logins (in reviewer_set) whose LATEST
|
||||
review on the current head is an official, non-stale, non-dismissed
|
||||
APPROVED, and
|
||||
- request_changes is the list of logins (in reviewer_set) whose latest
|
||||
official review on the current head is REQUEST_CHANGES.
|
||||
All logic — the per-review commit_id / state / official / dismissed /
|
||||
stale contract — lives in _approval_validator.classify_reviews. This
|
||||
wrapper exists only to keep the call site (and external readers of
|
||||
the symbol) stable. Do NOT add any per-review logic here; if you need
|
||||
to change the predicate, edit _approval_validator.py.
|
||||
|
||||
"Current head" is enforced two ways, because Gitea exposes both signals:
|
||||
a review must be `official` and NOT `stale`/`dismissed`, AND when the
|
||||
review carries a commit_id it must equal head_sha. A review with no
|
||||
commit_id but stale=False/dismissed=False is accepted (older Gitea rows).
|
||||
We take each reviewer's LATEST submission (reviews arrive oldest-first), so
|
||||
a later REQUEST_CHANGES correctly supersedes an earlier APPROVED and vice
|
||||
versa.
|
||||
See _approval_validator.py for the full fail-closed contract
|
||||
(SEV-1 internal#812). The previous inline implementation had a
|
||||
`if isinstance(commit_id, str) and commit_id and headsha:` guard that
|
||||
silently accepted reviews with no commit_id; that fail-open surface is
|
||||
now closed at the SSOT.
|
||||
"""
|
||||
latest_by_user: dict[str, dict] = {}
|
||||
for review in reviews:
|
||||
if not isinstance(review, dict):
|
||||
continue
|
||||
user = (review.get("user") or {}).get("login")
|
||||
if not isinstance(user, str) or user not in reviewer_set:
|
||||
continue
|
||||
state = str(review.get("state") or "").upper()
|
||||
if state not in {"APPROVED", "REQUEST_CHANGES"}:
|
||||
continue # ignore COMMENT/PENDING/DISMISSED-state rows
|
||||
# reviews are returned oldest-first; later entries overwrite → latest wins
|
||||
latest_by_user[user] = review
|
||||
|
||||
approvers: set[str] = set()
|
||||
request_changes: list[str] = []
|
||||
for user, review in latest_by_user.items():
|
||||
if not review.get("official"):
|
||||
continue
|
||||
if review.get("stale") or review.get("dismissed"):
|
||||
continue
|
||||
commit_id = review.get("commit_id")
|
||||
if isinstance(commit_id, str) and commit_id and head_sha:
|
||||
if commit_id != head_sha:
|
||||
continue # review was on a previous head
|
||||
state = str(review.get("state") or "").upper()
|
||||
if state == "APPROVED":
|
||||
approvers.add(user)
|
||||
elif state == "REQUEST_CHANGES":
|
||||
request_changes.append(user)
|
||||
return approvers, request_changes
|
||||
|
||||
return _classify_reviews_ssot(
|
||||
reviews, headsha=headsha, reviewer_set=reviewer_set
|
||||
)
|
||||
|
||||
def get_pull_reviews(pr_number: int) -> list[dict]:
|
||||
_, body = api("GET", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/reviews")
|
||||
@@ -1147,7 +1122,7 @@ def _evaluate_candidate(
|
||||
|
||||
reviews = get_pull_reviews(pr_number)
|
||||
approvers, request_changes = genuine_approvals(
|
||||
reviews, head_sha=head_sha, reviewer_set=REVIEWER_SET
|
||||
reviews, headsha=head_sha, reviewer_set=REVIEWER_SET
|
||||
)
|
||||
|
||||
decision = evaluate_merge_readiness(
|
||||
|
||||
@@ -197,17 +197,13 @@ if [ "$HTTP_CODE" != "200" ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Filter: state=APPROVED, official=true, not-dismissed, non-author,
|
||||
# commit_id matches current PR head. All conditions are mandatory.
|
||||
JQ_FILTER='.[]
|
||||
| select(.state == "APPROVED")
|
||||
| select(.official == true)
|
||||
| select(.dismissed != true)
|
||||
| select(.user.login != $author)
|
||||
| select(.commit_id == $head)
|
||||
| .user.login'
|
||||
|
||||
REVIEW_CANDIDATES=$(jq -r --arg author "$PR_AUTHOR" --arg head "$PR_HEAD_SHA" "$JQ_FILTER" "$REVIEWS_JSON" | sort -u)
|
||||
# Filter via the SSOT fail-closed predicate in _approval_validator.py
|
||||
# (same module gitea-merge-queue.py imports). The jq filter is gone
|
||||
# entirely — any change to the predicate must be made in
|
||||
# _approval_validator.py. See SEV-1 internal#812 for the fail-closed
|
||||
# contract this closes.
|
||||
SCRIPT_DIR_HERE="$(cd "$(dirname "$0")" && pwd)"
|
||||
REVIEW_CANDIDATES=$(python3 "$SCRIPT_DIR_HERE/_review_check_filter.py" "$REVIEWS_JSON" "$PR_HEAD_SHA" "$PR_AUTHOR")
|
||||
debug "candidate non-author approvers: $(echo "$REVIEW_CANDIDATES" | tr '\n' ' ')"
|
||||
|
||||
if [ -z "$REVIEW_CANDIDATES" ]; then
|
||||
|
||||
@@ -134,6 +134,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
])
|
||||
if sc == "T23_missing_commit_id":
|
||||
# APPROVED review with NO commit_id field — the SEV-1
|
||||
# internal#812 / closed-#843 spoof-bug signature. The
|
||||
# fail-closed SSOT must REJECT (not silently accept as
|
||||
# "older Gitea row" the way the old pre-fix code did).
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "official": True, "dismissed": False, "user": {"login": "core-devops"}},
|
||||
])
|
||||
# Default: one non-author APPROVED (current head, official)
|
||||
return self._json(200, [
|
||||
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
|
||||
|
||||
@@ -0,0 +1,610 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Mutation-verified unit tests for the SSOT fail-closed approval predicate
|
||||
in _approval_validator.py (SEV-1 internal#812).
|
||||
|
||||
Each test asserts REJECTION explicitly. A reviewer who weakens the
|
||||
predicate — e.g., by removing the commit_id check, by reintroducing the
|
||||
"no commit_id is accepted" escape hatch, by changing `!=` to `==` in the
|
||||
head comparison, or by allowing official == false — will trip these
|
||||
tests in CI.
|
||||
|
||||
Run:
|
||||
cd .gitea/scripts
|
||||
python3 -m unittest tests.test_approval_validator -v
|
||||
# or
|
||||
python3 tests/test_approval_validator.py
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
|
||||
# Same-dir import — test lives next to _approval_validator.py
|
||||
sys.path.insert(
|
||||
0,
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||||
)
|
||||
from _approval_validator import ( # noqa: E402
|
||||
classify_reviews,
|
||||
is_genuine_approval,
|
||||
is_official_current_head,
|
||||
is_open_request_changes,
|
||||
)
|
||||
|
||||
HEAD = "0123456789abcdef0123456789abcdef01234567"
|
||||
OTHER_HEAD = "fedcba9876543210fedcba9876543210fedcba98"
|
||||
|
||||
|
||||
def _review(
|
||||
*,
|
||||
state: str = "APPROVED",
|
||||
official: bool = True,
|
||||
dismissed: bool = False,
|
||||
stale: bool = False,
|
||||
commit_id: object = HEAD,
|
||||
user: str = "reviewer-1",
|
||||
body: str = "",
|
||||
) -> dict:
|
||||
"""Build a minimal review row shaped like the Gitea reviews API."""
|
||||
return {
|
||||
"id": 1,
|
||||
"user": {"login": user, "id": 1},
|
||||
"body": body,
|
||||
"state": state,
|
||||
"official": official,
|
||||
"dismissed": dismissed,
|
||||
"stale": stale,
|
||||
"commit_id": commit_id,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Hard contract: every fail-closed branch must reject
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class IsOfficialCurrentHeadFailClosed(unittest.TestCase):
|
||||
"""is_official_current_head is the common predicate. EVERY condition
|
||||
is mandatory. The tests below assert REJECTION for every possible
|
||||
failure of any condition."""
|
||||
|
||||
def test_accepts_canonical_review(self):
|
||||
self.assertTrue(is_official_current_head(_review(), HEAD))
|
||||
|
||||
def test_rejects_non_dict(self):
|
||||
for bad in [None, "string", 42, [], (), object()]:
|
||||
with self.subTest(bad=bad):
|
||||
self.assertFalse(is_official_current_head(bad, HEAD))
|
||||
|
||||
def test_rejects_when_official_is_false(self):
|
||||
for v in [False, None, 0, "false"]:
|
||||
with self.subTest(v=v):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(official=v), HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_when_dismissed(self):
|
||||
for v in [True, "true", 1]:
|
||||
with self.subTest(v=v):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(dismissed=v), HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_when_stale(self):
|
||||
for v in [True, "true", 1]:
|
||||
with self.subTest(v=v):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(stale=v), HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_when_commit_id_missing(self):
|
||||
"""FAIL-CLOSED #1: missing commit_id is REJECTED.
|
||||
This is the spoof signature that closed #843 (with CR2 + Researcher
|
||||
both flagging it)."""
|
||||
for bad in [None, "", 0, False, [], {}, ()]:
|
||||
with self.subTest(commit_id=bad):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(commit_id=bad), HEAD),
|
||||
f"commit_id={bad!r} must reject (fail-closed)",
|
||||
)
|
||||
|
||||
def test_rejects_when_commit_id_wrong_type(self):
|
||||
for bad in [123, 1.5, True, ["abc"], {"sha": HEAD}, ("tuple",)]:
|
||||
with self.subTest(commit_id=bad):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(commit_id=bad), HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_when_commit_id_stale(self):
|
||||
"""FAIL-CLOSED #2: present-but-wrong commit_id is REJECTED. Stale
|
||||
reviews on a previous head cannot count."""
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(commit_id=OTHER_HEAD), HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_when_head_missing(self):
|
||||
for bad in [None, "", 0, False]:
|
||||
with self.subTest(head=bad):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(), bad)
|
||||
)
|
||||
|
||||
def test_rejects_when_head_wrong_type(self):
|
||||
self.assertFalse(is_official_current_head(_review(), 123))
|
||||
self.assertFalse(is_official_current_head(_review(), ["x"]))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_genuine_approval
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class IsGenuineApprovalContract(unittest.TestCase):
|
||||
def test_accepts_canonical_approval(self):
|
||||
self.assertTrue(
|
||||
is_genuine_approval(_review(state="APPROVED"), headsha=HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_non_approved_states(self):
|
||||
for state in ("REQUEST_CHANGES", "COMMENT", "PENDING", "DISMISSED", "approve", "", "bogus"):
|
||||
with self.subTest(state=state):
|
||||
self.assertFalse(
|
||||
is_genuine_approval(_review(state=state), headsha=HEAD)
|
||||
)
|
||||
|
||||
def test_rejects_case_coerced_approved_states(self):
|
||||
"""EXACT-ENUM fail-closed (RCs 9849/9851/9852): Gitea always emits
|
||||
the canonical UPPERCASE "APPROVED". A lowercase/mixed-case/padded
|
||||
value is the signature of a forged row and MUST be rejected, not
|
||||
coerced via .upper() into an accepted APPROVED. Each of these was
|
||||
ACCEPTED before the exact-enum fix."""
|
||||
for state in (
|
||||
"approved", "Approved", "ApProVeD", "APPROVED ", " APPROVED",
|
||||
"approved\n", "\tAPPROVED",
|
||||
):
|
||||
with self.subTest(state=state):
|
||||
self.assertFalse(
|
||||
is_genuine_approval(_review(state=state), headsha=HEAD),
|
||||
f"case-coerced/padded state {state!r} must NOT count as "
|
||||
"a genuine approval",
|
||||
)
|
||||
|
||||
def test_rejects_non_official_approval(self):
|
||||
"""Comment-based / non-official 'APPROVED' is REJECTED.
|
||||
PM: 'reject comment-based / non-official reviews'."""
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(state="APPROVED", official=False), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_dismissed_approval(self):
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(state="APPROVED", dismissed=True), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_stale_head_approval(self):
|
||||
"""commit_id != head is REJECTED. Stale-on-old-head approvals cannot
|
||||
count, even if they were official and not dismissed."""
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(state="APPROVED", commit_id=OTHER_HEAD), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_missing_commit_id_approval(self):
|
||||
"""FAIL-CLOSED #3: the SEV-1 case. A APPROVED review with NO
|
||||
commit_id is the spoof-bug signature. Reject."""
|
||||
for bad in [None, "", 0, False]:
|
||||
with self.subTest(commit_id=bad):
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(state="APPROVED", commit_id=bad), headsha=HEAD
|
||||
),
|
||||
f"missing commit_id={bad!r} must reject",
|
||||
)
|
||||
|
||||
def test_reviewer_set_filters_users(self):
|
||||
self.assertTrue(
|
||||
is_genuine_approval(
|
||||
_review(user="alice"),
|
||||
headsha=HEAD,
|
||||
reviewer_set={"alice", "bob"},
|
||||
)
|
||||
)
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(user="carol"),
|
||||
headsha=HEAD,
|
||||
reviewer_set={"alice", "bob"},
|
||||
)
|
||||
)
|
||||
|
||||
def test_reviewer_set_none_skips_check(self):
|
||||
# None means "no team filter at this layer" (e.g., review-check.sh
|
||||
# applies its own team-membership probe separately).
|
||||
self.assertTrue(
|
||||
is_genuine_approval(
|
||||
_review(user="anyone"),
|
||||
headsha=HEAD,
|
||||
reviewer_set=None,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# is_open_request_changes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class IsOpenRequestChangesContract(unittest.TestCase):
|
||||
def test_accepts_canonical_request_changes(self):
|
||||
self.assertTrue(
|
||||
is_open_request_changes(
|
||||
_review(state="REQUEST_CHANGES"), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_non_request_changes_states(self):
|
||||
for state in ("APPROVED", "COMMENT", "PENDING", "DISMISSED"):
|
||||
with self.subTest(state=state):
|
||||
self.assertFalse(
|
||||
is_open_request_changes(
|
||||
_review(state=state), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_case_coerced_request_changes_states(self):
|
||||
"""EXACT-ENUM fail-closed: a lowercase/mixed-case "request_changes"
|
||||
must NOT be coerced into an open-block match. Before the exact-enum
|
||||
fix, .upper() accepted these as REQUEST_CHANGES."""
|
||||
for state in (
|
||||
"request_changes", "Request_Changes", "REQUEST_CHANGES ",
|
||||
" REQUEST_CHANGES", "request_changes\n",
|
||||
):
|
||||
with self.subTest(state=state):
|
||||
self.assertFalse(
|
||||
is_open_request_changes(
|
||||
_review(state=state), headsha=HEAD
|
||||
),
|
||||
f"case-coerced/padded state {state!r} must NOT count as "
|
||||
"an open REQUEST_CHANGES",
|
||||
)
|
||||
|
||||
def test_rejects_when_dismissed(self):
|
||||
self.assertFalse(
|
||||
is_open_request_changes(
|
||||
_review(state="REQUEST_CHANGES", dismissed=True), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_when_stale_head(self):
|
||||
self.assertFalse(
|
||||
is_open_request_changes(
|
||||
_review(state="REQUEST_CHANGES", commit_id=OTHER_HEAD),
|
||||
headsha=HEAD,
|
||||
)
|
||||
)
|
||||
|
||||
def test_rejects_when_missing_commit_id(self):
|
||||
for bad in [None, "", 0]:
|
||||
with self.subTest(commit_id=bad):
|
||||
self.assertFalse(
|
||||
is_open_request_changes(
|
||||
_review(state="REQUEST_CHANGES", commit_id=bad),
|
||||
headsha=HEAD,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# classify_reviews — the merge-queue consumer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class ClassifyReviewsContract(unittest.TestCase):
|
||||
def test_basic_approvers_and_request_changes(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, {"alice"})
|
||||
self.assertEqual(request_changes, ["bob"])
|
||||
|
||||
def test_reviewer_set_filters_early(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="carol", state="APPROVED", commit_id=HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(
|
||||
reviews, headsha=HEAD, reviewer_set={"alice"}
|
||||
)
|
||||
self.assertEqual(approvers, {"alice"})
|
||||
|
||||
def test_latest_review_per_user_wins(self):
|
||||
# alice's REQUEST_CHANGES (latest) supersedes her earlier APPROVED.
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertNotIn("alice", approvers)
|
||||
self.assertIn("alice", request_changes)
|
||||
|
||||
def test_stale_head_approval_excluded(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=OTHER_HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, set())
|
||||
|
||||
def test_missing_commit_id_approval_excluded(self):
|
||||
"""The SEV-1 fail-open surface. APPROVED + no commit_id → must NOT
|
||||
count toward approvers, even with stale=False/dismissed=False."""
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=None),
|
||||
_review(user="bob", state="APPROVED", commit_id=""),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, set())
|
||||
|
||||
def test_dismissed_approval_excluded(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", dismissed=True, commit_id=HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, set())
|
||||
|
||||
def test_non_official_approval_excluded(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", official=False, commit_id=HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, set())
|
||||
|
||||
def test_comment_state_excluded(self):
|
||||
reviews = [
|
||||
_review(user="alice", state="COMMENT", commit_id=HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, set())
|
||||
|
||||
def test_case_coerced_approved_not_counted(self):
|
||||
"""EXACT-ENUM via the reducer: a lowercase 'approved' (otherwise
|
||||
valid official current-head row) must NOT be counted as an approver.
|
||||
Before the fix, classify_reviews coerced it via .upper()."""
|
||||
for state in ("approved", "Approved", "APPROVED "):
|
||||
with self.subTest(state=state):
|
||||
reviews = [
|
||||
_review(user="alice", state=state, commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(
|
||||
reviews, headsha=HEAD
|
||||
)
|
||||
self.assertEqual(approvers, set())
|
||||
self.assertEqual(request_changes, [])
|
||||
|
||||
def test_case_coerced_request_changes_not_silently_dropped(self):
|
||||
"""EXACT-ENUM via the reducer: a lowercase 'request_changes' must be
|
||||
rejected (not coerced into a block). Crucially, it must NOT silently
|
||||
erase a SAME-USER genuine current-head REQUEST_CHANGES posted
|
||||
earlier — the case-variant later row is invalid and is ignored, so
|
||||
the genuine block stands."""
|
||||
reviews = [
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
_review(user="bob", state="request_changes", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("bob", request_changes)
|
||||
self.assertNotIn("bob", approvers)
|
||||
|
||||
def test_stale_head_request_changes_excluded(self):
|
||||
# A REQUEST_CHANGES on a previous head must NOT block the current head.
|
||||
reviews = [
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=OTHER_HEAD),
|
||||
]
|
||||
_, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(request_changes, [])
|
||||
|
||||
# -----------------------------------------------------------------
|
||||
# VALIDATE-BEFORE-REDUCE regression tests (SEV-1 internal#812 follow-up).
|
||||
#
|
||||
# The bug: classify_reviews reduced to the LATEST row per user FIRST and
|
||||
# validated AFTER. A later INVALID row (a COMMENT, or APPROVED/
|
||||
# REQUEST_CHANGES with a null/old commit_id) from the same user could
|
||||
# overwrite a genuine current-head review — masking an approval or
|
||||
# ERASING a REQUEST_CHANGES block. The fix validates before the reduce,
|
||||
# so an invalid later row is never eligible to be a user's "latest".
|
||||
# -----------------------------------------------------------------
|
||||
|
||||
def test_genuine_approval_not_masked_by_later_comment(self):
|
||||
"""A genuine current-head APPROVED followed by a LATER COMMENT from
|
||||
the SAME user must STILL count as an approval. A later non-
|
||||
APPROVED/RC row (COMMENT) must not erase the approval. This is the
|
||||
reduce-before-validate masking bug."""
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="alice", state="COMMENT", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("alice", approvers)
|
||||
self.assertEqual(request_changes, [])
|
||||
|
||||
def test_genuine_approval_not_masked_by_later_null_commit_id(self):
|
||||
"""A genuine current-head APPROVED followed by a LATER APPROVED with
|
||||
a null commit_id (the spoof/invalid signature) from the SAME user
|
||||
must STILL count. The invalid later row must be ignored, not allowed
|
||||
to overwrite the valid earlier approval."""
|
||||
for bad in [None, ""]:
|
||||
with self.subTest(commit_id=bad):
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="alice", state="APPROVED", commit_id=bad),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn(
|
||||
"alice", approvers,
|
||||
f"later invalid commit_id={bad!r} must not mask the "
|
||||
"genuine current-head approval",
|
||||
)
|
||||
|
||||
def test_genuine_approval_not_masked_by_later_stale_commit_id(self):
|
||||
"""A genuine current-head APPROVED followed by a LATER APPROVED on a
|
||||
STALE (old) head from the SAME user must STILL count toward
|
||||
approvers — the stale later row is invalid and must be ignored."""
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="alice", state="APPROVED", commit_id=OTHER_HEAD),
|
||||
]
|
||||
approvers, _ = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("alice", approvers)
|
||||
|
||||
def test_request_changes_not_erased_by_later_comment(self):
|
||||
"""A genuine current-head REQUEST_CHANGES followed by a LATER COMMENT
|
||||
from the SAME user must STILL block. The later invalid row must not
|
||||
erase the REQUEST_CHANGES — this is the worse, silently-evaporating-
|
||||
block variant of the bug."""
|
||||
reviews = [
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
_review(user="bob", state="COMMENT", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("bob", request_changes)
|
||||
self.assertNotIn("bob", approvers)
|
||||
|
||||
def test_request_changes_not_erased_by_later_null_commit_id(self):
|
||||
"""A genuine current-head REQUEST_CHANGES followed by a LATER
|
||||
REQUEST_CHANGES with a null/old commit_id from the SAME user must
|
||||
STILL block. The invalid later row must be ignored, not allowed to
|
||||
relocate the user's verdict off the current head."""
|
||||
for bad in [None, "", OTHER_HEAD]:
|
||||
with self.subTest(commit_id=bad):
|
||||
reviews = [
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=bad),
|
||||
]
|
||||
_, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn(
|
||||
"bob", request_changes,
|
||||
f"later invalid commit_id={bad!r} must not erase the "
|
||||
"genuine current-head REQUEST_CHANGES block",
|
||||
)
|
||||
|
||||
def test_request_changes_not_erased_by_later_approved_invalid(self):
|
||||
"""A genuine current-head REQUEST_CHANGES followed by a LATER
|
||||
INVALID APPROVED (null commit_id) from the SAME user must STILL
|
||||
block AND must NOT count the user as an approver. The invalid
|
||||
approval must not flip a real block into a pass."""
|
||||
reviews = [
|
||||
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
_review(user="bob", state="APPROVED", commit_id=None),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("bob", request_changes)
|
||||
self.assertNotIn("bob", approvers)
|
||||
|
||||
def test_genuine_request_changes_still_supersedes_genuine_approval(self):
|
||||
"""Sanity: a genuine LATER current-head REQUEST_CHANGES still
|
||||
supersedes an earlier genuine APPROVED from the same user (the
|
||||
valid-row supersession we MUST preserve — only INVALID later rows
|
||||
are ignored). Guards against an over-correction that ignores all
|
||||
later rows."""
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertNotIn("alice", approvers)
|
||||
self.assertIn("alice", request_changes)
|
||||
|
||||
def test_genuine_approval_still_supersedes_genuine_request_changes(self):
|
||||
"""Sanity: a genuine LATER current-head APPROVED supersedes an
|
||||
earlier genuine REQUEST_CHANGES from the same user."""
|
||||
reviews = [
|
||||
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertIn("alice", approvers)
|
||||
self.assertEqual(request_changes, [])
|
||||
|
||||
def test_two_valid_approvers_plus_one_invalid_later_row(self):
|
||||
"""Two distinct users with valid current-head approvals + a third
|
||||
user whose ONLY genuine approval is followed by an invalid later
|
||||
row → all three real approvers are counted; the invalid later row
|
||||
does not drop the third user."""
|
||||
reviews = [
|
||||
_review(user="alice", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="bob", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="carol", state="APPROVED", commit_id=HEAD),
|
||||
_review(user="carol", state="COMMENT", commit_id=HEAD),
|
||||
]
|
||||
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
|
||||
self.assertEqual(approvers, {"alice", "bob", "carol"})
|
||||
self.assertEqual(request_changes, [])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mutation-resistance smoke checks
|
||||
#
|
||||
# These tests document the mutations a reviewer would have to apply to
|
||||
# weaken the gate. They are not synthetic; they verify that the
|
||||
# predicate is structured so each known-softening mutation would also
|
||||
# fail at least one other test in this file. We can't actually mutate
|
||||
# the source in CI, but these tests are explicit about the mutations
|
||||
# that would slip through, and the suite is dense enough that any
|
||||
# loosening of the predicate will fail multiple cases.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class MutationResistance(unittest.TestCase):
|
||||
def test_documented_mutation_remove_commit_id_check_fails(self):
|
||||
"""If a reviewer removes the commit_id check (e.g., reverts to
|
||||
the pre-fix `if isinstance(commit_id, str) and commit_id and
|
||||
headsha:` guard, or replaces `commit_id != headsha` with True),
|
||||
the missing-commit_id tests above (test_rejects_when_commit_id_missing
|
||||
in IsOfficialCurrentHeadFailClosed, test_rejects_missing_commit_id_approval
|
||||
in IsGenuineApprovalContract, test_missing_commit_id_approval_excluded
|
||||
in ClassifyReviewsContract) would all fail. The reviewer would have
|
||||
to weaken all three test categories to slip the SEV-1 surface in."""
|
||||
# Sanity: every missing-commit_id case is a False today.
|
||||
for bad in [None, "", 0, False]:
|
||||
with self.subTest(commit_id=bad):
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(commit_id=bad), HEAD)
|
||||
)
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(commit_id=bad), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
def test_documented_mutation_change_neq_to_eq_fails(self):
|
||||
"""If a reviewer changes `commit_id != headsha` to `commit_id == headsha`
|
||||
in the head comparison (inverting the check), the stale-head tests
|
||||
(test_rejects_when_commit_id_stale, test_stale_head_approval_excluded)
|
||||
would fail because the wrong head would now match."""
|
||||
self.assertFalse(
|
||||
is_official_current_head(_review(commit_id=OTHER_HEAD), HEAD)
|
||||
)
|
||||
|
||||
def test_documented_mutation_drop_official_check_fails(self):
|
||||
"""If a reviewer drops the `if not review.get('official')` check, the
|
||||
non-official tests (test_rejects_when_official_is_false,
|
||||
test_rejects_non_official_approval, test_non_official_approval_excluded)
|
||||
would all fail."""
|
||||
self.assertFalse(
|
||||
is_genuine_approval(
|
||||
_review(state="APPROVED", official=False), headsha=HEAD
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -248,7 +248,7 @@ def test_genuine_approvals_counts_two_distinct_on_current_head():
|
||||
{"state": "APPROVED", "user": {"login": "agent-reviewer-cr2"},
|
||||
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
|
||||
]
|
||||
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
|
||||
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
|
||||
assert approvers == {"agent-researcher", "agent-reviewer-cr2"}
|
||||
assert rc == []
|
||||
|
||||
@@ -265,7 +265,7 @@ def test_genuine_approvals_ignores_stale_dismissed_and_wrong_head():
|
||||
{"state": "APPROVED", "user": {"login": "agent-reviewer"},
|
||||
"official": True, "stale": False, "dismissed": False, "commit_id": "OLD"},
|
||||
]
|
||||
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
|
||||
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
|
||||
assert approvers == set()
|
||||
assert rc == []
|
||||
|
||||
@@ -279,7 +279,7 @@ def test_genuine_approvals_ignores_unofficial_and_outsiders():
|
||||
{"state": "APPROVED", "user": {"login": "hongming-codex-laptop"},
|
||||
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
|
||||
]
|
||||
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
|
||||
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
|
||||
assert approvers == set()
|
||||
|
||||
|
||||
@@ -291,7 +291,7 @@ def test_genuine_approvals_latest_review_supersedes_earlier():
|
||||
{"state": "REQUEST_CHANGES", "user": {"login": "agent-reviewer-cr2"},
|
||||
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
|
||||
]
|
||||
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
|
||||
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
|
||||
assert approvers == set()
|
||||
assert rc == ["agent-reviewer-cr2"]
|
||||
|
||||
|
||||
@@ -25,6 +25,11 @@
|
||||
# T20 — ai-sop-ack APPROVED review excluded from security-review gate
|
||||
# T21 — stale-head APPROVED review → exit 1 (commit_id mismatch)
|
||||
# T22 — missing/non-official APPROVED review → exit 1 (official != true)
|
||||
# T23 — missing-commit_id APPROVED review → exit 1 (SEV-1 internal#812
|
||||
# fail-closed contract: a missing/empty commit_id is REJECTED, not
|
||||
# silently accepted as "older Gitea row" the way the pre-fix
|
||||
# gitea-merge-queue.py did. Closes the spoof-bug surface that
|
||||
# #843 had.)
|
||||
#
|
||||
# Hostile-self-review (per feedback_assert_exact_not_substring):
|
||||
# this test MUST FAIL if the script is absent. Verified by running
|
||||
@@ -427,6 +432,22 @@ T22_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T22 exit code 1 (missing official rejected)" "1" "$T22_RC"
|
||||
assert_contains "T22 no candidates error" "no candidates from reviews API or issue comments" "$T22_OUT"
|
||||
|
||||
# T23 — missing-commit_id APPROVED review must be rejected.
|
||||
# SEV-1 internal#812 (supersedes closed internal#843). A review with NO
|
||||
# commit_id field is the spoof-bug signature: a real reviewer cannot
|
||||
# have submitted against a commit that doesn't exist. The fail-closed
|
||||
# SSOT must REJECT — the pre-fix gitea-merge-queue.py silently accepted
|
||||
# these (the "older Gitea row" escape hatch), which is the exact surface
|
||||
# that closed #843 had. The Python unit tests in
|
||||
# test_approval_validator.py cover the predicate at the unit level;
|
||||
# this T23 covers the bash + jq pipeline end-to-end.
|
||||
echo
|
||||
echo "== T23 missing commit_id APPROVED review rejected (SEV-1 fail-closed) =="
|
||||
T23_OUT=$(run_review_check "T23_missing_commit_id")
|
||||
T23_RC=$(cat "$FIX_STATE_DIR/last_rc")
|
||||
assert_eq "T23 exit code 1 (missing commit_id rejected)" "1" "$T23_RC"
|
||||
assert_contains "T23 no candidates error" "no candidates from reviews API or issue comments" "$T23_OUT"
|
||||
|
||||
echo
|
||||
echo "------"
|
||||
echo "PASS=$PASS FAIL=$FAIL"
|
||||
|
||||
@@ -21,15 +21,21 @@ on:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- '.gitea/scripts/review-check.sh'
|
||||
- '.gitea/scripts/_approval_validator.py'
|
||||
- '.gitea/scripts/_review_check_filter.py'
|
||||
- '.gitea/scripts/tests/test_review_check.sh'
|
||||
- '.gitea/scripts/tests/_review_check_fixture.py'
|
||||
- '.gitea/scripts/tests/test_approval_validator.py'
|
||||
- '.gitea/workflows/review-check-tests.yml'
|
||||
pull_request:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- '.gitea/scripts/review-check.sh'
|
||||
- '.gitea/scripts/_approval_validator.py'
|
||||
- '.gitea/scripts/_review_check_filter.py'
|
||||
- '.gitea/scripts/tests/test_review_check.sh'
|
||||
- '.gitea/scripts/tests/_review_check_fixture.py'
|
||||
- '.gitea/scripts/tests/test_approval_validator.py'
|
||||
- '.gitea/workflows/review-check-tests.yml'
|
||||
workflow_dispatch:
|
||||
|
||||
@@ -70,3 +76,16 @@ jobs:
|
||||
|
||||
- name: Run review-check.sh regression suite
|
||||
run: bash .gitea/scripts/tests/test_review_check.sh
|
||||
|
||||
- name: SSOT approval-validator unit tests (SEV-1 internal#812)
|
||||
# The Python unit tests for _approval_validator.py are
|
||||
# mutation-verified — every fail-closed branch has an explicit
|
||||
# REJECT assertion. A reviewer who weakens the predicate trips
|
||||
# these in CI.
|
||||
run: |
|
||||
# The test file lives in .gitea/scripts/tests/ with no __init__.py,
|
||||
# so `unittest discover -s .gitea/scripts` finds 0 tests (the SEV-1
|
||||
# suite silently never ran — a CI gap fixed alongside internal#812).
|
||||
# Run the file directly; it self-inserts its sys.path and calls
|
||||
# unittest.main(), so a failing assertion exits non-zero and fails CI.
|
||||
python3 .gitea/scripts/tests/test_approval_validator.py -v
|
||||
|
||||
Reference in New Issue
Block a user