|
|
|
@@ -102,7 +102,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Comment parsing — /sop-ack and /sop-revoke
|
|
|
|
|
# Comment parsing — /sop-ack, /sop-revoke, and /sop-n/a
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
# A directive must be on its own line. Permits leading whitespace.
|
|
|
|
@@ -114,21 +114,33 @@ _DIRECTIVE_RE = re.compile(
|
|
|
|
|
re.MULTILINE,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# /sop-n/a <gate> [reason] — declare a qa/sec gate N/A.
|
|
|
|
|
# Gate names: qa-review, security-review (match review-check.sh context names).
|
|
|
|
|
_NA_DIRECTIVE_RE = re.compile(
|
|
|
|
|
r"^[ \t]*/sop-n/a[ \t]+([A-Za-z0-9_\-]+)(?:[ \t]+(.*))?[ \t]*$",
|
|
|
|
|
re.MULTILINE,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_directives(
|
|
|
|
|
comment_body: str,
|
|
|
|
|
numeric_aliases: dict[int, str],
|
|
|
|
|
) -> list[tuple[str, str, str]]:
|
|
|
|
|
"""Extract /sop-ack and /sop-revoke directives from a comment body.
|
|
|
|
|
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
|
|
|
|
|
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
|
|
|
|
|
|
|
|
|
|
Returns a list of (kind, canonical_slug, note) tuples where:
|
|
|
|
|
kind is "sop-ack" or "sop-revoke"
|
|
|
|
|
canonical_slug is the normalized form (or "" if unparseable)
|
|
|
|
|
note is the trailing free-text (may be "")
|
|
|
|
|
Returns (directives, na_directives) where:
|
|
|
|
|
directives is a list of (kind, canonical_slug, note) tuples
|
|
|
|
|
kind is "sop-ack" or "sop-revoke"
|
|
|
|
|
canonical_slug is the normalized form (or "" if unparseable)
|
|
|
|
|
note is the trailing free-text (may be "")
|
|
|
|
|
na_directives is a list of (gate_name, reason) tuples
|
|
|
|
|
gate_name is "qa-review" or "security-review" (raw from comment)
|
|
|
|
|
reason is the free-text after the gate name (may be "")
|
|
|
|
|
"""
|
|
|
|
|
out: list[tuple[str, str, str]] = []
|
|
|
|
|
na_out: list[tuple[str, str, str]] = []
|
|
|
|
|
if not comment_body:
|
|
|
|
|
return out
|
|
|
|
|
return out, na_out
|
|
|
|
|
for m in _DIRECTIVE_RE.finditer(comment_body):
|
|
|
|
|
kind = m.group(1)
|
|
|
|
|
raw_slug = (m.group(2) or "").strip()
|
|
|
|
@@ -159,7 +171,11 @@ def parse_directives(
|
|
|
|
|
# If we collapsed multi-word slug into kebab and there's a
|
|
|
|
|
# trailing-text group too, append it.
|
|
|
|
|
out.append((kind, canonical, note_from_group))
|
|
|
|
|
return out
|
|
|
|
|
for m in _NA_DIRECTIVE_RE.finditer(comment_body):
|
|
|
|
|
gate_raw = (m.group(1) or "").strip()
|
|
|
|
|
reason = (m.group(2) or "").strip()
|
|
|
|
|
na_out.append((gate_raw.lower(), reason))
|
|
|
|
|
return out, na_out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@@ -172,8 +188,8 @@ def section_marker_present(body: str, marker: str) -> bool:
|
|
|
|
|
on a non-empty line (i.e. the author actually filled it in).
|
|
|
|
|
|
|
|
|
|
We require the marker substring AND non-whitespace content on the
|
|
|
|
|
same line OR within the next line — this prevents trivially-empty
|
|
|
|
|
checklists like:
|
|
|
|
|
same line OR within the next non-blank line — this prevents
|
|
|
|
|
trivially-empty checklists like:
|
|
|
|
|
|
|
|
|
|
## SOP-Checklist
|
|
|
|
|
- [ ] **Comprehensive testing performed**:
|
|
|
|
@@ -182,6 +198,10 @@ def section_marker_present(body: str, marker: str) -> bool:
|
|
|
|
|
from auto-passing the section-present check. The peer-ack is still
|
|
|
|
|
required, but answering with empty content is captured as a soft
|
|
|
|
|
finding via the section-present test alone.
|
|
|
|
|
|
|
|
|
|
NOTE: we scan forward through blank lines (the markdown-header pattern
|
|
|
|
|
is ## Header\\n\\ncontent) so that a header + blank-line + content
|
|
|
|
|
structure still satisfies the check.
|
|
|
|
|
"""
|
|
|
|
|
if not body or not marker:
|
|
|
|
|
return False
|
|
|
|
@@ -200,13 +220,27 @@ def section_marker_present(body: str, marker: str) -> bool:
|
|
|
|
|
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
|
|
|
|
|
if stripped:
|
|
|
|
|
return True
|
|
|
|
|
# Fall through: check the NEXT line (multi-line answers).
|
|
|
|
|
next_line_end = body.find("\n", line_end + 1)
|
|
|
|
|
if next_line_end < 0:
|
|
|
|
|
next_line_end = len(body)
|
|
|
|
|
next_line = body[line_end + 1:next_line_end]
|
|
|
|
|
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line)
|
|
|
|
|
return bool(stripped_next)
|
|
|
|
|
# Fall through: scan forward, skipping blank-only lines, until we find
|
|
|
|
|
# non-empty content or run out of body. Handles:
|
|
|
|
|
# ## Header ← marker line (empty after marker)
|
|
|
|
|
# ← blank line (skipped)
|
|
|
|
|
# - actual content ← found
|
|
|
|
|
pos = line_end
|
|
|
|
|
while True:
|
|
|
|
|
# Skip the current newline and any additional newlines (blank lines).
|
|
|
|
|
while pos < len(body) and body[pos] == "\n":
|
|
|
|
|
pos += 1
|
|
|
|
|
if pos >= len(body):
|
|
|
|
|
break
|
|
|
|
|
line_end = body.find("\n", pos)
|
|
|
|
|
if line_end < 0:
|
|
|
|
|
line_end = len(body)
|
|
|
|
|
line = body[pos:line_end]
|
|
|
|
|
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
|
|
|
|
|
if stripped:
|
|
|
|
|
return True
|
|
|
|
|
pos = line_end
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@@ -249,7 +283,8 @@ def compute_ack_state(
|
|
|
|
|
user = (c.get("user") or {}).get("login", "")
|
|
|
|
|
if not user:
|
|
|
|
|
continue
|
|
|
|
|
for kind, slug, _note in parse_directives(body, numeric_aliases):
|
|
|
|
|
directives, _na = parse_directives(body, numeric_aliases)
|
|
|
|
|
for kind, slug, _note in directives:
|
|
|
|
|
if not slug:
|
|
|
|
|
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
|
|
|
|
|
continue
|
|
|
|
@@ -301,6 +336,82 @@ def compute_ack_state(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compute_na_state(
|
|
|
|
|
comments: list[dict[str, Any]],
|
|
|
|
|
pr_author: str,
|
|
|
|
|
na_gates: dict[str, dict[str, Any]],
|
|
|
|
|
team_membership_probe: "callable[[str, list[str]], list[str]]",
|
|
|
|
|
) -> dict[str, dict[str, Any]]:
|
|
|
|
|
"""Compute per-gate N/A declaration state.
|
|
|
|
|
|
|
|
|
|
Each comment is processed in chronological order. The most-recent
|
|
|
|
|
N/A directive per (commenter, gate) wins.
|
|
|
|
|
|
|
|
|
|
Returns a dict keyed by gate name:
|
|
|
|
|
{
|
|
|
|
|
"qa-review": {
|
|
|
|
|
"declared": True,
|
|
|
|
|
"declared_by": "core-qa-agent",
|
|
|
|
|
"reason": "CI/non-security-touching",
|
|
|
|
|
"valid": True, # non-author + in required team
|
|
|
|
|
"error": None, # error string if invalid
|
|
|
|
|
},
|
|
|
|
|
...
|
|
|
|
|
}
|
|
|
|
|
Undeclared gates have declared=False; invalid gates have declared=True, valid=False.
|
|
|
|
|
"""
|
|
|
|
|
# Step 1: collapse N/A directives per (commenter, gate) — most recent wins.
|
|
|
|
|
latest_na: dict[tuple[str, str], tuple[str, str]] = {}
|
|
|
|
|
for c in comments:
|
|
|
|
|
body = c.get("body", "") or ""
|
|
|
|
|
user = (c.get("user") or {}).get("login", "")
|
|
|
|
|
if not user:
|
|
|
|
|
continue
|
|
|
|
|
_, na_directives = parse_directives(body, {})
|
|
|
|
|
for gate, reason in na_directives:
|
|
|
|
|
if gate not in na_gates:
|
|
|
|
|
continue
|
|
|
|
|
latest_na[(user, gate)] = (gate, reason)
|
|
|
|
|
|
|
|
|
|
# Step 2: initialise all gates as undeclared.
|
|
|
|
|
result: dict[str, dict[str, Any]] = {
|
|
|
|
|
g: {"declared": False, "declared_by": "", "reason": "", "valid": False, "error": None}
|
|
|
|
|
for g in na_gates
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Step 3: evaluate each gate's most-recent N/A declaration.
|
|
|
|
|
for (user, gate), (gate_name, reason) in latest_na.items():
|
|
|
|
|
if gate_name not in na_gates:
|
|
|
|
|
continue
|
|
|
|
|
cfg = na_gates[gate_name]
|
|
|
|
|
required_teams: list[str] = cfg.get("required_teams", [])
|
|
|
|
|
|
|
|
|
|
entry: dict[str, Any] = {
|
|
|
|
|
"declared": True,
|
|
|
|
|
"declared_by": user,
|
|
|
|
|
"reason": reason,
|
|
|
|
|
"valid": False,
|
|
|
|
|
"error": None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Authors cannot self-declare N/A (gate script enforces same rule).
|
|
|
|
|
if user == pr_author:
|
|
|
|
|
entry["error"] = "self-declare N/A rejected"
|
|
|
|
|
else:
|
|
|
|
|
# Probe team membership: is the declarer in any required team?
|
|
|
|
|
approved = team_membership_probe(f"na:{gate_name}", [user])
|
|
|
|
|
if user in approved:
|
|
|
|
|
entry["valid"] = True
|
|
|
|
|
else:
|
|
|
|
|
# 403 from team API means token owner not in that team.
|
|
|
|
|
# Fail-closed: treat unknown membership as invalid.
|
|
|
|
|
entry["error"] = f"{user} not in required team {required_teams}"
|
|
|
|
|
|
|
|
|
|
result[gate_name] = entry
|
|
|
|
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Gitea API client
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@@ -460,10 +571,29 @@ def _load_config_minimal(path: str) -> dict[str, Any]:
|
|
|
|
|
tier_failure_mode), top-level list of maps (items:), and within an
|
|
|
|
|
item map: scalars + lists of scalars. Does NOT support nested lists,
|
|
|
|
|
YAML anchors, multi-doc, or flow style.
|
|
|
|
|
|
|
|
|
|
Key names containing '/' (e.g. n/a_gates) are handled by using
|
|
|
|
|
rpartition(':') — splitting at the LAST colon so embedded colons
|
|
|
|
|
in the key are preserved.
|
|
|
|
|
"""
|
|
|
|
|
with open(path) as f:
|
|
|
|
|
lines = f.readlines()
|
|
|
|
|
return _parse_minimal_yaml(lines)
|
|
|
|
|
# Preprocess: for lines at indent 0 that contain '/' before ':',
|
|
|
|
|
# use rpartition so the key keeps the '/'. e.g.
|
|
|
|
|
# "n/a_gates:" → key="n/a_gates", val=""
|
|
|
|
|
# "n/a_gates: value" → key="n/a_gates", val="value"
|
|
|
|
|
processed: list[str] = []
|
|
|
|
|
for raw in lines:
|
|
|
|
|
stripped = raw.rstrip("\n")
|
|
|
|
|
indent = len(stripped) - len(stripped.lstrip(" "))
|
|
|
|
|
content = stripped.lstrip(" ")
|
|
|
|
|
if indent == 0 and "/" in content and ":" in content:
|
|
|
|
|
# Use rpartition so the last ':' is the key-value separator.
|
|
|
|
|
key, _, val = content.rpartition(":")
|
|
|
|
|
processed.append(" " * indent + key.strip() + ": " + val.strip())
|
|
|
|
|
else:
|
|
|
|
|
processed.append(stripped)
|
|
|
|
|
return _parse_minimal_yaml(processed)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]: # noqa: C901
|
|
|
|
@@ -800,6 +930,90 @@ def main(argv: list[str] | None = None) -> int:
|
|
|
|
|
extra = " (" + "; ".join(extras) + ")" if extras else ""
|
|
|
|
|
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
|
|
|
|
|
|
|
|
|
|
# ----- N/A gate declarations (RFC#324 §N/A follow-up) -----
|
|
|
|
|
# sop-checklist.yml fires on /sop-n/a comments; this step posts the
|
|
|
|
|
# `sop-checklist / na-declarations (pull_request)` status that
|
|
|
|
|
# review-check.sh reads to waive the Gitea-APPROVE requirement.
|
|
|
|
|
na_gates: dict[str, Any] = cfg.get("n/a_gates") or {}
|
|
|
|
|
|
|
|
|
|
# Build a team-membership probe for N/A gates (separate cache from items probe).
|
|
|
|
|
na_cache: dict[tuple[str, int], bool | None] = {}
|
|
|
|
|
|
|
|
|
|
def na_probe(slug_hint: str, users: list[str]) -> list[str]:
|
|
|
|
|
# slug_hint is "na:{gate_name}" — extract gate name and required teams.
|
|
|
|
|
gate_name = slug_hint.removeprefix("na:")
|
|
|
|
|
gate_cfg = na_gates.get(gate_name, {})
|
|
|
|
|
team_names: list[str] = gate_cfg.get("required_teams", [])
|
|
|
|
|
# Resolve team names → ids.
|
|
|
|
|
team_ids: list[int] = []
|
|
|
|
|
for tn in team_names:
|
|
|
|
|
tid = client.resolve_team_id(args.owner, tn) # noqa: SLF001
|
|
|
|
|
if tid is None:
|
|
|
|
|
code, data = client._req( # noqa: SLF001
|
|
|
|
|
"GET", f"/orgs/{args.owner}/teams"
|
|
|
|
|
)
|
|
|
|
|
if code == 200 and isinstance(data, list):
|
|
|
|
|
for t in data:
|
|
|
|
|
if t.get("name") == tn:
|
|
|
|
|
tid = t.get("id")
|
|
|
|
|
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
|
|
|
|
|
break
|
|
|
|
|
if tid is not None:
|
|
|
|
|
team_ids.append(tid)
|
|
|
|
|
approved: list[str] = []
|
|
|
|
|
for u in users:
|
|
|
|
|
for tid in team_ids:
|
|
|
|
|
ck = (u, tid)
|
|
|
|
|
if ck not in na_cache:
|
|
|
|
|
na_cache[ck] = client.is_team_member(tid, u) # noqa: SLF001
|
|
|
|
|
res = na_cache[ck]
|
|
|
|
|
if res is True:
|
|
|
|
|
approved.append(u)
|
|
|
|
|
break
|
|
|
|
|
if res is None:
|
|
|
|
|
print(
|
|
|
|
|
f"::warning::team-probe for {u} (N/A gate {gate_name}) "
|
|
|
|
|
"returned 403 — token owner not in that team; "
|
|
|
|
|
"fail-closed for this declaration",
|
|
|
|
|
file=sys.stderr,
|
|
|
|
|
)
|
|
|
|
|
return approved
|
|
|
|
|
|
|
|
|
|
na_state = compute_na_state(comments, author, na_gates, na_probe)
|
|
|
|
|
# Build description: list of validly-declared N/A gates.
|
|
|
|
|
na_approved_gates = [
|
|
|
|
|
g for g, entry in na_state.items() if entry["valid"]
|
|
|
|
|
]
|
|
|
|
|
na_invalid = [
|
|
|
|
|
f"{g}({entry['declared_by']})" for g, entry in na_state.items()
|
|
|
|
|
if entry["declared"] and not entry["valid"]
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
if na_approved_gates:
|
|
|
|
|
na_desc = "N/A: " + ", ".join(na_approved_gates)
|
|
|
|
|
elif na_invalid:
|
|
|
|
|
na_desc = "invalid N/A: " + ", ".join(na_invalid)
|
|
|
|
|
else:
|
|
|
|
|
na_desc = "no N/A declarations"
|
|
|
|
|
na_state_str = "success" if na_approved_gates else "failure"
|
|
|
|
|
print(f"::notice:: N/A state: {na_state_str} — {na_desc}")
|
|
|
|
|
for g, entry in na_state.items():
|
|
|
|
|
if entry["declared"]:
|
|
|
|
|
status_flag = "valid" if entry["valid"] else f"invalid: {entry['error']}"
|
|
|
|
|
print(f"::notice:: {g}: declared by {entry['declared_by']} — {status_flag}")
|
|
|
|
|
|
|
|
|
|
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
|
|
|
|
|
|
|
|
|
if not args.dry_run:
|
|
|
|
|
na_context = "sop-checklist / na-declarations (pull_request)"
|
|
|
|
|
client.post_status(
|
|
|
|
|
args.owner, args.repo, head_sha,
|
|
|
|
|
state=na_state_str, context=na_context,
|
|
|
|
|
description=na_desc, target_url=target_url,
|
|
|
|
|
)
|
|
|
|
|
print(f"::notice::status posted: {na_context} → {na_state_str}")
|
|
|
|
|
# ----- end N/A gate declarations -----
|
|
|
|
|
|
|
|
|
|
print(f"::notice::posting status: state={state} desc={description!r}")
|
|
|
|
|
|
|
|
|
|
if args.dry_run:
|
|
|
|
@@ -807,8 +1021,6 @@ def main(argv: list[str] | None = None) -> int:
|
|
|
|
|
if args.exit_on_state:
|
|
|
|
|
return 0 if state in ("success", "pending") else 1
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
|
|
|
|
|
client.post_status(
|
|
|
|
|
args.owner, args.repo, head_sha,
|
|
|
|
|
state=state, context=args.status_context,
|
|
|
|
|