feat(internal#219 §4+§6): port ci-required-drift + audit-force-merge sidecar from CP

Phase 2b+c port of molecule-controlplane PR#112 (SHA 0adf2098) to
molecule-core, per RFC internal#219 §4 (jobs ↔ protection drift) + §6
(audit env ↔ protection drift).

## What this adds

1. .gitea/workflows/ci-required-drift.yml — hourly cron (':17') +
   workflow_dispatch. AST-walks ci.yml, branch_protections, and
   audit-force-merge.yml's REQUIRED_CHECKS env. Files/updates a
   [ci-drift] issue idempotent by title when any pair diverges.

2. .gitea/scripts/ci-required-drift.py — verbatim from CP. PyYAML-based
   AST detector (NOT grep-by-name), per feedback_behavior_based_ast_gates.
   Five drift classes: F1, F1b, F2, F3a, F3b.

3. .gitea/workflows/audit-force-merge.yml — reconcile with CP's
   structure. Moves permissions: to workflow level, adds base.sha-
   pinning rationale, links to drift-detect, and updates REQUIRED_CHECKS
   to current branch_protections/main verbatim (2 contexts).

4. tests/test_ci_required_drift.py — 17 pytest cases, verbatim from CP.
   Stdlib + PyYAML only. Covers F1/F1b/F2/F3a/F3b, happy path, the
   idempotent-PATCH path, the MUST-FIX find_open_issue() raise-on-
   transient regression, the --dry-run flag, and api() error contracts.

## Adaptations from CP#112

- secrets.GITEA_TOKEN → secrets.SOP_TIER_CHECK_TOKEN (molecule-core's
  established read-only token name, used by sop-tier-check and
  audit-force-merge already).
- DRIFT_LABEL tier:high resolves to label id 9 on core (verified
  2026-05-11) vs id 10 on CP.
- REQUIRED_CHECKS env initialized to molecule-core's actual main
  protection set (2 contexts: Secret scan + sop-tier-check), not CP's
  (3 contexts incl. packer-ascii-gate + all-required).
- Comment block flags that the 'all-required' sentinel does NOT yet
  exist in molecule-core's ci.yml (RFC §4 Phase 4 adds it). Until
  then, the detector exits 3 with ::error:: 'sentinel job not found'.
  Verified locally: the workflow will be red on the cron until Phase 4
  lands — that's intentional + louder than a silent issue.

## Verification

- 17/17 pytest cases green locally (Python 3.13, PyYAML 6.0.3).
- Hostile self-review: removing the script makes all 17 tests ERROR
  with FileNotFoundError, confirming they exercise the actual
  implementation (not happy-path shape-matching).
- python3 -m py_compile + bash -n + yaml.safe_load all pass.
- Initial dry-run against real molecule-core ci.yml: exits 3 with
  ::error::sentinel job 'all-required' not found — expected, Phase 4
  will add it.

## What does NOT change

- audit-force-merge.sh is byte-identical to CP's — no change needed.
- No branch protection mutation (that's Phase 4, separate PR).
- No CI workflow restructuring (PR#372 already did that).

RFC: molecule-ai/internal#219
Source: molecule-controlplane@0adf2098 (PR #112)
This commit is contained in:
claude-ceo-assistant 2026-05-11 00:35:25 -07:00
parent cb716f9649
commit a8b2cf948d
4 changed files with 1307 additions and 23 deletions

View File

@ -0,0 +1,591 @@
#!/usr/bin/env python3
"""ci-required-drift — RFC internal#219 §4 + §6.
Detects drift between three sources of "what counts as a required check"
for this repo, files (or updates) a `[ci-drift]` Gitea issue when any
pair diverges.
Sources:
A. `.gitea/workflows/ci.yml` jobs (CI source the actual job set)
B. `status_check_contexts` in branch_protections (the merge gate)
C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env)
Three failure classes:
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
doesn't gate it, so a red job on that name can sneak through.
Ignores jobs whose `if:` references `github.event_name` (those
run only on specific events and may be `skipped` legitimately).
F2 Context in (B) corresponds to no emitter i.e. there's no job
in ci.yml whose runtime status-name maps to that context.
A stale required-check name is silent: protection demands a
green it never receives, but Gitea treats absent-as-pending,
not absent-as-red. The gate degrades to advisory.
F3 (B) and (C) are not set-equal. Audit env wider than protection
audit flags non-force-merges as force; narrower real
force-merges are missed.
Idempotency:
Searches OPEN issues by exact title prefix
`[ci-drift] {repo}/{branch}: ` and either edits the existing one
(if any) or POSTs a new one. Never spawns duplicates.
Behavior-based AST gate per `feedback_behavior_based_ast_gates`:
- Job set comes from PyYAML parse of jobs:* keys
- Sentinel needs from PyYAML parse of jobs[sentinel].needs (a list)
- Audit env from PyYAML parse, NOT grep so reformatting the YAML
(block-scalar `|` vs flow-style list) does not break the gate
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
import yaml # PyYAML 6.0.2 — installed by the workflow before this runs.
# --------------------------------------------------------------------------
# Environment
# --------------------------------------------------------------------------
def env(key: str, *, required: bool = True, default: str | None = None) -> str:
val = os.environ.get(key, default)
if required and not val:
sys.stderr.write(f"::error::missing required env var: {key}\n")
sys.exit(2)
return val or ""
GITEA_TOKEN = env("GITEA_TOKEN", required=False)
GITEA_HOST = env("GITEA_HOST", required=False)
REPO = env("REPO", required=False)
BRANCHES = env("BRANCHES", required=False).split()
SENTINEL_JOB = env("SENTINEL_JOB", required=False)
AUDIT_WORKFLOW_PATH = env("AUDIT_WORKFLOW_PATH", required=False)
CI_WORKFLOW_PATH = env("CI_WORKFLOW_PATH", required=False)
DRIFT_LABEL = env("DRIFT_LABEL", required=False)
OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "")
API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
def _require_runtime_env() -> None:
"""Enforce env contract — called from `main()` only. Tests import
individual functions without setting the full env contract."""
for key in (
"GITEA_TOKEN",
"GITEA_HOST",
"REPO",
"BRANCHES",
"SENTINEL_JOB",
"AUDIT_WORKFLOW_PATH",
"CI_WORKFLOW_PATH",
"DRIFT_LABEL",
):
if not os.environ.get(key):
sys.stderr.write(f"::error::missing required env var: {key}\n")
sys.exit(2)
# --------------------------------------------------------------------------
# Tiny HTTP helper (no requests dependency)
# --------------------------------------------------------------------------
class ApiError(RuntimeError):
"""Raised when a Gitea API call cannot be trusted to have succeeded.
Covers non-2xx HTTP status AND 2xx with an unparseable JSON body on
endpoints that are documented to return JSON (search/read). Callers
that swallow this and proceed would risk e.g. creating duplicate
`[ci-drift]` issues when a transient 500 hides an existing match.
The cron retries hourly; one fail-loud cycle is fine silent
duplicate creation is not (per Five-Axis review on PR #112).
"""
def api(
method: str,
path: str,
*,
body: dict | None = None,
query: dict[str, str] | None = None,
expect_json: bool = True,
) -> tuple[int, Any]:
"""Tiny HTTP helper around urllib.
Raises ApiError on any non-2xx response. Callers that want
best-effort semantics (e.g. label-apply) must `try/except ApiError`
explicitly making the failure-soft path opt-in rather than the
default closes the duplicate-issue regression class.
For 2xx responses with a JSON body that fails to parse, raises
ApiError when `expect_json=True` (the default for read-shaped
paths). On endpoints that legitimately return non-JSON success
bodies (e.g. some Gitea create echoes see
`feedback_gitea_create_api_unparseable_response`), callers may pass
`expect_json=False` to accept a `_raw` fallthrough but they MUST
then verify success via a follow-up GET, not by trusting the body.
"""
url = f"{API}{path}"
if query:
url = f"{url}?{urllib.parse.urlencode(query)}"
data = None
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Accept": "application/json",
}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
status = resp.status
except urllib.error.HTTPError as e:
raw = e.read()
status = e.code
if not (200 <= status < 300):
snippet = raw[:500].decode("utf-8", errors="replace") if raw else ""
raise ApiError(
f"{method} {path} → HTTP {status}: {snippet}"
)
if not raw:
return status, None
try:
return status, json.loads(raw)
except json.JSONDecodeError as e:
if expect_json:
raise ApiError(
f"{method} {path} → HTTP {status} but body is not JSON: {e}"
) from e
# Opt-in raw fallthrough for endpoints with known echo-quirks.
return status, {"_raw": raw.decode("utf-8", errors="replace")}
# --------------------------------------------------------------------------
# YAML loaders — STRICT (reject GitHub-Actions-only syntax)
# --------------------------------------------------------------------------
def load_yaml(path: str) -> dict:
"""Load + parse a workflow YAML. Hard-fail if the file is missing
or doesn't parse — drift-detect cannot make decisions without
knowing the actual job set."""
if not os.path.exists(path):
sys.stderr.write(f"::error::file not found: {path}\n")
sys.exit(3)
with open(path, encoding="utf-8") as f:
try:
doc = yaml.safe_load(f)
except yaml.YAMLError as e:
sys.stderr.write(f"::error::YAML parse error in {path}: {e}\n")
sys.exit(3)
if not isinstance(doc, dict):
sys.stderr.write(f"::error::{path} is not a YAML mapping\n")
sys.exit(3)
return doc
def ci_jobs_all(ci_doc: dict) -> set[str]:
"""Every job key in ci.yml minus the sentinel itself. Used for F1b
(sentinel.needs typo check) needs that name a non-existent job
is a typo regardless of event-gating."""
jobs = ci_doc.get("jobs")
if not isinstance(jobs, dict):
sys.stderr.write("::error::ci.yml has no jobs: mapping\n")
sys.exit(3)
return {k for k in jobs if k != SENTINEL_JOB}
def ci_job_names(ci_doc: dict) -> set[str]:
"""Set of job keys in ci.yml MINUS the sentinel itself MINUS jobs
whose `if:` gates on `github.event_name` (those are event-scoped
and can legitimately be `skipped` for a given trigger; if we
required them under the sentinel `needs:`, every PR-only job
would be `skipped` on push and the sentinel would interpret
`skipped != success` as failure). RFC §4 spec.
Used for F1 (jobs missing from sentinel needs). NOT used for F1b
(typos in needs) see `ci_jobs_all` for that."""
jobs = ci_doc.get("jobs")
if not isinstance(jobs, dict):
sys.stderr.write("::error::ci.yml has no jobs: mapping\n")
sys.exit(3)
names: set[str] = set()
for k, v in jobs.items():
if k == SENTINEL_JOB:
continue
if isinstance(v, dict):
gate = v.get("if")
if isinstance(gate, str) and "github.event_name" in gate:
continue
names.add(k)
return names
def sentinel_needs(ci_doc: dict) -> set[str]:
sentinel = ci_doc.get("jobs", {}).get(SENTINEL_JOB)
if not isinstance(sentinel, dict):
sys.stderr.write(
f"::error::sentinel job '{SENTINEL_JOB}' not found in {CI_WORKFLOW_PATH}\n"
)
sys.exit(3)
needs = sentinel.get("needs", [])
if isinstance(needs, str):
needs = [needs]
if not isinstance(needs, list):
sys.stderr.write("::error::sentinel `needs:` is neither list nor string\n")
sys.exit(3)
return set(needs)
def required_checks_env(audit_doc: dict) -> set[str]:
"""Pull the REQUIRED_CHECKS env value from audit-force-merge.yml.
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
NOT grep for `REQUIRED_CHECKS:` that breaks under reformatting,
multi-job workflows, or a future move of the env to a different
step. Instead, look inside every job's every step's `env:` map."""
found: list[str] = []
jobs = audit_doc.get("jobs", {})
if not isinstance(jobs, dict):
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
return set()
for job in jobs.values():
if not isinstance(job, dict):
continue
for step in job.get("steps", []) or []:
if not isinstance(step, dict):
continue
step_env = step.get("env") or {}
if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found.append(v)
if not found:
sys.stderr.write(
f"::error::REQUIRED_CHECKS env not found in any step of {AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
if len(found) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n"
)
sys.exit(3)
raw = found[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
# --------------------------------------------------------------------------
# Mapping: ci.yml job-key → protection context name
# --------------------------------------------------------------------------
def expected_context(job_key: str, workflow_name: str = "ci") -> str:
"""Gitea Actions reports status-check contexts as
"{workflow.name} / {job.name or job.key} ({event})".
For ci.yml the event is `pull_request` on PRs (that's what
`status_check_contexts` records). Job.name defaults to job.key
when no `name:` is set. CP's ci.yml does NOT set per-job `name:`
so the key equals the human-name."""
return f"{workflow_name} / {job_key} (pull_request)"
# --------------------------------------------------------------------------
# Drift detection
# --------------------------------------------------------------------------
def detect_drift(branch: str) -> tuple[list[str], dict]:
"""Returns (findings, debug). Empty findings == no drift."""
findings: list[str] = []
ci_doc = load_yaml(CI_WORKFLOW_PATH)
audit_doc = load_yaml(AUDIT_WORKFLOW_PATH)
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc)
# Protection
# api() raises ApiError on non-2xx; let it propagate so a transient
# 500 fails the run loudly rather than producing a "no drift" lie.
_, protection = api("GET", f"/repos/{OWNER}/{NAME}/branch_protections/{branch}")
if not isinstance(protection, dict):
sys.stderr.write(
f"::error::protection response for {branch} not a JSON object\n"
)
sys.exit(4)
contexts = set(protection.get("status_check_contexts") or [])
# ----- F1: job exists in CI but not under sentinel.needs -----
missing_from_needs = sorted(jobs - needs)
if missing_from_needs:
findings.append(
"F1 — jobs in ci.yml NOT under sentinel `needs:` (sentinel doesn't gate them):\n"
+ "\n".join(f" - {n}" for n in missing_from_needs)
)
# ----- F1b: needs lists a job that doesn't exist (typo) -----
# Compare against jobs_all (incl. event-gated jobs); a typo is a
# typo regardless of `if:` gating.
stale_needs = sorted(needs - jobs_all)
if stale_needs:
findings.append(
"F1b — sentinel `needs:` lists jobs NOT present in ci.yml (typo or removed job):\n"
+ "\n".join(f" - {n}" for n in stale_needs)
)
# ----- F2: protection context has no emitting job -----
# Compute the contexts the CI YAML actually produces. The sentinel
# is in (B) intentionally (`ci / all-required (pull_request)`); we
# whitelist it explicitly.
emitted_contexts = {expected_context(j) for j in jobs} | {expected_context(SENTINEL_JOB)}
# Contexts NOT produced by ci.yml may still come from other
# workflows in the repo (Secret scan etc). We can't enumerate
# every workflow's emissions cheaply; instead, flag only contexts
# whose prefix is `ci / ` (this workflow's emissions) and which
# don't appear in `emitted_contexts`. This narrows F2 to the
# failure class the RFC actually targets without producing noise
# from cross-workflow emitters.
stale_protection = sorted(
c for c in contexts if c.startswith("ci / ") and c not in emitted_contexts
)
if stale_protection:
findings.append(
"F2 — protection `status_check_contexts` entries with `ci / ` prefix that NO "
"job in ci.yml emits (stale name → silent advisory gate):\n"
+ "\n".join(f" - {c}" for c in stale_protection)
)
# ----- F3: audit env vs protection contexts (set-equal) -----
only_in_env = sorted(env_set - contexts)
only_in_protection = sorted(contexts - env_set)
if only_in_env:
findings.append(
"F3a — audit-force-merge.yml `REQUIRED_CHECKS` env has contexts NOT in "
f"branch_protections/{branch}.status_check_contexts (audit would flag "
"non-force-merges as force):\n"
+ "\n".join(f" - {c}" for c in only_in_env)
)
if only_in_protection:
findings.append(
"F3b — branch_protections/{br}.status_check_contexts has contexts NOT in "
"audit-force-merge.yml `REQUIRED_CHECKS` env (real force-merges would be "
"missed):\n".format(br=branch)
+ "\n".join(f" - {c}" for c in only_in_protection)
)
debug = {
"branch": branch,
"ci_jobs": sorted(jobs),
"sentinel_needs": sorted(needs),
"protection_contexts": sorted(contexts),
"audit_env_checks": sorted(env_set),
"expected_contexts": sorted(emitted_contexts),
}
return findings, debug
# --------------------------------------------------------------------------
# Issue file/update
# --------------------------------------------------------------------------
def title_for(branch: str) -> str:
# Idempotency key — keep stable, never include timestamp/SHA.
return f"[ci-drift] {REPO}/{branch}: required-checks divergence detected"
def find_open_issue(title: str) -> dict | None:
"""Return the existing open `[ci-drift]` issue for `title`, or None.
`None` means "search succeeded, no match" NOT "search failed".
Per Five-Axis review on PR #112: returning None on a transient API
error caused the caller to POST a duplicate issue. Now api() raises
ApiError on any non-2xx; we let it propagate. The cron retries
hourly; failing one cycle loudly is strictly better than silently
duplicating.
Gitea issue search returns at most page=50 per page; one page is
enough as long as `[ci-drift]` issues are a tiny minority. (See
follow-up issue for Link-header pagination.)
"""
_, results = api(
"GET",
f"/repos/{OWNER}/{NAME}/issues",
query={"state": "open", "type": "issues", "limit": "50"},
)
if not isinstance(results, list):
raise ApiError(
f"issue search returned non-list body (got {type(results).__name__})"
)
for issue in results:
if issue.get("title") == title:
return issue
return None
def render_body(branch: str, findings: list[str], debug: dict) -> str:
body = [
f"# Drift detected on `{REPO}/{branch}`",
"",
"Auto-filed by `.gitea/workflows/ci-required-drift.yml` "
"(RFC [internal#219](https://git.moleculesai.app/molecule-ai/internal/issues/219) §4 + §6).",
"",
"## Findings",
"",
]
body.extend(findings)
body.extend(
[
"",
"## Resolution",
"",
"- **F1 / F1b**: add the missing job to `all-required.needs:` "
"in `.gitea/workflows/ci.yml`, or remove the stale entry.",
"- **F2**: rename the protection context to match an emitter, "
"or remove it from `status_check_contexts` "
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
"- **F3a / F3b**: bring `REQUIRED_CHECKS` env in "
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
"`status_check_contexts` (single PR, both files).",
"",
"## Debug",
"",
"```json",
json.dumps(debug, indent=2, sort_keys=True),
"```",
"",
"_This issue is idempotent: drift-detect runs hourly at `:17` "
"and edits this body in place. Close the issue once the drift "
"is fixed; the next hourly run will reopen if drift returns._",
]
)
return "\n".join(body)
def file_or_update(
branch: str,
findings: list[str],
debug: dict,
*,
dry_run: bool = False,
) -> None:
"""File a new `[ci-drift]` issue, or PATCH the existing one in place.
`dry_run=True` skips every side-effecting Gitea call (issue
search, POST, PATCH, label apply) and prints the would-be issue
title + body to stdout. Useful for local testing and for
debugging drift output without polluting the issue tracker.
"""
title = title_for(branch)
body = render_body(branch, findings, debug)
if dry_run:
print(f"::notice::[dry-run] would file/update drift issue for {branch}")
print(f"::group::[dry-run] title")
print(title)
print(f"::endgroup::")
print(f"::group::[dry-run] body")
print(body)
print(f"::endgroup::")
return
existing = find_open_issue(title)
if existing:
num = existing["number"]
api(
"PATCH",
f"/repos/{OWNER}/{NAME}/issues/{num}",
body={"body": body},
)
print(f"::notice::Updated existing drift issue #{num} for {branch}")
return
_, created = api(
"POST",
f"/repos/{OWNER}/{NAME}/issues",
body={"title": title, "body": body, "labels": []},
)
if not isinstance(created, dict):
sys.stderr.write("::error::POST issue response not a JSON object\n")
sys.exit(5)
new_num = created.get("number")
print(f"::warning::Filed new drift issue #{new_num} for {branch}")
# Apply label by name (Gitea's add-labels endpoint accepts label IDs;
# look up id by name once). Best-effort: failure to label is logged
# but does not fail the audit run — the issue itself IS the alarm.
try:
_, labels = api("GET", f"/repos/{OWNER}/{NAME}/labels")
except ApiError as e:
sys.stderr.write(f"::warning::could not list labels: {e}\n")
return
label_id = None
if isinstance(labels, list):
for lbl in labels:
if lbl.get("name") == DRIFT_LABEL:
label_id = lbl.get("id")
break
if label_id is not None and new_num:
try:
api(
"POST",
f"/repos/{OWNER}/{NAME}/issues/{new_num}/labels",
body={"labels": [label_id]},
)
except ApiError as e:
sys.stderr.write(
f"::warning::could not apply label '{DRIFT_LABEL}' to #{new_num}: {e}\n"
)
else:
sys.stderr.write(f"::warning::label '{DRIFT_LABEL}' not found on repo\n")
# --------------------------------------------------------------------------
# Main
# --------------------------------------------------------------------------
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="ci-required-drift",
description="Detect drift between ci.yml, branch_protections, "
"and audit-force-merge.yml REQUIRED_CHECKS env.",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Detect + print findings to stdout; do NOT file or PATCH "
"the `[ci-drift]` issue. Useful for local testing and for "
"previewing output before turning the workflow loose.",
)
return p.parse_args(argv)
def main(argv: list[str] | None = None) -> int:
args = _parse_args(argv)
_require_runtime_env()
for branch in BRANCHES:
findings, debug = detect_drift(branch)
if findings:
print(f"::warning::Drift detected on {branch}:")
for f in findings:
print(f)
file_or_update(branch, findings, debug, dry_run=args.dry_run)
else:
print(f"::notice::No drift on {branch}.")
print(json.dumps(debug, indent=2, sort_keys=True))
# Exit 0 even on drift — the issue IS the alarm, not a red workflow.
# A red workflow here would page on a CI rename until the issue is
# opened, doubling the noise. The issue itself is the actionable
# surface. (`api()` raising ApiError is the only path that exits
# non-zero, by design: a transient Gitea outage should fail loudly.)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,58 +1,88 @@
# audit-force-merge — emit `incident.force_merge` to runner stdout when
# a PR is merged with required-status-checks not green. Vector picks
# audit-force-merge — emit `incident.force_merge` to the runner log when
# a PR is merged with required-status checks NOT all green. Vector picks
# the JSON line off docker_logs and ships to Loki on
# molecule-canonical-obs (per `reference_obs_stack_phase1`); query as:
#
# {host="operator"} |= "event_type" |= "incident.force_merge" | json
#
# Closes the §SOP-6 audit gap (the doc says force-merges write to
# `structure_events`, but that table lives in the platform DB, not
# Gitea-side; Loki is the practical equivalent for Gitea Actions
# events). When the credential / observability stack converges later,
# this can sync into structure_events from Loki via a backfill job —
# the structured JSON shape is forward-compatible.
# Companion to `audit-force-merge.sh` (script-extract pattern, same as
# sop-tier-check). The audit observes BOTH UI-merged and REST-merged PRs
# uniformly per `feedback_gh_cli_merge_lies_use_rest`.
#
# Logic in `.gitea/scripts/audit-force-merge.sh` per the same script-
# extract pattern as sop-tier-check.
# Closes the §SOP-6 audit gap for the molecule-core repo. RFC:
# internal#219 §6. Mirrors the same-named workflow in
# molecule-controlplane; design rationale lives in the RFC, not here,
# to keep the workflow file scannable.
name: audit-force-merge
# pull_request_target loads from the base branch — same security model
# as sop-tier-check. Without this, an attacker could rewrite the
# workflow on a PR and skip the audit emission for their own
# force-merge. See `.gitea/workflows/sop-tier-check.yml` for the full
# rationale.
# as sop-tier-check. Without this, a PR author could rewrite the
# workflow on their own PR and skip the audit emission for their own
# force-merge. The base-branch checkout below ALSO uses
# `base.sha`, not `base.ref`, so a fast-moving base can't slip a
# different audit script in under us.
on:
pull_request_target:
types: [closed]
# `pull-requests: read` + `contents: read` covers everything the script
# needs (fetch PR + commit statuses). `issues:` deliberately omitted —
# audit fires-and-forgets to stdout, never opens issues.
permissions:
contents: read
pull-requests: read
jobs:
audit:
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
# Skip when PR is closed without merge — saves a runner.
if: github.event.pull_request.merged == true
steps:
- name: Check out base branch (for the script)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
# base.sha pinning, NOT base.ref — see header rationale.
ref: ${{ github.event.pull_request.base.sha }}
- name: Detect force-merge + emit audit event
env:
# Same org-level secret the sop-tier-check workflow uses.
# Same org-level secret the sop-tier-check workflow uses;
# falls back to the auto-injected GITHUB_TOKEN if the
# org-level SOP_TIER_CHECK_TOKEN isn't set on a transitional
# repo.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.pull_request.number }}
# Required-status-check contexts to evaluate at merge time.
# Newline-separated. Mirror this against branch protection
# (settings → branches → protected branch → required checks).
# Newline-separated. MUST mirror branch protection's
# status_check_contexts for protected branches
# (currently `main`; `staging` protection forthcoming per
# RFC internal#219 Phase 4).
#
# Initialized 2026-05-11 from the current molecule-core `main`
# branch protection:
#
# GET /api/v1/repos/molecule-ai/molecule-core/
# branch_protections/main
# → status_check_contexts = [
# "Secret scan / Scan diff for credential-shaped strings (pull_request)",
# "sop-tier-check / tier-check (pull_request)"
# ]
#
# Declared here rather than fetched from /branch_protections
# because that endpoint requires admin write — sop-tier-bot is
# read-only by design (least-privilege).
# because that endpoint requires admin write — sop-tier-bot
# is read-only by design (least-privilege per
# `feedback_least_privilege_via_workflow_env` / internal#257).
# Drift between this env and the real protection list is
# auto-detected by `ci-required-drift.yml` (RFC §4 + §6),
# which opens a `[ci-drift]` issue within one hour.
#
# When the protection set changes (e.g. Phase 4 adds the
# `ci / all-required (pull_request)` sentinel), update BOTH
# branch protection AND this env in the SAME PR; drift-detect
# will otherwise file an issue for you.
REQUIRED_CHECKS: |
sop-tier-check / tier-check (pull_request)
Secret scan / Scan diff for credential-shaped strings (pull_request)
sop-tier-check / tier-check (pull_request)
run: bash .gitea/scripts/audit-force-merge.sh

View File

@ -0,0 +1,107 @@
# ci-required-drift — hourly sentinel for drift between the canonical
# "what counts as required" sources of truth in this repo:
#
# 1. `.gitea/workflows/ci.yml` jobs (CI source)
# 2. `branch_protections/{main,staging}.status_check_contexts`
# (protection)
# 3. `.gitea/workflows/audit-force-merge.yml` REQUIRED_CHECKS env
# (audit env)
#
# RFC: internal#219 §4 (jobs ↔ protection) + §6 (audit env ↔ protection).
# Ported verbatim-then-adapted from molecule-controlplane PR#112
# (SHA 0adf2098) per RFC internal#219 Phase 2b+c — replicate repo-by-repo.
#
# When any pair diverges, a `[ci-drift]` issue is opened or updated
# (idempotent by title) and labelled `tier:high`. This is the
# auto-detection that closes the regression class identified in
# RFC §1 finding 3 (protection only listed 2 of 6 real jobs for
# ~weeks, undetected) and §6 (audit env drifts silently from
# protection).
#
# Diff logic lives in `.gitea/scripts/ci-required-drift.py`. The
# Python file does YAML AST parsing + `needs:` graph walking per
# `feedback_behavior_based_ast_gates` — NOT grep-by-name. That way
# job renames or matrix-expansion-induced churn produce honest signal.
#
# IMPORTANT — TRANSITIONAL STATE: molecule-core's ci.yml does NOT yet
# contain the `all-required` sentinel job (RFC §4 Phase 4 adds it).
# Until Phase 4 lands the detector will hard-fail with exit 3 on the
# missing sentinel. That's intentional: a red workflow on a 5-min cron
# is louder than a silent issue and forces Phase 4 to land soon.
name: ci-required-drift
# IMPORTANT — Gitea 1.22.6 parser quirk per
# `feedback_gitea_workflow_dispatch_inputs_unsupported`: do NOT add an
# `inputs:` block here, even though stock GitHub Actions allows it.
# Gitea 1.22.6 flattens `workflow_dispatch.inputs.X` into a sibling of
# the `on:` event keys and rejects the entire workflow as
# "unknown on type". The whole file then registers for ZERO events
# (no schedule, no dispatch). When Gitea ≥ 1.23 lands fleet-wide,
# this constraint can be revisited.
on:
schedule:
# Hourly at :17 — offset from :00 to spread load away from the
# peak when N cron workflows fire on the hour-boundary, per
# RFC §4 cadence ("off-zero").
- cron: '17 * * * *'
workflow_dispatch:
# Read protection + read CI YAML + write issue. No write on contents.
permissions:
contents: read
issues: write
# Serialise — two simultaneous drift runs would duel on the issue
# create/update path. The audit is idempotent, but parallel POSTs
# can produce duplicate comments before the title-search dedup wins.
concurrency:
group: ci-required-drift
cancel-in-progress: false
jobs:
drift:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Check out repo (we read the YAML files locally)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Set up Python (PyYAML for AST parsing)
# Avoid a system-pip install on the runner; setup-python pins
# a hermetic interpreter + cache. PyYAML is small enough that
# the install is sub-2s — no need to cache wheels.
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: '3.12'
- name: Install PyYAML
run: python -m pip install --quiet 'PyYAML==6.0.2'
- name: Run drift detector
env:
# GITEA_TOKEN reads protection + writes issues. molecule-core
# uses `SOP_TIER_CHECK_TOKEN` as the org-level secret name for
# read-only Gitea API access from CI (set by audit-force-merge
# and sop-tier-check too). Falls back to the auto-injected
# GITHUB_TOKEN if the org-level secret isn't set
# (transitional repos).
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
# Branches whose protection we compare against. molecule-core
# currently has main protected; staging protection is
# forthcoming. Keep this list in sync if a new long-lived
# branch gets protected (e.g. release/* if introduced later).
BRANCHES: 'main staging'
# The sentinel job's name inside ci.yml. If the aggregator
# is ever renamed, update this too (the drift detector
# currently treats `all-required` as the source of "what
# the sentinel claims to require").
SENTINEL_JOB: 'all-required'
# Path to the audit workflow whose REQUIRED_CHECKS env we
# cross-check against protection (RFC §6).
AUDIT_WORKFLOW_PATH: '.gitea/workflows/audit-force-merge.yml'
# Path to the CI workflow with the sentinel + the jobs.
CI_WORKFLOW_PATH: '.gitea/workflows/ci.yml'
# Issue label applied on file/update. `tier:high` exists in
# the molecule-core label set (verified 2026-05-11, label id 9).
DRIFT_LABEL: 'tier:high'
run: python3 .gitea/scripts/ci-required-drift.py

View File

@ -0,0 +1,556 @@
"""Tests for `.gitea/scripts/ci-required-drift.py` — RFC internal#219 §4 + §6.
Covers the five drift-finding classes (F1, F1b, F2, F3a, F3b), the happy
path (no drift, no API mutation), and the idempotent path (existing
`[ci-drift]` issue is PATCHed in place, NOT duplicated).
Per the Five-Axis review on PR #112, the test suite must FAIL on the
pre-fix code where `find_open_issue()` returned `None` on transient
HTTP errors (causing the caller to POST a duplicate issue). We exercise
that path explicitly with `test_find_open_issue_raises_on_transient_error`.
Run:
python3 -m pytest tests/test_ci_required_drift.py -v
Dependencies: stdlib + PyYAML (already required by the script itself).
No network. No live Gitea calls.
"""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import textwrap
from pathlib import Path
from unittest import mock
import pytest
# --------------------------------------------------------------------------
# Module-import fixture
# --------------------------------------------------------------------------
# The script reads env vars at import-time (cheap globals, no IO). Tests
# set the env vars BEFORE importing so the module loads under a known
# config, then individual tests monkeypatch the `api()` callable and
# YAML file paths via tmp_path.
SCRIPT_PATH = (
Path(__file__).resolve().parent.parent
/ ".gitea"
/ "scripts"
/ "ci-required-drift.py"
)
@pytest.fixture(scope="module")
def drift_module():
"""Import the script as a module. Env vars are pre-set so the
module-level reads pass; tests then patch individual globals as
needed."""
env = {
"GITEA_TOKEN": "test-token",
"GITEA_HOST": "git.example.test",
"REPO": "owner/repo",
"BRANCHES": "main staging",
"SENTINEL_JOB": "all-required",
"AUDIT_WORKFLOW_PATH": ".gitea/workflows/audit-force-merge.yml",
"CI_WORKFLOW_PATH": ".gitea/workflows/ci.yml",
"DRIFT_LABEL": "tier:high",
}
with mock.patch.dict(os.environ, env, clear=False):
spec = importlib.util.spec_from_file_location(
"ci_required_drift", SCRIPT_PATH
)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m)
# Force-set the globals from env (they were captured at import
# time before our mock.patch.dict took effect on subsequent
# runs in the same pytest session).
m.GITEA_TOKEN = env["GITEA_TOKEN"]
m.GITEA_HOST = env["GITEA_HOST"]
m.REPO = env["REPO"]
m.BRANCHES = env["BRANCHES"].split()
m.SENTINEL_JOB = env["SENTINEL_JOB"]
m.AUDIT_WORKFLOW_PATH = env["AUDIT_WORKFLOW_PATH"]
m.CI_WORKFLOW_PATH = env["CI_WORKFLOW_PATH"]
m.DRIFT_LABEL = env["DRIFT_LABEL"]
m.OWNER, m.NAME = "owner", "repo"
m.API = f"https://{env['GITEA_HOST']}/api/v1"
yield m
# --------------------------------------------------------------------------
# Fixture YAML — minimal but realistic ci.yml + audit-force-merge.yml
# --------------------------------------------------------------------------
def _write_ci_yaml(tmp_path: Path, *, jobs: dict, sentinel_needs: list[str]) -> Path:
"""Write a synthetic ci.yml with the given jobs + sentinel needs."""
full_jobs = dict(jobs)
full_jobs["all-required"] = {"runs-on": "ubuntu-latest", "needs": sentinel_needs}
doc = {"name": "ci", "on": {"pull_request": {}}, "jobs": full_jobs}
import yaml
p = tmp_path / "ci.yml"
p.write_text(yaml.safe_dump(doc), encoding="utf-8")
return p
def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS env."""
block = "\n".join(required_checks)
text = textwrap.dedent(
f"""\
name: audit-force-merge
on:
schedule:
- cron: '*/30 * * * *'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Run audit
env:
REQUIRED_CHECKS: |
{block.replace(chr(10), chr(10) + ' ')}
run: bash .gitea/scripts/audit-force-merge.sh
"""
)
p = tmp_path / "audit-force-merge.yml"
p.write_text(text, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
`responses` maps (method, path) tuples to either:
- (status_int, body) returned as-is
- Exception instance raised
Calls are recorded in `.calls` for later assertion.
"""
class StubApi:
def __init__(self):
self.calls: list[tuple] = []
def __call__(self, method, path, *, body=None, query=None, expect_json=True):
self.calls.append((method, path, body, query))
key = (method, path)
if key not in responses:
raise AssertionError(
f"unexpected api call: {method} {path} (no stub registered)"
)
r = responses[key]
if isinstance(r, Exception):
raise r
return r
return StubApi()
# --------------------------------------------------------------------------
# Drift-class tests — pure detect_drift() coverage
# --------------------------------------------------------------------------
def _patch_paths(drift_module, monkeypatch, ci_yml: Path, audit_yml: Path):
monkeypatch.setattr(drift_module, "CI_WORKFLOW_PATH", str(ci_yml))
monkeypatch.setattr(drift_module, "AUDIT_WORKFLOW_PATH", str(audit_yml))
def test_f1_job_missing_from_sentinel_needs(drift_module, tmp_path, monkeypatch):
"""F1: a job exists in ci.yml but is NOT under sentinel.needs."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"}, # missing from needs
},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F1 —" in f and "test" in f for f in findings), findings
def test_f1b_sentinel_needs_typo(drift_module, tmp_path, monkeypatch):
"""F1b: sentinel.needs lists a job not present in ci.yml (typo).
Per the prior fix, F1b uses jobs_all (the unfiltered set) so that
event-gated jobs aren't false-positive typos."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build", "bulid"], # typo'd
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F1b" in f and "bulid" in f for f in findings), findings
def test_f1b_event_gated_job_not_flagged_as_typo(drift_module, tmp_path, monkeypatch):
"""F1b regression guard: event-gated jobs (with `if: github.event_name`)
are in jobs_all and must NOT trigger F1b when listed in sentinel.needs.
They DO trigger F1 if missing but that's a different finding."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"pr-only": {
"runs-on": "ubuntu-latest",
"if": "github.event_name == 'pull_request'",
},
},
sentinel_needs=["build", "pr-only"], # event-gated, but real
)
audit = _write_audit_yaml(
tmp_path,
["ci / build (pull_request)", "ci / pr-only (pull_request)"],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / pr-only (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert not any("F1b" in f for f in findings), findings
def test_f2_protection_has_no_emitter(drift_module, tmp_path, monkeypatch):
"""F2: a `ci / ` prefixed context in protection has no job in ci.yml."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / removed-job (pull_request)", # F2
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F2" in f and "removed-job" in f for f in findings), findings
def test_f3a_env_wider_than_protection(drift_module, tmp_path, monkeypatch):
"""F3a: REQUIRED_CHECKS env has a context NOT in protection."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(
tmp_path,
[
"ci / build (pull_request)",
"ci / ghost (pull_request)", # only in env
],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3a" in f and "ghost" in f for f in findings), findings
def test_f3b_protection_wider_than_env(drift_module, tmp_path, monkeypatch):
"""F3b: protection has a context NOT in REQUIRED_CHECKS env."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)", # only in protection
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
"""Happy path: ci.yml ↔ protection ↔ audit env all in alignment."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml(
tmp_path,
[
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert findings == [], findings
# --------------------------------------------------------------------------
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
# --------------------------------------------------------------------------
def test_find_open_issue_returns_none_on_no_match(drift_module, monkeypatch):
"""Search succeeded, no match → return None (the OK path)."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, []),
})
monkeypatch.setattr(drift_module, "api", stub)
assert drift_module.find_open_issue("[ci-drift] foo") is None
def test_find_open_issue_returns_match(drift_module, monkeypatch):
"""Search succeeded, matching issue exists → return it."""
issue = {"number": 42, "title": "[ci-drift] foo"}
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, [issue]),
})
monkeypatch.setattr(drift_module, "api", stub)
assert drift_module.find_open_issue("[ci-drift] foo") == issue
def test_find_open_issue_raises_on_transient_error(drift_module, monkeypatch):
"""Search FAILED (HTTP 500) → raise ApiError, do NOT return None.
This is the regression class from PR #112's Five-Axis review:
returning None caused file_or_update() to take the else branch and
POST a duplicate issue. The fix is for api() to raise; tests pin
that contract by exercising the failure path explicitly.
"""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): drift_module.ApiError(
"GET /repos/owner/repo/issues → HTTP 500: gateway timeout"
),
})
monkeypatch.setattr(drift_module, "api", stub)
with pytest.raises(drift_module.ApiError):
drift_module.find_open_issue("[ci-drift] foo")
# --------------------------------------------------------------------------
# Idempotent path: existing issue is PATCHed, NOT duplicated
# --------------------------------------------------------------------------
def test_file_or_update_patches_existing_issue(drift_module, monkeypatch):
"""When an open `[ci-drift]` issue exists, file_or_update PATCHes it
and does NOT POST a duplicate."""
title = drift_module.title_for("main")
issue = {"number": 7, "title": title}
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, [issue]),
("PATCH", "/repos/owner/repo/issues/7"): (200, {"number": 7}),
})
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
)
methods = [c[0] for c in stub.calls]
assert "PATCH" in methods, stub.calls
assert "POST" not in methods, (
f"expected NO POST when issue exists (idempotent path), got: {stub.calls}"
)
def test_file_or_update_posts_new_issue_when_none_exists(drift_module, monkeypatch):
"""When no open `[ci-drift]` issue exists, file_or_update POSTs one."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, []),
("POST", "/repos/owner/repo/issues"): (201, {"number": 99}),
("GET", "/repos/owner/repo/labels"): (200, [{"id": 10, "name": "tier:high"}]),
("POST", "/repos/owner/repo/issues/99/labels"): (200, []),
})
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
)
methods_paths = [(c[0], c[1]) for c in stub.calls]
assert ("POST", "/repos/owner/repo/issues") in methods_paths, stub.calls
# Label apply is best-effort but should be attempted on the happy path:
assert ("POST", "/repos/owner/repo/issues/99/labels") in methods_paths, stub.calls
# --------------------------------------------------------------------------
# --dry-run flag
# --------------------------------------------------------------------------
def test_dry_run_skips_all_api_writes(drift_module, monkeypatch, capsys):
"""--dry-run: detector still runs, but no GET/POST/PATCH issue calls."""
stub = _make_stub_api({}) # any api call would assert
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
dry_run=True,
)
assert stub.calls == [], f"dry-run must not call api(), got: {stub.calls}"
captured = capsys.readouterr()
assert "[dry-run]" in captured.out
assert "[ci-drift]" in captured.out # title rendered to stdout
def test_dry_run_flag_parsed(drift_module):
"""--dry-run is wired into argparse."""
ns = drift_module._parse_args(["--dry-run"])
assert ns.dry_run is True
ns = drift_module._parse_args([])
assert ns.dry_run is False
# --------------------------------------------------------------------------
# api() helper: raises on non-2xx + on JSON-decode failure when expected
# --------------------------------------------------------------------------
def test_api_raises_on_non_2xx(drift_module, monkeypatch):
"""api() must raise ApiError on HTTP 500 — the duplicate-issue
regression class from PR #112's review depends on this."""
class FakeHTTPError(Exception):
def __init__(self):
self.code = 500
def read(self):
return b"internal server error"
def fake_urlopen(req, timeout=30):
import urllib.error
raise urllib.error.HTTPError(
req.full_url, 500, "Internal Server Error", {}, None # type: ignore
)
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
with pytest.raises(drift_module.ApiError) as excinfo:
drift_module.api("GET", "/repos/owner/repo/issues")
assert "HTTP 500" in str(excinfo.value)
def test_api_raises_on_json_decode_when_expected(drift_module, monkeypatch):
"""api(expect_json=True) raises ApiError if body is not valid JSON.
This closes the prior `{"_raw": ...}` fallthrough that callers
could misinterpret as "JSON response with one key called _raw".
"""
class FakeResp:
status = 200
def read(self):
return b"not-json\n\n"
def __enter__(self):
return self
def __exit__(self, *a):
return False
def fake_urlopen(req, timeout=30):
return FakeResp()
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
with pytest.raises(drift_module.ApiError):
drift_module.api("GET", "/repos/owner/repo/issues")
def test_api_allows_raw_when_expect_json_false(drift_module, monkeypatch):
"""api(expect_json=False) returns the `_raw` fallthrough for endpoints
with known echo-quirks (Gitea create responses). Reserved opt-in."""
class FakeResp:
status = 201
def read(self):
return b"not-json-but-create-succeeded\n"
def __enter__(self):
return self
def __exit__(self, *a):
return False
def fake_urlopen(req, timeout=30):
return FakeResp()
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
status, body = drift_module.api(
"POST", "/repos/owner/repo/issues", expect_json=False
)
assert status == 201
assert "_raw" in body