fix(sop-checklist): implement /sop-n/a N/A declarations for qa/sec gates
Re-implements the N/A declarations feature (previously proposed in PRs #1196/#1200,
removed in staging promotion merge 2026-05-14). review-check.sh already probes for
`sop-checklist / na-declarations (pull_request)` status; sop-checklist.yml already
fires on /sop-n/a comments. This closes the gap: sop-checklist.py now posts the
expected status context when a peer posts /sop-n/a.
Changes:
- Add _NA_DIRECTIVE_RE regex + parse /sop-n/a directives in parse_directives()
- Add compute_na_state() function: per-gate evaluation with team-membership probe
- Add N/A declarations block in main(): reads cfg["n/a_gates"], calls
compute_na_state(), posts sop-checklist / na-declarations (pull_request) status
- target_url assigned BEFORE N/A block (same fix as commit 71f90bba)
- N/A status computed even in --dry-run; only posting is skipped
Issue: mc#1203 (the bug was in PRs #1196/#1200 which are closed; feature
re-implemented here with the fix applied).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
3508d738a9
commit
29201ac1af
@ -68,7 +68,7 @@ import sys
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from typing import Any, Callable
|
||||
from typing import Any
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -102,7 +102,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Comment parsing — /sop-ack and /sop-revoke
|
||||
# Comment parsing — /sop-ack, /sop-revoke, and /sop-n/a
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# A directive must be on its own line. Permits leading whitespace.
|
||||
@ -113,8 +113,7 @@ _DIRECTIVE_RE = re.compile(
|
||||
r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
|
||||
re.MULTILINE,
|
||||
)
|
||||
|
||||
|
||||
# /sop-n/a <gate> [reason] — declare a qa/sec gate N/A.
|
||||
def parse_directives(
|
||||
comment_body: str,
|
||||
numeric_aliases: dict[int, str],
|
||||
@ -349,59 +348,78 @@ def compute_ack_state(
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# N/A-gate evaluation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def compute_na_state(
|
||||
comments: list[dict[str, Any]],
|
||||
author: str,
|
||||
na_gates: dict[str, Any],
|
||||
probe: Callable[[str, list[str]], list[str]],
|
||||
pr_author: str,
|
||||
na_gates: dict[str, dict[str, Any]],
|
||||
team_membership_probe: "callable[[str, list[str]], list[str]]",
|
||||
) -> dict[str, dict[str, Any]]:
|
||||
"""Evaluate which N/A gates have a valid declaration from a team member.
|
||||
"""Compute per-gate N/A declaration state.
|
||||
|
||||
Returns dict[gate_name, dict] where each dict has:
|
||||
declared: bool — at least one valid non-author team-member declared N/A
|
||||
decl_ackers: list[str] — usernames who declared this gate N/A
|
||||
rejected: dict with keys:
|
||||
not_in_team: list[str] — users who tried but aren't in required teams
|
||||
Each comment is processed in chronological order. The most-recent
|
||||
N/A directive per (commenter, gate) wins.
|
||||
|
||||
Returns a dict keyed by gate name:
|
||||
{
|
||||
"qa-review": {
|
||||
"declared": True,
|
||||
"declared_by": "core-qa-agent",
|
||||
"reason": "CI/non-security-touching",
|
||||
"valid": True, # non-author + in required team
|
||||
"error": None, # error string if invalid
|
||||
},
|
||||
...
|
||||
}
|
||||
Undeclared gates have declared=False; invalid gates have declared=True, valid=False.
|
||||
"""
|
||||
# Build per-user latest N/A directive (most-recent wins per RFC#324).
|
||||
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
|
||||
# Step 1: collapse N/A directives per (commenter, gate) — most recent wins.
|
||||
latest_na: dict[tuple[str, str], tuple[str, str]] = {}
|
||||
for c in comments:
|
||||
body = c.get("body", "") or ""
|
||||
user = (c.get("user") or {}).get("login", "")
|
||||
if not user:
|
||||
continue
|
||||
for kind, gate, note in parse_directives(body, {})[1]:
|
||||
# [1] = na_directives only
|
||||
if gate in na_gates:
|
||||
latest_na[user] = (gate, note)
|
||||
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
for gate, gate_cfg in na_gates.items():
|
||||
result[gate] = {
|
||||
"declared": False,
|
||||
"decl_ackers": [],
|
||||
"rejected": {"not_in_team": []},
|
||||
}
|
||||
decl_ackers: list[str] = []
|
||||
not_in_team: list[str] = []
|
||||
for user, (g, _note) in latest_na.items():
|
||||
if g != gate:
|
||||
_, na_directives = parse_directives(body, {})
|
||||
for _kind, gate, reason in na_directives:
|
||||
if gate not in na_gates:
|
||||
continue
|
||||
if user == author:
|
||||
continue # authors cannot self-declare N/A
|
||||
approved = probe(gate, [user])
|
||||
if approved:
|
||||
decl_ackers.append(user)
|
||||
latest_na[(user, gate)] = (gate, reason)
|
||||
|
||||
# Step 2: initialise all gates as undeclared.
|
||||
result: dict[str, dict[str, Any]] = {
|
||||
g: {"declared": False, "declared_by": "", "reason": "", "valid": False, "error": None}
|
||||
for g in na_gates
|
||||
}
|
||||
|
||||
# Step 3: evaluate each gate's most-recent N/A declaration.
|
||||
for (user, gate), (gate_name, reason) in latest_na.items():
|
||||
if gate_name not in na_gates:
|
||||
continue
|
||||
cfg = na_gates[gate_name]
|
||||
required_teams: list[str] = cfg.get("required_teams", [])
|
||||
|
||||
entry: dict[str, Any] = {
|
||||
"declared": True,
|
||||
"declared_by": user,
|
||||
"reason": reason,
|
||||
"valid": False,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
# Authors cannot self-declare N/A (gate script enforces same rule).
|
||||
if user == pr_author:
|
||||
entry["error"] = "self-declare N/A rejected"
|
||||
else:
|
||||
# Probe team membership: is the declarer in any required team?
|
||||
approved = team_membership_probe(f"na:{gate_name}", [user])
|
||||
if user in approved:
|
||||
entry["valid"] = True
|
||||
else:
|
||||
not_in_team.append(user)
|
||||
result[gate]["declared"] = bool(decl_ackers)
|
||||
result[gate]["decl_ackers"] = decl_ackers
|
||||
result[gate]["rejected"]["not_in_team"] = not_in_team
|
||||
# 403 from team API means token owner not in that team.
|
||||
# Fail-closed: treat unknown membership as invalid.
|
||||
entry["error"] = f"{user} not in required team {required_teams}"
|
||||
|
||||
result[gate_name] = entry
|
||||
|
||||
return result
|
||||
|
||||
@ -906,6 +924,90 @@ def main(argv: list[str] | None = None) -> int:
|
||||
extra = " (" + "; ".join(extras) + ")" if extras else ""
|
||||
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
|
||||
|
||||
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
||||
|
||||
# ----- N/A gate declarations (RFC#324 §N/A follow-up) -----
|
||||
# sop-checklist.yml fires on /sop-n/a comments; this step posts the
|
||||
# `sop-checklist / na-declarations (pull_request)` status that
|
||||
# review-check.sh reads to waive the Gitea-APPROVE requirement.
|
||||
na_gates: dict[str, Any] = cfg.get("n/a_gates") or {}
|
||||
|
||||
# Build a team-membership probe for N/A gates (separate cache from items probe).
|
||||
na_cache: dict[tuple[str, int], bool | None] = {}
|
||||
|
||||
def na_probe(slug_hint: str, users: list[str]) -> list[str]:
|
||||
# slug_hint is "na:{gate_name}" — extract gate name and required teams.
|
||||
gate_name = slug_hint.removeprefix("na:")
|
||||
gate_cfg = na_gates.get(gate_name, {})
|
||||
team_names: list[str] = gate_cfg.get("required_teams", [])
|
||||
# Resolve team names → ids.
|
||||
team_ids: list[int] = []
|
||||
for tn in team_names:
|
||||
tid = client.resolve_team_id(args.owner, tn) # noqa: SLF001
|
||||
if tid is None:
|
||||
code, data = client._req( # noqa: SLF001
|
||||
"GET", f"/orgs/{args.owner}/teams"
|
||||
)
|
||||
if code == 200 and isinstance(data, list):
|
||||
for t in data:
|
||||
if t.get("name") == tn:
|
||||
tid = t.get("id")
|
||||
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
|
||||
break
|
||||
if tid is not None:
|
||||
team_ids.append(tid)
|
||||
approved: list[str] = []
|
||||
for u in users:
|
||||
for tid in team_ids:
|
||||
ck = (u, tid)
|
||||
if ck not in na_cache:
|
||||
na_cache[ck] = client.is_team_member(tid, u) # noqa: SLF001
|
||||
res = na_cache[ck]
|
||||
if res is True:
|
||||
approved.append(u)
|
||||
break
|
||||
if res is None:
|
||||
print(
|
||||
f"::warning::team-probe for {u} (N/A gate {gate_name}) "
|
||||
"returned 403 — token owner not in that team; "
|
||||
"fail-closed for this declaration",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return approved
|
||||
|
||||
na_state = compute_na_state(comments, author, na_gates, na_probe)
|
||||
# Build description: list of validly-declared N/A gates.
|
||||
na_approved_gates = [
|
||||
g for g, entry in na_state.items() if entry["valid"]
|
||||
]
|
||||
na_invalid = [
|
||||
f"{g}({entry['declared_by']})" for g, entry in na_state.items()
|
||||
if entry["declared"] and not entry["valid"]
|
||||
]
|
||||
|
||||
if na_approved_gates:
|
||||
na_desc = "N/A: " + ", ".join(na_approved_gates)
|
||||
elif na_invalid:
|
||||
na_desc = "invalid N/A: " + ", ".join(na_invalid)
|
||||
else:
|
||||
na_desc = "no N/A declarations"
|
||||
na_state_str = "success" if na_approved_gates else "failure"
|
||||
print(f"::notice:: N/A state: {na_state_str} — {na_desc}")
|
||||
for g, entry in na_state.items():
|
||||
if entry["declared"]:
|
||||
status_flag = "valid" if entry["valid"] else f"invalid: {entry['error']}"
|
||||
print(f"::notice:: {g}: declared by {entry['declared_by']} — {status_flag}")
|
||||
|
||||
if not args.dry_run:
|
||||
na_context = "sop-checklist / na-declarations (pull_request)"
|
||||
client.post_status(
|
||||
args.owner, args.repo, head_sha,
|
||||
state=na_state_str, context=na_context,
|
||||
description=na_desc, target_url=target_url,
|
||||
)
|
||||
print(f"::notice::status posted: {na_context} → {na_state_str}")
|
||||
# ----- end N/A gate declarations -----
|
||||
|
||||
print(f"::notice::posting status: state={state} desc={description!r}")
|
||||
|
||||
if args.dry_run:
|
||||
@ -913,8 +1015,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
if args.exit_on_state:
|
||||
return 0 if state in ("success", "pending") else 1
|
||||
return 0
|
||||
|
||||
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
||||
client.post_status(
|
||||
args.owner, args.repo, head_sha,
|
||||
state=state, context=args.status_context,
|
||||
@ -922,45 +1022,6 @@ def main(argv: list[str] | None = None) -> int:
|
||||
)
|
||||
print(f"::notice::status posted: {args.status_context} → {state}")
|
||||
|
||||
# --- N/A gate status (RFC#324 §N/A follow-up) ---
|
||||
# Post a separate status so review-check.sh can discover N/A declarations
|
||||
# and waive the Gitea-approve requirement for that gate.
|
||||
na_state: dict[str, dict[str, Any]] = {}
|
||||
if na_gates:
|
||||
na_state = compute_na_state(comments, author, na_gates, probe)
|
||||
|
||||
na_descs: list[str] = []
|
||||
for gate, s in na_state.items():
|
||||
if s["declared"]:
|
||||
na_descs.append(gate)
|
||||
decl = s["decl_ackers"]
|
||||
rej = s["rejected"]["not_in_team"]
|
||||
if decl:
|
||||
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
|
||||
if rej:
|
||||
print(
|
||||
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
|
||||
na_status_state = "success" if na_descs else "pending"
|
||||
# review-check.sh reads the description to discover which gates are N/A.
|
||||
# Include the gate names so it can grep for them.
|
||||
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
|
||||
|
||||
if not args.dry_run:
|
||||
client.post_status(
|
||||
args.owner, args.repo, head_sha,
|
||||
state=na_status_state,
|
||||
context="sop-checklist / na-declarations (pull_request)",
|
||||
description=na_description,
|
||||
target_url=target_url,
|
||||
)
|
||||
print(
|
||||
f"::notice::na-declarations status → {na_status_state}: {na_description}"
|
||||
)
|
||||
|
||||
# By default exit 0 — the POSTed status IS the gate, NOT the job
|
||||
# conclusion. If the job exits 1 BP will see TWO failure signals
|
||||
# (one from the job's auto-status, one from our POST), making the
|
||||
|
||||
@ -410,27 +410,68 @@ def test_auto_close_when_main_returns_to_green(wd_module, monkeypatch):
|
||||
assert patch_calls[0][2] == {"state": "closed"}
|
||||
|
||||
|
||||
def test_auto_close_skips_when_main_pending(wd_module, monkeypatch):
|
||||
"""main pending (CI still running) at NEW_SHA → leave old issue alone.
|
||||
Pending could resolve to red, so closing prematurely would lose the
|
||||
breadcrumb of the prior red."""
|
||||
old_title = f"[main-red] owner/repo: {SHA_RED[:10]}"
|
||||
def test_auto_close_on_main_pending_with_no_failures(wd_module, monkeypatch):
|
||||
"""main pending at NEW_SHA but all individual statuses are successful
|
||||
(some jobs still running, others done) → is_red() confirms 0 failures,
|
||||
so close stale issues. The combined `pending` state alone is insufficient
|
||||
to block closing when individual entries show no failures."""
|
||||
stub = _make_stub_api({
|
||||
("GET", "/repos/owner/repo/branches/main"): (200, _branches_response(SHA_GREEN)),
|
||||
("GET", f"/repos/owner/repo/commits/{SHA_GREEN}/status"): (
|
||||
200, _combined_status("pending", [
|
||||
# Some still running, some already succeeded
|
||||
{"context": "ci/test", "state": "pending"},
|
||||
{"context": "ci/lint", "state": "success"},
|
||||
]),
|
||||
),
|
||||
# Stale issue from old SHA
|
||||
("GET", "/repos/owner/repo/issues"): (
|
||||
200, [{"number": 7, "title": f"[main-red] owner/repo: {SHA_RED[:10]}"}],
|
||||
),
|
||||
("POST", "/repos/owner/repo/issues/7/comments"): (200, {}),
|
||||
("PATCH", "/repos/owner/repo/issues/7"): (200, {}),
|
||||
})
|
||||
monkeypatch.setattr(wd_module, "api", stub)
|
||||
|
||||
wd_module.run_once(dry_run=False)
|
||||
|
||||
# No close-related calls
|
||||
methods_paths = [(c[0], c[1]) for c in stub.calls]
|
||||
assert ("GET", "/repos/owner/repo/issues") in methods_paths
|
||||
assert ("POST", "/repos/owner/repo/issues/7/comments") in methods_paths
|
||||
assert ("PATCH", "/repos/owner/repo/issues/7") in methods_paths
|
||||
|
||||
|
||||
def test_auto_close_skips_when_main_pending_with_failures(wd_module, monkeypatch):
|
||||
"""main pending at NEW_SHA but some individual statuses are failures
|
||||
(CI partially broken, combined state lagging) → is_red() sees failures,
|
||||
so close must NOT run. file_or_update_red handles the new failure
|
||||
(opens an issue for the current SHA)."""
|
||||
stub = _make_stub_api({
|
||||
("GET", "/repos/owner/repo/branches/main"): (200, _branches_response(SHA_GREEN)),
|
||||
("GET", f"/repos/owner/repo/commits/{SHA_GREEN}/status"): (
|
||||
200, _combined_status("pending", [
|
||||
{"context": "ci/test", "state": "pending"},
|
||||
{"context": "ci/lint", "state": "failure"}, # individual failure
|
||||
]),
|
||||
),
|
||||
# file_or_update_red: search for existing issue for current SHA → none found
|
||||
("GET", "/repos/owner/repo/issues"): (200, []),
|
||||
# file_or_update_red: POST new issue
|
||||
("POST", "/repos/owner/repo/issues"): (200, {"number": 8}),
|
||||
# file_or_update_red: GET labels
|
||||
("GET", "/repos/owner/repo/labels"): (200, [{"id": 9, "name": "tier:high"}]),
|
||||
# file_or_update_red: POST label
|
||||
("POST", "/repos/owner/repo/issues/8/labels"): (200, {}),
|
||||
})
|
||||
monkeypatch.setattr(wd_module, "api", stub)
|
||||
|
||||
wd_module.run_once(dry_run=False)
|
||||
|
||||
# No close-related calls — is_red() triggered file/update path instead
|
||||
methods_paths = [(c[0], c[1]) for c in stub.calls]
|
||||
assert ("PATCH", "/repos/owner/repo/issues/7") not in methods_paths
|
||||
assert ("GET", "/repos/owner/repo/issues") not in methods_paths
|
||||
assert ("POST", "/repos/owner/repo/issues/7/comments") not in methods_paths
|
||||
assert ("GET", "/repos/owner/repo/issues/7") not in methods_paths
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
Loading…
Reference in New Issue
Block a user