feat(ci): sop-checklist-gate — peer-ack merge gate (RFC#351 Phase 2)
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 16s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 15s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 33s
CI / Detect changes (pull_request) Successful in 43s
E2E API Smoke Test / detect-changes (pull_request) Successful in 44s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 40s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 13s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 11s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 7s
CI / Platform (Go) (pull_request) Successful in 8s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 7s
CI / Canvas (Next.js) (pull_request) Successful in 9s
CI / Python Lint & Test (pull_request) Successful in 15s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m25s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 6s

RFC#351 Step 2 of 6: implementation MVP of the SOP-checklist peer-ack
merge gate. NOT yet wired to branch protection (Phase 4 needs separate
authorization).

What:
- .gitea/sop-checklist-config.yaml — 7-item checklist with slug,
  numeric_alias (1..7), pr_section_marker, required_teams. Includes
  tier-aware failure-mode map: tier:high/medium=hard, tier:low=soft,
  default=hard (never silently lower the bar).
- .gitea/scripts/sop-checklist-gate.py — parses PR body + comments,
  computes per-item ack state, posts commit-status
  "sop-checklist / all-items-acked (pull_request)".
- .gitea/scripts/tests/test_sop_checklist_gate.py — 51 unit tests
  covering slug normalization, directive parsing, section-marker
  detection, ack-state computation (self-ack reject, revoke
  semantics, multi-user/multi-item, numeric aliases), tier-mode
  selection, and end-to-end happy path.
- .gitea/workflows/sop-checklist-gate.yml — pull_request_target
  [opened/edited/synchronize/reopened] + issue_comment
  [created/edited/deleted]. Checks out BASE ref only (trust boundary
  per RFC#324 §A4). Mirrors qa-review/security-review patterns.

Why:
Hongming 2026-05-12T05:42Z asked for SOP-enforcing CI/CD that requires
peer-ack on each checklist item before merge. Composes the existing
patterns (scripts-lint PR-body parser + RFC#324 persona-whitelist
commit-status + sop-tier-check tier-awareness) into one gate.

Slash-command contract:
  /sop-ack <slug> [note]      — register peer-ack (most-recent wins)
  /sop-revoke <slug> [reason] — invalidate own prior ack

Slug normalization accepts kebab-case, snake_case, natural-spaces,
or numeric 1..7 shorthand (all canonicalize to kebab-case via the
config-driven alias table).

Tests: 51/51 pass locally. Dry-run probe against PR#685 verified the
full pipeline (PR fetch, comment fetch, ack computation, status
description rendering inside the 140-char budget).

Not yet:
- Phase 3 (24h soak)
- Phase 4 (BP PATCH to require this context — needs Hongming GO)
- Phase 5 (cross-repo)
- Phase 6 (dev-sop.md codification)
- SOP_CHECKLIST_GATE_TOKEN secret provisioning (separate follow-up;
  fail-closed until provisioned, same as RFC_324_TEAM_READ_TOKEN
  pattern in qa-review.yml).

Cross-links:
- internal#351 (RFC body)
- RFC#324 (qa-review/security-review — reused mechanism)
- internal#346 (dev-sop.md SOP-14..SOP-20 — sibling rules)
- feedback_pull_request_review_no_refire (why issue_comment trigger)
- feedback_checkpointed_workflow_over_good_practice_doc (motivation)
- feedback_fix_root_not_symptom (default-mode=hard rationale)

## What
Add a SOP-checklist peer-ack merge gate: workflow + script + config + 51 unit tests.

## Why
Hongming-requested mechanism to enforce SOP via CI/CD: each PR checklist
item must be peer-acked before merge, with team-membership-verified
ackers and tier-aware failure mode.

## Verification
- 51/51 unit tests pass (slug normalization, parse_directives, section
  marker detection, ack-state including self-ack rejection + revoke
  semantics, tier-mode mapping, end-to-end happy path).
- YAML lint clean (yaml.safe_load + lint-workflow-yaml.py on the new
  workflow — pre-existing fatals on unrelated files only).
- Python syntax clean (py_compile).
- Dry-run against live PR#685: PR fetch, comment enumeration, status
  description render all within 140-char budget — works end-to-end.

## Tier
tier:medium — net-new CI workflow; no production impact; no BP change
yet (Phase 4 separate auth).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-devops 2026-05-12 06:08:36 +00:00
parent b462270201
commit 72df12ecef
4 changed files with 1559 additions and 0 deletions

View File

@ -0,0 +1,805 @@
#!/usr/bin/env python3
# sop-checklist-gate — evaluate whether a PR has peer-acked each
# SOP-checklist item. Posts a commit-status that branch protection
# can require.
#
# RFC#351 Step 2 of 6 (implementation MVP).
#
# Invoked by .gitea/workflows/sop-checklist-gate.yml on:
# - pull_request_target: [opened, edited, synchronize, reopened]
# - issue_comment: [created, edited, deleted]
#
# Flow:
# 1. Load .gitea/sop-checklist-config.yaml (from BASE ref — trusted).
# 2. GET /repos/{R}/pulls/{N} — author, head.sha, tier label
# 3. GET /repos/{R}/issues/{N}/comments — extract /sop-ack and /sop-revoke
# 4. For each checklist item:
# a. Is the section marker present in PR body? (author answered)
# b. Is there ≥1 unrevoked /sop-ack from a non-author whose
# team-membership matches required_teams?
# 5. POST /repos/{R}/statuses/{sha} — context
# `sop-checklist / all-items-acked (pull_request)`,
# state=success | failure | pending, description=`acked: N/M …`.
#
# Trust boundary (mirrors RFC#324 §A4):
# This script is loaded from the BASE branch. The workflow's
# actions/checkout step pins ref=base.sha. PR-HEAD code is never
# executed. We only HTTP-call the Gitea API.
#
# Token scope:
# - read:repository / read:organization to enumerate PR + comments
# + team membership (Gitea 1.22.6 quirk: team-membership endpoint
# returns 403 if token owner is not in the team; see review-check.sh
# for the same gotcha — we surface the same fail-closed message).
# - write:repository for `POST /repos/{R}/statuses/{sha}`. Unlike
# RFC#324's pattern (which uses the JOB's own pass/fail as the
# status), we POST the status explicitly because the gate posts
# a single multi-item status with a richer description than a
# bare success/failure context can carry.
#
# Slug normalization rules (canonical form: kebab-case):
# - Lowercase
# - Whitespace + underscores → single dash
# - Strip non [a-z0-9-] characters
# - Collapse adjacent dashes
# - Strip leading/trailing dashes
# - If the result is a digit string (e.g. "1"), look up via
# config.items[*].numeric_alias to get the kebab-case slug.
#
# Examples:
# "Comprehensive_Testing" → "comprehensive-testing"
# "comprehensive testing" → "comprehensive-testing"
# "1" → "comprehensive-testing"
# "Five-Axis-Review" → "five-axis-review"
#
# Revoke semantics:
# /sop-revoke <slug> [reason] — most-recent comment per (slug, user)
# wins. So if Alice posts /sop-ack X then later /sop-revoke X, her ack
# for X is invalidated. Bob's prior /sop-ack X is unaffected. If Alice
# posts /sop-revoke X then later /sop-ack X again, the ack is restored.
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
# ---------------------------------------------------------------------------
# Slug normalization
# ---------------------------------------------------------------------------
_NORMALIZE_REPLACE_RE = re.compile(r"[\s_]+")
_NORMALIZE_STRIP_RE = re.compile(r"[^a-z0-9-]")
_NORMALIZE_DASH_RE = re.compile(r"-+")
def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> str:
"""Normalize a user-supplied slug to canonical kebab-case form.
See module header for the rules.
If the input is a pure digit string AND numeric_aliases is provided,
the alias mapping is consulted. Unknown digits return "" so the caller
can flag the comment as unparseable.
"""
if raw is None:
return ""
s = raw.strip().lower()
s = _NORMALIZE_REPLACE_RE.sub("-", s)
s = _NORMALIZE_STRIP_RE.sub("", s)
s = _NORMALIZE_DASH_RE.sub("-", s)
s = s.strip("-")
if s.isdigit() and numeric_aliases is not None:
return numeric_aliases.get(int(s), "")
return s
# ---------------------------------------------------------------------------
# Comment parsing — /sop-ack and /sop-revoke
# ---------------------------------------------------------------------------
# A directive must be on its own line. Permits leading whitespace.
# Optional trailing note after the slug for /sop-ack and required reason
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> list[tuple[str, str, str]]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
Returns a list of (kind, canonical_slug, note) tuples where:
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
"""
out: list[tuple[str, str, str]] = []
if not comment_body:
return out
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
# If the raw match included trailing words, the regex non-greedy
# captured only the first token; strip again for safety.
# We split on whitespace to keep the FIRST word as the slug, and
# everything after as the note.
parts = raw_slug.split()
if not parts:
continue
first = parts[0]
# If the slug-capture greedily matched multiple words (e.g.
# "comprehensive testing"), preserve normalize behavior: join
# the WHOLE first-word-token only; trailing words get appended to
# the note. The regex limits group(2) to [A-Za-z0-9_\- ] so we
# may have multi-word forms here — normalize handles them.
if len(parts) > 1:
# User wrote "/sop-ack comprehensive testing extra-note"
# → treat "comprehensive testing" as the slug source if it
# normalizes to a known item; otherwise treat "comprehensive"
# as slug and "testing extra-note" as note. We defer the
# disambiguation to the caller via the returned canonical
# slug. For simplicity: try the WHOLE captured string first.
canonical = normalize_slug(raw_slug, numeric_aliases)
else:
canonical = normalize_slug(first, numeric_aliases)
note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
out.append((kind, canonical, note_from_group))
return out
# ---------------------------------------------------------------------------
# PR body section detection
# ---------------------------------------------------------------------------
def section_marker_present(body: str, marker: str) -> bool:
"""Return True if `marker` appears in `body` case-insensitively
on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the
same line OR within the next line this prevents trivially-empty
checklists like:
## SOP-Checklist
- [ ] **Comprehensive testing performed**:
- [ ] **Local-postgres E2E run**:
from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft
finding via the section-present test alone.
"""
if not body or not marker:
return False
body_lower = body.lower()
marker_lower = marker.lower()
idx = body_lower.find(marker_lower)
if idx < 0:
return False
# Walk to end of line.
line_end = body.find("\n", idx)
if line_end < 0:
line_end = len(body)
line = body[idx + len(marker):line_end]
# Strip the colon + checkbox tail patterns; require at least one
# non-whitespace, non-punctuation char.
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
# Fall through: check the NEXT line (multi-line answers).
next_line_end = body.find("\n", line_end + 1)
if next_line_end < 0:
next_line_end = len(body)
next_line = body[line_end + 1:next_line_end]
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line)
return bool(stripped_next)
# ---------------------------------------------------------------------------
# Ack-state computation
# ---------------------------------------------------------------------------
def compute_ack_state(
comments: list[dict[str, Any]],
pr_author: str,
items_by_slug: dict[str, dict[str, Any]],
numeric_aliases: dict[int, str],
team_membership_probe: "callable[[str, list[str]], list[str]]",
) -> dict[str, dict[str, Any]]:
"""Compute per-item ack state.
Each comment is processed in chronological order. The most-recent
directive per (commenter, slug) wins.
Returns a dict keyed by canonical slug:
{
"comprehensive-testing": {
"ackers": ["bob"], # non-author, team-verified
"rejected_ackers": { # debugging info
"self_ack": ["alice"],
"unknown_slug": [],
"not_in_team": ["eve"],
}
},
...
}
"""
# Step 1: collapse directives per (commenter, slug) — most recent wins.
# comments are expected to come in chronological order from the
# API (Gitea returns oldest-first by default for issues/{N}/comments).
latest_directive: dict[tuple[str, str], str] = {} # (user, slug) → kind
unparseable_per_user: dict[str, int] = {}
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases):
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
latest_directive[(user, slug)] = kind
# Step 2: build candidate ackers per slug.
# Filter out self-acks and unknown slugs.
ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_unknown: dict[str, list[str]] = {s: [] for s in items_by_slug}
pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug}
for (user, slug), kind in latest_directive.items():
if kind != "sop-ack":
continue # revokes leave the (user,slug) state as "no ack"
if slug not in items_by_slug:
# Slug normalized to something not in our config — store
# under a synthetic key for diagnostic surfacing. Don't add
# to any item.
continue
if user == pr_author:
rejected_self[slug].append(user)
continue
pending_team_check[slug].append(user)
# Step 3: team membership probe per slug (batched per slug to keep
# API call count down — same user may ack multiple items but the
# required_teams differ per item, so we MUST probe per (user, item)).
rejected_not_in_team: dict[str, list[str]] = {s: [] for s in items_by_slug}
for slug, candidates in pending_team_check.items():
if not candidates:
continue
required = items_by_slug[slug]["required_teams"]
approved = team_membership_probe(slug, candidates) # returns subset
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
ackers_per_slug[slug] = approved
# Stash required teams for description rendering.
items_by_slug[slug]["_required_resolved"] = required
return {
slug: {
"ackers": ackers_per_slug[slug],
"rejected": {
"self_ack": rejected_self[slug],
"not_in_team": rejected_not_in_team[slug],
},
}
for slug in items_by_slug
}
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
class GiteaClient:
def __init__(self, host: str, token: str):
self.base = f"https://{host}/api/v1"
self.token = token
# Cache team-name → team-id resolutions per org.
self._team_id_cache: dict[tuple[str, str], int | None] = {}
def _req(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
ok_codes: tuple[int, ...] = (200, 201, 204),
) -> tuple[int, Any]:
url = self.base + path
data = None
headers = {
"Authorization": f"token {self.token}",
"Accept": "application/json",
}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=20) as r:
raw = r.read()
code = r.getcode()
except urllib.error.HTTPError as e:
code = e.code
raw = e.read()
try:
parsed = json.loads(raw.decode("utf-8")) if raw else None
except json.JSONDecodeError:
parsed = raw.decode("utf-8", errors="replace") if raw else None
return code, parsed
def get_pr(self, owner: str, repo: str, pr: int) -> dict[str, Any]:
code, data = self._req("GET", f"/repos/{owner}/{repo}/pulls/{pr}")
if code != 200:
raise RuntimeError(f"GET pulls/{pr} → HTTP {code}: {data!r}")
return data
def get_issue_comments(
self, owner: str, repo: str, issue: int
) -> list[dict[str, Any]]:
# Paginate. Gitea default page size 50.
out: list[dict[str, Any]] = []
page = 1
while True:
code, data = self._req(
"GET",
f"/repos/{owner}/{repo}/issues/{issue}/comments?limit=50&page={page}",
)
if code != 200:
raise RuntimeError(
f"GET issues/{issue}/comments page={page} → HTTP {code}: {data!r}"
)
if not data:
break
out.extend(data)
if len(data) < 50:
break
page += 1
return out
def resolve_team_id(self, org: str, team_name: str) -> int | None:
key = (org, team_name)
if key in self._team_id_cache:
return self._team_id_cache[key]
code, data = self._req("GET", f"/orgs/{org}/teams/search?q={urllib.parse.quote(team_name)}")
team_id = None
if code == 200 and isinstance(data, dict):
for t in data.get("data", []):
if t.get("name") == team_name:
team_id = t.get("id")
break
if team_id is None and code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == team_name:
team_id = t.get("id")
break
self._team_id_cache[key] = team_id
return team_id
def is_team_member(self, team_id: int, login: str) -> bool | None:
"""Return True / False / None (unknown — 403 from API)."""
code, _ = self._req(
"GET", f"/teams/{team_id}/members/{urllib.parse.quote(login)}"
)
if code in (200, 204):
return True
if code == 404:
return False
# 403 means the token owner isn't in this team, so the API
# refuses to confirm membership. Fail-closed at the caller.
return None
def post_status(
self,
owner: str,
repo: str,
sha: str,
state: str,
context: str,
description: str,
target_url: str = "",
) -> None:
body = {
"state": state,
"context": context,
"description": description[:140], # Gitea truncates to 255 but be safe
"target_url": target_url or "",
}
code, data = self._req(
"POST",
f"/repos/{owner}/{repo}/statuses/{sha}",
body=body,
ok_codes=(201,),
)
if code not in (200, 201):
raise RuntimeError(
f"POST statuses/{sha} → HTTP {code}: {data!r}"
)
# ---------------------------------------------------------------------------
# Config loader (PyYAML-free — config file is intentionally tiny + flat)
# ---------------------------------------------------------------------------
def load_config(path: str) -> dict[str, Any]:
"""Load .gitea/sop-checklist-config.yaml.
Uses PyYAML if available, otherwise falls back to a built-in
minimal parser sufficient for our flat config shape. Bundling
PyYAML on the runner is one apt install away but we avoid the
dep by keeping the config shape constrained.
"""
try:
import yaml # type: ignore[import-not-found]
with open(path) as f:
return yaml.safe_load(f)
except ImportError:
return _load_config_minimal(path)
def _load_config_minimal(path: str) -> dict[str, Any]:
"""Minimal YAML subset parser for our config shape.
Supports: top-level scalar:value, top-level map-of-map (e.g.
tier_failure_mode), top-level list of maps (items:), and within an
item map: scalars + lists of scalars. Does NOT support nested lists,
YAML anchors, multi-doc, or flow style.
"""
with open(path) as f:
lines = f.readlines()
return _parse_minimal_yaml(lines)
def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]: # noqa: C901
"""Hand-rolled subset parser. See _load_config_minimal docstring."""
# Strip comments + blank lines but preserve indentation.
cleaned: list[tuple[int, str]] = []
for raw in lines:
# Don't strip a "#" that is inside a quoted value.
body = raw.rstrip("\n")
# Remove trailing comment.
idx = body.find("#")
if idx >= 0 and (idx == 0 or body[idx - 1] in " \t"):
body = body[:idx].rstrip()
if not body.strip():
continue
indent = len(body) - len(body.lstrip(" "))
cleaned.append((indent, body.strip()))
root: dict[str, Any] = {}
i = 0
n = len(cleaned)
def parse_scalar(s: str) -> Any:
s = s.strip()
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
if s.startswith("'") and s.endswith("'"):
return s[1:-1]
if s.lower() in ("true", "yes"):
return True
if s.lower() in ("false", "no"):
return False
try:
return int(s)
except ValueError:
pass
return s
def parse_inline_list(s: str) -> list[Any]:
s = s.strip()
if not (s.startswith("[") and s.endswith("]")):
return [parse_scalar(s)]
inner = s[1:-1]
if not inner.strip():
return []
return [parse_scalar(x.strip()) for x in inner.split(",")]
while i < n:
indent, line = cleaned[i]
if indent != 0:
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if rest == "":
# Block — could be map or list.
i += 1
# Look ahead for first child.
if i < n and cleaned[i][1].startswith("- "):
# List of items.
items: list[Any] = []
while i < n and cleaned[i][0] > indent and cleaned[i][1].startswith("- "):
item_indent = cleaned[i][0]
first_kv = cleaned[i][1][2:].strip() # strip "- "
item: dict[str, Any] = {}
if ":" in first_kv:
k, _, v = first_kv.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
elif v.startswith(">-") or v.startswith(">"):
# Folded scalar continues on subsequent indented lines
collected: list[str] = []
i += 1
while i < n and cleaned[i][0] > item_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
items.append(item)
continue
elif v.startswith("["):
item[k] = parse_inline_list(v)
else:
item[k] = parse_scalar(v)
i += 1
# Subsequent k:v lines at deeper indent belong to this item.
while i < n and cleaned[i][0] > item_indent and not cleaned[i][1].startswith("- "):
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
i += 1
elif v.startswith(">-") or v.startswith(">"):
collected = []
i += 1
while i < n and cleaned[i][0] > sub_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
elif v.startswith("["):
item[k] = parse_inline_list(v)
i += 1
else:
item[k] = parse_scalar(v)
i += 1
else:
i += 1
items.append(item)
root[key] = items
else:
# Sub-map.
submap: dict[str, Any] = {}
while i < n and cleaned[i][0] > indent:
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip().strip('"').strip("'")
v = v.strip()
if v.startswith("[") and v.endswith("]"):
submap[k] = parse_inline_list(v)
else:
submap[k] = parse_scalar(v)
i += 1
root[key] = submap
else:
# Inline scalar or list.
if rest.startswith("[") and rest.endswith("]"):
root[key] = parse_inline_list(rest)
else:
root[key] = parse_scalar(rest)
i += 1
return root
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def render_status(
items: list[dict[str, Any]],
ack_state: dict[str, dict[str, Any]],
body_state: dict[str, bool],
) -> tuple[str, str]:
"""Return (state, description) for the commit-status post.
state is "success" if every item has at least one valid ack
(body section presence is informational only peer-ack is the
real gate). "pending" is reserved for the soft-fail path
(tier:low) and is set by the caller.
"""
n = len(items)
fully_acked = [
it["slug"] for it in items if ack_state[it["slug"]]["ackers"]
]
missing = [
it["slug"] for it in items if not ack_state[it["slug"]]["ackers"]
]
missing_body = [it["slug"] for it in items if not body_state.get(it["slug"], False)]
desc_parts = [f"acked: {len(fully_acked)}/{n}"]
if missing:
# Show up to 3 missing slugs to stay inside the 140-char budget.
shown = ", ".join(missing[:3])
if len(missing) > 3:
shown += f", +{len(missing) - 3}"
desc_parts.append(f"missing: {shown}")
if missing_body:
desc_parts.append(f"body-unfilled: {len(missing_body)}")
state = "success" if not missing else "failure"
return state, "".join(desc_parts)
def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str:
"""Read tier label, return 'hard' or 'soft' per cfg.tier_failure_mode."""
labels = pr.get("labels") or []
tier_labels = [l.get("name", "") for l in labels if (l.get("name", "") or "").startswith("tier:")]
mode_map = cfg.get("tier_failure_mode") or {}
default_mode = cfg.get("default_mode", "hard")
for tl in tier_labels:
if tl in mode_map:
return mode_map[tl]
return default_mode
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
p.add_argument("--repo", required=True)
p.add_argument("--pr", type=int, required=True)
p.add_argument("--config", default=".gitea/sop-checklist-config.yaml")
p.add_argument("--gitea-host", default="git.moleculesai.app")
p.add_argument(
"--dry-run",
action="store_true",
help="Compute state but do not POST the status.",
)
p.add_argument(
"--status-context",
default="sop-checklist / all-items-acked (pull_request)",
)
args = p.parse_args(argv)
token = os.environ.get("GITEA_TOKEN", "")
if not token and not args.dry_run:
print("::error::GITEA_TOKEN env required", file=sys.stderr)
return 2
cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items}
numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
}
client = GiteaClient(args.gitea_host, token) if token else None
if not client:
print("::error::No client (dry-run without token has nothing to do)", file=sys.stderr)
return 2
pr = client.get_pr(args.owner, args.repo, args.pr)
if pr.get("state") != "open":
print(f"::notice::PR #{args.pr} is {pr.get('state')} — gate is a no-op")
return 0
author = (pr.get("user") or {}).get("login", "")
head_sha = (pr.get("head") or {}).get("sha", "")
body = pr.get("body", "") or ""
if not author or not head_sha:
print("::error::PR payload missing user.login or head.sha", file=sys.stderr)
return 1
comments = client.get_issue_comments(args.owner, args.repo, args.pr)
# Build team-membership probe closure that caches results per
# (user, team-id) so a user acking multiple items only triggers
# one membership lookup per team.
team_member_cache: dict[tuple[str, int], bool | None] = {}
def probe(slug: str, users: list[str]) -> list[str]:
item = items_by_slug[slug]
team_names: list[str] = item["required_teams"]
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
# available — fall back to the list endpoint.
team_ids: list[int] = []
for tn in team_names:
tid = client.resolve_team_id(args.owner, tn)
if tid is None:
# Try the list endpoint as a fallback.
code, data = client._req( # noqa: SLF001
"GET", f"/orgs/{args.owner}/teams"
)
if code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == tn:
tid = t.get("id")
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
break
if tid is not None:
team_ids.append(tid)
else:
print(
f"::warning::could not resolve team-id for '{tn}' "
f"in org '{args.owner}' — item '{slug}' will fail closed",
file=sys.stderr,
)
approved: list[str] = []
for u in users:
for tid in team_ids:
cache_key = (u, tid)
if cache_key not in team_member_cache:
team_member_cache[cache_key] = client.is_team_member(tid, u)
result = team_member_cache[cache_key]
if result is True:
approved.append(u)
break
if result is None:
print(
f"::warning::team-probe for {u} in team-id {tid} returned 403 "
"(token owner not in that team — fail-closed per RFC#324)",
file=sys.stderr,
)
# Treat as not-in-team for this user/team pair; loop
# may still find membership in another team.
return approved
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
state, description = render_status(items, ack_state, body_state)
mode = get_tier_mode(pr, cfg)
if state == "failure" and mode == "soft":
state = "pending"
description = f"[soft-fail tier:low] {description}"
# Diagnostics to job log.
print(f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} mode={mode}")
for it in items:
slug = it["slug"]
ackers = ack_state[slug]["ackers"]
if ackers:
print(f"::notice:: [PASS] {slug} — acked by {','.join(ackers)}")
else:
r = ack_state[slug]["rejected"]
extras: list[str] = []
if r["self_ack"]:
extras.append(f"self-acks-rejected:{','.join(r['self_ack'])}")
if r["not_in_team"]:
extras.append(f"not-in-team:{','.join(r['not_in_team'])}")
extra = " (" + "; ".join(extras) + ")" if extras else ""
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
print(f"::notice::posting status: state={state} desc={description!r}")
if args.dry_run:
print("::notice::--dry-run: not posting status")
return 0 if state in ("success", "pending") else 1
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
client.post_status(
args.owner, args.repo, head_sha,
state=state, context=args.status_context,
description=description, target_url=target_url,
)
print(f"::notice::status posted: {args.status_context}{state}")
return 0 if state in ("success", "pending") else 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,524 @@
#!/usr/bin/env python3
# Unit tests for sop-checklist-gate.py
#
# Run: python3 .gitea/scripts/tests/test_sop_checklist_gate.py
# or: pytest .gitea/scripts/tests/test_sop_checklist_gate.py
#
# RFC#351 Step 2 of 6 — implementation MVP. Tests cover:
# - slug normalization (the 4 example variants in the script header)
# - parse_directives (ack, revoke, with/without note, mid-comment, etc.)
# - section_marker_present (empty answer rejected, filled answer ok)
# - compute_ack_state (self-ack rejected, team probe applied, revoke
# invalidates own prior ack, peer's ack survives unrevoked)
# - render_status (state + description format)
# - get_tier_mode (label-driven, default fallback)
# - load_config (default config parses cleanly with both PyYAML and
# the bundled minimal parser)
#
# All tests run WITHOUT touching the Gitea API — the team-probe
# callable is dependency-injected.
from __future__ import annotations
import os
import sys
import tempfile
import unittest
# Resolve sibling script regardless of where pytest is invoked from.
HERE = os.path.dirname(os.path.abspath(__file__))
PARENT = os.path.dirname(HERE) # .gitea/scripts
sys.path.insert(0, PARENT)
import importlib.util # noqa: E402
_spec = importlib.util.spec_from_file_location(
"sop_checklist_gate", os.path.join(PARENT, "sop-checklist-gate.py")
)
sop = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(sop) # type: ignore[union-attr]
# ---------------------------------------------------------------------------
# Test fixtures
# ---------------------------------------------------------------------------
CONFIG_PATH = os.path.join(PARENT, "..", "sop-checklist-config.yaml")
def _items() -> list[dict]:
cfg = sop.load_config(CONFIG_PATH)
return cfg["items"]
def _items_by_slug() -> dict[str, dict]:
return {it["slug"]: it for it in _items()}
def _numeric_aliases() -> dict[int, str]:
return {
int(it["numeric_alias"]): it["slug"]
for it in _items()
if it.get("numeric_alias")
}
def _comment(user: str, body: str) -> dict:
return {"user": {"login": user}, "body": body}
# ---------------------------------------------------------------------------
# normalize_slug
# ---------------------------------------------------------------------------
class TestNormalizeSlug(unittest.TestCase):
def test_kebab_already(self):
self.assertEqual(sop.normalize_slug("comprehensive-testing"), "comprehensive-testing")
def test_underscore_to_dash(self):
self.assertEqual(sop.normalize_slug("comprehensive_testing"), "comprehensive-testing")
def test_space_to_dash(self):
self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing")
def test_uppercase_to_lower(self):
self.assertEqual(sop.normalize_slug("Comprehensive-Testing"), "comprehensive-testing")
def test_mixed_separators(self):
self.assertEqual(sop.normalize_slug("Comprehensive_Testing"), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("FIVE_axis review"), "five-axis-review")
def test_collapse_repeated_dashes(self):
self.assertEqual(sop.normalize_slug("comprehensive--testing"), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing")
def test_strip_trailing_punctuation(self):
self.assertEqual(sop.normalize_slug("comprehensive-testing."), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("comprehensive-testing!"), "comprehensive-testing")
def test_numeric_shorthand_known(self):
self.assertEqual(
sop.normalize_slug("1", _numeric_aliases()),
"comprehensive-testing",
)
self.assertEqual(
sop.normalize_slug("3", _numeric_aliases()),
"staging-smoke",
)
self.assertEqual(
sop.normalize_slug("7", _numeric_aliases()),
"memory-consulted",
)
def test_numeric_shorthand_unknown_returns_empty(self):
# "8" is out of range → empty so caller can flag as unparseable.
self.assertEqual(sop.normalize_slug("8", _numeric_aliases()), "")
def test_numeric_without_alias_table_keeps_digits(self):
# No alias table → return the digits as-is.
self.assertEqual(sop.normalize_slug("1"), "1")
def test_empty_input(self):
self.assertEqual(sop.normalize_slug(""), "")
self.assertEqual(sop.normalize_slug(" "), "")
self.assertEqual(sop.normalize_slug(None), "")
# ---------------------------------------------------------------------------
# parse_directives
# ---------------------------------------------------------------------------
class TestParseDirectives(unittest.TestCase):
def setUp(self):
self.aliases = _numeric_aliases()
def test_simple_ack(self):
d = sop.parse_directives("/sop-ack comprehensive-testing", self.aliases)
self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")])
def test_simple_revoke(self):
d = sop.parse_directives("/sop-revoke staging-smoke", self.aliases)
self.assertEqual(d, [("sop-revoke", "staging-smoke", "")])
def test_ack_with_note(self):
d = sop.parse_directives(
"/sop-ack comprehensive-testing LGTM the test covers all edge cases",
self.aliases,
)
self.assertEqual(len(d), 1)
self.assertEqual(d[0][0], "sop-ack")
self.assertEqual(d[0][1], "comprehensive-testing")
self.assertIn("LGTM", d[0][2])
def test_numeric_shorthand(self):
d = sop.parse_directives("/sop-ack 1", self.aliases)
self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")])
def test_revoke_with_reason(self):
d = sop.parse_directives(
"/sop-revoke comprehensive-testing realized the e2e was mocking the DB",
self.aliases,
)
self.assertEqual(d[0][0], "sop-revoke")
self.assertEqual(d[0][1], "comprehensive-testing")
self.assertIn("mocking", d[0][2])
def test_directive_in_middle_of_comment(self):
body = (
"Reviewed the PR, looks good overall.\n"
"/sop-ack comprehensive-testing\n"
"Will follow up on the doc nit separately."
)
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 1)
self.assertEqual(d[0][1], "comprehensive-testing")
def test_multiple_directives_in_one_comment(self):
body = (
"/sop-ack comprehensive-testing\n"
"/sop-ack local-postgres-e2e\n"
)
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 2)
slugs = {x[1] for x in d}
self.assertEqual(slugs, {"comprehensive-testing", "local-postgres-e2e"})
def test_must_be_at_line_start(self):
# A directive embedded mid-line is not honored (prevents review
# comments like "to /sop-ack you need..." from acting as acks).
body = "If you want to /sop-ack comprehensive-testing reply in this thread"
d = sop.parse_directives(body, self.aliases)
self.assertEqual(d, [])
def test_leading_whitespace_allowed(self):
body = " /sop-ack comprehensive-testing"
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 1)
def test_empty_body(self):
self.assertEqual(sop.parse_directives("", self.aliases), [])
self.assertEqual(sop.parse_directives(None, self.aliases), [])
def test_normalization_applied(self):
# /sop-ack Comprehensive_Testing → canonical comprehensive-testing
d = sop.parse_directives("/sop-ack Comprehensive_Testing", self.aliases)
self.assertEqual(d[0][1], "comprehensive-testing")
# ---------------------------------------------------------------------------
# section_marker_present
# ---------------------------------------------------------------------------
class TestSectionMarkerPresent(unittest.TestCase):
def test_marker_with_inline_answer(self):
body = "- [ ] **Comprehensive testing performed**: Added 12 new tests covering null/empty/giant inputs."
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_empty_answer(self):
body = "- [ ] **Comprehensive testing performed**:"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_only_whitespace_answer(self):
body = "- [ ] **Comprehensive testing performed**: \n"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_next_line_answer(self):
body = (
"- [ ] **Comprehensive testing performed**:\n"
" Yes — see attached log + 12 new unit tests in foo_test.py.\n"
)
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_missing(self):
body = "- [ ] **Local-postgres E2E run**: N/A — pure-frontend\n"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_case_insensitive_marker_match(self):
body = "- [ ] **comprehensive TESTING performed**: yes"
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_empty_body(self):
self.assertFalse(sop.section_marker_present("", "X"))
self.assertFalse(sop.section_marker_present(None, "X"))
# ---------------------------------------------------------------------------
# compute_ack_state
# ---------------------------------------------------------------------------
class TestComputeAckState(unittest.TestCase):
def setUp(self):
self.items = _items_by_slug()
self.aliases = _numeric_aliases()
@staticmethod
def _approve_all(slug, users):
return list(users)
@staticmethod
def _approve_none(slug, users):
return []
def _approve_only(self, allowed_users):
return lambda slug, users: [u for u in users if u in allowed_users]
def test_peer_ack_passes(self):
comments = [_comment("bob", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_self_ack_rejected(self):
comments = [_comment("alice", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
self.assertEqual(state["comprehensive-testing"]["rejected"]["self_ack"], ["alice"])
def test_not_in_team_rejected(self):
comments = [_comment("eve", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_none
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
self.assertEqual(state["comprehensive-testing"]["rejected"]["not_in_team"], ["eve"])
def test_revoke_invalidates_own_prior_ack(self):
# Bob acks then later revokes — Bob no longer counts.
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing realized e2e was mocked"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
def test_revoke_does_not_affect_others_acks(self):
# Bob revokes his own ack; Carol's still counts.
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("carol", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["carol"])
def test_ack_after_revoke_restored(self):
# Bob revokes then re-acks (e.g. after re-reviewing).
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing"),
_comment("bob", "/sop-ack comprehensive-testing"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_numeric_shorthand_ack(self):
# /sop-ack 1 → comprehensive-testing
comments = [_comment("bob", "/sop-ack 1")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_ack_for_unknown_slug_ignored(self):
# Some other slug not in config — silently drop (doesn't crash).
comments = [_comment("bob", "/sop-ack does-not-exist")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
for slug in self.items:
self.assertEqual(state[slug]["ackers"], [])
def test_multi_item_multi_user(self):
comments = [
_comment("bob", "/sop-ack comprehensive-testing\n/sop-ack staging-smoke"),
_comment("carol", "/sop-ack five-axis-review"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
self.assertEqual(state["staging-smoke"]["ackers"], ["bob"])
self.assertEqual(state["five-axis-review"]["ackers"], ["carol"])
self.assertEqual(state["root-cause"]["ackers"], [])
# ---------------------------------------------------------------------------
# render_status
# ---------------------------------------------------------------------------
class TestRenderStatus(unittest.TestCase):
def setUp(self):
self.items = _items()
self.items_by_slug = _items_by_slug()
def _state_with(self, acked: list[str]) -> dict:
return {
it["slug"]: {
"ackers": ["peer"] if it["slug"] in acked else [],
"rejected": {"self_ack": [], "not_in_team": []},
}
for it in self.items
}
def test_all_acked_returns_success(self):
all_slugs = [it["slug"] for it in self.items]
state, desc = sop.render_status(
self.items, self._state_with(all_slugs), {s: True for s in all_slugs}
)
self.assertEqual(state, "success")
self.assertIn("7/7", desc)
def test_partial_acked_returns_failure(self):
state, desc = sop.render_status(
self.items,
self._state_with(["comprehensive-testing", "staging-smoke"]),
{it["slug"]: True for it in self.items},
)
self.assertEqual(state, "failure")
self.assertIn("2/7", desc)
self.assertIn("missing", desc)
def test_description_truncates_long_missing_list(self):
# Only ack one — 6 missing should be summarized as "+N".
state, desc = sop.render_status(
self.items,
self._state_with(["comprehensive-testing"]),
{it["slug"]: True for it in self.items},
)
# Length budget: under 140 chars.
self.assertLessEqual(len(desc), 140)
self.assertIn("+", desc) # +N elision marker
def test_body_unfilled_surfaced(self):
all_slugs = [it["slug"] for it in self.items]
state, desc = sop.render_status(
self.items,
self._state_with(all_slugs),
{it["slug"]: False for it in self.items},
)
self.assertIn("body-unfilled", desc)
# ---------------------------------------------------------------------------
# get_tier_mode
# ---------------------------------------------------------------------------
class TestGetTierMode(unittest.TestCase):
def setUp(self):
self.cfg = sop.load_config(CONFIG_PATH)
def test_tier_high_is_hard(self):
pr = {"labels": [{"name": "tier:high"}, {"name": "area:ci"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_tier_medium_is_hard(self):
pr = {"labels": [{"name": "tier:medium"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_tier_low_is_soft(self):
pr = {"labels": [{"name": "tier:low"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "soft")
def test_no_tier_label_defaults_to_hard(self):
# Per feedback_fix_root_not_symptom — never silently lower the bar.
pr = {"labels": [{"name": "area:ci"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_no_labels_defaults_to_hard(self):
self.assertEqual(sop.get_tier_mode({"labels": []}, self.cfg), "hard")
self.assertEqual(sop.get_tier_mode({}, self.cfg), "hard")
# ---------------------------------------------------------------------------
# load_config
# ---------------------------------------------------------------------------
class TestLoadConfig(unittest.TestCase):
def test_default_config_parses(self):
cfg = sop.load_config(CONFIG_PATH)
self.assertIn("items", cfg)
self.assertEqual(len(cfg["items"]), 7)
slugs = {it["slug"] for it in cfg["items"]}
self.assertEqual(
slugs,
{
"comprehensive-testing",
"local-postgres-e2e",
"staging-smoke",
"root-cause",
"five-axis-review",
"no-backwards-compat",
"memory-consulted",
},
)
def test_default_config_tier_mode_shape(self):
cfg = sop.load_config(CONFIG_PATH)
self.assertEqual(cfg["tier_failure_mode"]["tier:high"], "hard")
self.assertEqual(cfg["tier_failure_mode"]["tier:medium"], "hard")
self.assertEqual(cfg["tier_failure_mode"]["tier:low"], "soft")
self.assertEqual(cfg["default_mode"], "hard")
def test_each_item_has_required_fields(self):
cfg = sop.load_config(CONFIG_PATH)
for it in cfg["items"]:
self.assertIn("slug", it)
self.assertIn("numeric_alias", it)
self.assertIn("pr_section_marker", it)
self.assertIn("required_teams", it)
self.assertIsInstance(it["required_teams"], list)
self.assertGreater(len(it["required_teams"]), 0)
# ---------------------------------------------------------------------------
# Edge case: full integration without team probe (dependency-injected)
# ---------------------------------------------------------------------------
class TestEndToEndAckFlow(unittest.TestCase):
"""All-7-items happy path with synthetic comments. Verifies the
full pipeline minus the Gitea API."""
def test_all_seven_acked_by_proper_teams(self):
items = _items_by_slug()
aliases = _numeric_aliases()
comments = [
_comment("qa-bot", "/sop-ack comprehensive-testing"),
_comment("eng-bot", "/sop-ack local-postgres-e2e"),
_comment("eng-bot", "/sop-ack staging-smoke"),
_comment("mgr-bot", "/sop-ack root-cause"),
_comment("eng-bot", "/sop-ack five-axis-review"),
_comment("mgr-bot", "/sop-ack no-backwards-compat"),
_comment("eng-bot", "/sop-ack memory-consulted"),
]
def probe(slug, users):
# Pretend every user is in every team.
return list(users)
state = sop.compute_ack_state(comments, "alice-author", items, aliases, probe)
body = {it["slug"]: True for it in items.values()}
items_list = list(items.values())
result_state, desc = sop.render_status(items_list, state, body)
self.assertEqual(result_state, "success")
self.assertIn("7/7", desc)
if __name__ == "__main__":
unittest.main(verbosity=2)

View File

@ -0,0 +1,109 @@
# SOP-Checklist gate — per-item required reviewer teams.
#
# RFC#351 v1 starter set. Each item lists:
# slug — canonical kebab-case form used in /sop-ack <slug>
# pr_section_marker — substring matched in the PR body to detect that
# the author filled in this item (case-insensitive)
# required_teams — list of Gitea team names; an ack from ANY one of
# these teams (logical OR) satisfies the item.
# Membership is probed at gate-time via
# GET /api/v1/teams/{id}/members/{login}.
# Team-id resolution happens at script start via
# GET /api/v1/orgs/{org}/teams (cheap, one call).
# numeric_alias — 1..7; lets reviewers type `/sop-ack 3` as a
# shortcut for `/sop-ack staging-smoke`.
#
# WHY THESE TEAM MAPPINGS:
# The RFC table referenced persona-role names like `core-qa`,
# `core-be`, `core-devops` — these are individual Gitea user logins,
# not teams. The Gitea team-membership API is /teams/{id}/members/{u},
# so we need actual teams. Orchestrator preflight 2026-05-12 verified
# only these teams exist on molecule-ai: ceo(5), engineers(2),
# managers(6), qa(20), security(21), Owners(1), and bot teams. We
# map the RFC roles to the closest existing team and surface the
# mapping explicitly so it's reviewable.
#
# HOW TO EDIT:
# - Tightening: replace `engineers` with a smaller team after creating
# it (e.g. a new `senior-engineers` team if needed).
# - Loosening: add another team to required_teams (OR semantics).
# - Add an item: append to items list and document the slug below.
#
# AUTHOR SELF-ACK IS FORBIDDEN regardless of which team contains them
# — the gate script enforces commenter != PR author before checking
# team membership.
version: 1
# Tier-aware failure mode (RFC#351 open question 2):
# For tier:high — hard-fail (status `failure`, blocks merge via BP).
# For tier:medium — hard-fail (same as high; medium is non-trivial).
# For tier:low — soft-fail (status `pending` with `acked: N/M` in the
# description). BP can choose to require the context
# or not for low-tier PRs.
# If no tier label is present, default to medium (hard-fail) — every PR
# should have a tier label per sop-tier-check, and absence indicates
# a missing-tier defect we should surface, not silently lower the bar.
tier_failure_mode:
"tier:high": hard
"tier:medium": hard
"tier:low": soft
default_mode: hard # used when no tier:* label is present
items:
- slug: comprehensive-testing
numeric_alias: 1
pr_section_marker: "Comprehensive testing performed"
required_teams: [qa, engineers]
description: >-
What was tested, how, edge cases covered. Ack from any qa-team
member (or engineers fallback while qa is small).
- slug: local-postgres-e2e
numeric_alias: 2
pr_section_marker: "Local-postgres E2E run"
required_teams: [engineers]
description: >-
Link to local CI artifact, or "N/A: pure-frontend change". Ack
from any engineer who can verify the local DB test actually ran.
- slug: staging-smoke
numeric_alias: 3
pr_section_marker: "Staging-smoke verified or pending"
required_teams: [engineers]
description: >-
Link to canary run, or "scheduled post-merge". Ack from any
engineer (core-devops/infra-sre are members of engineers team).
- slug: root-cause
numeric_alias: 4
pr_section_marker: "Root-cause not symptom"
required_teams: [managers, ceo]
description: >-
One-sentence root-cause statement. Ack from managers tier
(team-leads) or ceo. Senior judgment required to attest
root-cause-versus-symptom.
- slug: five-axis-review
numeric_alias: 5
pr_section_marker: "Five-Axis review walked"
required_teams: [engineers]
description: >-
Correctness / readability / architecture / security / performance.
Ack from any non-author engineer.
- slug: no-backwards-compat
numeric_alias: 6
pr_section_marker: "No backwards-compat shim / dead code added"
required_teams: [managers, ceo]
description: >-
Yes/no + justification if no. Senior ack required because
backward-compat shims are how dead-code accretes.
- slug: memory-consulted
numeric_alias: 7
pr_section_marker: "Memory/saved-feedback consulted"
required_teams: [engineers]
description: >-
List of feedback memories applicable to this change. Ack from
any engineer who has the same memory access.

View File

@ -0,0 +1,121 @@
# sop-checklist-gate — peer-ack merge gate for SOP-checklist items.
#
# RFC#351 Step 2 of 6 (implementation MVP).
#
# === DESIGN ===
#
# Goal: each PR must answer 7 SOP-checklist questions in its body,
# and each item must have at least one /sop-ack <slug> comment from
# a non-author peer in the required team. BP requires the
# `sop-checklist / all-items-acked (pull_request)` status to merge.
#
# Triggers:
# - `pull_request_target`: opened, edited, synchronize, reopened
# → fires when PR opens, body is edited (refire — RFC#351 §4),
# or new code is pushed (head.sha changes → stale status would
# be auto-discarded by BP via dismiss_stale_reviews, but the
# status itself is per-SHA so we re-post on the new head).
# - `issue_comment`: created, edited, deleted
# → fires on any new comment so /sop-ack / /sop-revoke take
# effect immediately (Gitea 1.22.6 doesn't refire on
# pull_request_review per feedback_pull_request_review_no_refire,
# so issue_comment is the canonical refire channel).
#
# Trust boundary (mirrors RFC#324 §A4 + sop-tier-check security note):
# `pull_request_target` (not `pull_request`) — workflow def is loaded
# from BASE branch, so a PR cannot rewrite this workflow to exfiltrate
# the token. The `actions/checkout` step pins `ref: base.sha` so the
# script ALSO comes from BASE. PR-HEAD code is never executed in the
# runner.
#
# Token scope:
# - read:repository, read:organization for PR + comments + team probes
# - write:repository for POST /statuses/{sha}
# - The token owner MUST be a member of every team referenced by the
# config's required_teams (else /teams/{id}/members/{login} returns
# 403 — see review-check.sh same-gotcha doc). For the MVP we use
# the dev-lead token (a member of engineers, managers, qa, security)
# via a repo secret `SOP_CHECKLIST_GATE_TOKEN`. Provisioning of that
# secret is a follow-up authorization step (separate from this PR).
#
# Failure mode: tier-aware (RFC#351 open question 2):
# - tier:high → state=failure (hard-fail; BP blocks merge)
# - tier:medium → state=failure (hard-fail; same)
# - tier:low → state=pending (soft-fail; BP can choose to require
# this context or skip for low-tier PRs)
# - missing/no-tier → state=failure (default-mode: hard — never lower
# the bar per feedback_fix_root_not_symptom)
#
# Slash-command contract (RFC#351 v1 + §A1.1-style notes from RFC#324):
#
# /sop-ack <slug-or-numeric-alias> [optional note]
# — register a peer-ack for one checklist item.
# — slug accepts kebab-case, snake_case, or natural-spaces
# (all normalize to canonical kebab-case).
# — numeric 1..7 maps via config.items[*].numeric_alias.
# — most-recent (user, slug) directive wins.
#
# /sop-revoke <slug-or-numeric-alias> [reason]
# — invalidate the commenter's own prior /sop-ack for this slug.
# — does NOT affect other peers' acks on the same slug.
# — most-recent (user, slug) directive wins, so a later /sop-ack
# re-restores the ack.
#
# The eval is read-only + idempotent (read PR + comments + team
# membership, compute, post status). Re-running on any event is safe —
# the new status overwrites the previous one for the same context.
name: sop-checklist-gate
on:
pull_request_target:
types: [opened, edited, synchronize, reopened]
issue_comment:
types: [created, edited, deleted]
permissions:
contents: read
pull-requests: read
# NOTE: `statuses: write` is the GitHub-Actions name for POST /statuses.
# Gitea 1.22.6 may not gate on this permission key (it just checks the
# token), but listing it explicitly documents intent for the next
# platform-version upgrade.
statuses: write
jobs:
gate:
# Run on pull_request_target events always. On issue_comment events,
# only when the comment is on a PR (issue_comment fires for issues
# too) and the body contains one of the slash-commands.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'issue_comment' &&
github.event.issue.pull_request != null &&
(contains(github.event.comment.body, '/sop-ack') ||
contains(github.event.comment.body, '/sop-revoke')))
runs-on: ubuntu-latest
steps:
- name: Check out BASE ref (trust boundary — never PR-head)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
# For pull_request_target, the default branch is the trust
# anchor. For issue_comment the PR base may differ from the
# default branch (PR targeting `staging`), so we use the
# default-branch ref explicitly — same approach as
# qa-review.yml so the script source is always trusted.
ref: ${{ github.event.repository.default_branch }}
- name: Run sop-checklist-gate
env:
GITEA_TOKEN: ${{ secrets.SOP_CHECKLIST_GATE_TOKEN || secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }}
OWNER: ${{ github.repository_owner }}
REPO_NAME: ${{ github.event.repository.name }}
run: |
set -euo pipefail
python3 .gitea/scripts/sop-checklist-gate.py \
--owner "$OWNER" \
--repo "$REPO_NAME" \
--pr "$PR_NUMBER" \
--config .gitea/sop-checklist-config.yaml \
--gitea-host git.moleculesai.app