Files
molecule-core/.gitea/scripts/sop-checklist.py
Molecule AI Dev Engineer A (Kimi) bf276bc25d
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 11s
CI / all-required (pull_request) Successful in 14s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Python Lint & Test (pull_request) Successful in 8s
CI / Detect changes (pull_request) Successful in 9s
E2E Chat / detect-changes (pull_request) Successful in 10s
E2E API Smoke Test / detect-changes (pull_request) Successful in 10s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 4s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 12s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 5s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 9s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 12s
qa-review / approved (pull_request) Failing after 13s
security-review / approved (pull_request) Failing after 7s
sop-checklist / na-declarations (pull_request) N/A: (none)
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m14s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m14s
CI / Platform (Go) (pull_request) Successful in 2s
CI / Canvas (Next.js) (pull_request) Successful in 3s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3s
E2E Chat / E2E Chat (pull_request) Successful in 3s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 6s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
audit-force-merge / audit (pull_request) Successful in 4s
gate-check-v3 / gate-check (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-checklist / review-refire (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
fix(ci): add explicit utf-8 encoding to Python open() calls
Python 3's open() default encoding is platform-dependent (PEP 597).
On CI runners it happens to be UTF-8, but being explicit avoids
surprises on Windows dev boxes or custom runner images.

Files touched:
- sop-checklist.py: config loading (YAML + minimal parser)
- tests/_review_check_fixture.py: test fixture scenario loader
- tests/_refire_fixture.py: test fixture scenario loader

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 15:35:36 +00:00

1176 lines
48 KiB
Python

#!/usr/bin/env python3
# sop-checklist — evaluate whether a PR has peer-acked each
# SOP-checklist item. Posts a commit-status that branch protection
# can require.
#
# RFC#351 Step 2 of 6 (implementation MVP).
#
# Invoked by .gitea/workflows/sop-checklist.yml on:
# - pull_request_target: [opened, edited, synchronize, reopened]
# - issue_comment: [created, edited, deleted]
#
# Flow:
# 1. Load .gitea/sop-checklist-config.yaml (from BASE ref — trusted).
# 2. GET /repos/{R}/pulls/{N} — author, head.sha, tier label
# 3. GET /repos/{R}/issues/{N}/comments — extract /sop-ack and /sop-revoke
# 4. For each checklist item:
# a. Is the section marker present in PR body? (author answered)
# b. Is there ≥1 unrevoked /sop-ack from a non-author whose
# team-membership matches required_teams?
# 5. POST /repos/{R}/statuses/{sha} — context
# `sop-checklist / all-items-acked (pull_request)`,
# state=success | failure | pending, description=`acked: N/M …`.
#
# Trust boundary (mirrors RFC#324 §A4):
# This script is loaded from the BASE branch. The workflow's
# actions/checkout step pins ref=base.sha. PR-HEAD code is never
# executed. We only HTTP-call the Gitea API.
#
# Token scope:
# - read:repository / read:organization to enumerate PR + comments
# + team membership (Gitea 1.22.6 quirk: team-membership endpoint
# returns 403 if token owner is not in the team; see review-check.sh
# for the same gotcha — we surface the same fail-closed message).
# - write:repository for `POST /repos/{R}/statuses/{sha}`. Unlike
# RFC#324's pattern (which uses the JOB's own pass/fail as the
# status), we POST the status explicitly because the gate posts
# a single multi-item status with a richer description than a
# bare success/failure context can carry.
#
# Slug normalization rules (canonical form: kebab-case):
# - Lowercase
# - Whitespace + underscores → single dash
# - Strip non [a-z0-9-] characters
# - Collapse adjacent dashes
# - Strip leading/trailing dashes
# - If the result is a digit string (e.g. "1"), look up via
# config.items[*].numeric_alias to get the kebab-case slug.
#
# Examples:
# "Comprehensive_Testing" → "comprehensive-testing"
# "comprehensive testing" → "comprehensive-testing"
# "1" → "comprehensive-testing"
# "Five-Axis-Review" → "five-axis-review"
#
# Revoke semantics:
# /sop-revoke <slug> [reason] — most-recent comment per (slug, user)
# wins. So if Alice posts /sop-ack X then later /sop-revoke X, her ack
# for X is invalidated. Bob's prior /sop-ack X is unaffected. If Alice
# posts /sop-revoke X then later /sop-ack X again, the ack is restored.
from __future__ import annotations
import argparse
import json
import os
import re
import resource
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Callable, Iterator
# ---------------------------------------------------------------------------
# Address-space guardrail (RFC#369 / task #369 follow-up to mc#1242-class OOM).
#
# `get_issue_comments` paginates the full comment history of a PR. On
# bot-relay-heavy PRs (e.g. mc#291, mc#1242) this can balloon past the
# runner's cgroup memory limit and 137 the job. Cap virtual-address-space
# at 2 GiB so the script OOMs as a `MemoryError` (catchable / surfaceable)
# rather than a SIGKILL we can't post a status for.
#
# 2 GiB is generous — a 5000-comment PR with 1 KiB minimal-dicts (see
# get_issue_comments below) fits in ~10 MiB, leaving plenty of headroom
# for the Python runtime + urllib + json buffers.
#
# Skipped under pytest / dry-run where RLIMIT_AS would interfere with
# test runner memory needs (set SOP_CHECKLIST_NO_RLIMIT=1 to opt out).
if not os.environ.get("SOP_CHECKLIST_NO_RLIMIT"):
try:
resource.setrlimit(resource.RLIMIT_AS, (2 * 1024**3, 2 * 1024**3))
except (ValueError, OSError):
# macOS sometimes refuses RLIMIT_AS; not fatal — the Linux runner
# is the only place this matters for the OOM-prevention goal.
pass
# Per-comment body cap (task #369). The directive parser walks the body
# line-by-line looking for ^/sop-ack ^/sop-revoke ^/sop-n/a markers — only
# the first few KiB matter for that. Cap each comment body so a single
# pasted-log comment can't push us past the cgroup limit.
_MAX_BODY_BYTES = int(os.environ.get("SOP_CHECKLIST_MAX_BODY_BYTES") or 8 * 1024)
# ---------------------------------------------------------------------------
# Slug normalization
# ---------------------------------------------------------------------------
_NORMALIZE_REPLACE_RE = re.compile(r"[\s_]+")
_NORMALIZE_STRIP_RE = re.compile(r"[^a-z0-9-]")
_NORMALIZE_DASH_RE = re.compile(r"-+")
def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> str:
"""Normalize a user-supplied slug to canonical kebab-case form.
See module header for the rules.
If the input is a pure digit string AND numeric_aliases is provided,
the alias mapping is consulted. Unknown digits return "" so the caller
can flag the comment as unparseable.
"""
if raw is None:
return ""
s = raw.strip().lower()
s = _NORMALIZE_REPLACE_RE.sub("-", s)
s = _NORMALIZE_STRIP_RE.sub("", s)
s = _NORMALIZE_DASH_RE.sub("-", s)
s = s.strip("-")
if s.isdigit() and numeric_aliases is not None:
return numeric_aliases.get(int(s), "")
return s
# ---------------------------------------------------------------------------
# Comment parsing — /sop-ack and /sop-revoke
# ---------------------------------------------------------------------------
# A directive must be on its own line. Permits leading whitespace.
# Optional trailing note after the slug for /sop-ack and required reason
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
Returns (directives, na_directives) where each is a list of
(kind, canonical_slug, note) tuples:
kind is "sop-ack", "sop-revoke", or "sop-n/a"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
The two lists are kept separate so call sites can unpack them
directly (e.g. directives, na_directives = parse_directives(...)).
"""
directives: list[tuple[str, str, str]] = []
na_directives: list[tuple[str, str, str]] = []
if not comment_body:
return directives, na_directives
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
# If the raw match included trailing words, the regex non-greedy
# captured only the first token; strip again for safety.
# We split on whitespace to keep the FIRST word as the slug, and
# everything after as the note.
parts = raw_slug.split()
if not parts:
continue
first = parts[0]
# If the slug-capture greedily matched multiple words (e.g.
# "comprehensive testing"), preserve normalize behavior: join
# the WHOLE first-word-token only; trailing words get appended to
# the note. The regex limits group(2) to [A-Za-z0-9_\- ] so we
# may have multi-word forms here — normalize handles them.
if len(parts) > 1:
# User wrote "/sop-ack comprehensive testing extra-note"
# → treat "comprehensive testing" as the slug source if it
# normalizes to a known item; otherwise treat "comprehensive"
# as slug and "testing extra-note" as note. We defer the
# disambiguation to the caller via the returned canonical
# slug. For simplicity: try the WHOLE captured string first.
canonical = normalize_slug(raw_slug, numeric_aliases)
else:
canonical = normalize_slug(first, numeric_aliases)
note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
entry = (kind, canonical, note_from_group)
if kind == "sop-n/a":
na_directives.append(entry)
else:
directives.append(entry)
return directives, na_directives
# ---------------------------------------------------------------------------
# PR body section detection
# ---------------------------------------------------------------------------
def section_marker_present(body: str, marker: str) -> bool:
"""Return True if `marker` appears in `body` case-insensitively
on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the
same line OR within the next non-blank line — this prevents
trivially-empty checklists like:
## SOP-Checklist
- [ ] **Comprehensive testing performed**:
- [ ] **Local-postgres E2E run**:
from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft
finding via the section-present test alone.
NOTE: we scan forward through blank lines (the markdown-header pattern
is ## Header\\n\\ncontent) so that a header + blank-line + content
structure still satisfies the check. The backward checkbox fallback
catches inline markers without a preceding checkbox (mc#1099).
"""
if not body or not marker:
return False
# Strip trailing whitespace so the blank-line scan below can find
# content that appears on the very last line of the body (without
# being misled by a trailing \n or spaces).
body = body.rstrip()
body_lower = body.lower()
marker_lower = marker.lower()
idx = body_lower.find(marker_lower)
if idx < 0:
return False
# Walk to end of line.
line_end = body.find("\n", idx)
if line_end < 0:
line_end = len(body)
line = body[idx + len(marker):line_end]
# Strip the colon + checkbox tail patterns; require at least one
# non-whitespace, non-punctuation char.
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
# Fall through: scan forward, skipping blank-only lines, until we find
# non-empty content or run out of body. Handles:
# ## Header ← marker line (empty after marker)
# ← blank line (skipped)
# - actual content ← found
pos = line_end
while True:
# Skip the current newline and any additional newlines (blank lines).
while pos < len(body) and body[pos] == "\n":
pos += 1
if pos >= len(body):
break
line_end = body.find("\n", pos)
if line_end < 0:
line_end = len(body)
line = body[pos:line_end]
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
pos = line_end
# Last resort: the marker may appear mid-sentence (e.g.
# **Memory/saved-feedback consulted**: No applicable...).
# Search backward within the CURRENT LINE only (not preceding lines)
# to find a checkbox on the same line before the marker text.
# mc#1099 follow-up: memory-consulted detection was failing because
# the checkbox was on the same line before the inline marker.
_CHECKBOX_RE = re.compile(r"- \[[ x\]]|<input", re.IGNORECASE)
line_start = body.rfind("\n", 0, idx) + 1 # 0 if no newline before idx
before = body[line_start:idx]
m = _CHECKBOX_RE.search(before)
if not m:
return False
# Require meaningful content between the checkbox and the marker text
# (markdown formatting like ** or * must also be stripped).
# If only whitespace/markdown chars remain, the checkbox line is empty.
between = before[m.end() :]
stripped_between = re.sub(r"[\s\*:#\[\]_\-]+", "", between)
return bool(stripped_between)
# ---------------------------------------------------------------------------
# Ack-state computation
# ---------------------------------------------------------------------------
def compute_ack_state(
comments: list[dict[str, Any]],
pr_author: str,
items_by_slug: dict[str, dict[str, Any]],
numeric_aliases: dict[int, str],
team_membership_probe: "callable[[str, list[str]], list[str]]",
high_risk: bool = False,
) -> dict[str, dict[str, Any]]:
"""Compute per-item ack state.
Each comment is processed in chronological order. The most-recent
directive per (commenter, slug) wins.
Returns a dict keyed by canonical slug:
{
"comprehensive-testing": {
"ackers": ["bob"], # non-author, team-verified
"rejected_ackers": { # debugging info
"self_ack": ["alice"],
"unknown_slug": [],
"not_in_team": ["eve"],
}
},
...
}
"""
# Step 1: collapse directives per (commenter, slug) — most recent wins.
# comments are expected to come in chronological order from the
# API (Gitea returns oldest-first by default for issues/{N}/comments).
latest_directive: dict[tuple[str, str], str] = {} # (user, slug) → kind
unparseable_per_user: dict[str, int] = {}
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases)[0]:
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
latest_directive[(user, slug)] = kind
# Step 2: build candidate ackers per slug.
# Filter out self-acks and unknown slugs.
ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug}
pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug}
for (user, slug), kind in latest_directive.items():
if kind != "sop-ack":
continue # revokes leave the (user,slug) state as "no ack"
if slug not in items_by_slug:
# Slug normalized to something not in our config — store
# under a synthetic key for diagnostic surfacing. Don't add
# to any item.
continue
if user == pr_author:
rejected_self[slug].append(user)
continue
pending_team_check[slug].append(user)
# Step 3: team membership probe per slug (batched per slug to keep
# API call count down — same user may ack multiple items but the
# required_teams differ per item, so we MUST probe per (user, item)).
rejected_not_in_team: dict[str, list[str]] = {s: [] for s in items_by_slug}
for slug, candidates in pending_team_check.items():
if not candidates:
continue
# Risk-class-aware required-teams resolution (RFC#450 Option C):
# high-risk PRs use `required_teams_high_risk` (when set on the
# item); default class uses `required_teams`. The probe closure
# is built with the same high_risk flag so the two reads are
# always consistent (both sites share `resolve_required_teams`).
required = resolve_required_teams(items_by_slug[slug], high_risk)
approved = team_membership_probe(slug, candidates) # returns subset
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
ackers_per_slug[slug] = approved
# Stash resolved teams for description rendering.
items_by_slug[slug]["_required_resolved"] = required
return {
slug: {
"ackers": ackers_per_slug[slug],
"rejected": {
"self_ack": rejected_self[slug],
"not_in_team": rejected_not_in_team[slug],
},
}
for slug in items_by_slug
}
# ---------------------------------------------------------------------------
# N/A-gate evaluation
# ---------------------------------------------------------------------------
def compute_na_state(
comments: list[dict[str, Any]],
author: str,
na_gates: dict[str, Any],
probe: Callable[[str, list[str]], list[str]],
) -> dict[str, dict[str, Any]]:
"""Evaluate which N/A gates have a valid declaration from a team member.
Returns dict[gate_name, dict] where each dict has:
declared: bool — at least one valid non-author team-member declared N/A
decl_ackers: list[str] — usernames who declared this gate N/A
rejected: dict with keys:
not_in_team: list[str] — users who tried but aren't in required teams
"""
# Build per-user latest N/A directive (most-recent wins per RFC#324).
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, gate, note in parse_directives(body, {})[1]:
# [1] = na_directives only
if gate in na_gates:
latest_na[user] = (gate, note)
result: dict[str, dict[str, Any]] = {}
for gate, gate_cfg in na_gates.items():
result[gate] = {
"declared": False,
"decl_ackers": [],
"rejected": {"not_in_team": []},
}
decl_ackers: list[str] = []
not_in_team: list[str] = []
for user, (g, _note) in latest_na.items():
if g != gate:
continue
if user == author:
continue # authors cannot self-declare N/A
approved = probe(gate, [user])
if approved:
decl_ackers.append(user)
else:
not_in_team.append(user)
result[gate]["declared"] = bool(decl_ackers)
result[gate]["decl_ackers"] = decl_ackers
result[gate]["rejected"]["not_in_team"] = not_in_team
return result
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
class GiteaClient:
def __init__(self, host: str, token: str):
self.base = f"https://{host}/api/v1"
self.token = token
# Cache team-name → team-id resolutions per org.
self._team_id_cache: dict[tuple[str, str], int | None] = {}
def _req(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
ok_codes: tuple[int, ...] = (200, 201, 204),
) -> tuple[int, Any]:
url = self.base + path
data = None
headers = {
"Authorization": f"token {self.token}",
"Accept": "application/json",
}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=20) as r:
raw = r.read()
code = r.getcode()
except urllib.error.HTTPError as e:
code = e.code
raw = e.read()
try:
parsed = json.loads(raw.decode("utf-8")) if raw else None
except json.JSONDecodeError:
parsed = raw.decode("utf-8", errors="replace") if raw else None
return code, parsed
def get_pr(self, owner: str, repo: str, pr: int) -> dict[str, Any]:
code, data = self._req("GET", f"/repos/{owner}/{repo}/pulls/{pr}")
if code != 200:
raise RuntimeError(f"GET pulls/{pr} → HTTP {code}: {data!r}")
return data
def iter_issue_comments(
self, owner: str, repo: str, issue: int, page_size: int = 50
) -> Iterator[dict[str, Any]]:
"""Stream comments page-by-page, yielding ONE minimal-dict per comment.
Each yielded comment carries ONLY the fields the gate actually reads
— `{"user": {"login": str}, "body": str}` — and DROPS the much
larger Gitea-API extras (html_url, pull_request_url, issue_url,
assets, created_at, updated_at, id, original_author_*).
Memory motivation (task #369 / mc#1242-class OOM): full Gitea
comment dicts are ~2 KiB median + ~3 KiB p95. On PRs with several
thousand bot-relay comments the eager `list[full_dict]` shape used
previously pushed runner anon-rss past the cgroup limit. The
minimal-dict shape is ~10-20x smaller (typically ~50-100B Python
overhead + the body string).
The two downstream consumers (`compute_ack_state`,
`compute_na_state`) each iterate the comment list exactly once and
read only `body` + `user.login`, so dropping every other field is
safe. They still receive `list[dict[str, Any]]`-shaped objects so
the test fixtures (which already used the minimal shape) keep
working with no fixture changes.
"""
page = 1
while True:
code, data = self._req(
"GET",
f"/repos/{owner}/{repo}/issues/{issue}/comments?limit={page_size}&page={page}",
)
if code != 200:
raise RuntimeError(
f"GET issues/{issue}/comments page={page} → HTTP {code}: {data!r}"
)
if not data:
break
for c in data:
# Minimal projection — drop ALL fields the gate doesn't read.
user_login = ((c.get("user") or {}).get("login") or "") if isinstance(c, dict) else ""
body = (c.get("body") if isinstance(c, dict) else "") or ""
# Body-size guardrail: huge comments (e.g. pasted CI logs) can
# individually be MiBs. The directive parser only needs the
# first ~8 KiB to find /sop-ack /sop-revoke /sop-n/a markers
# — anything past that is filler. Truncate at 8 KiB so a
# single oversized comment can't OOM the runner.
if len(body) > _MAX_BODY_BYTES:
body = body[:_MAX_BODY_BYTES]
yield {"user": {"login": user_login}, "body": body}
if len(data) < page_size:
break
page += 1
def get_issue_comments(
self,
owner: str,
repo: str,
issue: int,
max_comments: int | None = None,
) -> list[dict[str, Any]]:
"""Paginate + collect minimal comment dicts. See `iter_issue_comments`
for the per-comment shape and the OOM-prevention rationale.
`max_comments` (optional, default unbounded): hard cap. When the cap
is hit we stop fetching further pages and the caller surfaces a
soft 'skipping due to volume' status (see main()).
"""
out: list[dict[str, Any]] = []
for c in self.iter_issue_comments(owner, repo, issue):
out.append(c)
if max_comments is not None and len(out) >= max_comments:
break
return out
def resolve_team_id(self, org: str, team_name: str) -> int | None:
key = (org, team_name)
if key in self._team_id_cache:
return self._team_id_cache[key]
code, data = self._req("GET", f"/orgs/{org}/teams/search?q={urllib.parse.quote(team_name)}")
team_id = None
if code == 200 and isinstance(data, dict):
for t in data.get("data", []):
if t.get("name") == team_name:
team_id = t.get("id")
break
if team_id is None and code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == team_name:
team_id = t.get("id")
break
self._team_id_cache[key] = team_id
return team_id
def is_team_member(self, team_id: int, login: str) -> bool | None:
"""Return True / False / None (unknown — 403 from API)."""
code, _ = self._req(
"GET", f"/teams/{team_id}/members/{urllib.parse.quote(login)}"
)
if code in (200, 204):
return True
if code == 404:
return False
# 403 means the token owner isn't in this team, so the API
# refuses to confirm membership. Fail-closed at the caller.
return None
def post_status(
self,
owner: str,
repo: str,
sha: str,
state: str,
context: str,
description: str,
target_url: str = "",
) -> None:
body = {
"state": state,
"context": context,
"description": description[:140], # Gitea truncates to 255 but be safe
"target_url": target_url or "",
}
code, data = self._req(
"POST",
f"/repos/{owner}/{repo}/statuses/{sha}",
body=body,
ok_codes=(201,),
)
if code not in (200, 201):
raise RuntimeError(
f"POST statuses/{sha} → HTTP {code}: {data!r}"
)
# ---------------------------------------------------------------------------
# Config loader (PyYAML-free — config file is intentionally tiny + flat)
# ---------------------------------------------------------------------------
def load_config(path: str) -> dict[str, Any]:
"""Load .gitea/sop-checklist-config.yaml.
Uses PyYAML if available, otherwise falls back to a built-in
minimal parser sufficient for our flat config shape. Bundling
PyYAML on the runner is one apt install away but we avoid the
dep by keeping the config shape constrained.
"""
try:
# yaml is an optional dep; the canonical loader is used when available,
# but the SOP runs on runners that may not have PyYAML installed. The
# fallback _load_config_minimal covers the same config shape without
# requiring the dep, so the ignore is safe: if yaml loads, we use it;
# otherwise we fall back silently.
import yaml # type: ignore[import-not-found]
with open(path, encoding="utf-8") as f:
return yaml.safe_load(f)
except ImportError:
return _load_config_minimal(path)
def _load_config_minimal(path: str) -> dict[str, Any]:
"""Minimal YAML subset parser for our config shape.
Supports: top-level scalar:value, top-level map-of-map (e.g.
tier_failure_mode), top-level list of maps (items:), and within an
item map: scalars + lists of scalars. Does NOT support nested lists,
YAML anchors, multi-doc, or flow style.
"""
with open(path, encoding="utf-8") as f:
lines = f.readlines()
return _parse_minimal_yaml(lines)
def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]:
"""Hand-rolled subset parser. See _load_config_minimal docstring.
C901: function is necessarily long — it implements a finite-state YAML
subset (scalars, maps, lists of maps at fixed depth). No utility refactors
meaningfully reduce length without degrading readability. All branches
are exhaustively tested in test_parse_minimal_yaml.py.
"""
# Strip comments + blank lines but preserve indentation.
cleaned: list[tuple[int, str]] = []
for raw in lines:
# Don't strip a "#" that is inside a quoted value.
body = raw.rstrip("\n")
# Remove trailing comment.
idx = body.find("#")
if idx >= 0 and (idx == 0 or body[idx - 1] in " \t"):
body = body[:idx].rstrip()
if not body.strip():
continue
indent = len(body) - len(body.lstrip(" "))
cleaned.append((indent, body.strip()))
root: dict[str, Any] = {}
i = 0
n = len(cleaned)
def parse_scalar(s: str) -> Any:
s = s.strip()
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
if s.startswith("'") and s.endswith("'"):
return s[1:-1]
if s.lower() in ("true", "yes"):
return True
if s.lower() in ("false", "no"):
return False
try:
return int(s)
except ValueError:
pass
return s
def parse_inline_list(s: str) -> list[Any]:
s = s.strip()
if not (s.startswith("[") and s.endswith("]")):
return [parse_scalar(s)]
inner = s[1:-1]
if not inner.strip():
return []
return [parse_scalar(x.strip()) for x in inner.split(",")]
while i < n:
indent, line = cleaned[i]
if indent != 0:
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if rest == "":
# Block — could be map or list.
i += 1
# Look ahead for first child.
if i < n and cleaned[i][1].startswith("- "):
# List of items.
items: list[Any] = []
while i < n and cleaned[i][0] > indent and cleaned[i][1].startswith("- "):
item_indent = cleaned[i][0]
first_kv = cleaned[i][1][2:].strip() # strip "- "
item: dict[str, Any] = {}
if ":" in first_kv:
k, _, v = first_kv.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
elif v.startswith(">-") or v.startswith(">"):
# Folded scalar continues on subsequent indented lines
collected: list[str] = []
i += 1
while i < n and cleaned[i][0] > item_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
items.append(item)
continue
elif v.startswith("["):
item[k] = parse_inline_list(v)
else:
item[k] = parse_scalar(v)
i += 1
# Subsequent k:v lines at deeper indent belong to this item.
while i < n and cleaned[i][0] > item_indent and not cleaned[i][1].startswith("- "):
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
i += 1
elif v.startswith(">-") or v.startswith(">"):
collected = []
i += 1
while i < n and cleaned[i][0] > sub_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
elif v.startswith("["):
item[k] = parse_inline_list(v)
i += 1
else:
item[k] = parse_scalar(v)
i += 1
else:
i += 1
items.append(item)
root[key] = items
else:
# Sub-map.
submap: dict[str, Any] = {}
while i < n and cleaned[i][0] > indent:
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip().strip('"').strip("'")
v = v.strip()
if v.startswith("[") and v.endswith("]"):
submap[k] = parse_inline_list(v)
else:
submap[k] = parse_scalar(v)
i += 1
root[key] = submap
else:
# Inline scalar or list.
if rest.startswith("[") and rest.endswith("]"):
root[key] = parse_inline_list(rest)
else:
root[key] = parse_scalar(rest)
i += 1
return root
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def render_status(
items: list[dict[str, Any]],
ack_state: dict[str, dict[str, Any]],
body_state: dict[str, bool],
) -> tuple[str, str]:
"""Return (state, description) for the commit-status post.
state is "success" if every item has at least one valid ack
(body section presence is informational only — peer-ack is the
real gate). tier:low PRs receive state="success" (soft-fail — no
acks required); the description carries "[info tier:low]" prefix.
"""
n = len(items)
fully_acked = [
it["slug"] for it in items if ack_state[it["slug"]]["ackers"]
]
missing = [
it["slug"] for it in items if not ack_state[it["slug"]]["ackers"]
]
missing_body = [it["slug"] for it in items if not body_state.get(it["slug"], False)]
desc_parts = [f"acked: {len(fully_acked)}/{n}"]
if missing:
# Show up to 3 missing slugs to stay inside the 140-char budget.
shown = ", ".join(missing[:3])
if len(missing) > 3:
shown += f", +{len(missing) - 3}"
desc_parts.append(f"missing: {shown}")
if missing_body:
shown = ", ".join(missing_body[:3])
if len(missing_body) > 3:
shown += f", +{len(missing_body) - 3}"
desc_parts.append(f"body-unfilled: {shown}")
state = "success" if not missing and not missing_body else "failure"
return state, "".join(desc_parts)
def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str:
"""Read tier label, return 'hard' or 'soft' per cfg.tier_failure_mode."""
labels = pr.get("labels") or []
tier_labels = [label.get("name", "") for label in labels if (label.get("name", "") or "").startswith("tier:")]
mode_map = cfg.get("tier_failure_mode") or {}
default_mode = cfg.get("default_mode", "hard")
for tl in tier_labels:
if tl in mode_map:
return mode_map[tl]
return default_mode
def is_high_risk(pr: dict[str, Any], cfg: dict[str, Any]) -> bool:
"""Return True when the PR is high-risk per RFC#450 Option C.
A PR is high-risk when ANY of:
- it carries the `tier:high` label (mechanically strictest tier), or
- it carries any label listed in cfg.high_risk_labels.
High-risk PRs use `required_teams_high_risk` (when set on an item)
instead of the default `required_teams`. Items without
`required_teams_high_risk` are unaffected (the default applies).
Governance fix for internal#442 — closes the inconsistency between
sop-tier-check (tier-aware) and sop-checklist (was tier-blind).
"""
label_set = {(label.get("name") or "") for label in (pr.get("labels") or [])}
if "tier:high" in label_set:
return True
high_risk_labels = set(cfg.get("high_risk_labels") or [])
return bool(label_set & high_risk_labels)
def resolve_required_teams(item: dict[str, Any], high_risk: bool) -> list[str]:
"""Pick the active required_teams list for an item.
When high_risk is True AND the item declares a non-empty
`required_teams_high_risk`, return that. Else fall back to
`required_teams`. Keeping this in one helper means the gate's
decision shape stays single-sited even as items grow.
"""
if high_risk:
elevated = item.get("required_teams_high_risk") or []
if elevated:
return list(elevated)
return list(item.get("required_teams") or [])
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
p.add_argument("--repo", required=True)
p.add_argument("--pr", type=int, required=True)
p.add_argument("--config", default=".gitea/sop-checklist-config.yaml")
p.add_argument("--gitea-host", default="git.moleculesai.app")
p.add_argument(
"--dry-run",
action="store_true",
help="Compute state but do not POST the status.",
)
p.add_argument(
"--status-context",
default="sop-checklist / all-items-acked (pull_request)",
)
p.add_argument(
"--exit-on-state",
action="store_true",
help=(
"If set, exit non-zero when state=failure. Default OFF so the "
"job-level conclusion is independent of ack-state — the only "
"thing BP sees is the POSTed status. Useful for local debugging."
),
)
p.add_argument(
"--max-comments",
type=int,
default=int(os.environ.get("SOP_CHECKLIST_MAX_COMMENTS") or 5000),
help=(
"Hard cap on comments fetched from the PR. Above this we post "
"a SOFT-pending status with a 'skipping due to volume' note "
"instead of OOM'ing the runner (task #369). Override with the "
"SOP_CHECKLIST_MAX_COMMENTS env var. Set 0 to disable the cap."
),
)
args = p.parse_args(argv)
token = os.environ.get("GITEA_TOKEN", "")
if not token and not args.dry_run:
print("::error::GITEA_TOKEN env required", file=sys.stderr)
return 2
cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items}
na_gates: dict[str, Any] = cfg.get("n/a_gates", {})
numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
}
client = GiteaClient(args.gitea_host, token) if token else None
if not client:
print("::error::No client (dry-run without token has nothing to do)", file=sys.stderr)
return 2
pr = client.get_pr(args.owner, args.repo, args.pr)
if pr.get("state") != "open":
print(f"::notice::PR #{args.pr} is {pr.get('state')} — gate is a no-op")
return 0
author = (pr.get("user") or {}).get("login", "")
head_sha = (pr.get("head") or {}).get("sha", "")
body = pr.get("body", "") or ""
if not author or not head_sha:
print("::error::PR payload missing user.login or head.sha", file=sys.stderr)
return 1
max_comments_cap = args.max_comments if args.max_comments and args.max_comments > 0 else None
comments = client.get_issue_comments(
args.owner, args.repo, args.pr, max_comments=max_comments_cap
)
# Volume short-circuit: PRs with thousands of bot-relay comments
# (the mc#1242-class OOM source) get a soft 'volume-skipped' status
# so the gate doesn't churn the runner; reviewers can re-trigger by
# editing the PR or filing a fresh PR with the housekeeping comments
# split off. Cap-hit means we couldn't see the WHOLE history, so we
# can't fairly post failure — pending is the safe default.
volume_skipped = bool(max_comments_cap and len(comments) >= max_comments_cap)
# High-risk classification (RFC#450 Option C, governance fix for
# internal#442). Computed ONCE per PR — used by both the probe
# closure and compute_ack_state so the elevation decision is
# single-sited.
high_risk = is_high_risk(pr, cfg)
# Build team-membership probe closure that caches results per
# (user, team-id) so a user acking multiple items only triggers
# one membership lookup per team.
team_member_cache: dict[tuple[str, int], bool | None] = {}
def probe(slug: str, users: list[str]) -> list[str]:
# `slug` may be either an items-key (compute_ack_state caller) OR
# an n/a-gate key (compute_na_state caller). Previously this hard
# KeyError'd on the n/a-gate path when slug was e.g. "security-review"
# — that's a config gate, not an item — so the gate would crash
# instead of falling back to the gate's own required_teams. Fix
# task #369 follow-up to issue #355.
if slug in items_by_slug:
item = items_by_slug[slug]
team_names: list[str] = resolve_required_teams(item, high_risk)
elif slug in na_gates:
# n/a-gate configs carry `required_teams` directly (see
# sop-checklist-config.yaml: n/a_gates.<gate>.required_teams).
gate_cfg = na_gates[slug] or {}
team_names = list(gate_cfg.get("required_teams") or [])
if not team_names:
print(
f"::warning::n/a-gate '{slug}' has no required_teams; "
"fail-closed (no users will be approved)",
file=sys.stderr,
)
else:
# Unknown slug — fail closed, log so we can find config drift.
print(
f"::warning::probe() called with slug '{slug}' which is "
f"neither an items entry nor an n/a-gate; fail-closed",
file=sys.stderr,
)
return []
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
# available — fall back to the list endpoint.
team_ids: list[int] = []
for tn in team_names:
tid = client.resolve_team_id(args.owner, tn)
if tid is None:
# Try the list endpoint as a fallback.
code, data = client._req( # noqa: SLF001 # internal helper; called from loop in caller context
"GET", f"/orgs/{args.owner}/teams"
)
if code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == tn:
tid = t.get("id")
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001 # internal write-through cache
break
if tid is not None:
team_ids.append(tid)
else:
print(
f"::warning::could not resolve team-id for '{tn}' "
f"in org '{args.owner}' — item '{slug}' will fail closed",
file=sys.stderr,
)
approved: list[str] = []
for u in users:
for tid in team_ids:
cache_key = (u, tid)
if cache_key not in team_member_cache:
team_member_cache[cache_key] = client.is_team_member(tid, u)
result = team_member_cache[cache_key]
if result is True:
approved.append(u)
break
if result is None:
print(
f"::warning::team-probe for {u} in team-id {tid} returned 403 "
"(token owner not in that team — fail-closed per RFC#324)",
file=sys.stderr,
)
# Treat as not-in-team for this user/team pair; loop
# may still find membership in another team.
return approved
ack_state = compute_ack_state(
comments, author, items_by_slug, numeric_aliases, probe, high_risk=high_risk
)
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
state, description = render_status(items, ack_state, body_state)
mode = get_tier_mode(pr, cfg)
if mode == "soft":
# tier:low: acks are informational only — post success so BP gate passes.
# Description carries "[info tier:low]" prefix so reviewers know acks
# were not required (vs a tier:medium+ PR that truly passed all acks).
state = "success"
description = f"[info tier:low] {description}"
if volume_skipped:
# Above the comment-cap — we may have a partial view. Soft-pend
# so neither BP nor the author gets stuck; surface the cap so
# reviewers know what's up. No-block at the gate level.
state = "pending"
description = (
f"[volume-skipped] comment-cap={max_comments_cap} hit; please file "
f"a fresh PR with bot-relay history split off (#369). {description}"
)
# Diagnostics to job log.
print(
f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} "
f"mode={mode} risk_class={'high' if high_risk else 'default'}"
)
for it in items:
slug = it["slug"]
ackers = ack_state[slug]["ackers"]
if ackers:
print(f"::notice:: [PASS] {slug} — acked by {','.join(ackers)}")
else:
r = ack_state[slug]["rejected"]
extras: list[str] = []
if r["self_ack"]:
extras.append(f"self-acks-rejected:{','.join(r['self_ack'])}")
if r["not_in_team"]:
extras.append(f"not-in-team:{','.join(r['not_in_team'])}")
extra = " (" + "; ".join(extras) + ")" if extras else ""
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
print(f"::notice::posting status: state={state} desc={description!r}")
if args.dry_run:
print("::notice::--dry-run: not posting status")
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
client.post_status(
args.owner, args.repo, head_sha,
state=state, context=args.status_context,
description=description, target_url=target_url,
)
print(f"::notice::status posted: {args.status_context}{state}")
# --- N/A gate status (RFC#324 §N/A follow-up) ---
# Post a separate status so review-check.sh can discover N/A declarations
# and waive the Gitea-approve requirement for that gate.
na_state: dict[str, dict[str, Any]] = {}
if na_gates:
na_state = compute_na_state(comments, author, na_gates, probe)
na_descs: list[str] = []
for gate, s in na_state.items():
if s["declared"]:
na_descs.append(gate)
decl = s["decl_ackers"]
rej = s["rejected"]["not_in_team"]
if decl:
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
if rej:
print(
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
file=sys.stderr,
)
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
na_status_state = "success" if na_descs else "pending"
# review-check.sh reads the description to discover which gates are N/A.
# Include the gate names so it can grep for them.
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
if not args.dry_run:
client.post_status(
args.owner, args.repo, head_sha,
state=na_status_state,
context="sop-checklist / na-declarations (pull_request)",
description=na_description,
target_url=target_url,
)
print(
f"::notice::na-declarations status → {na_status_state}: {na_description}"
)
# By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the
# description less actionable. --exit-on-state restores the old
# behavior for local debugging.
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
if __name__ == "__main__":
sys.exit(main())