From 72df12ecef291eab99282fa7c96e962a09fcd0cc Mon Sep 17 00:00:00 2001 From: core-devops Date: Tue, 12 May 2026 06:08:36 +0000 Subject: [PATCH] =?UTF-8?q?feat(ci):=20sop-checklist-gate=20=E2=80=94=20pe?= =?UTF-8?q?er-ack=20merge=20gate=20(RFC#351=20Phase=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFC#351 Step 2 of 6: implementation MVP of the SOP-checklist peer-ack merge gate. NOT yet wired to branch protection (Phase 4 needs separate authorization). What: - .gitea/sop-checklist-config.yaml — 7-item checklist with slug, numeric_alias (1..7), pr_section_marker, required_teams. Includes tier-aware failure-mode map: tier:high/medium=hard, tier:low=soft, default=hard (never silently lower the bar). - .gitea/scripts/sop-checklist-gate.py — parses PR body + comments, computes per-item ack state, posts commit-status "sop-checklist / all-items-acked (pull_request)". - .gitea/scripts/tests/test_sop_checklist_gate.py — 51 unit tests covering slug normalization, directive parsing, section-marker detection, ack-state computation (self-ack reject, revoke semantics, multi-user/multi-item, numeric aliases), tier-mode selection, and end-to-end happy path. - .gitea/workflows/sop-checklist-gate.yml — pull_request_target [opened/edited/synchronize/reopened] + issue_comment [created/edited/deleted]. Checks out BASE ref only (trust boundary per RFC#324 §A4). Mirrors qa-review/security-review patterns. Why: Hongming 2026-05-12T05:42Z asked for SOP-enforcing CI/CD that requires peer-ack on each checklist item before merge. Composes the existing patterns (scripts-lint PR-body parser + RFC#324 persona-whitelist commit-status + sop-tier-check tier-awareness) into one gate. Slash-command contract: /sop-ack [note] — register peer-ack (most-recent wins) /sop-revoke [reason] — invalidate own prior ack Slug normalization accepts kebab-case, snake_case, natural-spaces, or numeric 1..7 shorthand (all canonicalize to kebab-case via the config-driven alias table). Tests: 51/51 pass locally. Dry-run probe against PR#685 verified the full pipeline (PR fetch, comment fetch, ack computation, status description rendering inside the 140-char budget). Not yet: - Phase 3 (24h soak) - Phase 4 (BP PATCH to require this context — needs Hongming GO) - Phase 5 (cross-repo) - Phase 6 (dev-sop.md codification) - SOP_CHECKLIST_GATE_TOKEN secret provisioning (separate follow-up; fail-closed until provisioned, same as RFC_324_TEAM_READ_TOKEN pattern in qa-review.yml). Cross-links: - internal#351 (RFC body) - RFC#324 (qa-review/security-review — reused mechanism) - internal#346 (dev-sop.md SOP-14..SOP-20 — sibling rules) - feedback_pull_request_review_no_refire (why issue_comment trigger) - feedback_checkpointed_workflow_over_good_practice_doc (motivation) - feedback_fix_root_not_symptom (default-mode=hard rationale) ## What Add a SOP-checklist peer-ack merge gate: workflow + script + config + 51 unit tests. ## Why Hongming-requested mechanism to enforce SOP via CI/CD: each PR checklist item must be peer-acked before merge, with team-membership-verified ackers and tier-aware failure mode. ## Verification - 51/51 unit tests pass (slug normalization, parse_directives, section marker detection, ack-state including self-ack rejection + revoke semantics, tier-mode mapping, end-to-end happy path). - YAML lint clean (yaml.safe_load + lint-workflow-yaml.py on the new workflow — pre-existing fatals on unrelated files only). - Python syntax clean (py_compile). - Dry-run against live PR#685: PR fetch, comment enumeration, status description render all within 140-char budget — works end-to-end. ## Tier tier:medium — net-new CI workflow; no production impact; no BP change yet (Phase 4 separate auth). Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitea/scripts/sop-checklist-gate.py | 805 ++++++++++++++++++ .../scripts/tests/test_sop_checklist_gate.py | 524 ++++++++++++ .gitea/sop-checklist-config.yaml | 109 +++ .gitea/workflows/sop-checklist-gate.yml | 121 +++ 4 files changed, 1559 insertions(+) create mode 100755 .gitea/scripts/sop-checklist-gate.py create mode 100644 .gitea/scripts/tests/test_sop_checklist_gate.py create mode 100644 .gitea/sop-checklist-config.yaml create mode 100644 .gitea/workflows/sop-checklist-gate.yml diff --git a/.gitea/scripts/sop-checklist-gate.py b/.gitea/scripts/sop-checklist-gate.py new file mode 100755 index 00000000..9f25e18a --- /dev/null +++ b/.gitea/scripts/sop-checklist-gate.py @@ -0,0 +1,805 @@ +#!/usr/bin/env python3 +# sop-checklist-gate — evaluate whether a PR has peer-acked each +# SOP-checklist item. Posts a commit-status that branch protection +# can require. +# +# RFC#351 Step 2 of 6 (implementation MVP). +# +# Invoked by .gitea/workflows/sop-checklist-gate.yml on: +# - pull_request_target: [opened, edited, synchronize, reopened] +# - issue_comment: [created, edited, deleted] +# +# Flow: +# 1. Load .gitea/sop-checklist-config.yaml (from BASE ref — trusted). +# 2. GET /repos/{R}/pulls/{N} — author, head.sha, tier label +# 3. GET /repos/{R}/issues/{N}/comments — extract /sop-ack and /sop-revoke +# 4. For each checklist item: +# a. Is the section marker present in PR body? (author answered) +# b. Is there ≥1 unrevoked /sop-ack from a non-author whose +# team-membership matches required_teams? +# 5. POST /repos/{R}/statuses/{sha} — context +# `sop-checklist / all-items-acked (pull_request)`, +# state=success | failure | pending, description=`acked: N/M …`. +# +# Trust boundary (mirrors RFC#324 §A4): +# This script is loaded from the BASE branch. The workflow's +# actions/checkout step pins ref=base.sha. PR-HEAD code is never +# executed. We only HTTP-call the Gitea API. +# +# Token scope: +# - read:repository / read:organization to enumerate PR + comments +# + team membership (Gitea 1.22.6 quirk: team-membership endpoint +# returns 403 if token owner is not in the team; see review-check.sh +# for the same gotcha — we surface the same fail-closed message). +# - write:repository for `POST /repos/{R}/statuses/{sha}`. Unlike +# RFC#324's pattern (which uses the JOB's own pass/fail as the +# status), we POST the status explicitly because the gate posts +# a single multi-item status with a richer description than a +# bare success/failure context can carry. +# +# Slug normalization rules (canonical form: kebab-case): +# - Lowercase +# - Whitespace + underscores → single dash +# - Strip non [a-z0-9-] characters +# - Collapse adjacent dashes +# - Strip leading/trailing dashes +# - If the result is a digit string (e.g. "1"), look up via +# config.items[*].numeric_alias to get the kebab-case slug. +# +# Examples: +# "Comprehensive_Testing" → "comprehensive-testing" +# "comprehensive testing" → "comprehensive-testing" +# "1" → "comprehensive-testing" +# "Five-Axis-Review" → "five-axis-review" +# +# Revoke semantics: +# /sop-revoke [reason] — most-recent comment per (slug, user) +# wins. So if Alice posts /sop-ack X then later /sop-revoke X, her ack +# for X is invalidated. Bob's prior /sop-ack X is unaffected. If Alice +# posts /sop-revoke X then later /sop-ack X again, the ack is restored. + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from typing import Any + + +# --------------------------------------------------------------------------- +# Slug normalization +# --------------------------------------------------------------------------- + +_NORMALIZE_REPLACE_RE = re.compile(r"[\s_]+") +_NORMALIZE_STRIP_RE = re.compile(r"[^a-z0-9-]") +_NORMALIZE_DASH_RE = re.compile(r"-+") + + +def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> str: + """Normalize a user-supplied slug to canonical kebab-case form. + + See module header for the rules. + + If the input is a pure digit string AND numeric_aliases is provided, + the alias mapping is consulted. Unknown digits return "" so the caller + can flag the comment as unparseable. + """ + if raw is None: + return "" + s = raw.strip().lower() + s = _NORMALIZE_REPLACE_RE.sub("-", s) + s = _NORMALIZE_STRIP_RE.sub("", s) + s = _NORMALIZE_DASH_RE.sub("-", s) + s = s.strip("-") + if s.isdigit() and numeric_aliases is not None: + return numeric_aliases.get(int(s), "") + return s + + +# --------------------------------------------------------------------------- +# Comment parsing — /sop-ack and /sop-revoke +# --------------------------------------------------------------------------- + +# A directive must be on its own line. Permits leading whitespace. +# Optional trailing note after the slug for /sop-ack and required reason +# for /sop-revoke (RFC#351 open question 4 — reason is captured but not +# yet validated; future iteration may require a min-length). +_DIRECTIVE_RE = re.compile( + r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$", + re.MULTILINE, +) + + +def parse_directives( + comment_body: str, + numeric_aliases: dict[int, str], +) -> list[tuple[str, str, str]]: + """Extract /sop-ack and /sop-revoke directives from a comment body. + + Returns a list of (kind, canonical_slug, note) tuples where: + kind is "sop-ack" or "sop-revoke" + canonical_slug is the normalized form (or "" if unparseable) + note is the trailing free-text (may be "") + """ + out: list[tuple[str, str, str]] = [] + if not comment_body: + return out + for m in _DIRECTIVE_RE.finditer(comment_body): + kind = m.group(1) + raw_slug = (m.group(2) or "").strip() + # If the raw match included trailing words, the regex non-greedy + # captured only the first token; strip again for safety. + # We split on whitespace to keep the FIRST word as the slug, and + # everything after as the note. + parts = raw_slug.split() + if not parts: + continue + first = parts[0] + # If the slug-capture greedily matched multiple words (e.g. + # "comprehensive testing"), preserve normalize behavior: join + # the WHOLE first-word-token only; trailing words get appended to + # the note. The regex limits group(2) to [A-Za-z0-9_\- ] so we + # may have multi-word forms here — normalize handles them. + if len(parts) > 1: + # User wrote "/sop-ack comprehensive testing extra-note" + # → treat "comprehensive testing" as the slug source if it + # normalizes to a known item; otherwise treat "comprehensive" + # as slug and "testing extra-note" as note. We defer the + # disambiguation to the caller via the returned canonical + # slug. For simplicity: try the WHOLE captured string first. + canonical = normalize_slug(raw_slug, numeric_aliases) + else: + canonical = normalize_slug(first, numeric_aliases) + note_from_group = (m.group(3) or "").strip() + # If we collapsed multi-word slug into kebab and there's a + # trailing-text group too, append it. + out.append((kind, canonical, note_from_group)) + return out + + +# --------------------------------------------------------------------------- +# PR body section detection +# --------------------------------------------------------------------------- + + +def section_marker_present(body: str, marker: str) -> bool: + """Return True if `marker` appears in `body` case-insensitively + on a non-empty line (i.e. the author actually filled it in). + + We require the marker substring AND non-whitespace content on the + same line OR within the next line — this prevents trivially-empty + checklists like: + + ## SOP-Checklist + - [ ] **Comprehensive testing performed**: + - [ ] **Local-postgres E2E run**: + + from auto-passing the section-present check. The peer-ack is still + required, but answering with empty content is captured as a soft + finding via the section-present test alone. + """ + if not body or not marker: + return False + body_lower = body.lower() + marker_lower = marker.lower() + idx = body_lower.find(marker_lower) + if idx < 0: + return False + # Walk to end of line. + line_end = body.find("\n", idx) + if line_end < 0: + line_end = len(body) + line = body[idx + len(marker):line_end] + # Strip the colon + checkbox tail patterns; require at least one + # non-whitespace, non-punctuation char. + stripped = re.sub(r"[\s\*:\-\[\]]+", "", line) + if stripped: + return True + # Fall through: check the NEXT line (multi-line answers). + next_line_end = body.find("\n", line_end + 1) + if next_line_end < 0: + next_line_end = len(body) + next_line = body[line_end + 1:next_line_end] + stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line) + return bool(stripped_next) + + +# --------------------------------------------------------------------------- +# Ack-state computation +# --------------------------------------------------------------------------- + + +def compute_ack_state( + comments: list[dict[str, Any]], + pr_author: str, + items_by_slug: dict[str, dict[str, Any]], + numeric_aliases: dict[int, str], + team_membership_probe: "callable[[str, list[str]], list[str]]", +) -> dict[str, dict[str, Any]]: + """Compute per-item ack state. + + Each comment is processed in chronological order. The most-recent + directive per (commenter, slug) wins. + + Returns a dict keyed by canonical slug: + { + "comprehensive-testing": { + "ackers": ["bob"], # non-author, team-verified + "rejected_ackers": { # debugging info + "self_ack": ["alice"], + "unknown_slug": [], + "not_in_team": ["eve"], + } + }, + ... + } + """ + # Step 1: collapse directives per (commenter, slug) — most recent wins. + # comments are expected to come in chronological order from the + # API (Gitea returns oldest-first by default for issues/{N}/comments). + latest_directive: dict[tuple[str, str], str] = {} # (user, slug) → kind + unparseable_per_user: dict[str, int] = {} + for c in comments: + body = c.get("body", "") or "" + user = (c.get("user") or {}).get("login", "") + if not user: + continue + for kind, slug, _note in parse_directives(body, numeric_aliases): + if not slug: + unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1 + continue + latest_directive[(user, slug)] = kind + + # Step 2: build candidate ackers per slug. + # Filter out self-acks and unknown slugs. + ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug} + rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug} + rejected_unknown: dict[str, list[str]] = {s: [] for s in items_by_slug} + pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug} + + for (user, slug), kind in latest_directive.items(): + if kind != "sop-ack": + continue # revokes leave the (user,slug) state as "no ack" + if slug not in items_by_slug: + # Slug normalized to something not in our config — store + # under a synthetic key for diagnostic surfacing. Don't add + # to any item. + continue + if user == pr_author: + rejected_self[slug].append(user) + continue + pending_team_check[slug].append(user) + + # Step 3: team membership probe per slug (batched per slug to keep + # API call count down — same user may ack multiple items but the + # required_teams differ per item, so we MUST probe per (user, item)). + rejected_not_in_team: dict[str, list[str]] = {s: [] for s in items_by_slug} + for slug, candidates in pending_team_check.items(): + if not candidates: + continue + required = items_by_slug[slug]["required_teams"] + approved = team_membership_probe(slug, candidates) # returns subset + rejected_not_in_team[slug] = [u for u in candidates if u not in approved] + ackers_per_slug[slug] = approved + # Stash required teams for description rendering. + items_by_slug[slug]["_required_resolved"] = required + + return { + slug: { + "ackers": ackers_per_slug[slug], + "rejected": { + "self_ack": rejected_self[slug], + "not_in_team": rejected_not_in_team[slug], + }, + } + for slug in items_by_slug + } + + +# --------------------------------------------------------------------------- +# Gitea API client +# --------------------------------------------------------------------------- + + +class GiteaClient: + def __init__(self, host: str, token: str): + self.base = f"https://{host}/api/v1" + self.token = token + # Cache team-name → team-id resolutions per org. + self._team_id_cache: dict[tuple[str, str], int | None] = {} + + def _req( + self, + method: str, + path: str, + body: dict[str, Any] | None = None, + ok_codes: tuple[int, ...] = (200, 201, 204), + ) -> tuple[int, Any]: + url = self.base + path + data = None + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/json", + } + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, method=method, data=data, headers=headers) + try: + with urllib.request.urlopen(req, timeout=20) as r: + raw = r.read() + code = r.getcode() + except urllib.error.HTTPError as e: + code = e.code + raw = e.read() + try: + parsed = json.loads(raw.decode("utf-8")) if raw else None + except json.JSONDecodeError: + parsed = raw.decode("utf-8", errors="replace") if raw else None + return code, parsed + + def get_pr(self, owner: str, repo: str, pr: int) -> dict[str, Any]: + code, data = self._req("GET", f"/repos/{owner}/{repo}/pulls/{pr}") + if code != 200: + raise RuntimeError(f"GET pulls/{pr} → HTTP {code}: {data!r}") + return data + + def get_issue_comments( + self, owner: str, repo: str, issue: int + ) -> list[dict[str, Any]]: + # Paginate. Gitea default page size 50. + out: list[dict[str, Any]] = [] + page = 1 + while True: + code, data = self._req( + "GET", + f"/repos/{owner}/{repo}/issues/{issue}/comments?limit=50&page={page}", + ) + if code != 200: + raise RuntimeError( + f"GET issues/{issue}/comments page={page} → HTTP {code}: {data!r}" + ) + if not data: + break + out.extend(data) + if len(data) < 50: + break + page += 1 + return out + + def resolve_team_id(self, org: str, team_name: str) -> int | None: + key = (org, team_name) + if key in self._team_id_cache: + return self._team_id_cache[key] + code, data = self._req("GET", f"/orgs/{org}/teams/search?q={urllib.parse.quote(team_name)}") + team_id = None + if code == 200 and isinstance(data, dict): + for t in data.get("data", []): + if t.get("name") == team_name: + team_id = t.get("id") + break + if team_id is None and code == 200 and isinstance(data, list): + for t in data: + if t.get("name") == team_name: + team_id = t.get("id") + break + self._team_id_cache[key] = team_id + return team_id + + def is_team_member(self, team_id: int, login: str) -> bool | None: + """Return True / False / None (unknown — 403 from API).""" + code, _ = self._req( + "GET", f"/teams/{team_id}/members/{urllib.parse.quote(login)}" + ) + if code in (200, 204): + return True + if code == 404: + return False + # 403 means the token owner isn't in this team, so the API + # refuses to confirm membership. Fail-closed at the caller. + return None + + def post_status( + self, + owner: str, + repo: str, + sha: str, + state: str, + context: str, + description: str, + target_url: str = "", + ) -> None: + body = { + "state": state, + "context": context, + "description": description[:140], # Gitea truncates to 255 but be safe + "target_url": target_url or "", + } + code, data = self._req( + "POST", + f"/repos/{owner}/{repo}/statuses/{sha}", + body=body, + ok_codes=(201,), + ) + if code not in (200, 201): + raise RuntimeError( + f"POST statuses/{sha} → HTTP {code}: {data!r}" + ) + + +# --------------------------------------------------------------------------- +# Config loader (PyYAML-free — config file is intentionally tiny + flat) +# --------------------------------------------------------------------------- + + +def load_config(path: str) -> dict[str, Any]: + """Load .gitea/sop-checklist-config.yaml. + + Uses PyYAML if available, otherwise falls back to a built-in + minimal parser sufficient for our flat config shape. Bundling + PyYAML on the runner is one apt install away but we avoid the + dep by keeping the config shape constrained. + """ + try: + import yaml # type: ignore[import-not-found] + with open(path) as f: + return yaml.safe_load(f) + except ImportError: + return _load_config_minimal(path) + + +def _load_config_minimal(path: str) -> dict[str, Any]: + """Minimal YAML subset parser for our config shape. + + Supports: top-level scalar:value, top-level map-of-map (e.g. + tier_failure_mode), top-level list of maps (items:), and within an + item map: scalars + lists of scalars. Does NOT support nested lists, + YAML anchors, multi-doc, or flow style. + """ + with open(path) as f: + lines = f.readlines() + return _parse_minimal_yaml(lines) + + +def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]: # noqa: C901 + """Hand-rolled subset parser. See _load_config_minimal docstring.""" + # Strip comments + blank lines but preserve indentation. + cleaned: list[tuple[int, str]] = [] + for raw in lines: + # Don't strip a "#" that is inside a quoted value. + body = raw.rstrip("\n") + # Remove trailing comment. + idx = body.find("#") + if idx >= 0 and (idx == 0 or body[idx - 1] in " \t"): + body = body[:idx].rstrip() + if not body.strip(): + continue + indent = len(body) - len(body.lstrip(" ")) + cleaned.append((indent, body.strip())) + + root: dict[str, Any] = {} + i = 0 + n = len(cleaned) + + def parse_scalar(s: str) -> Any: + s = s.strip() + if s.startswith('"') and s.endswith('"'): + return s[1:-1] + if s.startswith("'") and s.endswith("'"): + return s[1:-1] + if s.lower() in ("true", "yes"): + return True + if s.lower() in ("false", "no"): + return False + try: + return int(s) + except ValueError: + pass + return s + + def parse_inline_list(s: str) -> list[Any]: + s = s.strip() + if not (s.startswith("[") and s.endswith("]")): + return [parse_scalar(s)] + inner = s[1:-1] + if not inner.strip(): + return [] + return [parse_scalar(x.strip()) for x in inner.split(",")] + + while i < n: + indent, line = cleaned[i] + if indent != 0: + i += 1 + continue + if ":" not in line: + i += 1 + continue + key, _, rest = line.partition(":") + key = key.strip() + rest = rest.strip() + if rest == "": + # Block — could be map or list. + i += 1 + # Look ahead for first child. + if i < n and cleaned[i][1].startswith("- "): + # List of items. + items: list[Any] = [] + while i < n and cleaned[i][0] > indent and cleaned[i][1].startswith("- "): + item_indent = cleaned[i][0] + first_kv = cleaned[i][1][2:].strip() # strip "- " + item: dict[str, Any] = {} + if ":" in first_kv: + k, _, v = first_kv.partition(":") + k = k.strip() + v = v.strip() + if v == "": + item[k] = "" + elif v.startswith(">-") or v.startswith(">"): + # Folded scalar continues on subsequent indented lines + collected: list[str] = [] + i += 1 + while i < n and cleaned[i][0] > item_indent: + collected.append(cleaned[i][1]) + i += 1 + item[k] = " ".join(collected) + items.append(item) + continue + elif v.startswith("["): + item[k] = parse_inline_list(v) + else: + item[k] = parse_scalar(v) + i += 1 + # Subsequent k:v lines at deeper indent belong to this item. + while i < n and cleaned[i][0] > item_indent and not cleaned[i][1].startswith("- "): + sub_indent, sub_line = cleaned[i] + if ":" in sub_line: + k, _, v = sub_line.partition(":") + k = k.strip() + v = v.strip() + if v == "": + item[k] = "" + i += 1 + elif v.startswith(">-") or v.startswith(">"): + collected = [] + i += 1 + while i < n and cleaned[i][0] > sub_indent: + collected.append(cleaned[i][1]) + i += 1 + item[k] = " ".join(collected) + elif v.startswith("["): + item[k] = parse_inline_list(v) + i += 1 + else: + item[k] = parse_scalar(v) + i += 1 + else: + i += 1 + items.append(item) + root[key] = items + else: + # Sub-map. + submap: dict[str, Any] = {} + while i < n and cleaned[i][0] > indent: + sub_indent, sub_line = cleaned[i] + if ":" in sub_line: + k, _, v = sub_line.partition(":") + k = k.strip().strip('"').strip("'") + v = v.strip() + if v.startswith("[") and v.endswith("]"): + submap[k] = parse_inline_list(v) + else: + submap[k] = parse_scalar(v) + i += 1 + root[key] = submap + else: + # Inline scalar or list. + if rest.startswith("[") and rest.endswith("]"): + root[key] = parse_inline_list(rest) + else: + root[key] = parse_scalar(rest) + i += 1 + return root + + +# --------------------------------------------------------------------------- +# Main entry point +# --------------------------------------------------------------------------- + + +def render_status( + items: list[dict[str, Any]], + ack_state: dict[str, dict[str, Any]], + body_state: dict[str, bool], +) -> tuple[str, str]: + """Return (state, description) for the commit-status post. + + state is "success" if every item has at least one valid ack + (body section presence is informational only — peer-ack is the + real gate). "pending" is reserved for the soft-fail path + (tier:low) and is set by the caller. + """ + n = len(items) + fully_acked = [ + it["slug"] for it in items if ack_state[it["slug"]]["ackers"] + ] + missing = [ + it["slug"] for it in items if not ack_state[it["slug"]]["ackers"] + ] + missing_body = [it["slug"] for it in items if not body_state.get(it["slug"], False)] + + desc_parts = [f"acked: {len(fully_acked)}/{n}"] + if missing: + # Show up to 3 missing slugs to stay inside the 140-char budget. + shown = ", ".join(missing[:3]) + if len(missing) > 3: + shown += f", +{len(missing) - 3}" + desc_parts.append(f"missing: {shown}") + if missing_body: + desc_parts.append(f"body-unfilled: {len(missing_body)}") + state = "success" if not missing else "failure" + return state, " — ".join(desc_parts) + + +def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str: + """Read tier label, return 'hard' or 'soft' per cfg.tier_failure_mode.""" + labels = pr.get("labels") or [] + tier_labels = [l.get("name", "") for l in labels if (l.get("name", "") or "").startswith("tier:")] + mode_map = cfg.get("tier_failure_mode") or {} + default_mode = cfg.get("default_mode", "hard") + for tl in tier_labels: + if tl in mode_map: + return mode_map[tl] + return default_mode + + +def main(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser() + p.add_argument("--owner", required=True) + p.add_argument("--repo", required=True) + p.add_argument("--pr", type=int, required=True) + p.add_argument("--config", default=".gitea/sop-checklist-config.yaml") + p.add_argument("--gitea-host", default="git.moleculesai.app") + p.add_argument( + "--dry-run", + action="store_true", + help="Compute state but do not POST the status.", + ) + p.add_argument( + "--status-context", + default="sop-checklist / all-items-acked (pull_request)", + ) + args = p.parse_args(argv) + + token = os.environ.get("GITEA_TOKEN", "") + if not token and not args.dry_run: + print("::error::GITEA_TOKEN env required", file=sys.stderr) + return 2 + + cfg = load_config(args.config) + items: list[dict[str, Any]] = cfg["items"] + items_by_slug = {it["slug"]: it for it in items} + numeric_aliases = { + int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias") + } + + client = GiteaClient(args.gitea_host, token) if token else None + if not client: + print("::error::No client (dry-run without token has nothing to do)", file=sys.stderr) + return 2 + + pr = client.get_pr(args.owner, args.repo, args.pr) + if pr.get("state") != "open": + print(f"::notice::PR #{args.pr} is {pr.get('state')} — gate is a no-op") + return 0 + + author = (pr.get("user") or {}).get("login", "") + head_sha = (pr.get("head") or {}).get("sha", "") + body = pr.get("body", "") or "" + + if not author or not head_sha: + print("::error::PR payload missing user.login or head.sha", file=sys.stderr) + return 1 + + comments = client.get_issue_comments(args.owner, args.repo, args.pr) + + # Build team-membership probe closure that caches results per + # (user, team-id) so a user acking multiple items only triggers + # one membership lookup per team. + team_member_cache: dict[tuple[str, int], bool | None] = {} + + def probe(slug: str, users: list[str]) -> list[str]: + item = items_by_slug[slug] + team_names: list[str] = item["required_teams"] + # Resolve names → ids. NOTE: orgs/{org}/teams/search may not be + # available — fall back to the list endpoint. + team_ids: list[int] = [] + for tn in team_names: + tid = client.resolve_team_id(args.owner, tn) + if tid is None: + # Try the list endpoint as a fallback. + code, data = client._req( # noqa: SLF001 + "GET", f"/orgs/{args.owner}/teams" + ) + if code == 200 and isinstance(data, list): + for t in data: + if t.get("name") == tn: + tid = t.get("id") + client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001 + break + if tid is not None: + team_ids.append(tid) + else: + print( + f"::warning::could not resolve team-id for '{tn}' " + f"in org '{args.owner}' — item '{slug}' will fail closed", + file=sys.stderr, + ) + approved: list[str] = [] + for u in users: + for tid in team_ids: + cache_key = (u, tid) + if cache_key not in team_member_cache: + team_member_cache[cache_key] = client.is_team_member(tid, u) + result = team_member_cache[cache_key] + if result is True: + approved.append(u) + break + if result is None: + print( + f"::warning::team-probe for {u} in team-id {tid} returned 403 " + "(token owner not in that team — fail-closed per RFC#324)", + file=sys.stderr, + ) + # Treat as not-in-team for this user/team pair; loop + # may still find membership in another team. + return approved + + ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe) + body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items} + + state, description = render_status(items, ack_state, body_state) + mode = get_tier_mode(pr, cfg) + if state == "failure" and mode == "soft": + state = "pending" + description = f"[soft-fail tier:low] {description}" + + # Diagnostics to job log. + print(f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} mode={mode}") + for it in items: + slug = it["slug"] + ackers = ack_state[slug]["ackers"] + if ackers: + print(f"::notice:: [PASS] {slug} — acked by {','.join(ackers)}") + else: + r = ack_state[slug]["rejected"] + extras: list[str] = [] + if r["self_ack"]: + extras.append(f"self-acks-rejected:{','.join(r['self_ack'])}") + if r["not_in_team"]: + extras.append(f"not-in-team:{','.join(r['not_in_team'])}") + extra = " (" + "; ".join(extras) + ")" if extras else "" + print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}") + + print(f"::notice::posting status: state={state} desc={description!r}") + + if args.dry_run: + print("::notice::--dry-run: not posting status") + return 0 if state in ("success", "pending") else 1 + + target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}" + client.post_status( + args.owner, args.repo, head_sha, + state=state, context=args.status_context, + description=description, target_url=target_url, + ) + print(f"::notice::status posted: {args.status_context} → {state}") + return 0 if state in ("success", "pending") else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/.gitea/scripts/tests/test_sop_checklist_gate.py b/.gitea/scripts/tests/test_sop_checklist_gate.py new file mode 100644 index 00000000..d951f974 --- /dev/null +++ b/.gitea/scripts/tests/test_sop_checklist_gate.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python3 +# Unit tests for sop-checklist-gate.py +# +# Run: python3 .gitea/scripts/tests/test_sop_checklist_gate.py +# or: pytest .gitea/scripts/tests/test_sop_checklist_gate.py +# +# RFC#351 Step 2 of 6 — implementation MVP. Tests cover: +# - slug normalization (the 4 example variants in the script header) +# - parse_directives (ack, revoke, with/without note, mid-comment, etc.) +# - section_marker_present (empty answer rejected, filled answer ok) +# - compute_ack_state (self-ack rejected, team probe applied, revoke +# invalidates own prior ack, peer's ack survives unrevoked) +# - render_status (state + description format) +# - get_tier_mode (label-driven, default fallback) +# - load_config (default config parses cleanly with both PyYAML and +# the bundled minimal parser) +# +# All tests run WITHOUT touching the Gitea API — the team-probe +# callable is dependency-injected. + +from __future__ import annotations + +import os +import sys +import tempfile +import unittest + +# Resolve sibling script regardless of where pytest is invoked from. +HERE = os.path.dirname(os.path.abspath(__file__)) +PARENT = os.path.dirname(HERE) # .gitea/scripts +sys.path.insert(0, PARENT) + +import importlib.util # noqa: E402 + +_spec = importlib.util.spec_from_file_location( + "sop_checklist_gate", os.path.join(PARENT, "sop-checklist-gate.py") +) +sop = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(sop) # type: ignore[union-attr] + + +# --------------------------------------------------------------------------- +# Test fixtures +# --------------------------------------------------------------------------- + +CONFIG_PATH = os.path.join(PARENT, "..", "sop-checklist-config.yaml") + + +def _items() -> list[dict]: + cfg = sop.load_config(CONFIG_PATH) + return cfg["items"] + + +def _items_by_slug() -> dict[str, dict]: + return {it["slug"]: it for it in _items()} + + +def _numeric_aliases() -> dict[int, str]: + return { + int(it["numeric_alias"]): it["slug"] + for it in _items() + if it.get("numeric_alias") + } + + +def _comment(user: str, body: str) -> dict: + return {"user": {"login": user}, "body": body} + + +# --------------------------------------------------------------------------- +# normalize_slug +# --------------------------------------------------------------------------- + + +class TestNormalizeSlug(unittest.TestCase): + def test_kebab_already(self): + self.assertEqual(sop.normalize_slug("comprehensive-testing"), "comprehensive-testing") + + def test_underscore_to_dash(self): + self.assertEqual(sop.normalize_slug("comprehensive_testing"), "comprehensive-testing") + + def test_space_to_dash(self): + self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing") + + def test_uppercase_to_lower(self): + self.assertEqual(sop.normalize_slug("Comprehensive-Testing"), "comprehensive-testing") + + def test_mixed_separators(self): + self.assertEqual(sop.normalize_slug("Comprehensive_Testing"), "comprehensive-testing") + self.assertEqual(sop.normalize_slug("FIVE_axis review"), "five-axis-review") + + def test_collapse_repeated_dashes(self): + self.assertEqual(sop.normalize_slug("comprehensive--testing"), "comprehensive-testing") + self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing") + + def test_strip_trailing_punctuation(self): + self.assertEqual(sop.normalize_slug("comprehensive-testing."), "comprehensive-testing") + self.assertEqual(sop.normalize_slug("comprehensive-testing!"), "comprehensive-testing") + + def test_numeric_shorthand_known(self): + self.assertEqual( + sop.normalize_slug("1", _numeric_aliases()), + "comprehensive-testing", + ) + self.assertEqual( + sop.normalize_slug("3", _numeric_aliases()), + "staging-smoke", + ) + self.assertEqual( + sop.normalize_slug("7", _numeric_aliases()), + "memory-consulted", + ) + + def test_numeric_shorthand_unknown_returns_empty(self): + # "8" is out of range → empty so caller can flag as unparseable. + self.assertEqual(sop.normalize_slug("8", _numeric_aliases()), "") + + def test_numeric_without_alias_table_keeps_digits(self): + # No alias table → return the digits as-is. + self.assertEqual(sop.normalize_slug("1"), "1") + + def test_empty_input(self): + self.assertEqual(sop.normalize_slug(""), "") + self.assertEqual(sop.normalize_slug(" "), "") + self.assertEqual(sop.normalize_slug(None), "") + + +# --------------------------------------------------------------------------- +# parse_directives +# --------------------------------------------------------------------------- + + +class TestParseDirectives(unittest.TestCase): + def setUp(self): + self.aliases = _numeric_aliases() + + def test_simple_ack(self): + d = sop.parse_directives("/sop-ack comprehensive-testing", self.aliases) + self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")]) + + def test_simple_revoke(self): + d = sop.parse_directives("/sop-revoke staging-smoke", self.aliases) + self.assertEqual(d, [("sop-revoke", "staging-smoke", "")]) + + def test_ack_with_note(self): + d = sop.parse_directives( + "/sop-ack comprehensive-testing LGTM the test covers all edge cases", + self.aliases, + ) + self.assertEqual(len(d), 1) + self.assertEqual(d[0][0], "sop-ack") + self.assertEqual(d[0][1], "comprehensive-testing") + self.assertIn("LGTM", d[0][2]) + + def test_numeric_shorthand(self): + d = sop.parse_directives("/sop-ack 1", self.aliases) + self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")]) + + def test_revoke_with_reason(self): + d = sop.parse_directives( + "/sop-revoke comprehensive-testing realized the e2e was mocking the DB", + self.aliases, + ) + self.assertEqual(d[0][0], "sop-revoke") + self.assertEqual(d[0][1], "comprehensive-testing") + self.assertIn("mocking", d[0][2]) + + def test_directive_in_middle_of_comment(self): + body = ( + "Reviewed the PR, looks good overall.\n" + "/sop-ack comprehensive-testing\n" + "Will follow up on the doc nit separately." + ) + d = sop.parse_directives(body, self.aliases) + self.assertEqual(len(d), 1) + self.assertEqual(d[0][1], "comprehensive-testing") + + def test_multiple_directives_in_one_comment(self): + body = ( + "/sop-ack comprehensive-testing\n" + "/sop-ack local-postgres-e2e\n" + ) + d = sop.parse_directives(body, self.aliases) + self.assertEqual(len(d), 2) + slugs = {x[1] for x in d} + self.assertEqual(slugs, {"comprehensive-testing", "local-postgres-e2e"}) + + def test_must_be_at_line_start(self): + # A directive embedded mid-line is not honored (prevents review + # comments like "to /sop-ack you need..." from acting as acks). + body = "If you want to /sop-ack comprehensive-testing reply in this thread" + d = sop.parse_directives(body, self.aliases) + self.assertEqual(d, []) + + def test_leading_whitespace_allowed(self): + body = " /sop-ack comprehensive-testing" + d = sop.parse_directives(body, self.aliases) + self.assertEqual(len(d), 1) + + def test_empty_body(self): + self.assertEqual(sop.parse_directives("", self.aliases), []) + self.assertEqual(sop.parse_directives(None, self.aliases), []) + + def test_normalization_applied(self): + # /sop-ack Comprehensive_Testing → canonical comprehensive-testing + d = sop.parse_directives("/sop-ack Comprehensive_Testing", self.aliases) + self.assertEqual(d[0][1], "comprehensive-testing") + + +# --------------------------------------------------------------------------- +# section_marker_present +# --------------------------------------------------------------------------- + + +class TestSectionMarkerPresent(unittest.TestCase): + def test_marker_with_inline_answer(self): + body = "- [ ] **Comprehensive testing performed**: Added 12 new tests covering null/empty/giant inputs." + self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_marker_with_empty_answer(self): + body = "- [ ] **Comprehensive testing performed**:" + self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_marker_with_only_whitespace_answer(self): + body = "- [ ] **Comprehensive testing performed**: \n" + self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_marker_with_next_line_answer(self): + body = ( + "- [ ] **Comprehensive testing performed**:\n" + " Yes — see attached log + 12 new unit tests in foo_test.py.\n" + ) + self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_marker_missing(self): + body = "- [ ] **Local-postgres E2E run**: N/A — pure-frontend\n" + self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_case_insensitive_marker_match(self): + body = "- [ ] **comprehensive TESTING performed**: yes" + self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed")) + + def test_empty_body(self): + self.assertFalse(sop.section_marker_present("", "X")) + self.assertFalse(sop.section_marker_present(None, "X")) + + +# --------------------------------------------------------------------------- +# compute_ack_state +# --------------------------------------------------------------------------- + + +class TestComputeAckState(unittest.TestCase): + def setUp(self): + self.items = _items_by_slug() + self.aliases = _numeric_aliases() + + @staticmethod + def _approve_all(slug, users): + return list(users) + + @staticmethod + def _approve_none(slug, users): + return [] + + def _approve_only(self, allowed_users): + return lambda slug, users: [u for u in users if u in allowed_users] + + def test_peer_ack_passes(self): + comments = [_comment("bob", "/sop-ack comprehensive-testing")] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"]) + + def test_self_ack_rejected(self): + comments = [_comment("alice", "/sop-ack comprehensive-testing")] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], []) + self.assertEqual(state["comprehensive-testing"]["rejected"]["self_ack"], ["alice"]) + + def test_not_in_team_rejected(self): + comments = [_comment("eve", "/sop-ack comprehensive-testing")] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_none + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], []) + self.assertEqual(state["comprehensive-testing"]["rejected"]["not_in_team"], ["eve"]) + + def test_revoke_invalidates_own_prior_ack(self): + # Bob acks then later revokes — Bob no longer counts. + comments = [ + _comment("bob", "/sop-ack comprehensive-testing"), + _comment("bob", "/sop-revoke comprehensive-testing realized e2e was mocked"), + ] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], []) + + def test_revoke_does_not_affect_others_acks(self): + # Bob revokes his own ack; Carol's still counts. + comments = [ + _comment("bob", "/sop-ack comprehensive-testing"), + _comment("carol", "/sop-ack comprehensive-testing"), + _comment("bob", "/sop-revoke comprehensive-testing"), + ] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], ["carol"]) + + def test_ack_after_revoke_restored(self): + # Bob revokes then re-acks (e.g. after re-reviewing). + comments = [ + _comment("bob", "/sop-ack comprehensive-testing"), + _comment("bob", "/sop-revoke comprehensive-testing"), + _comment("bob", "/sop-ack comprehensive-testing"), + ] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"]) + + def test_numeric_shorthand_ack(self): + # /sop-ack 1 → comprehensive-testing + comments = [_comment("bob", "/sop-ack 1")] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"]) + + def test_ack_for_unknown_slug_ignored(self): + # Some other slug not in config — silently drop (doesn't crash). + comments = [_comment("bob", "/sop-ack does-not-exist")] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + for slug in self.items: + self.assertEqual(state[slug]["ackers"], []) + + def test_multi_item_multi_user(self): + comments = [ + _comment("bob", "/sop-ack comprehensive-testing\n/sop-ack staging-smoke"), + _comment("carol", "/sop-ack five-axis-review"), + ] + state = sop.compute_ack_state( + comments, "alice", self.items, self.aliases, self._approve_all + ) + self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"]) + self.assertEqual(state["staging-smoke"]["ackers"], ["bob"]) + self.assertEqual(state["five-axis-review"]["ackers"], ["carol"]) + self.assertEqual(state["root-cause"]["ackers"], []) + + +# --------------------------------------------------------------------------- +# render_status +# --------------------------------------------------------------------------- + + +class TestRenderStatus(unittest.TestCase): + def setUp(self): + self.items = _items() + self.items_by_slug = _items_by_slug() + + def _state_with(self, acked: list[str]) -> dict: + return { + it["slug"]: { + "ackers": ["peer"] if it["slug"] in acked else [], + "rejected": {"self_ack": [], "not_in_team": []}, + } + for it in self.items + } + + def test_all_acked_returns_success(self): + all_slugs = [it["slug"] for it in self.items] + state, desc = sop.render_status( + self.items, self._state_with(all_slugs), {s: True for s in all_slugs} + ) + self.assertEqual(state, "success") + self.assertIn("7/7", desc) + + def test_partial_acked_returns_failure(self): + state, desc = sop.render_status( + self.items, + self._state_with(["comprehensive-testing", "staging-smoke"]), + {it["slug"]: True for it in self.items}, + ) + self.assertEqual(state, "failure") + self.assertIn("2/7", desc) + self.assertIn("missing", desc) + + def test_description_truncates_long_missing_list(self): + # Only ack one — 6 missing should be summarized as "+N". + state, desc = sop.render_status( + self.items, + self._state_with(["comprehensive-testing"]), + {it["slug"]: True for it in self.items}, + ) + # Length budget: under 140 chars. + self.assertLessEqual(len(desc), 140) + self.assertIn("+", desc) # +N elision marker + + def test_body_unfilled_surfaced(self): + all_slugs = [it["slug"] for it in self.items] + state, desc = sop.render_status( + self.items, + self._state_with(all_slugs), + {it["slug"]: False for it in self.items}, + ) + self.assertIn("body-unfilled", desc) + + +# --------------------------------------------------------------------------- +# get_tier_mode +# --------------------------------------------------------------------------- + + +class TestGetTierMode(unittest.TestCase): + def setUp(self): + self.cfg = sop.load_config(CONFIG_PATH) + + def test_tier_high_is_hard(self): + pr = {"labels": [{"name": "tier:high"}, {"name": "area:ci"}]} + self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard") + + def test_tier_medium_is_hard(self): + pr = {"labels": [{"name": "tier:medium"}]} + self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard") + + def test_tier_low_is_soft(self): + pr = {"labels": [{"name": "tier:low"}]} + self.assertEqual(sop.get_tier_mode(pr, self.cfg), "soft") + + def test_no_tier_label_defaults_to_hard(self): + # Per feedback_fix_root_not_symptom — never silently lower the bar. + pr = {"labels": [{"name": "area:ci"}]} + self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard") + + def test_no_labels_defaults_to_hard(self): + self.assertEqual(sop.get_tier_mode({"labels": []}, self.cfg), "hard") + self.assertEqual(sop.get_tier_mode({}, self.cfg), "hard") + + +# --------------------------------------------------------------------------- +# load_config +# --------------------------------------------------------------------------- + + +class TestLoadConfig(unittest.TestCase): + def test_default_config_parses(self): + cfg = sop.load_config(CONFIG_PATH) + self.assertIn("items", cfg) + self.assertEqual(len(cfg["items"]), 7) + slugs = {it["slug"] for it in cfg["items"]} + self.assertEqual( + slugs, + { + "comprehensive-testing", + "local-postgres-e2e", + "staging-smoke", + "root-cause", + "five-axis-review", + "no-backwards-compat", + "memory-consulted", + }, + ) + + def test_default_config_tier_mode_shape(self): + cfg = sop.load_config(CONFIG_PATH) + self.assertEqual(cfg["tier_failure_mode"]["tier:high"], "hard") + self.assertEqual(cfg["tier_failure_mode"]["tier:medium"], "hard") + self.assertEqual(cfg["tier_failure_mode"]["tier:low"], "soft") + self.assertEqual(cfg["default_mode"], "hard") + + def test_each_item_has_required_fields(self): + cfg = sop.load_config(CONFIG_PATH) + for it in cfg["items"]: + self.assertIn("slug", it) + self.assertIn("numeric_alias", it) + self.assertIn("pr_section_marker", it) + self.assertIn("required_teams", it) + self.assertIsInstance(it["required_teams"], list) + self.assertGreater(len(it["required_teams"]), 0) + + +# --------------------------------------------------------------------------- +# Edge case: full integration without team probe (dependency-injected) +# --------------------------------------------------------------------------- + + +class TestEndToEndAckFlow(unittest.TestCase): + """All-7-items happy path with synthetic comments. Verifies the + full pipeline minus the Gitea API.""" + + def test_all_seven_acked_by_proper_teams(self): + items = _items_by_slug() + aliases = _numeric_aliases() + comments = [ + _comment("qa-bot", "/sop-ack comprehensive-testing"), + _comment("eng-bot", "/sop-ack local-postgres-e2e"), + _comment("eng-bot", "/sop-ack staging-smoke"), + _comment("mgr-bot", "/sop-ack root-cause"), + _comment("eng-bot", "/sop-ack five-axis-review"), + _comment("mgr-bot", "/sop-ack no-backwards-compat"), + _comment("eng-bot", "/sop-ack memory-consulted"), + ] + + def probe(slug, users): + # Pretend every user is in every team. + return list(users) + + state = sop.compute_ack_state(comments, "alice-author", items, aliases, probe) + body = {it["slug"]: True for it in items.values()} + items_list = list(items.values()) + result_state, desc = sop.render_status(items_list, state, body) + self.assertEqual(result_state, "success") + self.assertIn("7/7", desc) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/.gitea/sop-checklist-config.yaml b/.gitea/sop-checklist-config.yaml new file mode 100644 index 00000000..8973c9d3 --- /dev/null +++ b/.gitea/sop-checklist-config.yaml @@ -0,0 +1,109 @@ +# SOP-Checklist gate — per-item required reviewer teams. +# +# RFC#351 v1 starter set. Each item lists: +# slug — canonical kebab-case form used in /sop-ack +# pr_section_marker — substring matched in the PR body to detect that +# the author filled in this item (case-insensitive) +# required_teams — list of Gitea team names; an ack from ANY one of +# these teams (logical OR) satisfies the item. +# Membership is probed at gate-time via +# GET /api/v1/teams/{id}/members/{login}. +# Team-id resolution happens at script start via +# GET /api/v1/orgs/{org}/teams (cheap, one call). +# numeric_alias — 1..7; lets reviewers type `/sop-ack 3` as a +# shortcut for `/sop-ack staging-smoke`. +# +# WHY THESE TEAM MAPPINGS: +# The RFC table referenced persona-role names like `core-qa`, +# `core-be`, `core-devops` — these are individual Gitea user logins, +# not teams. The Gitea team-membership API is /teams/{id}/members/{u}, +# so we need actual teams. Orchestrator preflight 2026-05-12 verified +# only these teams exist on molecule-ai: ceo(5), engineers(2), +# managers(6), qa(20), security(21), Owners(1), and bot teams. We +# map the RFC roles to the closest existing team and surface the +# mapping explicitly so it's reviewable. +# +# HOW TO EDIT: +# - Tightening: replace `engineers` with a smaller team after creating +# it (e.g. a new `senior-engineers` team if needed). +# - Loosening: add another team to required_teams (OR semantics). +# - Add an item: append to items list and document the slug below. +# +# AUTHOR SELF-ACK IS FORBIDDEN regardless of which team contains them +# — the gate script enforces commenter != PR author before checking +# team membership. + +version: 1 + +# Tier-aware failure mode (RFC#351 open question 2): +# For tier:high — hard-fail (status `failure`, blocks merge via BP). +# For tier:medium — hard-fail (same as high; medium is non-trivial). +# For tier:low — soft-fail (status `pending` with `acked: N/M` in the +# description). BP can choose to require the context +# or not for low-tier PRs. +# If no tier label is present, default to medium (hard-fail) — every PR +# should have a tier label per sop-tier-check, and absence indicates +# a missing-tier defect we should surface, not silently lower the bar. +tier_failure_mode: + "tier:high": hard + "tier:medium": hard + "tier:low": soft +default_mode: hard # used when no tier:* label is present + +items: + - slug: comprehensive-testing + numeric_alias: 1 + pr_section_marker: "Comprehensive testing performed" + required_teams: [qa, engineers] + description: >- + What was tested, how, edge cases covered. Ack from any qa-team + member (or engineers fallback while qa is small). + + - slug: local-postgres-e2e + numeric_alias: 2 + pr_section_marker: "Local-postgres E2E run" + required_teams: [engineers] + description: >- + Link to local CI artifact, or "N/A: pure-frontend change". Ack + from any engineer who can verify the local DB test actually ran. + + - slug: staging-smoke + numeric_alias: 3 + pr_section_marker: "Staging-smoke verified or pending" + required_teams: [engineers] + description: >- + Link to canary run, or "scheduled post-merge". Ack from any + engineer (core-devops/infra-sre are members of engineers team). + + - slug: root-cause + numeric_alias: 4 + pr_section_marker: "Root-cause not symptom" + required_teams: [managers, ceo] + description: >- + One-sentence root-cause statement. Ack from managers tier + (team-leads) or ceo. Senior judgment required to attest + root-cause-versus-symptom. + + - slug: five-axis-review + numeric_alias: 5 + pr_section_marker: "Five-Axis review walked" + required_teams: [engineers] + description: >- + Correctness / readability / architecture / security / performance. + Ack from any non-author engineer. + + - slug: no-backwards-compat + numeric_alias: 6 + pr_section_marker: "No backwards-compat shim / dead code added" + required_teams: [managers, ceo] + description: >- + Yes/no + justification if no. Senior ack required because + backward-compat shims are how dead-code accretes. + + - slug: memory-consulted + numeric_alias: 7 + pr_section_marker: "Memory/saved-feedback consulted" + required_teams: [engineers] + description: >- + List of feedback memories applicable to this change. Ack from + any engineer who has the same memory access. diff --git a/.gitea/workflows/sop-checklist-gate.yml b/.gitea/workflows/sop-checklist-gate.yml new file mode 100644 index 00000000..b120aaec --- /dev/null +++ b/.gitea/workflows/sop-checklist-gate.yml @@ -0,0 +1,121 @@ +# sop-checklist-gate — peer-ack merge gate for SOP-checklist items. +# +# RFC#351 Step 2 of 6 (implementation MVP). +# +# === DESIGN === +# +# Goal: each PR must answer 7 SOP-checklist questions in its body, +# and each item must have at least one /sop-ack comment from +# a non-author peer in the required team. BP requires the +# `sop-checklist / all-items-acked (pull_request)` status to merge. +# +# Triggers: +# - `pull_request_target`: opened, edited, synchronize, reopened +# → fires when PR opens, body is edited (refire — RFC#351 §4), +# or new code is pushed (head.sha changes → stale status would +# be auto-discarded by BP via dismiss_stale_reviews, but the +# status itself is per-SHA so we re-post on the new head). +# - `issue_comment`: created, edited, deleted +# → fires on any new comment so /sop-ack / /sop-revoke take +# effect immediately (Gitea 1.22.6 doesn't refire on +# pull_request_review per feedback_pull_request_review_no_refire, +# so issue_comment is the canonical refire channel). +# +# Trust boundary (mirrors RFC#324 §A4 + sop-tier-check security note): +# `pull_request_target` (not `pull_request`) — workflow def is loaded +# from BASE branch, so a PR cannot rewrite this workflow to exfiltrate +# the token. The `actions/checkout` step pins `ref: base.sha` so the +# script ALSO comes from BASE. PR-HEAD code is never executed in the +# runner. +# +# Token scope: +# - read:repository, read:organization for PR + comments + team probes +# - write:repository for POST /statuses/{sha} +# - The token owner MUST be a member of every team referenced by the +# config's required_teams (else /teams/{id}/members/{login} returns +# 403 — see review-check.sh same-gotcha doc). For the MVP we use +# the dev-lead token (a member of engineers, managers, qa, security) +# via a repo secret `SOP_CHECKLIST_GATE_TOKEN`. Provisioning of that +# secret is a follow-up authorization step (separate from this PR). +# +# Failure mode: tier-aware (RFC#351 open question 2): +# - tier:high → state=failure (hard-fail; BP blocks merge) +# - tier:medium → state=failure (hard-fail; same) +# - tier:low → state=pending (soft-fail; BP can choose to require +# this context or skip for low-tier PRs) +# - missing/no-tier → state=failure (default-mode: hard — never lower +# the bar per feedback_fix_root_not_symptom) +# +# Slash-command contract (RFC#351 v1 + §A1.1-style notes from RFC#324): +# +# /sop-ack [optional note] +# — register a peer-ack for one checklist item. +# — slug accepts kebab-case, snake_case, or natural-spaces +# (all normalize to canonical kebab-case). +# — numeric 1..7 maps via config.items[*].numeric_alias. +# — most-recent (user, slug) directive wins. +# +# /sop-revoke [reason] +# — invalidate the commenter's own prior /sop-ack for this slug. +# — does NOT affect other peers' acks on the same slug. +# — most-recent (user, slug) directive wins, so a later /sop-ack +# re-restores the ack. +# +# The eval is read-only + idempotent (read PR + comments + team +# membership, compute, post status). Re-running on any event is safe — +# the new status overwrites the previous one for the same context. + +name: sop-checklist-gate + +on: + pull_request_target: + types: [opened, edited, synchronize, reopened] + issue_comment: + types: [created, edited, deleted] + +permissions: + contents: read + pull-requests: read + # NOTE: `statuses: write` is the GitHub-Actions name for POST /statuses. + # Gitea 1.22.6 may not gate on this permission key (it just checks the + # token), but listing it explicitly documents intent for the next + # platform-version upgrade. + statuses: write + +jobs: + gate: + # Run on pull_request_target events always. On issue_comment events, + # only when the comment is on a PR (issue_comment fires for issues + # too) and the body contains one of the slash-commands. + if: | + github.event_name == 'pull_request_target' || + (github.event_name == 'issue_comment' && + github.event.issue.pull_request != null && + (contains(github.event.comment.body, '/sop-ack') || + contains(github.event.comment.body, '/sop-revoke'))) + runs-on: ubuntu-latest + steps: + - name: Check out BASE ref (trust boundary — never PR-head) + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + # For pull_request_target, the default branch is the trust + # anchor. For issue_comment the PR base may differ from the + # default branch (PR targeting `staging`), so we use the + # default-branch ref explicitly — same approach as + # qa-review.yml so the script source is always trusted. + ref: ${{ github.event.repository.default_branch }} + + - name: Run sop-checklist-gate + env: + GITEA_TOKEN: ${{ secrets.SOP_CHECKLIST_GATE_TOKEN || secrets.GITHUB_TOKEN }} + PR_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }} + OWNER: ${{ github.repository_owner }} + REPO_NAME: ${{ github.event.repository.name }} + run: | + set -euo pipefail + python3 .gitea/scripts/sop-checklist-gate.py \ + --owner "$OWNER" \ + --repo "$REPO_NAME" \ + --pr "$PR_NUMBER" \ + --config .gitea/sop-checklist-config.yaml \ + --gitea-host git.moleculesai.app