molecule-core/tests/test_ci_required_drift.py
claude-ceo-assistant a8b2cf948d
Some checks failed
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 1m36s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 19s
CI / Detect changes (pull_request) Successful in 1m46s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m46s
sop-tier-check / tier-check (pull_request) Failing after 19s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m40s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m12s
audit-force-merge / audit (pull_request) Successful in 14s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 15s
CI / Platform (Go) (pull_request) Successful in 14s
CI / Canvas (Next.js) (pull_request) Successful in 26s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 19s
CI / Python Lint & Test (pull_request) Successful in 11s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 12s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 14s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 13s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 22s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 17s
feat(internal#219 §4+§6): port ci-required-drift + audit-force-merge sidecar from CP
Phase 2b+c port of molecule-controlplane PR#112 (SHA 0adf2098) to
molecule-core, per RFC internal#219 §4 (jobs ↔ protection drift) + §6
(audit env ↔ protection drift).

## What this adds

1. .gitea/workflows/ci-required-drift.yml — hourly cron (':17') +
   workflow_dispatch. AST-walks ci.yml, branch_protections, and
   audit-force-merge.yml's REQUIRED_CHECKS env. Files/updates a
   [ci-drift] issue idempotent by title when any pair diverges.

2. .gitea/scripts/ci-required-drift.py — verbatim from CP. PyYAML-based
   AST detector (NOT grep-by-name), per feedback_behavior_based_ast_gates.
   Five drift classes: F1, F1b, F2, F3a, F3b.

3. .gitea/workflows/audit-force-merge.yml — reconcile with CP's
   structure. Moves permissions: to workflow level, adds base.sha-
   pinning rationale, links to drift-detect, and updates REQUIRED_CHECKS
   to current branch_protections/main verbatim (2 contexts).

4. tests/test_ci_required_drift.py — 17 pytest cases, verbatim from CP.
   Stdlib + PyYAML only. Covers F1/F1b/F2/F3a/F3b, happy path, the
   idempotent-PATCH path, the MUST-FIX find_open_issue() raise-on-
   transient regression, the --dry-run flag, and api() error contracts.

## Adaptations from CP#112

- secrets.GITEA_TOKEN → secrets.SOP_TIER_CHECK_TOKEN (molecule-core's
  established read-only token name, used by sop-tier-check and
  audit-force-merge already).
- DRIFT_LABEL tier:high resolves to label id 9 on core (verified
  2026-05-11) vs id 10 on CP.
- REQUIRED_CHECKS env initialized to molecule-core's actual main
  protection set (2 contexts: Secret scan + sop-tier-check), not CP's
  (3 contexts incl. packer-ascii-gate + all-required).
- Comment block flags that the 'all-required' sentinel does NOT yet
  exist in molecule-core's ci.yml (RFC §4 Phase 4 adds it). Until
  then, the detector exits 3 with ::error:: 'sentinel job not found'.
  Verified locally: the workflow will be red on the cron until Phase 4
  lands — that's intentional + louder than a silent issue.

## Verification

- 17/17 pytest cases green locally (Python 3.13, PyYAML 6.0.3).
- Hostile self-review: removing the script makes all 17 tests ERROR
  with FileNotFoundError, confirming they exercise the actual
  implementation (not happy-path shape-matching).
- python3 -m py_compile + bash -n + yaml.safe_load all pass.
- Initial dry-run against real molecule-core ci.yml: exits 3 with
  ::error::sentinel job 'all-required' not found — expected, Phase 4
  will add it.

## What does NOT change

- audit-force-merge.sh is byte-identical to CP's — no change needed.
- No branch protection mutation (that's Phase 4, separate PR).
- No CI workflow restructuring (PR#372 already did that).

RFC: molecule-ai/internal#219
Source: molecule-controlplane@0adf2098 (PR #112)
2026-05-11 00:35:25 -07:00

557 lines
20 KiB
Python

"""Tests for `.gitea/scripts/ci-required-drift.py` — RFC internal#219 §4 + §6.
Covers the five drift-finding classes (F1, F1b, F2, F3a, F3b), the happy
path (no drift, no API mutation), and the idempotent path (existing
`[ci-drift]` issue is PATCHed in place, NOT duplicated).
Per the Five-Axis review on PR #112, the test suite must FAIL on the
pre-fix code where `find_open_issue()` returned `None` on transient
HTTP errors (causing the caller to POST a duplicate issue). We exercise
that path explicitly with `test_find_open_issue_raises_on_transient_error`.
Run:
python3 -m pytest tests/test_ci_required_drift.py -v
Dependencies: stdlib + PyYAML (already required by the script itself).
No network. No live Gitea calls.
"""
from __future__ import annotations
import importlib.util
import json
import os
import sys
import textwrap
from pathlib import Path
from unittest import mock
import pytest
# --------------------------------------------------------------------------
# Module-import fixture
# --------------------------------------------------------------------------
# The script reads env vars at import-time (cheap globals, no IO). Tests
# set the env vars BEFORE importing so the module loads under a known
# config, then individual tests monkeypatch the `api()` callable and
# YAML file paths via tmp_path.
SCRIPT_PATH = (
Path(__file__).resolve().parent.parent
/ ".gitea"
/ "scripts"
/ "ci-required-drift.py"
)
@pytest.fixture(scope="module")
def drift_module():
"""Import the script as a module. Env vars are pre-set so the
module-level reads pass; tests then patch individual globals as
needed."""
env = {
"GITEA_TOKEN": "test-token",
"GITEA_HOST": "git.example.test",
"REPO": "owner/repo",
"BRANCHES": "main staging",
"SENTINEL_JOB": "all-required",
"AUDIT_WORKFLOW_PATH": ".gitea/workflows/audit-force-merge.yml",
"CI_WORKFLOW_PATH": ".gitea/workflows/ci.yml",
"DRIFT_LABEL": "tier:high",
}
with mock.patch.dict(os.environ, env, clear=False):
spec = importlib.util.spec_from_file_location(
"ci_required_drift", SCRIPT_PATH
)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m)
# Force-set the globals from env (they were captured at import
# time before our mock.patch.dict took effect on subsequent
# runs in the same pytest session).
m.GITEA_TOKEN = env["GITEA_TOKEN"]
m.GITEA_HOST = env["GITEA_HOST"]
m.REPO = env["REPO"]
m.BRANCHES = env["BRANCHES"].split()
m.SENTINEL_JOB = env["SENTINEL_JOB"]
m.AUDIT_WORKFLOW_PATH = env["AUDIT_WORKFLOW_PATH"]
m.CI_WORKFLOW_PATH = env["CI_WORKFLOW_PATH"]
m.DRIFT_LABEL = env["DRIFT_LABEL"]
m.OWNER, m.NAME = "owner", "repo"
m.API = f"https://{env['GITEA_HOST']}/api/v1"
yield m
# --------------------------------------------------------------------------
# Fixture YAML — minimal but realistic ci.yml + audit-force-merge.yml
# --------------------------------------------------------------------------
def _write_ci_yaml(tmp_path: Path, *, jobs: dict, sentinel_needs: list[str]) -> Path:
"""Write a synthetic ci.yml with the given jobs + sentinel needs."""
full_jobs = dict(jobs)
full_jobs["all-required"] = {"runs-on": "ubuntu-latest", "needs": sentinel_needs}
doc = {"name": "ci", "on": {"pull_request": {}}, "jobs": full_jobs}
import yaml
p = tmp_path / "ci.yml"
p.write_text(yaml.safe_dump(doc), encoding="utf-8")
return p
def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS env."""
block = "\n".join(required_checks)
text = textwrap.dedent(
f"""\
name: audit-force-merge
on:
schedule:
- cron: '*/30 * * * *'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Run audit
env:
REQUIRED_CHECKS: |
{block.replace(chr(10), chr(10) + ' ')}
run: bash .gitea/scripts/audit-force-merge.sh
"""
)
p = tmp_path / "audit-force-merge.yml"
p.write_text(text, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
`responses` maps (method, path) tuples to either:
- (status_int, body) → returned as-is
- Exception instance → raised
Calls are recorded in `.calls` for later assertion.
"""
class StubApi:
def __init__(self):
self.calls: list[tuple] = []
def __call__(self, method, path, *, body=None, query=None, expect_json=True):
self.calls.append((method, path, body, query))
key = (method, path)
if key not in responses:
raise AssertionError(
f"unexpected api call: {method} {path} (no stub registered)"
)
r = responses[key]
if isinstance(r, Exception):
raise r
return r
return StubApi()
# --------------------------------------------------------------------------
# Drift-class tests — pure detect_drift() coverage
# --------------------------------------------------------------------------
def _patch_paths(drift_module, monkeypatch, ci_yml: Path, audit_yml: Path):
monkeypatch.setattr(drift_module, "CI_WORKFLOW_PATH", str(ci_yml))
monkeypatch.setattr(drift_module, "AUDIT_WORKFLOW_PATH", str(audit_yml))
def test_f1_job_missing_from_sentinel_needs(drift_module, tmp_path, monkeypatch):
"""F1: a job exists in ci.yml but is NOT under sentinel.needs."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"}, # missing from needs
},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F1 —" in f and "test" in f for f in findings), findings
def test_f1b_sentinel_needs_typo(drift_module, tmp_path, monkeypatch):
"""F1b: sentinel.needs lists a job not present in ci.yml (typo).
Per the prior fix, F1b uses jobs_all (the unfiltered set) so that
event-gated jobs aren't false-positive typos."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build", "bulid"], # typo'd
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F1b" in f and "bulid" in f for f in findings), findings
def test_f1b_event_gated_job_not_flagged_as_typo(drift_module, tmp_path, monkeypatch):
"""F1b regression guard: event-gated jobs (with `if: github.event_name`)
are in jobs_all and must NOT trigger F1b when listed in sentinel.needs.
They DO trigger F1 if missing — but that's a different finding."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"pr-only": {
"runs-on": "ubuntu-latest",
"if": "github.event_name == 'pull_request'",
},
},
sentinel_needs=["build", "pr-only"], # event-gated, but real
)
audit = _write_audit_yaml(
tmp_path,
["ci / build (pull_request)", "ci / pr-only (pull_request)"],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / pr-only (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert not any("F1b" in f for f in findings), findings
def test_f2_protection_has_no_emitter(drift_module, tmp_path, monkeypatch):
"""F2: a `ci / ` prefixed context in protection has no job in ci.yml."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / removed-job (pull_request)", # F2
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F2" in f and "removed-job" in f for f in findings), findings
def test_f3a_env_wider_than_protection(drift_module, tmp_path, monkeypatch):
"""F3a: REQUIRED_CHECKS env has a context NOT in protection."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml(
tmp_path,
[
"ci / build (pull_request)",
"ci / ghost (pull_request)", # only in env
],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3a" in f and "ghost" in f for f in findings), findings
def test_f3b_protection_wider_than_env(drift_module, tmp_path, monkeypatch):
"""F3b: protection has a context NOT in REQUIRED_CHECKS env."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml(tmp_path, ["ci / build (pull_request)"])
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)", # only in protection
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
"""Happy path: ci.yml ↔ protection ↔ audit env all in alignment."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml(
tmp_path,
[
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
],
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert findings == [], findings
# --------------------------------------------------------------------------
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
# --------------------------------------------------------------------------
def test_find_open_issue_returns_none_on_no_match(drift_module, monkeypatch):
"""Search succeeded, no match → return None (the OK path)."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, []),
})
monkeypatch.setattr(drift_module, "api", stub)
assert drift_module.find_open_issue("[ci-drift] foo") is None
def test_find_open_issue_returns_match(drift_module, monkeypatch):
"""Search succeeded, matching issue exists → return it."""
issue = {"number": 42, "title": "[ci-drift] foo"}
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, [issue]),
})
monkeypatch.setattr(drift_module, "api", stub)
assert drift_module.find_open_issue("[ci-drift] foo") == issue
def test_find_open_issue_raises_on_transient_error(drift_module, monkeypatch):
"""Search FAILED (HTTP 500) → raise ApiError, do NOT return None.
This is the regression class from PR #112's Five-Axis review:
returning None caused file_or_update() to take the else branch and
POST a duplicate issue. The fix is for api() to raise; tests pin
that contract by exercising the failure path explicitly.
"""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): drift_module.ApiError(
"GET /repos/owner/repo/issues → HTTP 500: gateway timeout"
),
})
monkeypatch.setattr(drift_module, "api", stub)
with pytest.raises(drift_module.ApiError):
drift_module.find_open_issue("[ci-drift] foo")
# --------------------------------------------------------------------------
# Idempotent path: existing issue is PATCHed, NOT duplicated
# --------------------------------------------------------------------------
def test_file_or_update_patches_existing_issue(drift_module, monkeypatch):
"""When an open `[ci-drift]` issue exists, file_or_update PATCHes it
and does NOT POST a duplicate."""
title = drift_module.title_for("main")
issue = {"number": 7, "title": title}
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, [issue]),
("PATCH", "/repos/owner/repo/issues/7"): (200, {"number": 7}),
})
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
)
methods = [c[0] for c in stub.calls]
assert "PATCH" in methods, stub.calls
assert "POST" not in methods, (
f"expected NO POST when issue exists (idempotent path), got: {stub.calls}"
)
def test_file_or_update_posts_new_issue_when_none_exists(drift_module, monkeypatch):
"""When no open `[ci-drift]` issue exists, file_or_update POSTs one."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/issues"): (200, []),
("POST", "/repos/owner/repo/issues"): (201, {"number": 99}),
("GET", "/repos/owner/repo/labels"): (200, [{"id": 10, "name": "tier:high"}]),
("POST", "/repos/owner/repo/issues/99/labels"): (200, []),
})
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
)
methods_paths = [(c[0], c[1]) for c in stub.calls]
assert ("POST", "/repos/owner/repo/issues") in methods_paths, stub.calls
# Label apply is best-effort but should be attempted on the happy path:
assert ("POST", "/repos/owner/repo/issues/99/labels") in methods_paths, stub.calls
# --------------------------------------------------------------------------
# --dry-run flag
# --------------------------------------------------------------------------
def test_dry_run_skips_all_api_writes(drift_module, monkeypatch, capsys):
"""--dry-run: detector still runs, but no GET/POST/PATCH issue calls."""
stub = _make_stub_api({}) # any api call would assert
monkeypatch.setattr(drift_module, "api", stub)
drift_module.file_or_update(
"main",
["F2 — ci / removed-job (pull_request) has no emitter"],
{"branch": "main"},
dry_run=True,
)
assert stub.calls == [], f"dry-run must not call api(), got: {stub.calls}"
captured = capsys.readouterr()
assert "[dry-run]" in captured.out
assert "[ci-drift]" in captured.out # title rendered to stdout
def test_dry_run_flag_parsed(drift_module):
"""--dry-run is wired into argparse."""
ns = drift_module._parse_args(["--dry-run"])
assert ns.dry_run is True
ns = drift_module._parse_args([])
assert ns.dry_run is False
# --------------------------------------------------------------------------
# api() helper: raises on non-2xx + on JSON-decode failure when expected
# --------------------------------------------------------------------------
def test_api_raises_on_non_2xx(drift_module, monkeypatch):
"""api() must raise ApiError on HTTP 500 — the duplicate-issue
regression class from PR #112's review depends on this."""
class FakeHTTPError(Exception):
def __init__(self):
self.code = 500
def read(self):
return b"internal server error"
def fake_urlopen(req, timeout=30):
import urllib.error
raise urllib.error.HTTPError(
req.full_url, 500, "Internal Server Error", {}, None # type: ignore
)
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
with pytest.raises(drift_module.ApiError) as excinfo:
drift_module.api("GET", "/repos/owner/repo/issues")
assert "HTTP 500" in str(excinfo.value)
def test_api_raises_on_json_decode_when_expected(drift_module, monkeypatch):
"""api(expect_json=True) raises ApiError if body is not valid JSON.
This closes the prior `{"_raw": ...}` fallthrough that callers
could misinterpret as "JSON response with one key called _raw".
"""
class FakeResp:
status = 200
def read(self):
return b"not-json\n\n"
def __enter__(self):
return self
def __exit__(self, *a):
return False
def fake_urlopen(req, timeout=30):
return FakeResp()
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
with pytest.raises(drift_module.ApiError):
drift_module.api("GET", "/repos/owner/repo/issues")
def test_api_allows_raw_when_expect_json_false(drift_module, monkeypatch):
"""api(expect_json=False) returns the `_raw` fallthrough for endpoints
with known echo-quirks (Gitea create responses). Reserved opt-in."""
class FakeResp:
status = 201
def read(self):
return b"not-json-but-create-succeeded\n"
def __enter__(self):
return self
def __exit__(self, *a):
return False
def fake_urlopen(req, timeout=30):
return FakeResp()
monkeypatch.setattr(drift_module.urllib.request, "urlopen", fake_urlopen)
status, body = drift_module.api(
"POST", "/repos/owner/repo/issues", expect_json=False
)
assert status == 201
assert "_raw" in body