From 3461b86cba0cce1fe7a7e6e07a3774b26fccfc92 Mon Sep 17 00:00:00 2001 From: Molecule AI Core Platform Lead Date: Sat, 16 May 2026 09:39:27 +0000 Subject: [PATCH] fix(sop-checklist): post na-declarations status for review-check.sh --- .gitea/scripts/sop-checklist.py | 193 ++++++++++++++++++--- .gitea/scripts/tests/test_sop_checklist.py | 52 ++++++ 2 files changed, 220 insertions(+), 25 deletions(-) diff --git a/.gitea/scripts/sop-checklist.py b/.gitea/scripts/sop-checklist.py index e6351df3..efd62e9c 100644 --- a/.gitea/scripts/sop-checklist.py +++ b/.gitea/scripts/sop-checklist.py @@ -68,7 +68,7 @@ import sys import urllib.error import urllib.parse import urllib.request -from typing import Any +from typing import Any, Callable # --------------------------------------------------------------------------- @@ -110,7 +110,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s # for /sop-revoke (RFC#351 open question 4 — reason is captured but not # yet validated; future iteration may require a min-length). _DIRECTIVE_RE = re.compile( - r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$", + r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$", re.MULTILINE, ) @@ -118,19 +118,21 @@ _DIRECTIVE_RE = re.compile( def parse_directives( comment_body: str, numeric_aliases: dict[int, str], -) -> tuple[list[tuple[str, str, str]], list]: - """Extract /sop-ack and /sop-revoke directives from a comment body. +) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]: + """Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body. - Returns (directives, na_directives) where: - directives is a list of (kind, canonical_slug, note) tuples - kind is "sop-ack" or "sop-revoke" - canonical_slug is the normalized form (or "" if unparseable) - note is the trailing free-text (may be "") - na_directives is reserved for future N/A handling (always [] for now) + Returns (directives, na_directives) where each is a list of + (kind, canonical_slug, note) tuples: + kind is "sop-ack", "sop-revoke", or "sop-n/a" + canonical_slug is the normalized form (or "" if unparseable) + note is the trailing free-text (may be "") + The two lists are kept separate so call sites can unpack them + directly (e.g. directives, na_directives = parse_directives(...)). """ - out: list[tuple[str, str, str]] = [] + directives: list[tuple[str, str, str]] = [] + na_directives: list[tuple[str, str, str]] = [] if not comment_body: - return out, [] + return directives, na_directives for m in _DIRECTIVE_RE.finditer(comment_body): kind = m.group(1) raw_slug = (m.group(2) or "").strip() @@ -160,8 +162,12 @@ def parse_directives( note_from_group = (m.group(3) or "").strip() # If we collapsed multi-word slug into kebab and there's a # trailing-text group too, append it. - out.append((kind, canonical, note_from_group)) - return out, [] + entry = (kind, canonical, note_from_group) + if kind == "sop-n/a": + na_directives.append(entry) + else: + directives.append(entry) + return directives, na_directives # --------------------------------------------------------------------------- @@ -174,8 +180,8 @@ def section_marker_present(body: str, marker: str) -> bool: on a non-empty line (i.e. the author actually filled it in). We require the marker substring AND non-whitespace content on the - same line OR within the next line — this prevents trivially-empty - checklists like: + same line OR within the next non-blank line — this prevents + trivially-empty checklists like: ## SOP-Checklist - [ ] **Comprehensive testing performed**: @@ -184,9 +190,18 @@ def section_marker_present(body: str, marker: str) -> bool: from auto-passing the section-present check. The peer-ack is still required, but answering with empty content is captured as a soft finding via the section-present test alone. + + NOTE: we scan forward through blank lines (the markdown-header pattern + is ## Header\\n\\ncontent) so that a header + blank-line + content + structure still satisfies the check. The backward checkbox fallback + catches inline markers without a preceding checkbox (mc#1099). """ if not body or not marker: return False + # Strip trailing whitespace so the blank-line scan below can find + # content that appears on the very last line of the body (without + # being misled by a trailing \n or spaces). + body = body.rstrip() body_lower = body.lower() marker_lower = marker.lower() idx = body_lower.find(marker_lower) @@ -202,13 +217,44 @@ def section_marker_present(body: str, marker: str) -> bool: stripped = re.sub(r"[\s\*:\-\[\]]+", "", line) if stripped: return True - # Fall through: check the NEXT line (multi-line answers). - next_line_end = body.find("\n", line_end + 1) - if next_line_end < 0: - next_line_end = len(body) - next_line = body[line_end + 1:next_line_end] - stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line) - return bool(stripped_next) + # Fall through: scan forward, skipping blank-only lines, until we find + # non-empty content or run out of body. Handles: + # ## Header ← marker line (empty after marker) + # ← blank line (skipped) + # - actual content ← found + pos = line_end + while True: + # Skip the current newline and any additional newlines (blank lines). + while pos < len(body) and body[pos] == "\n": + pos += 1 + if pos >= len(body): + break + line_end = body.find("\n", pos) + if line_end < 0: + line_end = len(body) + line = body[pos:line_end] + stripped = re.sub(r"[\s\*:\-\[\]]+", "", line) + if stripped: + return True + pos = line_end + # Last resort: the marker may appear mid-sentence (e.g. + # **Memory/saved-feedback consulted**: No applicable...). + # Search backward within the CURRENT LINE only (not preceding lines) + # to find a checkbox on the same line before the marker text. + # mc#1099 follow-up: memory-consulted detection was failing because + # the checkbox was on the same line before the inline marker. + _CHECKBOX_RE = re.compile(r"- \[[ x\]]| dict[str, dict[str, Any]]: + """Evaluate which N/A gates have a valid declaration from a team member. + + Returns dict[gate_name, dict] where each dict has: + declared: bool — at least one valid non-author team-member declared N/A + decl_ackers: list[str] — usernames who declared this gate N/A + rejected: dict with keys: + not_in_team: list[str] — users who tried but aren't in required teams + """ + # Build per-user latest N/A directive (most-recent wins per RFC#324). + latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note) + for c in comments: + body = c.get("body", "") or "" + user = (c.get("user") or {}).get("login", "") + if not user: + continue + for kind, gate, note in parse_directives(body, {})[1]: + # [1] = na_directives only + if gate in na_gates: + latest_na[user] = (gate, note) + + result: dict[str, dict[str, Any]] = {} + for gate, gate_cfg in na_gates.items(): + result[gate] = { + "declared": False, + "decl_ackers": [], + "rejected": {"not_in_team": []}, + } + decl_ackers: list[str] = [] + not_in_team: list[str] = [] + for user, (g, _note) in latest_na.items(): + if g != gate: + continue + if user == author: + continue # authors cannot self-declare N/A + approved = probe(gate, [user]) + if approved: + decl_ackers.append(user) + else: + not_in_team.append(user) + result[gate]["declared"] = bool(decl_ackers) + result[gate]["decl_ackers"] = decl_ackers + result[gate]["rejected"]["not_in_team"] = not_in_team + + return result + + # --------------------------------------------------------------------------- # Gitea API client # --------------------------------------------------------------------------- @@ -698,6 +800,7 @@ def main(argv: list[str] | None = None) -> int: cfg = load_config(args.config) items: list[dict[str, Any]] = cfg["items"] items_by_slug = {it["slug"]: it for it in items} + na_gates: dict[str, Any] = cfg.get("n/a_gates", {}) numeric_aliases = { int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias") } @@ -818,6 +921,46 @@ def main(argv: list[str] | None = None) -> int: description=description, target_url=target_url, ) print(f"::notice::status posted: {args.status_context} → {state}") + + # --- N/A gate status (RFC#324 §N/A follow-up) --- + # Post a separate status so review-check.sh can discover N/A declarations + # and waive the Gitea-approve requirement for that gate. + na_state: dict[str, dict[str, Any]] = {} + if na_gates: + na_state = compute_na_state(comments, author, na_gates, probe) + + na_descs: list[str] = [] + for gate, s in na_state.items(): + if s["declared"]: + na_descs.append(gate) + decl = s["decl_ackers"] + rej = s["rejected"]["not_in_team"] + if decl: + print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}") + if rej: + print( + f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}", + file=sys.stderr, + ) + + na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)" + na_status_state = "success" if na_descs else "pending" + # review-check.sh reads the description to discover which gates are N/A. + # Include the gate names so it can grep for them. + na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)" + + if not args.dry_run: + client.post_status( + args.owner, args.repo, head_sha, + state=na_status_state, + context="sop-checklist / na-declarations (pull_request)", + description=na_description, + target_url=target_url, + ) + print( + f"::notice::na-declarations status → {na_status_state}: {na_description}" + ) + # By default exit 0 — the POSTed status IS the gate, NOT the job # conclusion. If the job exits 1 BP will see TWO failure signals # (one from the job's auto-status, one from our POST), making the diff --git a/.gitea/scripts/tests/test_sop_checklist.py b/.gitea/scripts/tests/test_sop_checklist.py index 24fbc54c..91c016a1 100644 --- a/.gitea/scripts/tests/test_sop_checklist.py +++ b/.gitea/scripts/tests/test_sop_checklist.py @@ -551,3 +551,55 @@ class TestEndToEndAckFlow(unittest.TestCase): if __name__ == "__main__": unittest.main(verbosity=2) + + +# --------------------------------------------------------------------------- +# compute_na_state +# --------------------------------------------------------------------------- + + +class TestComputeNaState(unittest.TestCase): + """Tests for /sop-n/a directive evaluation.""" + + def test_no_na_declarations(self): + cfg = sop.load_config(CONFIG_PATH) + na_gates = cfg.get("n/a_gates", {}) + comments = [] + na_state = sop.compute_na_state(comments, "alice", na_gates, lambda *_: []) + self.assertFalse(na_state["qa-review"]["declared"]) + self.assertFalse(na_state["security-review"]["declared"]) + + def test_na_declared_by_authorized_user(self): + cfg = sop.load_config(CONFIG_PATH) + na_gates = cfg.get("n/a_gates", {}) + comments = [_comment("bob", "/sop-n/a qa-review N/A: pure tooling change")] + na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u) + self.assertTrue(na_state["qa-review"]["declared"]) + self.assertEqual(na_state["qa-review"]["decl_ackers"], ["bob"]) + + def test_na_declared_by_unauthorized_user_rejected(self): + cfg = sop.load_config(CONFIG_PATH) + na_gates = cfg.get("n/a_gates", {}) + comments = [_comment("mallory", "/sop-n/a qa-review N/A: not real team")] + na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: []) + self.assertFalse(na_state["qa-review"]["declared"]) + self.assertEqual(na_state["qa-review"]["rejected"]["not_in_team"], ["mallory"]) + + def test_author_cannot_self_declare_na(self): + cfg = sop.load_config(CONFIG_PATH) + na_gates = cfg.get("n/a_gates", {}) + comments = [_comment("alice", "/sop-n/a qa-review N/A: I am the author")] + na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u) + self.assertFalse(na_state["qa-review"]["declared"]) + + def test_parse_directives_separates_na_from_ack(self): + directives, na_directives = sop.parse_directives( + "/sop-ack comprehensive-testing\n/sop-n/a qa-review N/A: no surface", + {}, + ) + self.assertEqual(len(directives), 1) + self.assertEqual(directives[0][0], "sop-ack") + self.assertEqual(len(na_directives), 1) + self.assertEqual(na_directives[0][0], "sop-n/a") + self.assertEqual(na_directives[0][1], "qa-review") + self.assertIn("no surface", na_directives[0][2])