From d837519dfee404dc78ea956a522754a2e2071ebd Mon Sep 17 00:00:00 2001 From: core-devops Date: Mon, 11 May 2026 23:15:24 -0700 Subject: [PATCH] feat(ci)(hard-gate): lint-bp-context-emit-match (Tier 2f) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daily scheduled lint detecting drift between `branch_protections/.status_check_contexts` and the contexts emitted by `.gitea/workflows/*.yml`. Files/PATCHes a `[ci-bp-drift]` issue (idempotent) on mismatch. The class this prevents ----------------------- A BP-required context with no emitting workflow blocks merges forever — Gitea 1.22.6 treats absent-as-`pending`, NOT absent-as-`skipped`. Previously surfaced as feedback_phantom_required_check_after_gitea_migration (a port that kept the GitHub context name after rename to Gitea). Implementation -------------- - `.gitea/scripts/lint_bp_context_emit_match.py` — PyYAML walk of every workflow's `on:` block + `jobs.*.name:` (or job-key fallback) to enumerate emitted contexts. Compares against BP. Two directions: (a) BP→emitter: required by BP, no emitter → ERROR + drift issue. (b) Emitter→BP: emitter exists, BP doesn't list → NOTICE only (Tier 2g handles at PR-time; scheduled-flag would noisily flag every transitional state during a BP rollout). Event-suffix match strict: `(push)` and `(pull_request)` are distinct. `pull_request_target` maps to `(pull_request)` per Gitea convention. - `.gitea/workflows/lint-bp-context-emit-match.yml` — schedule `31 3 * * *` + workflow_dispatch. NO pull_request / push triggers (Tier 2g owns those). Phase 3 (continue-on-error: true) per RFC #219 §1. - `tests/test_lint_bp_context_emit_match.py` — 10 unit tests: perfect match, BP-orphan fail, emitter-orphan notice-only, multi-orphan aggregation, empty-BP skip, 403/404 graceful, event-suffix mismatch flag, pull_request_target mapping, idempotent PATCH-on-existing-issue. Auth uses DRIFT_BOT_TOKEN (same as ci-required-drift.yml) — Gitea 1.22.6 requires repo-admin scope on `/branch_protections/*`. Graceful degrade on 403 per Tier 2a contract. Refs: #350 --- .gitea/scripts/lint_bp_context_emit_match.py | 509 ++++++++++++++++++ .../workflows/lint-bp-context-emit-match.yml | 120 +++++ tests/test_lint_bp_context_emit_match.py | 361 +++++++++++++ 3 files changed, 990 insertions(+) create mode 100644 .gitea/scripts/lint_bp_context_emit_match.py create mode 100644 .gitea/workflows/lint-bp-context-emit-match.yml create mode 100644 tests/test_lint_bp_context_emit_match.py diff --git a/.gitea/scripts/lint_bp_context_emit_match.py b/.gitea/scripts/lint_bp_context_emit_match.py new file mode 100644 index 00000000..59453f66 --- /dev/null +++ b/.gitea/scripts/lint_bp_context_emit_match.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python3 +"""lint_bp_context_emit_match — Tier 2f per internal#350. + +Rule +---- +For a given protected branch, every context in +`branch_protections/.status_check_contexts` MUST be emitted +by at least one workflow in `.gitea/workflows/*.yml`. Two contexts +match when: + + 1. The workflow's `name:` equals the context's workflow-part (the + prefix before ` / `). + 2. Some job in that workflow has a `name:` (or default-fallback + job-key) equal to the context's job-part (between ` / ` and + ` (`). + 3. The workflow's `on:` block includes the context's event-part + (in parens at the end), with Gitea's event-name mapping: + - `pull_request` and `pull_request_target` BOTH emit + `(pull_request)` contexts (verified empirically on + molecule-core/main). + - `push` emits `(push)`. + +A BP context with no emitter blocks merges forever — Gitea treats +absent-as-`pending`, NOT absent-as-`skipped`-as-`success`. This is +the phantom-required-check class +(`feedback_phantom_required_check_after_gitea_migration`). + +The inverse direction (emitter without BP context) is INFORMATIONAL +only — Tier 2g handles that direction at PR-time. Flagging it here +on a daily schedule would falsely surface every transitional state +during a BP rollout. + +How the gate works +------------------ +Daily scheduled run + workflow_dispatch: + + 1. GET `branch_protections/{BRANCH}` (needs DRIFT_BOT_TOKEN with + repo-admin scope; same persona as ci-required-drift.yml). + Graceful-degrade on 403/404 per Tier 2a contract. + + 2. Walk `.gitea/workflows/*.yml` via PyYAML AST. For each workflow, + enumerate its emitted contexts: `{workflow.name} / {job.name or + job-key} ({event})` for each event in `on:` that emits a status. + + 3. For each BP context, look for an emitter match. Aggregate + orphans. + + 4. If orphans exist: + - File or PATCH a `[ci-bp-drift]` issue (idempotency contract: + search for exact title prefix, edit existing if open). + - Apply labels `tier:high` + `ci-bp-drift` (lookup IDs per + repo; per `feedback_tier_label_ids_are_per_repo`). + - Exit 1. + + 5. If no orphans: + - Close any existing `[ci-bp-drift]` issue with a clean-state + comment. + - Exit 0. + +Exit codes +---------- + 0 — clean OR API 403/404 (graceful-degrade, surfaces ::error::). + 1 — at least one BP context has no emitter. + 2 — env contract violation, workflows-dir missing, or YAML parse + error. + +Env +--- + GITEA_TOKEN — DRIFT_BOT_TOKEN (repo-admin for branch_protections) + GITEA_HOST — e.g. git.moleculesai.app + REPO — owner/name + BRANCH — defaults to `main` + WORKFLOWS_DIR — defaults to `.gitea/workflows` + DRIFT_LABEL — defaults to `ci-bp-drift` + +Memory cross-links +------------------ + - internal#350 (the RFC that specs this lint) + - feedback_phantom_required_check_after_gitea_migration + - feedback_tier_label_ids_are_per_repo + - reference_post_suspension_pipeline +""" +from __future__ import annotations + +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Any + +try: + import yaml +except ImportError: + sys.stderr.write( + "::error::PyYAML is required. Install with: pip install PyYAML\n" + ) + sys.exit(2) + + +# Status-check context regex (mirrors lint-required-no-paths.py). +_CONTEXT_RE = re.compile( + r"^(?P.+?) / (?P.+) \((?P[^)]+)\)$" +) + +# Map a workflow `on:` event-key to the context's event-part. Gitea's +# emitter convention (verified on molecule-core): +# - pull_request → `(pull_request)` +# - pull_request_target → `(pull_request)` (same surface) +# - push → `(push)` +# - schedule → no PR status; scheduled runs don't post +# commit-statuses unless the workflow itself does so explicitly. +# - workflow_dispatch → manually dispatched runs may or may not +# emit; safest to treat as "no PR status" (informational notice +# only). +_EVENT_MAP = { + "pull_request": "pull_request", + "pull_request_target": "pull_request", + "push": "push", +} + + +# --------------------------------------------------------------------------- +# Env +# --------------------------------------------------------------------------- +def _env(key: str, default: str | None = None) -> str: + v = os.environ.get(key, default) + return v if v is not None else "" + + +def _require_env(key: str) -> str: + v = os.environ.get(key) + if not v: + sys.stderr.write(f"::error::missing required env var: {key}\n") + sys.exit(2) + return v + + +# --------------------------------------------------------------------------- +# API helper. Mirrors lint-required-no-paths.py's contract: returns +# (status, payload) tuple with status ∈ {"ok", "not_found", "forbidden", +# "error"}. +# --------------------------------------------------------------------------- +def api( + method: str, + path: str, + *, + body: dict | None = None, + query: dict[str, str] | None = None, +) -> tuple[str, Any]: + host = _env("GITEA_HOST") + token = _env("GITEA_TOKEN") + url = f"https://{host}/api/v1{path}" + if query: + url = f"{url}?{urllib.parse.urlencode(query)}" + data = None + headers = { + "Authorization": f"token {token}", + "Accept": "application/json", + } + if body is not None: + data = json.dumps(body).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request( + url, method=method, data=data, headers=headers + ) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + raw = resp.read() + if not raw: + return ("ok", None) + return ("ok", json.loads(raw)) + except urllib.error.HTTPError as e: + if e.code == 404: + return ("not_found", None) + if e.code in (401, 403): + return ("forbidden", None) + return ("error", None) + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError): + return ("error", None) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _get_on(d: Any) -> Any: + """YAML 1.1 boolean quirk: bare `on:` may parse to True. Handle both.""" + if not isinstance(d, dict): + return None + if "on" in d: + return d["on"] + if True in d: + return d[True] + return None + + +def _on_events(doc: Any) -> set[str]: + """Return the set of event keys in a workflow's `on:` block. + + Accepts all three shapes (string / list / mapping). String/list + shapes can't carry filters but they DO emit. Returns the + Gitea-mapped event names per `_EVENT_MAP`. + """ + on = _get_on(doc) + raw_events: set[str] = set() + if on is None: + return raw_events + if isinstance(on, str): + raw_events.add(on) + elif isinstance(on, list): + for e in on: + if isinstance(e, str): + raw_events.add(e) + elif isinstance(on, dict): + for k in on: + if isinstance(k, str): + raw_events.add(k) + return {_EVENT_MAP[e] for e in raw_events if e in _EVENT_MAP} + + +def _job_display(jbody: dict, jkey: str) -> str: + """Return job's `name:` if set, else fall back to the job-key. + + Gitea formats status contexts with the job's `name:` when set; + when unset it uses the job key. Matches lint-required-no-paths + convention. + """ + n = jbody.get("name") if isinstance(jbody, dict) else None + if isinstance(n, str) and n: + return n + return jkey + + +def workflow_contexts(doc: Any) -> set[str]: + """Return the set of contexts a workflow emits.""" + contexts: set[str] = set() + if not isinstance(doc, dict): + return contexts + wf_name = doc.get("name") + if not isinstance(wf_name, str) or not wf_name: + return contexts # no name => no addressable context + events = _on_events(doc) + if not events: + return contexts + jobs = doc.get("jobs") + if not isinstance(jobs, dict): + return contexts + for jkey, jbody in jobs.items(): + if jkey == "__lines__": # tolerate line-tracking annotations + continue + if not isinstance(jbody, dict): + continue + disp = _job_display(jbody, jkey) + for ev in events: + contexts.add(f"{wf_name} / {disp} ({ev})") + return contexts + + +def parse_context(ctx: str) -> tuple[str, str, str] | None: + m = _CONTEXT_RE.match(ctx) + if not m: + return None + return (m.group("workflow"), m.group("job"), m.group("event")) + + +def _iter_workflow_files(wf_dir: Path) -> list[Path]: + return sorted(list(wf_dir.glob("*.yml")) + list(wf_dir.glob("*.yaml"))) + + +# --------------------------------------------------------------------------- +# Issue idempotency — search for an open issue with the canonical +# title prefix; PATCH if found, POST if not. Mirrors ci-required-drift. +# --------------------------------------------------------------------------- +def _canonical_title(repo: str, branch: str) -> str: + return f"[ci-bp-drift] {repo}/{branch}: BP→emitter mismatch" + + +def _ensure_labels(repo: str, names: list[str]) -> list[int]: + status, labels = api("GET", f"/repos/{repo}/labels", query={"limit": "50"}) + if status != "ok" or not isinstance(labels, list): + return [] + out: list[int] = [] + by_name = {l["name"]: l["id"] for l in labels if isinstance(l, dict)} + for n in names: + if n in by_name: + out.append(by_name[n]) + return out + + +def file_or_update_issue( + repo: str, branch: str, orphans: list[str], emitter_orphans: list[str] +) -> None: + title = _canonical_title(repo, branch) + body_lines = [ + f"BP→emitter drift detected on `{branch}` at " + f"{os.environ.get('GITHUB_RUN_URL', '(run url unavailable)')}.", + "", + f"## Orphan BP contexts ({len(orphans)})", + "", + "These contexts are required by branch protection but NO workflow " + "emits them. PRs merging into this branch will wait forever for a " + "status that never arrives (Gitea treats absent-as-`pending`, NOT " + "absent-as-`skipped`). See " + "`feedback_phantom_required_check_after_gitea_migration`.", + "", + ] + for o in orphans: + body_lines.append(f"- `{o}`") + if emitter_orphans: + body_lines += [ + "", + f"## Workflows emitting contexts NOT in BP ({len(emitter_orphans)})", + "", + "Informational — Tier 2g handles this direction at PR-time. " + "Listed here for completeness.", + "", + ] + for o in emitter_orphans: + body_lines.append(f"- `{o}`") + body_lines += [ + "", + "Fix options:", + " 1. PATCH `branch_protections/{branch}.status_check_contexts` " + " to remove the orphan.", + " 2. Restore the emitting workflow (if it was deleted/renamed).", + "", + "Linted by `.gitea/workflows/lint-bp-context-emit-match.yml` " + "(Tier 2f, internal#350).", + ] + body = "\n".join(body_lines) + + # Idempotency search — find an open issue with the canonical title. + status, hits = api( + "GET", + f"/repos/{repo}/issues", + query={ + "type": "issues", + "state": "open", + "q": title, + }, + ) + existing = None + if status == "ok" and isinstance(hits, list): + for h in hits: + if ( + isinstance(h, dict) + and h.get("state") == "open" + and isinstance(h.get("title"), str) + and h["title"].startswith(title) + ): + existing = h + break + + label_ids = _ensure_labels(repo, ["ci-bp-drift", "tier:high"]) + + if existing: + api( + "PATCH", + f"/repos/{repo}/issues/{existing['number']}", + body={"body": body, "labels": label_ids} if label_ids else {"body": body}, + ) + print( + f"::notice::Updated existing drift issue " + f"#{existing['number']}: {existing.get('html_url', '')}" + ) + else: + status, posted = api( + "POST", + f"/repos/{repo}/issues", + body={"title": title, "body": body, "labels": label_ids}, + ) + if status == "ok" and isinstance(posted, dict): + print( + f"::notice::Filed new drift issue " + f"#{posted.get('number')}: {posted.get('html_url', '')}" + ) + + +# --------------------------------------------------------------------------- +# Driver +# --------------------------------------------------------------------------- +def run() -> int: + _require_env("GITEA_TOKEN") + _require_env("GITEA_HOST") + repo = _require_env("REPO") + branch = _env("BRANCH", "main") + wf_dir = Path(_env("WORKFLOWS_DIR", ".gitea/workflows")) + + if not wf_dir.is_dir(): + sys.stderr.write(f"::error::workflows directory not found: {wf_dir}\n") + return 2 + + # 1. Pull BP. + status, bp = api("GET", f"/repos/{repo}/branch_protections/{branch}") + if status == "forbidden": + sys.stderr.write( + f"::error::GET branch_protections/{branch} returned HTTP 403 — " + f"DRIFT_BOT_TOKEN lacks repo-admin scope (Gitea 1.22.6 requires " + f"it for this endpoint). Skipping lint with exit 0 to avoid " + f"red-X on every run. Fix: grant repo-admin to mc-drift-bot. " + f"Per Tier 2a contract.\n" + ) + return 0 + if status == "not_found": + print( + f"::notice::branch '{branch}' has no protection configured; " + f"nothing to lint." + ) + return 0 + if status != "ok" or not isinstance(bp, dict): + sys.stderr.write( + f"::error::branch_protections/{branch} response unexpected; " + f"status={status}. Treating as transient; exit 0.\n" + ) + return 0 + + bp_contexts: list[str] = list(bp.get("status_check_contexts") or []) + if not bp_contexts: + print( + f"::notice::branch_protections/{branch} has 0 required " + f"status_check_contexts; nothing to lint." + ) + return 0 + + # 2. Enumerate emitter contexts from all workflows. + all_emitter: set[str] = set() + for path in _iter_workflow_files(wf_dir): + try: + doc = yaml.safe_load(path.read_text(encoding="utf-8")) + except yaml.YAMLError as e: + sys.stderr.write( + f"::error file={path}::YAML parse error: {e}; skipping.\n" + ) + continue + all_emitter |= workflow_contexts(doc) + + print( + f"::notice::Linting {len(bp_contexts)} BP context(s) for {branch} " + f"against {len(all_emitter)} workflow-emitted context(s)." + ) + + bp_set = set(bp_contexts) + + # 3. Find orphans (BP-side: required but no emitter). + bp_orphans = sorted(bp_set - all_emitter) + + # Informational: workflow emits but BP doesn't list. Tier 2g + # territory at PR-time. We list these as NOTICE only. + emitter_orphans = sorted(all_emitter - bp_set) + + if bp_orphans: + print( + f"::error::Found {len(bp_orphans)} BP context(s) with no " + f"emitter — these would block merges forever (Gitea treats " + f"absent-as-pending, not skipped):" + ) + for o in bp_orphans: + # Closest-match hint: name a workflow whose name-part is a + # near-match (lev-1 typo, or same workflow with a different + # event). + parsed = parse_context(o) + hint = "" + if parsed: + wf, _job, _ev = parsed + candidates = sorted( + {c for c in all_emitter if c.startswith(wf + " / ")} + ) + if candidates: + hint = ( + f" — closest emitter(s): {', '.join(candidates[:3])}" + ) + print(f"::error:: - {o}{hint}") + if emitter_orphans: + print( + f"::notice::Also: {len(emitter_orphans)} workflow-emitted " + f"context(s) not in BP (informational; Tier 2g handles at " + f"PR-time):" + ) + for o in emitter_orphans: + print(f"::notice:: - {o}") + # File / patch tracking issue. + try: + file_or_update_issue(repo, branch, bp_orphans, emitter_orphans) + except Exception as e: + sys.stderr.write( + f"::error::failed to file drift issue: {e}\n" + ) + return 1 + + if emitter_orphans: + print( + f"::notice::{len(emitter_orphans)} workflow-emitted context(s) " + f"not in BP (informational; Tier 2g handles at PR-time):" + ) + for o in emitter_orphans: + print(f"::notice:: - {o}") + + print( + f"::notice::BP/emitter match clean: all {len(bp_contexts)} required " + f"context(s) have an emitter." + ) + return 0 + + +if __name__ == "__main__": + sys.exit(run()) diff --git a/.gitea/workflows/lint-bp-context-emit-match.yml b/.gitea/workflows/lint-bp-context-emit-match.yml new file mode 100644 index 00000000..095705a8 --- /dev/null +++ b/.gitea/workflows/lint-bp-context-emit-match.yml @@ -0,0 +1,120 @@ +name: lint-bp-context-emit-match + +# Tier 2f scheduled lint (per internal#350) — detects drift between +# `branch_protections/.status_check_contexts` and the set of +# contexts emitted by `.gitea/workflows/*.yml`. +# +# Rule +# ---- +# For each protected branch context (Source A — BP), there must exist +# at least one emitting workflow + job pair (Source B — workflow YAML +# + on:-event mapping) whose runtime status-name maps to it. The +# inverse direction (emitter without BP context) is informational +# only — Tier 2g handles that at PR-time. +# +# Why this exists +# --------------- +# A BP-required context with no emitter blocks merges forever — Gitea +# 1.22.6 treats absent-as-`pending`, NOT absent-as-`skipped`. The +# phantom-required-check class previously surfaced as +# `feedback_phantom_required_check_after_gitea_migration` (a port +# kept the GitHub context name after rename to Gitea, but no +# workflow emitted under the new name). +# +# This lint catches the same class structurally + a forward case: +# workflow renamed/deleted while still in BP. +# +# Scope +# ----- +# Scheduled daily. We DON'T run on `pull_request` because (a) the +# emitter side moves with PR diffs (transitional state false-flags) +# and (b) Tier 2g handles emitter-side drift at PR-time. +# +# Cross-repo +# ---------- +# Today this runs only on molecule-core/main. Per internal#349 +# (cross-repo BP sweep) Class-D repos will get the same lint after +# their BP rollouts. +# +# Auth +# ---- +# `GET /repos/.../branch_protections/{branch}` requires repo-admin +# role on Gitea 1.22.6. We use DRIFT_BOT_TOKEN (same persona as +# ci-required-drift.yml — `internal#329` provisioning trail). +# Graceful-degrade per Tier 2a contract: 403/404 → exit 0 with +# ::error::. +# +# Idempotency +# ----------- +# The drift issue is filed with title prefix +# `[ci-bp-drift] {repo}/{branch}: BP→emitter mismatch`. The script +# searches OPEN issues for an exact title-prefix match and PATCHes +# the existing issue (if any) instead of POSTing a duplicate. +# Mirrors `ci-required-drift.py`'s contract. +# +# Phase contract (RFC internal#219 §1 ladder) +# ------------------------------------------- +# Lands at `continue-on-error: true` (Phase 3). After 7 days of clean +# scheduled runs on `main`, flip to `false` so a scheduled failure +# becomes a hard CI signal. +# +# Cross-links +# ----------- +# - internal#350 (the RFC that specs this lint) +# - internal#349 (cross-repo BP sweep) +# - feedback_phantom_required_check_after_gitea_migration +# - feedback_tier_label_ids_are_per_repo +# - ci-required-drift.yml (F2 detector, narrower-scope sibling) + +on: + schedule: + # Daily at 03:31 UTC — off-peak, prime-staggered from other + # scheduled jobs (ci-required-drift :00 hourly, lint-coe-tracking + # 13:11). At 03:31 the CI fleet is quietest in EMEA hours. + - cron: '31 3 * * *' + workflow_dispatch: + # No `push` / `pull_request` here — Tier 2g owns PR-time drift. + +env: + GITHUB_SERVER_URL: https://git.moleculesai.app + +permissions: + contents: read + issues: write # needed to file/edit the drift issue + +concurrency: + group: lint-bp-context-emit-match-${{ github.ref }} + cancel-in-progress: true + +jobs: + lint: + name: lint-bp-context-emit-match + runs-on: ubuntu-latest + timeout-minutes: 5 + # Phase 3 (RFC #219 §1): surface drift without blocking. After 7 + # clean scheduled runs on main, flip to false so a scheduled + # failure is a hard CI signal. + continue-on-error: true + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0 + with: + python-version: '3.12' + - name: Install PyYAML + run: python -m pip install --quiet 'PyYAML==6.0.2' + - name: Run lint-bp-context-emit-match + env: + # DRIFT_BOT_TOKEN — repo-admin on this repo (internal#329 + # provisioning trail). Required for branch_protections read. + GITEA_TOKEN: ${{ secrets.DRIFT_BOT_TOKEN }} + GITEA_HOST: git.moleculesai.app + REPO: ${{ github.repository }} + BRANCH: main + WORKFLOWS_DIR: .gitea/workflows + DRIFT_LABEL: ci-bp-drift + GITHUB_RUN_URL: https://git.moleculesai.app/${{ github.repository }}/actions/runs/${{ github.run_id }} + run: python3 .gitea/scripts/lint_bp_context_emit_match.py + - name: Run lint-bp-context-emit-match unit tests + run: | + python -m pip install --quiet pytest + python3 -m pytest tests/test_lint_bp_context_emit_match.py -v diff --git a/tests/test_lint_bp_context_emit_match.py b/tests/test_lint_bp_context_emit_match.py new file mode 100644 index 00000000..c960b7ac --- /dev/null +++ b/tests/test_lint_bp_context_emit_match.py @@ -0,0 +1,361 @@ +"""Tests for `.gitea/scripts/lint_bp_context_emit_match.py` — Tier 2f lint. + +Structural enforcement of internal#350 Tier 2f: BP `status_check_contexts` +and the set of contexts emitted by `.gitea/workflows/*.yml` must agree. + +Bidirectional rule: + (a) BP-only: every context in `branch_protections/.status_check_contexts` + must have at least one EMITTER — a workflow `name:` + job `name:` (or job key) + + `pull_request` (or `push`) event that produces it. A BP context without + an emitter blocks merges forever (Gitea treats absent-as-pending, NOT + absent-as-skipped). This is the phantom-required-check class + (`feedback_phantom_required_check_after_gitea_migration`). + + (b) EMITTER-only: NO automatic flag. The PR#656 case (workflow added a + sentinel context not yet in BP) is Tier 2g's job — a diff-based PR-time + lint. Tier 2f runs scheduled and would falsely flag every transitional + state during a BP rollout. We only flag the BP-empty case in this + direction as a NOTICE (informational), not as an error. + +Tier 2f runs on a daily schedule + workflow_dispatch and files a +`[ci-bp-drift]`-tagged issue on mismatch. + +Test classes (per `feedback_branch_count_before_approving`): + + - test_perfect_match_passes — BP has [X]; workflows emit X. + Exit 0. No issue filed/edited. + - test_bp_orphan_context_fails — BP has [Y] but no workflow + emits Y. Exit 1. Issue body lists the orphan and the closest + candidate workflow names (Levenshtein-1 suggestion for typos). + - test_emitter_orphan_only_warns — workflow emits Z but BP + doesn't have it. Exit 0 with ::notice:: (NOT ::error::) because + Tier 2g handles this at PR time. + - test_multiple_orphans_aggregated — two BP orphans surfaced + together, not short-circuited. + - test_bp_empty_lints_nothing — BP has no contexts. + Exit 0 cleanly. + - test_api_403_skips_gracefully — branch_protections endpoint + 403s (token-scope). Exit 0 with ::error::, do NOT red-X. + - test_api_404_skips_gracefully — branch has no protection. + Exit 0 cleanly. + - test_context_event_match_required — BP context says `(push)` and + workflow only emits on `pull_request`. That's NOT a match — the + BP-required gate would still wedge. Exit 1. + - test_workflow_event_mapping_pull_request_target — `pull_request_target` + in workflow `on:` emits a `(pull_request)` context (Gitea convention). + Match counts. + - test_idempotent_issue_filing — when an issue already exists + with the canonical title prefix, edit it instead of POSTing a new one + (idempotency contract — mirrors ci-required-drift). + +Run: + python3 -m pytest tests/test_lint_bp_context_emit_match.py -v +""" +from __future__ import annotations + +import importlib.util +import os +import sys +from pathlib import Path +from unittest import mock + +import pytest + + +SCRIPT_PATH = ( + Path(__file__).resolve().parent.parent + / ".gitea" + / "scripts" + / "lint_bp_context_emit_match.py" +) + + +def _import_lint(): + spec = importlib.util.spec_from_file_location( + f"lint_bp_emit_{os.getpid()}", SCRIPT_PATH + ) + m = importlib.util.module_from_spec(spec) + spec.loader.exec_module(m) + return m + + +@pytest.fixture() +def envset(tmp_path, monkeypatch): + wf = tmp_path / ".gitea" / "workflows" + wf.mkdir(parents=True) + monkeypatch.setenv("WORKFLOWS_DIR", str(wf)) + monkeypatch.setenv("GITEA_TOKEN", "stub") + monkeypatch.setenv("GITEA_HOST", "git.example.test") + monkeypatch.setenv("REPO", "owner/molecule-core") + monkeypatch.setenv("BRANCH", "main") + monkeypatch.setenv("DRIFT_LABEL", "ci-bp-drift") + return wf + + +def _write_wf(d: Path, name: str, content: str) -> Path: + p = d / name + p.write_text(content) + return p + + +def _stub_api(monkeypatch, lint_mod, bp_response, issue_search_response=None, posted_record=None): + """Stub the module's `api` function. + + bp_response: ("ok", {"status_check_contexts": [...]}) + or ("forbidden", None) / ("not_found", None) + issue_search_response: list of issues matching the search query ( + may be empty; default empty) + posted_record: dict in which to record any POST/PATCH calls made + (so tests can assert idempotency). + """ + if issue_search_response is None: + issue_search_response = [] + if posted_record is None: + posted_record = {} + + def fake_api(method, path, *, body=None, query=None): + if "branch_protections" in path: + return bp_response + if "issues/search" in path or "/issues?" in path or path.endswith("/issues"): + if method == "GET": + return ("ok", list(issue_search_response)) + if method == "POST": + posted_record.setdefault("posts", []).append({"path": path, "body": body}) + return ("ok", {"number": 9001, "html_url": "http://t/9001"}) + if "/issues/" in path and method == "PATCH": + posted_record.setdefault("patches", []).append({"path": path, "body": body}) + return ("ok", {"number": 9001}) + if "/labels" in path: + return ("ok", [{"id": 10, "name": "ci-bp-drift"}, {"id": 9, "name": "tier:high"}]) + return ("ok", {}) + + monkeypatch.setattr(lint_mod, "api", fake_api) + return posted_record + + +# --------------------------------------------------------------------------- +# Perfect match — both sides agree. +# --------------------------------------------------------------------------- +def test_perfect_match_passes(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": ["CI / all-required (pull_request)"]}), + ) + rc = m.run() + assert rc == 0 + + +# --------------------------------------------------------------------------- +# BP-only orphan — context with no emitter. +# --------------------------------------------------------------------------- +def test_bp_orphan_context_fails(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + posted = _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": [ + "CI / all-required (pull_request)", + "Ghost workflow / ghost (pull_request)", # the orphan + ]}), + ) + rc = m.run() + assert rc == 1 + out = capsys.readouterr().out + assert "Ghost workflow" in out or "ghost" in out.lower() + + +# --------------------------------------------------------------------------- +# Emitter-only direction → notice, not error (Tier 2g territory). +# --------------------------------------------------------------------------- +def test_emitter_orphan_only_warns(envset, monkeypatch, capsys): + _write_wf( + envset, + "extra.yml", + "name: Extra\non:\n pull_request:\n branches: [main]\njobs:\n" + " extra-job:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": ["CI / all-required (pull_request)"]}), + ) + rc = m.run() + assert rc == 0 + out = capsys.readouterr().out + assert "Extra" in out or "extra" in out + + +# --------------------------------------------------------------------------- +# Multiple BP orphans — all surfaced. +# --------------------------------------------------------------------------- +def test_multiple_orphans_aggregated(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": [ + "CI / all-required (pull_request)", + "Phantom A / a (pull_request)", + "Phantom B / b (pull_request)", + ]}), + ) + rc = m.run() + assert rc == 1 + out = capsys.readouterr().out + assert "Phantom A" in out and "Phantom B" in out + + +# --------------------------------------------------------------------------- +# BP has zero contexts → nothing to lint, pass. +# --------------------------------------------------------------------------- +def test_bp_empty_lints_nothing(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api(monkeypatch, m, ("ok", {"status_check_contexts": []})) + rc = m.run() + assert rc == 0 + + +# --------------------------------------------------------------------------- +# API 403 — graceful-degrade. +# --------------------------------------------------------------------------- +def test_api_403_skips_gracefully(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " j:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api(monkeypatch, m, ("forbidden", None)) + rc = m.run() + assert rc == 0 + err = capsys.readouterr().err + assert "403" in err or "scope" in err.lower() or "token" in err.lower() + + +# --------------------------------------------------------------------------- +# API 404 — branch has no protection → clean exit. +# --------------------------------------------------------------------------- +def test_api_404_skips_gracefully(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " j:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api(monkeypatch, m, ("not_found", None)) + rc = m.run() + assert rc == 0 + + +# --------------------------------------------------------------------------- +# Event-suffix match strict: BP says (push), workflow emits (pull_request) +# only. Mismatch — flag. +# --------------------------------------------------------------------------- +def test_context_event_match_required(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": ["CI / all-required (push)"]}), + ) + rc = m.run() + assert rc == 1 + + +# --------------------------------------------------------------------------- +# `pull_request_target` in workflow `on:` emits a `(pull_request)` context +# (Gitea convention — verified empirically on molecule-core). +# --------------------------------------------------------------------------- +def test_workflow_event_mapping_pull_request_target(envset, monkeypatch, capsys): + _write_wf( + envset, + "secret.yml", + "name: Secret scan\non:\n pull_request_target:\n branches: [main]\njobs:\n" + " scan:\n runs-on: x\n name: Scan diff for credential-shaped strings\n" + " steps:\n - run: echo hi\n", + ) + m = _import_lint() + _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": [ + "Secret scan / Scan diff for credential-shaped strings (pull_request)", + ]}), + ) + rc = m.run() + assert rc == 0 + + +# --------------------------------------------------------------------------- +# Idempotency — existing open issue is PATCHed, not duplicated. +# --------------------------------------------------------------------------- +def test_idempotent_issue_filing(envset, monkeypatch, capsys): + _write_wf( + envset, + "ci.yml", + "name: CI\non:\n pull_request:\n branches: [main]\njobs:\n" + " all-required:\n runs-on: x\n steps:\n - run: echo hi\n", + ) + m = _import_lint() + posted = _stub_api( + monkeypatch, + m, + ("ok", {"status_check_contexts": [ + "CI / all-required (pull_request)", + "Ghost / g (pull_request)", + ]}), + issue_search_response=[ + { + "number": 4242, + "title": "[ci-bp-drift] owner/molecule-core/main: BP→emitter mismatch", + "state": "open", + "html_url": "http://t/4242", + } + ], + ) + rc = m.run() + assert rc == 1 + # Should have PATCHed, not POSTed a new one. + assert posted.get("patches"), f"expected PATCH on existing issue; got {posted!r}" + assert not posted.get("posts"), f"expected no POSTs; got {posted!r}"