fix(gate): SEV-1 fail-closed approval-validator (SEV-1 internal#812) #2440

Merged
devops-engineer merged 3 commits from fix/sev1-812-approval-validator into main 2026-06-08 22:26:13 +00:00
9 changed files with 1012 additions and 62 deletions
+247
View File
@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
SSOT fail-closed approval validator (SEV-1 internal#812).
This module is the SINGLE source of truth for whether a Gitea review counts
as a "genuine" approval. Both consumers must call into it — they MUST NOT
duplicate the predicate:
- .gitea/scripts/gitea-merge-queue.py (Python) — imports directly.
- .gitea/scripts/review-check.sh (bash, jq) — calls the Python helper
at .gitea/scripts/_review_check_filter.py, which in turn calls this
module. There is no separate jq / bash copy of the predicate; a
reviewer who wants to weaken the gate has to weaken this one file.
# The fail-closed contract
A review counts as a GENUINE APPROVED on the current head ONLY IF ALL hold:
1. state == "APPROVED"
2. official == true
3. dismissed != true
4. stale != true
5. commit_id is present and equals the PR's current head SHA
ANY failure of any of the above → REJECT.
# The bug this fixes
The previous gitea-merge-queue.py predicate had a `if isinstance(commit_id,
str) and commit_id and headsha:` guard that *skipped* the commit_id check
when the review carried no commit_id. The previous review-check.sh jq
filter required `commit_id == $head`, which is also implicitly fail-closed
on missing commit_id (null != head), but only one of the two consumers
behaved correctly — a code-drift trap.
Both behaviors are now defined here, as a single fail-closed predicate.
A MISSING commit_id is the Gitea row signature of a spoofed or pre-commit
review: a real reviewer cannot have submitted against a commit that
doesn't exist. Accepting these is exactly the fail-open that SEV-1
internal#812 describes and the re-opened path that closed #843 (with CR2
+ Researcher both flagging it) addresses.
# Mutation-resistance
The unit tests in tests/test_approval_validator.py assert rejection
explicitly for each fail-closed case (missing commit_id, stale head,
non-official, dismissed, etc.). A reviewer who tries to weaken the
predicate by removing the commit_id check, by re-introducing the
"no commit_id is accepted" escape hatch, or by changing `!=` to `==`
in the head comparison will trip those tests in CI.
"""
from __future__ import annotations
from typing import Iterable, Optional, Tuple
# ---------------------------------------------------------------------------
# Canonical Gitea review-state enum (EXACT match -- no case coercion).
# ---------------------------------------------------------------------------
#
# Gitea's reviews API emits review.state as one of a fixed set of
# UPPERCASE string constants: "APPROVED", "REQUEST_CHANGES",
# "REQUEST_REVIEW", "COMMENT", "PENDING", "DISMISSED" (verified
# against the live API across real molecule-core PRs). They are ALWAYS
# uppercase on the wire.
#
# FAIL-CLOSED: we compare review.state to these constants with EXACT
# equality. The previous code used str(state or "").upper(), which
# coerced a lowercase/mixed-case "approved" or "request_changes" into
# the canonical value and ACCEPTED it. A real Gitea row never carries a
# lowercase state, so a case-variant value is the signature of a
# hand-forged / spoofed row, not a legitimate review. Coercing it was a
# residual fail-open (SEV-1 internal#812, RCs 9849/9851/9852). We reject
# anything that is not byte-for-byte the canonical constant.
STATE_APPROVED = "APPROVED"
STATE_REQUEST_CHANGES = "REQUEST_CHANGES"
# ---------------------------------------------------------------------------
# Shared predicate — fail-closed on every condition
# ---------------------------------------------------------------------------
def is_official_current_head(review: object, headsha: object) -> bool:
"""Common predicate: review is official, not dismissed, not stale, and
bound to the PR's current head SHA. EVERY condition is mandatory and
fail-closed. Both is_genuine_approval and is_open_request_changes build
on this so the rule cannot drift between the two cases.
`official` is checked with `is not True` (NOT `not review.get("official")`).
The latter is truthy on the string "false" or the integer 1, which is
exactly the fail-open surface we are closing here — a non-boolean
pass-through is treated as official. Gitea emits a real boolean, so
the stricter check rejects anything that isn't literally True.
"""
if not isinstance(review, dict):
return False
if review.get("official") is not True:
return False
if review.get("dismissed"):
return False
if review.get("stale"):
return False
commit_id = review.get("commit_id")
# FAIL-CLOSED: a missing/empty/non-string commit_id is REJECTED. The
# previous code had `if isinstance(commit_id, str) and commit_id and
# headsha:` which SKIPPED the check when the review carried no
# commit_id. That was the spoof-bug surface.
if not isinstance(commit_id, str) or not commit_id:
return False
# FAIL-CLOSED: a present-but-wrong commit_id is also REJECTED. Stale
# reviews (on a previous head) cannot count.
if not isinstance(headsha, str) or not headsha or commit_id != headsha:
return False
return True
# ---------------------------------------------------------------------------
# Per-verdict predicates
# ---------------------------------------------------------------------------
def is_genuine_approval(
review: object,
*,
headsha: str,
reviewer_set: Optional[Iterable[str]] = None,
) -> bool:
"""Return True iff `review` is a genuine APPROVED on the current head.
When `reviewer_set` is provided, the review's `user.login` must be in
the set (the merge-queue uses this to count only "recognised"
reviewers for the 2-genuine floor; review-check.sh applies its own
team-membership probe separately and so does not pass a set).
"""
if not isinstance(review, dict):
return False
# EXACT-ENUM (fail-closed): no .upper()/.strip() coercion. A
# case-variant or whitespace-padded state is a forged row and is
# rejected, not normalised into APPROVED.
if review.get("state") != STATE_APPROVED:
return False
if not is_official_current_head(review, headsha):
return False
if reviewer_set is not None:
user = (review.get("user") or {}).get("login")
if not isinstance(user, str) or user not in set(reviewer_set):
return False
return True
def is_open_request_changes(review: object, *, headsha: str) -> bool:
"""Return True iff `review` is an open official REQUEST_CHANGES on the
current head. Same fail-closed contract as is_genuine_approval —
a missing commit_id is REJECTED, not silently treated as 'still
blocking the merge from an old head'.
"""
if not isinstance(review, dict):
return False
# EXACT-ENUM (fail-closed): same contract as is_genuine_approval. A
# lowercase/mixed-case "request_changes" must NOT be coerced into a
# block-erasing match; an exact REQUEST_CHANGES is required.
if review.get("state") != STATE_REQUEST_CHANGES:
return False
if not is_official_current_head(review, headsha):
return False
return True
# ---------------------------------------------------------------------------
# Consumer-facing reducer (returns the two call sites need)
# ---------------------------------------------------------------------------
def classify_reviews(
reviews: Iterable[object],
*,
headsha: str,
reviewer_set: Optional[Iterable[str]] = None,
) -> Tuple[set[str], list[str]]:
"""Reduce a PR's reviews to (approvers, request_changes) on the CURRENT head.
approvers: distinct logins whose LATEST official review on the current
head is APPROVED.
request_changes: distinct logins whose LATEST official review on the
current head is REQUEST_CHANGES.
Gitea returns reviews oldest-first. We keep the latest *VALID*
submission per user (later VALID entries overwrite earlier ones; an
invalid later row — a COMMENT, or a review with a null/old commit_id —
is ignored and can NOT overwrite or erase a genuine review). See the
inline VALIDATE-BEFORE-REDUCE note below for the exploit this closes.
"""
reviewer_set_set = set(reviewer_set) if reviewer_set is not None else None
# VALIDATE-BEFORE-REDUCE (SEV-1 internal#812 follow-up).
#
# The earlier implementation reduced FIRST (latest row per user, keyed
# only on state in {APPROVED, REQUEST_CHANGES}) and validated the single
# surviving row AFTER. That is reduce-before-validate, and it is
# exploitable: a user posts a genuine current-head APPROVED (or
# REQUEST_CHANGES), then posts a LATER row that fails the fail-closed
# predicate (a COMMENT, or an APPROVED with a null/old commit_id). The
# later INVALID row overwrote the genuine one in latest_by_user, so a
# real approval was masked, and — worse — a real current-head
# REQUEST_CHANGES could be erased and the block silently evaporate.
#
# The fix: filter to VALID reviews FIRST (each row must pass
# is_official_current_head AND carry an APPROVED/REQUEST_CHANGES state),
# and only then reduce to the latest VALID review per user. An invalid
# later row is never eligible to become a user's "latest" state, so it
# cannot overwrite or erase a genuine review. A user's verdict is the
# state of their latest VALID (official, current-head, non-dismissed,
# non-stale, commit_id-present-and-matching) review.
latest_valid_by_user: dict = {}
for review in reviews:
if not isinstance(review, dict):
continue
user = (review.get("user") or {}).get("login")
if not isinstance(user, str):
continue
if reviewer_set_set is not None and user not in reviewer_set_set:
continue
# EXACT-ENUM (fail-closed): exact constants only, no coercion. A
# case-coerced row must not become eligible to overwrite/erase a
# genuine per-user verdict in the reduce below.
state = review.get("state")
if state not in (STATE_APPROVED, STATE_REQUEST_CHANGES):
continue
# Fail-closed predicate BEFORE the reduce: official, not dismissed,
# not stale, commit_id present AND == head. Invalid rows are dropped
# here and so can never become the per-user "latest".
if not is_official_current_head(review, headsha):
continue
latest_valid_by_user[user] = review
approvers: set[str] = set()
request_changes: list[str] = []
for user, review in latest_valid_by_user.items():
# Each surviving review already passed is_official_current_head, so
# the state alone determines the verdict. We still go through the
# per-verdict SSOT predicates so the rule cannot drift.
if is_genuine_approval(review, headsha=headsha, reviewer_set=None):
approvers.add(user)
elif is_open_request_changes(review, headsha=headsha):
request_changes.append(user)
return approvers, request_changes
+74
View File
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""
Helper for review-check.sh: applies the SSOT approval predicate to a
PR's reviews and prints the candidate approver logins on stdout (one per
line, de-duplicated, author excluded).
review-check.sh uses this in place of its previous inline jq filter so the
predicate is single-sourced. The jq filter is gone; if you want to change
the predicate, edit .gitea/scripts/_approval_validator.py, not this file.
Usage:
python3 _review_check_filter.py <reviews.json> <head-sha> <author-login>
Output:
- Candidate approver logins, one per line, de-duplicated, sorted.
- Excludes `author-login` (the PR author cannot approve their own PR).
- Empty output → review-check.sh interprets as "no candidates" and exits 1
after the team-membership probe.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
# Same-dir import — script lives next to _approval_validator.py
sys.path.insert(0, str(Path(__file__).resolve().parent))
from _approval_validator import is_genuine_approval # noqa: E402
def main(argv: list[str]) -> int:
if len(argv) != 4:
print(
f"usage: {argv[0] if argv else '_review_check_filter.py'} "
"<reviews.json> <head-sha> <author-login>",
file=sys.stderr,
)
return 2
reviews_path = Path(argv[1])
headsha = argv[2]
author = argv[3]
try:
reviews = json.loads(reviews_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
print(f"::error::could not read reviews JSON: {exc}", file=sys.stderr)
return 2
if not isinstance(reviews, list):
print("::error::reviews JSON was not a list", file=sys.stderr)
return 2
candidates: set[str] = set()
for review in reviews:
# We pass reviewer_set=None here because review-check.sh applies its
# own team-membership probe (CURL_AUTH_FILE + 200/204/403/404 logic)
# separately. The SSOT predicate enforces only the fail-closed
# commit_id / state / official / dismissed / stale contract here.
if not is_genuine_approval(review, headsha=headsha, reviewer_set=None):
continue
user = (review.get("user") or {}).get("login")
if not isinstance(user, str) or not user:
continue
if user == author:
continue
candidates.add(user)
for user in sorted(candidates):
print(user)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))
+22 -47
View File
@@ -105,6 +105,12 @@ import urllib.parse
import urllib.request
from typing import Any
# SSOT fail-closed approval predicate (SEV-1 internal#812). review-check.sh
# consumes the same module via _review_check_filter.py — do NOT duplicate
# the predicate here. See _approval_validator.py for the fail-closed contract.
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _approval_validator import classify_reviews as _classify_reviews_ssot # noqa: E402
def _env(key: str, *, default: str = "") -> str:
return os.environ.get(key, default)
@@ -424,57 +430,26 @@ def get_branch_protection(branch: str) -> BranchProtection:
def genuine_approvals(
reviews: list[dict],
*,
head_sha: str,
headsha: str,
reviewer_set: set[str],
) -> tuple[set[str], list[str]]:
"""Reduce a PR's reviews to genuine official approvals on the CURRENT head.
"""Thin wrapper over the SSOT predicate in _approval_validator.py.
Returns (approvers, request_changes) where:
- approvers is the set of distinct logins (in reviewer_set) whose LATEST
review on the current head is an official, non-stale, non-dismissed
APPROVED, and
- request_changes is the list of logins (in reviewer_set) whose latest
official review on the current head is REQUEST_CHANGES.
All logic — the per-review commit_id / state / official / dismissed /
stale contract — lives in _approval_validator.classify_reviews. This
wrapper exists only to keep the call site (and external readers of
the symbol) stable. Do NOT add any per-review logic here; if you need
to change the predicate, edit _approval_validator.py.
"Current head" is enforced two ways, because Gitea exposes both signals:
a review must be `official` and NOT `stale`/`dismissed`, AND when the
review carries a commit_id it must equal head_sha. A review with no
commit_id but stale=False/dismissed=False is accepted (older Gitea rows).
We take each reviewer's LATEST submission (reviews arrive oldest-first), so
a later REQUEST_CHANGES correctly supersedes an earlier APPROVED and vice
versa.
See _approval_validator.py for the full fail-closed contract
(SEV-1 internal#812). The previous inline implementation had a
`if isinstance(commit_id, str) and commit_id and headsha:` guard that
silently accepted reviews with no commit_id; that fail-open surface is
now closed at the SSOT.
"""
latest_by_user: dict[str, dict] = {}
for review in reviews:
if not isinstance(review, dict):
continue
user = (review.get("user") or {}).get("login")
if not isinstance(user, str) or user not in reviewer_set:
continue
state = str(review.get("state") or "").upper()
if state not in {"APPROVED", "REQUEST_CHANGES"}:
continue # ignore COMMENT/PENDING/DISMISSED-state rows
# reviews are returned oldest-first; later entries overwrite → latest wins
latest_by_user[user] = review
approvers: set[str] = set()
request_changes: list[str] = []
for user, review in latest_by_user.items():
if not review.get("official"):
continue
if review.get("stale") or review.get("dismissed"):
continue
commit_id = review.get("commit_id")
if isinstance(commit_id, str) and commit_id and head_sha:
if commit_id != head_sha:
continue # review was on a previous head
state = str(review.get("state") or "").upper()
if state == "APPROVED":
approvers.add(user)
elif state == "REQUEST_CHANGES":
request_changes.append(user)
return approvers, request_changes
return _classify_reviews_ssot(
reviews, headsha=headsha, reviewer_set=reviewer_set
)
def get_pull_reviews(pr_number: int) -> list[dict]:
_, body = api("GET", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/reviews")
@@ -1147,7 +1122,7 @@ def _evaluate_candidate(
reviews = get_pull_reviews(pr_number)
approvers, request_changes = genuine_approvals(
reviews, head_sha=head_sha, reviewer_set=REVIEWER_SET
reviews, headsha=head_sha, reviewer_set=REVIEWER_SET
)
decision = evaluate_merge_readiness(
+7 -11
View File
@@ -197,17 +197,13 @@ if [ "$HTTP_CODE" != "200" ]; then
exit 1
fi
# Filter: state=APPROVED, official=true, not-dismissed, non-author,
# commit_id matches current PR head. All conditions are mandatory.
JQ_FILTER='.[]
| select(.state == "APPROVED")
| select(.official == true)
| select(.dismissed != true)
| select(.user.login != $author)
| select(.commit_id == $head)
| .user.login'
REVIEW_CANDIDATES=$(jq -r --arg author "$PR_AUTHOR" --arg head "$PR_HEAD_SHA" "$JQ_FILTER" "$REVIEWS_JSON" | sort -u)
# Filter via the SSOT fail-closed predicate in _approval_validator.py
# (same module gitea-merge-queue.py imports). The jq filter is gone
# entirely — any change to the predicate must be made in
# _approval_validator.py. See SEV-1 internal#812 for the fail-closed
# contract this closes.
SCRIPT_DIR_HERE="$(cd "$(dirname "$0")" && pwd)"
REVIEW_CANDIDATES=$(python3 "$SCRIPT_DIR_HERE/_review_check_filter.py" "$REVIEWS_JSON" "$PR_HEAD_SHA" "$PR_AUTHOR")
debug "candidate non-author approvers: $(echo "$REVIEW_CANDIDATES" | tr '\n' ' ')"
if [ -z "$REVIEW_CANDIDATES" ]; then
@@ -134,6 +134,14 @@ class Handler(http.server.BaseHTTPRequestHandler):
return self._json(200, [
{"state": "APPROVED", "dismissed": False, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
])
if sc == "T23_missing_commit_id":
# APPROVED review with NO commit_id field — the SEV-1
# internal#812 / closed-#843 spoof-bug signature. The
# fail-closed SSOT must REJECT (not silently accept as
# "older Gitea row" the way the old pre-fix code did).
return self._json(200, [
{"state": "APPROVED", "official": True, "dismissed": False, "user": {"login": "core-devops"}},
])
# Default: one non-author APPROVED (current head, official)
return self._json(200, [
{"state": "APPROVED", "dismissed": False, "official": True, "user": {"login": "core-devops"}, "commit_id": "deadbeef0000111122223333444455556666"},
@@ -0,0 +1,610 @@
#!/usr/bin/env python3
"""
Mutation-verified unit tests for the SSOT fail-closed approval predicate
in _approval_validator.py (SEV-1 internal#812).
Each test asserts REJECTION explicitly. A reviewer who weakens the
predicate — e.g., by removing the commit_id check, by reintroducing the
"no commit_id is accepted" escape hatch, by changing `!=` to `==` in the
head comparison, or by allowing official == false — will trip these
tests in CI.
Run:
cd .gitea/scripts
python3 -m unittest tests.test_approval_validator -v
# or
python3 tests/test_approval_validator.py
"""
from __future__ import annotations
import os
import sys
import unittest
# Same-dir import — test lives next to _approval_validator.py
sys.path.insert(
0,
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
)
from _approval_validator import ( # noqa: E402
classify_reviews,
is_genuine_approval,
is_official_current_head,
is_open_request_changes,
)
HEAD = "0123456789abcdef0123456789abcdef01234567"
OTHER_HEAD = "fedcba9876543210fedcba9876543210fedcba98"
def _review(
*,
state: str = "APPROVED",
official: bool = True,
dismissed: bool = False,
stale: bool = False,
commit_id: object = HEAD,
user: str = "reviewer-1",
body: str = "",
) -> dict:
"""Build a minimal review row shaped like the Gitea reviews API."""
return {
"id": 1,
"user": {"login": user, "id": 1},
"body": body,
"state": state,
"official": official,
"dismissed": dismissed,
"stale": stale,
"commit_id": commit_id,
}
# ---------------------------------------------------------------------------
# Hard contract: every fail-closed branch must reject
# ---------------------------------------------------------------------------
class IsOfficialCurrentHeadFailClosed(unittest.TestCase):
"""is_official_current_head is the common predicate. EVERY condition
is mandatory. The tests below assert REJECTION for every possible
failure of any condition."""
def test_accepts_canonical_review(self):
self.assertTrue(is_official_current_head(_review(), HEAD))
def test_rejects_non_dict(self):
for bad in [None, "string", 42, [], (), object()]:
with self.subTest(bad=bad):
self.assertFalse(is_official_current_head(bad, HEAD))
def test_rejects_when_official_is_false(self):
for v in [False, None, 0, "false"]:
with self.subTest(v=v):
self.assertFalse(
is_official_current_head(_review(official=v), HEAD)
)
def test_rejects_when_dismissed(self):
for v in [True, "true", 1]:
with self.subTest(v=v):
self.assertFalse(
is_official_current_head(_review(dismissed=v), HEAD)
)
def test_rejects_when_stale(self):
for v in [True, "true", 1]:
with self.subTest(v=v):
self.assertFalse(
is_official_current_head(_review(stale=v), HEAD)
)
def test_rejects_when_commit_id_missing(self):
"""FAIL-CLOSED #1: missing commit_id is REJECTED.
This is the spoof signature that closed #843 (with CR2 + Researcher
both flagging it)."""
for bad in [None, "", 0, False, [], {}, ()]:
with self.subTest(commit_id=bad):
self.assertFalse(
is_official_current_head(_review(commit_id=bad), HEAD),
f"commit_id={bad!r} must reject (fail-closed)",
)
def test_rejects_when_commit_id_wrong_type(self):
for bad in [123, 1.5, True, ["abc"], {"sha": HEAD}, ("tuple",)]:
with self.subTest(commit_id=bad):
self.assertFalse(
is_official_current_head(_review(commit_id=bad), HEAD)
)
def test_rejects_when_commit_id_stale(self):
"""FAIL-CLOSED #2: present-but-wrong commit_id is REJECTED. Stale
reviews on a previous head cannot count."""
self.assertFalse(
is_official_current_head(_review(commit_id=OTHER_HEAD), HEAD)
)
def test_rejects_when_head_missing(self):
for bad in [None, "", 0, False]:
with self.subTest(head=bad):
self.assertFalse(
is_official_current_head(_review(), bad)
)
def test_rejects_when_head_wrong_type(self):
self.assertFalse(is_official_current_head(_review(), 123))
self.assertFalse(is_official_current_head(_review(), ["x"]))
# ---------------------------------------------------------------------------
# is_genuine_approval
# ---------------------------------------------------------------------------
class IsGenuineApprovalContract(unittest.TestCase):
def test_accepts_canonical_approval(self):
self.assertTrue(
is_genuine_approval(_review(state="APPROVED"), headsha=HEAD)
)
def test_rejects_non_approved_states(self):
for state in ("REQUEST_CHANGES", "COMMENT", "PENDING", "DISMISSED", "approve", "", "bogus"):
with self.subTest(state=state):
self.assertFalse(
is_genuine_approval(_review(state=state), headsha=HEAD)
)
def test_rejects_case_coerced_approved_states(self):
"""EXACT-ENUM fail-closed (RCs 9849/9851/9852): Gitea always emits
the canonical UPPERCASE "APPROVED". A lowercase/mixed-case/padded
value is the signature of a forged row and MUST be rejected, not
coerced via .upper() into an accepted APPROVED. Each of these was
ACCEPTED before the exact-enum fix."""
for state in (
"approved", "Approved", "ApProVeD", "APPROVED ", " APPROVED",
"approved\n", "\tAPPROVED",
):
with self.subTest(state=state):
self.assertFalse(
is_genuine_approval(_review(state=state), headsha=HEAD),
f"case-coerced/padded state {state!r} must NOT count as "
"a genuine approval",
)
def test_rejects_non_official_approval(self):
"""Comment-based / non-official 'APPROVED' is REJECTED.
PM: 'reject comment-based / non-official reviews'."""
self.assertFalse(
is_genuine_approval(
_review(state="APPROVED", official=False), headsha=HEAD
)
)
def test_rejects_dismissed_approval(self):
self.assertFalse(
is_genuine_approval(
_review(state="APPROVED", dismissed=True), headsha=HEAD
)
)
def test_rejects_stale_head_approval(self):
"""commit_id != head is REJECTED. Stale-on-old-head approvals cannot
count, even if they were official and not dismissed."""
self.assertFalse(
is_genuine_approval(
_review(state="APPROVED", commit_id=OTHER_HEAD), headsha=HEAD
)
)
def test_rejects_missing_commit_id_approval(self):
"""FAIL-CLOSED #3: the SEV-1 case. A APPROVED review with NO
commit_id is the spoof-bug signature. Reject."""
for bad in [None, "", 0, False]:
with self.subTest(commit_id=bad):
self.assertFalse(
is_genuine_approval(
_review(state="APPROVED", commit_id=bad), headsha=HEAD
),
f"missing commit_id={bad!r} must reject",
)
def test_reviewer_set_filters_users(self):
self.assertTrue(
is_genuine_approval(
_review(user="alice"),
headsha=HEAD,
reviewer_set={"alice", "bob"},
)
)
self.assertFalse(
is_genuine_approval(
_review(user="carol"),
headsha=HEAD,
reviewer_set={"alice", "bob"},
)
)
def test_reviewer_set_none_skips_check(self):
# None means "no team filter at this layer" (e.g., review-check.sh
# applies its own team-membership probe separately).
self.assertTrue(
is_genuine_approval(
_review(user="anyone"),
headsha=HEAD,
reviewer_set=None,
)
)
# ---------------------------------------------------------------------------
# is_open_request_changes
# ---------------------------------------------------------------------------
class IsOpenRequestChangesContract(unittest.TestCase):
def test_accepts_canonical_request_changes(self):
self.assertTrue(
is_open_request_changes(
_review(state="REQUEST_CHANGES"), headsha=HEAD
)
)
def test_rejects_non_request_changes_states(self):
for state in ("APPROVED", "COMMENT", "PENDING", "DISMISSED"):
with self.subTest(state=state):
self.assertFalse(
is_open_request_changes(
_review(state=state), headsha=HEAD
)
)
def test_rejects_case_coerced_request_changes_states(self):
"""EXACT-ENUM fail-closed: a lowercase/mixed-case "request_changes"
must NOT be coerced into an open-block match. Before the exact-enum
fix, .upper() accepted these as REQUEST_CHANGES."""
for state in (
"request_changes", "Request_Changes", "REQUEST_CHANGES ",
" REQUEST_CHANGES", "request_changes\n",
):
with self.subTest(state=state):
self.assertFalse(
is_open_request_changes(
_review(state=state), headsha=HEAD
),
f"case-coerced/padded state {state!r} must NOT count as "
"an open REQUEST_CHANGES",
)
def test_rejects_when_dismissed(self):
self.assertFalse(
is_open_request_changes(
_review(state="REQUEST_CHANGES", dismissed=True), headsha=HEAD
)
)
def test_rejects_when_stale_head(self):
self.assertFalse(
is_open_request_changes(
_review(state="REQUEST_CHANGES", commit_id=OTHER_HEAD),
headsha=HEAD,
)
)
def test_rejects_when_missing_commit_id(self):
for bad in [None, "", 0]:
with self.subTest(commit_id=bad):
self.assertFalse(
is_open_request_changes(
_review(state="REQUEST_CHANGES", commit_id=bad),
headsha=HEAD,
)
)
# ---------------------------------------------------------------------------
# classify_reviews — the merge-queue consumer
# ---------------------------------------------------------------------------
class ClassifyReviewsContract(unittest.TestCase):
def test_basic_approvers_and_request_changes(self):
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, {"alice"})
self.assertEqual(request_changes, ["bob"])
def test_reviewer_set_filters_early(self):
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="carol", state="APPROVED", commit_id=HEAD),
]
approvers, _ = classify_reviews(
reviews, headsha=HEAD, reviewer_set={"alice"}
)
self.assertEqual(approvers, {"alice"})
def test_latest_review_per_user_wins(self):
# alice's REQUEST_CHANGES (latest) supersedes her earlier APPROVED.
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertNotIn("alice", approvers)
self.assertIn("alice", request_changes)
def test_stale_head_approval_excluded(self):
reviews = [
_review(user="alice", state="APPROVED", commit_id=OTHER_HEAD),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, set())
def test_missing_commit_id_approval_excluded(self):
"""The SEV-1 fail-open surface. APPROVED + no commit_id → must NOT
count toward approvers, even with stale=False/dismissed=False."""
reviews = [
_review(user="alice", state="APPROVED", commit_id=None),
_review(user="bob", state="APPROVED", commit_id=""),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, set())
def test_dismissed_approval_excluded(self):
reviews = [
_review(user="alice", state="APPROVED", dismissed=True, commit_id=HEAD),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, set())
def test_non_official_approval_excluded(self):
reviews = [
_review(user="alice", state="APPROVED", official=False, commit_id=HEAD),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, set())
def test_comment_state_excluded(self):
reviews = [
_review(user="alice", state="COMMENT", commit_id=HEAD),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, set())
def test_case_coerced_approved_not_counted(self):
"""EXACT-ENUM via the reducer: a lowercase 'approved' (otherwise
valid official current-head row) must NOT be counted as an approver.
Before the fix, classify_reviews coerced it via .upper()."""
for state in ("approved", "Approved", "APPROVED "):
with self.subTest(state=state):
reviews = [
_review(user="alice", state=state, commit_id=HEAD),
]
approvers, request_changes = classify_reviews(
reviews, headsha=HEAD
)
self.assertEqual(approvers, set())
self.assertEqual(request_changes, [])
def test_case_coerced_request_changes_not_silently_dropped(self):
"""EXACT-ENUM via the reducer: a lowercase 'request_changes' must be
rejected (not coerced into a block). Crucially, it must NOT silently
erase a SAME-USER genuine current-head REQUEST_CHANGES posted
earlier — the case-variant later row is invalid and is ignored, so
the genuine block stands."""
reviews = [
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
_review(user="bob", state="request_changes", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn("bob", request_changes)
self.assertNotIn("bob", approvers)
def test_stale_head_request_changes_excluded(self):
# A REQUEST_CHANGES on a previous head must NOT block the current head.
reviews = [
_review(user="bob", state="REQUEST_CHANGES", commit_id=OTHER_HEAD),
]
_, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(request_changes, [])
# -----------------------------------------------------------------
# VALIDATE-BEFORE-REDUCE regression tests (SEV-1 internal#812 follow-up).
#
# The bug: classify_reviews reduced to the LATEST row per user FIRST and
# validated AFTER. A later INVALID row (a COMMENT, or APPROVED/
# REQUEST_CHANGES with a null/old commit_id) from the same user could
# overwrite a genuine current-head review — masking an approval or
# ERASING a REQUEST_CHANGES block. The fix validates before the reduce,
# so an invalid later row is never eligible to be a user's "latest".
# -----------------------------------------------------------------
def test_genuine_approval_not_masked_by_later_comment(self):
"""A genuine current-head APPROVED followed by a LATER COMMENT from
the SAME user must STILL count as an approval. A later non-
APPROVED/RC row (COMMENT) must not erase the approval. This is the
reduce-before-validate masking bug."""
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="alice", state="COMMENT", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn("alice", approvers)
self.assertEqual(request_changes, [])
def test_genuine_approval_not_masked_by_later_null_commit_id(self):
"""A genuine current-head APPROVED followed by a LATER APPROVED with
a null commit_id (the spoof/invalid signature) from the SAME user
must STILL count. The invalid later row must be ignored, not allowed
to overwrite the valid earlier approval."""
for bad in [None, ""]:
with self.subTest(commit_id=bad):
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="alice", state="APPROVED", commit_id=bad),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertIn(
"alice", approvers,
f"later invalid commit_id={bad!r} must not mask the "
"genuine current-head approval",
)
def test_genuine_approval_not_masked_by_later_stale_commit_id(self):
"""A genuine current-head APPROVED followed by a LATER APPROVED on a
STALE (old) head from the SAME user must STILL count toward
approvers — the stale later row is invalid and must be ignored."""
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="alice", state="APPROVED", commit_id=OTHER_HEAD),
]
approvers, _ = classify_reviews(reviews, headsha=HEAD)
self.assertIn("alice", approvers)
def test_request_changes_not_erased_by_later_comment(self):
"""A genuine current-head REQUEST_CHANGES followed by a LATER COMMENT
from the SAME user must STILL block. The later invalid row must not
erase the REQUEST_CHANGES — this is the worse, silently-evaporating-
block variant of the bug."""
reviews = [
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
_review(user="bob", state="COMMENT", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn("bob", request_changes)
self.assertNotIn("bob", approvers)
def test_request_changes_not_erased_by_later_null_commit_id(self):
"""A genuine current-head REQUEST_CHANGES followed by a LATER
REQUEST_CHANGES with a null/old commit_id from the SAME user must
STILL block. The invalid later row must be ignored, not allowed to
relocate the user's verdict off the current head."""
for bad in [None, "", OTHER_HEAD]:
with self.subTest(commit_id=bad):
reviews = [
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
_review(user="bob", state="REQUEST_CHANGES", commit_id=bad),
]
_, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn(
"bob", request_changes,
f"later invalid commit_id={bad!r} must not erase the "
"genuine current-head REQUEST_CHANGES block",
)
def test_request_changes_not_erased_by_later_approved_invalid(self):
"""A genuine current-head REQUEST_CHANGES followed by a LATER
INVALID APPROVED (null commit_id) from the SAME user must STILL
block AND must NOT count the user as an approver. The invalid
approval must not flip a real block into a pass."""
reviews = [
_review(user="bob", state="REQUEST_CHANGES", commit_id=HEAD),
_review(user="bob", state="APPROVED", commit_id=None),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn("bob", request_changes)
self.assertNotIn("bob", approvers)
def test_genuine_request_changes_still_supersedes_genuine_approval(self):
"""Sanity: a genuine LATER current-head REQUEST_CHANGES still
supersedes an earlier genuine APPROVED from the same user (the
valid-row supersession we MUST preserve — only INVALID later rows
are ignored). Guards against an over-correction that ignores all
later rows."""
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertNotIn("alice", approvers)
self.assertIn("alice", request_changes)
def test_genuine_approval_still_supersedes_genuine_request_changes(self):
"""Sanity: a genuine LATER current-head APPROVED supersedes an
earlier genuine REQUEST_CHANGES from the same user."""
reviews = [
_review(user="alice", state="REQUEST_CHANGES", commit_id=HEAD),
_review(user="alice", state="APPROVED", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertIn("alice", approvers)
self.assertEqual(request_changes, [])
def test_two_valid_approvers_plus_one_invalid_later_row(self):
"""Two distinct users with valid current-head approvals + a third
user whose ONLY genuine approval is followed by an invalid later
row → all three real approvers are counted; the invalid later row
does not drop the third user."""
reviews = [
_review(user="alice", state="APPROVED", commit_id=HEAD),
_review(user="bob", state="APPROVED", commit_id=HEAD),
_review(user="carol", state="APPROVED", commit_id=HEAD),
_review(user="carol", state="COMMENT", commit_id=HEAD),
]
approvers, request_changes = classify_reviews(reviews, headsha=HEAD)
self.assertEqual(approvers, {"alice", "bob", "carol"})
self.assertEqual(request_changes, [])
# ---------------------------------------------------------------------------
# Mutation-resistance smoke checks
#
# These tests document the mutations a reviewer would have to apply to
# weaken the gate. They are not synthetic; they verify that the
# predicate is structured so each known-softening mutation would also
# fail at least one other test in this file. We can't actually mutate
# the source in CI, but these tests are explicit about the mutations
# that would slip through, and the suite is dense enough that any
# loosening of the predicate will fail multiple cases.
# ---------------------------------------------------------------------------
class MutationResistance(unittest.TestCase):
def test_documented_mutation_remove_commit_id_check_fails(self):
"""If a reviewer removes the commit_id check (e.g., reverts to
the pre-fix `if isinstance(commit_id, str) and commit_id and
headsha:` guard, or replaces `commit_id != headsha` with True),
the missing-commit_id tests above (test_rejects_when_commit_id_missing
in IsOfficialCurrentHeadFailClosed, test_rejects_missing_commit_id_approval
in IsGenuineApprovalContract, test_missing_commit_id_approval_excluded
in ClassifyReviewsContract) would all fail. The reviewer would have
to weaken all three test categories to slip the SEV-1 surface in."""
# Sanity: every missing-commit_id case is a False today.
for bad in [None, "", 0, False]:
with self.subTest(commit_id=bad):
self.assertFalse(
is_official_current_head(_review(commit_id=bad), HEAD)
)
self.assertFalse(
is_genuine_approval(
_review(commit_id=bad), headsha=HEAD
)
)
def test_documented_mutation_change_neq_to_eq_fails(self):
"""If a reviewer changes `commit_id != headsha` to `commit_id == headsha`
in the head comparison (inverting the check), the stale-head tests
(test_rejects_when_commit_id_stale, test_stale_head_approval_excluded)
would fail because the wrong head would now match."""
self.assertFalse(
is_official_current_head(_review(commit_id=OTHER_HEAD), HEAD)
)
def test_documented_mutation_drop_official_check_fails(self):
"""If a reviewer drops the `if not review.get('official')` check, the
non-official tests (test_rejects_when_official_is_false,
test_rejects_non_official_approval, test_non_official_approval_excluded)
would all fail."""
self.assertFalse(
is_genuine_approval(
_review(state="APPROVED", official=False), headsha=HEAD
)
)
if __name__ == "__main__":
unittest.main()
@@ -248,7 +248,7 @@ def test_genuine_approvals_counts_two_distinct_on_current_head():
{"state": "APPROVED", "user": {"login": "agent-reviewer-cr2"},
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
]
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
assert approvers == {"agent-researcher", "agent-reviewer-cr2"}
assert rc == []
@@ -265,7 +265,7 @@ def test_genuine_approvals_ignores_stale_dismissed_and_wrong_head():
{"state": "APPROVED", "user": {"login": "agent-reviewer"},
"official": True, "stale": False, "dismissed": False, "commit_id": "OLD"},
]
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
assert approvers == set()
assert rc == []
@@ -279,7 +279,7 @@ def test_genuine_approvals_ignores_unofficial_and_outsiders():
{"state": "APPROVED", "user": {"login": "hongming-codex-laptop"},
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
]
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
assert approvers == set()
@@ -291,7 +291,7 @@ def test_genuine_approvals_latest_review_supersedes_earlier():
{"state": "REQUEST_CHANGES", "user": {"login": "agent-reviewer-cr2"},
"official": True, "stale": False, "dismissed": False, "commit_id": "HEAD"},
]
approvers, rc = mq.genuine_approvals(reviews, head_sha="HEAD", reviewer_set=REVIEWERS)
approvers, rc = mq.genuine_approvals(reviews, headsha="HEAD", reviewer_set=REVIEWERS)
assert approvers == set()
assert rc == ["agent-reviewer-cr2"]
+21
View File
@@ -25,6 +25,11 @@
# T20 — ai-sop-ack APPROVED review excluded from security-review gate
# T21 — stale-head APPROVED review → exit 1 (commit_id mismatch)
# T22 — missing/non-official APPROVED review → exit 1 (official != true)
# T23 — missing-commit_id APPROVED review → exit 1 (SEV-1 internal#812
# fail-closed contract: a missing/empty commit_id is REJECTED, not
# silently accepted as "older Gitea row" the way the pre-fix
# gitea-merge-queue.py did. Closes the spoof-bug surface that
# #843 had.)
#
# Hostile-self-review (per feedback_assert_exact_not_substring):
# this test MUST FAIL if the script is absent. Verified by running
@@ -427,6 +432,22 @@ T22_RC=$(cat "$FIX_STATE_DIR/last_rc")
assert_eq "T22 exit code 1 (missing official rejected)" "1" "$T22_RC"
assert_contains "T22 no candidates error" "no candidates from reviews API or issue comments" "$T22_OUT"
# T23 — missing-commit_id APPROVED review must be rejected.
# SEV-1 internal#812 (supersedes closed internal#843). A review with NO
# commit_id field is the spoof-bug signature: a real reviewer cannot
# have submitted against a commit that doesn't exist. The fail-closed
# SSOT must REJECT — the pre-fix gitea-merge-queue.py silently accepted
# these (the "older Gitea row" escape hatch), which is the exact surface
# that closed #843 had. The Python unit tests in
# test_approval_validator.py cover the predicate at the unit level;
# this T23 covers the bash + jq pipeline end-to-end.
echo
echo "== T23 missing commit_id APPROVED review rejected (SEV-1 fail-closed) =="
T23_OUT=$(run_review_check "T23_missing_commit_id")
T23_RC=$(cat "$FIX_STATE_DIR/last_rc")
assert_eq "T23 exit code 1 (missing commit_id rejected)" "1" "$T23_RC"
assert_contains "T23 no candidates error" "no candidates from reviews API or issue comments" "$T23_OUT"
echo
echo "------"
echo "PASS=$PASS FAIL=$FAIL"
+19
View File
@@ -21,15 +21,21 @@ on:
branches: [main, staging]
paths:
- '.gitea/scripts/review-check.sh'
- '.gitea/scripts/_approval_validator.py'
- '.gitea/scripts/_review_check_filter.py'
- '.gitea/scripts/tests/test_review_check.sh'
- '.gitea/scripts/tests/_review_check_fixture.py'
- '.gitea/scripts/tests/test_approval_validator.py'
- '.gitea/workflows/review-check-tests.yml'
pull_request:
branches: [main, staging]
paths:
- '.gitea/scripts/review-check.sh'
- '.gitea/scripts/_approval_validator.py'
- '.gitea/scripts/_review_check_filter.py'
- '.gitea/scripts/tests/test_review_check.sh'
- '.gitea/scripts/tests/_review_check_fixture.py'
- '.gitea/scripts/tests/test_approval_validator.py'
- '.gitea/workflows/review-check-tests.yml'
workflow_dispatch:
@@ -70,3 +76,16 @@ jobs:
- name: Run review-check.sh regression suite
run: bash .gitea/scripts/tests/test_review_check.sh
- name: SSOT approval-validator unit tests (SEV-1 internal#812)
# The Python unit tests for _approval_validator.py are
# mutation-verified — every fail-closed branch has an explicit
# REJECT assertion. A reviewer who weakens the predicate trips
# these in CI.
run: |
# The test file lives in .gitea/scripts/tests/ with no __init__.py,
# so `unittest discover -s .gitea/scripts` finds 0 tests (the SEV-1
# suite silently never ran — a CI gap fixed alongside internal#812).
# Run the file directly; it self-inserts its sys.path and calls
# unittest.main(), so a failing assertion exits non-zero and fails CI.
python3 .gitea/scripts/tests/test_approval_validator.py -v