style(tests): fix ruff F401, F541, F841, E741 in 10 files #1821

Merged
agent-dev-a merged 7 commits from fix/ruff-cleanup-test-scripts-22-issues into main 2026-05-26 00:20:27 +00:00
21 changed files with 39 additions and 50 deletions
+18 -11
View File
@@ -274,7 +274,8 @@ def required_checks_env(audit_doc: dict) -> set[str]:
found.append(v)
if not found:
sys.stderr.write(
f"::error::REQUIRED_CHECKS env not found in any step of {AUDIT_WORKFLOW_PATH}\n"
f"::error::REQUIRED_CHECKS env not found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
if len(found) > 1:
@@ -387,7 +388,8 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
missing_from_needs = sorted(jobs - needs)
if missing_from_needs:
findings.append(
"F1 — jobs in ci.yml NOT under sentinel `needs:` (sentinel doesn't gate them):\n"
"F1 — jobs in ci.yml NOT under sentinel `needs:` "
"(sentinel doesn't gate them):\n"
+ "\n".join(f" - {n}" for n in missing_from_needs)
)
@@ -397,7 +399,8 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
stale_needs = sorted(needs - jobs_all)
if stale_needs:
findings.append(
"F1b — sentinel `needs:` lists jobs NOT present in ci.yml (typo or removed job):\n"
"F1b — sentinel `needs:` lists jobs NOT present in ci.yml "
"(typo or removed job):\n"
+ "\n".join(f" - {n}" for n in stale_needs)
)
@@ -405,7 +408,9 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
# Compute the contexts the CI YAML actually produces. The sentinel
# is in (B) intentionally (`ci / all-required (pull_request)`); we
# whitelist it explicitly.
emitted_contexts = {expected_context(j) for j in jobs} | {expected_context(SENTINEL_JOB)}
emitted_contexts = {
expected_context(j) for j in jobs
} | {expected_context(SENTINEL_JOB)}
# Contexts NOT produced by ci.yml may still come from other
# workflows in the repo (Secret scan etc). We can't enumerate
# every workflow's emissions cheaply; instead, flag only contexts
@@ -418,8 +423,9 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
)
if stale_protection:
findings.append(
"F2 — protection `status_check_contexts` entries with `ci / ` prefix that NO "
"job in ci.yml emits (stale name → silent advisory gate):\n"
"F2 — protection `status_check_contexts` entries with `ci / ` "
"prefix that NO job in ci.yml emits "
"(stale name → silent advisory gate):\n"
+ "\n".join(f" - {c}" for c in stale_protection)
)
@@ -494,7 +500,8 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
f"# Drift detected on `{REPO}/{branch}`",
"",
"Auto-filed by `.gitea/workflows/ci-required-drift.yml` "
"(RFC [internal#219](https://git.moleculesai.app/molecule-ai/internal/issues/219) §4 + §6).",
"(RFC [internal#219]"
"(https://git.moleculesai.app/molecule-ai/internal/issues/219) §4 + §6).",
"",
"## Findings",
"",
@@ -547,12 +554,12 @@ def file_or_update(
if dry_run:
print(f"::notice::[dry-run] would file/update drift issue for {branch}")
print(f"::group::[dry-run] title")
print("::group::[dry-run] title")
print(title)
print(f"::endgroup::")
print(f"::group::[dry-run] body")
print("::endgroup::")
print("::group::[dry-run] body")
print(body)
print(f"::endgroup::")
print("::endgroup::")
return
existing = find_open_issue(title)
+4 -1
View File
@@ -153,7 +153,10 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
parser.add_argument("--event-name", default=os.environ.get("GITHUB_EVENT_NAME", ""))
parser.add_argument("--pr-base-sha", default="")
parser.add_argument("--base-ref", default="")
parser.add_argument("--push-before", default=os.environ.get("GITHUB_EVENT_BEFORE", ""))
parser.add_argument(
"--push-before",
default=os.environ.get("GITHUB_EVENT_BEFORE", ""),
)
return parser.parse_args(argv)
+3 -1
View File
@@ -183,7 +183,9 @@ def required_contexts_green(
status = latest_statuses.get(context)
state = status_state(status or {})
if state != "success":
if pr_labels and _is_tier_low_pending_ok(latest_statuses, context, pr_labels):
if pr_labels and _is_tier_low_pending_ok(
latest_statuses, context, pr_labels
):
continue # tier:low soft-fail: accept pending sop-checklist
missing_or_bad.append(f"{context}={state or 'missing'}")
return not missing_or_bad, missing_or_bad
@@ -13,7 +13,6 @@ from __future__ import annotations
import argparse
import glob
import re
import sys
from pathlib import Path
from typing import NamedTuple
+1 -1
View File
@@ -283,7 +283,7 @@ def _ensure_labels(repo: str, names: list[str]) -> list[int]:
if status != "ok" or not isinstance(labels, list):
return []
out: list[int] = []
by_name = {l["name"]: l["id"] for l in labels if isinstance(l, dict)}
by_name = {label["name"]: label["id"] for label in labels if isinstance(label, dict)}
for n in names:
if n in by_name:
out.append(by_name[n])
@@ -82,7 +82,7 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
+2 -3
View File
@@ -338,7 +338,6 @@ def compute_ack_state(
# Filter out self-acks and unknown slugs.
ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_unknown: dict[str, list[str]] = {s: [] for s in items_by_slug}
pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug}
for (user, slug), kind in latest_directive.items():
@@ -842,7 +841,7 @@ def render_status(
def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str:
"""Read tier label, return 'hard' or 'soft' per cfg.tier_failure_mode."""
labels = pr.get("labels") or []
tier_labels = [l.get("name", "") for l in labels if (l.get("name", "") or "").startswith("tier:")]
tier_labels = [label.get("name", "") for label in labels if (label.get("name", "") or "").startswith("tier:")]
mode_map = cfg.get("tier_failure_mode") or {}
default_mode = cfg.get("default_mode", "hard")
for tl in tier_labels:
@@ -865,7 +864,7 @@ def is_high_risk(pr: dict[str, Any], cfg: dict[str, Any]) -> bool:
Governance fix for internal#442 — closes the inconsistency between
sop-tier-check (tier-aware) and sop-checklist (was tier-blind).
"""
label_set = {(l.get("name") or "") for l in (pr.get("labels") or [])}
label_set = {(label.get("name") or "") for label in (pr.get("labels") or [])}
if "tier:high" in label_set:
return True
high_risk_labels = set(cfg.get("high_risk_labels") or [])
@@ -81,7 +81,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
# GET /repos/{owner}/{name}/pulls/{pr_number}
m = re.match(r"^/api/v1/repos/([^/]+)/([^/]+)/pulls/(\d+)$", path)
if m:
owner, name, pr_num = m.group(1), m.group(2), m.group(3)
pr_num = m.group(3)
if sc == "T2_pr_closed":
return self._json(200, {
"number": int(pr_num),
@@ -151,7 +151,7 @@ class Handler(http.server.BaseHTTPRequestHandler):
# GET /teams/{team_id}/members/{username}
m = re.match(r"^/api/v1/teams/(\d+)/members/([^/]+)$", path)
if m:
team_id, login = m.group(1), m.group(2)
login = m.group(2)
if sc == "T8_team_not_member":
return self._empty(404)
if sc == "T9_team_403":
@@ -15,7 +15,6 @@ Mirrors the pattern in scripts/ops/test_check_migration_collisions.py
from __future__ import annotations
import importlib.util
import os
import sys
import unittest
from pathlib import Path
@@ -22,7 +22,6 @@ from __future__ import annotations
import os
import sys
import tempfile
import unittest
# Resolve sibling script regardless of where pytest is invoked from.
@@ -70,7 +70,7 @@ def test_diag_memory_root_writable_in_canary_mode(sim: CPSim) -> None:
key = f"canary-probe-{uuid.uuid4().hex[:8]}"
try:
val = sim.probe_memory(key)
except Exception as e:
except Exception:
# /mcp may not be exposed on this template — canary 4 will
# surface the real defect if memory is actually broken.
if os.environ.get("CANARY_STRICT_MCP") == "1":
+2 -2
View File
@@ -281,8 +281,8 @@ def main() -> int:
for prefix, peers in sorted(open_pr_collisions.items()):
peer_str = ", ".join(f"#{p['number']} ({p['headRefName']})" for p in peers)
print(f"::error::migration prefix {prefix:03d} also claimed by open PR(s): {peer_str}")
print(f"::error::rebase coordination needed — only one PR can land a given prefix; "
f"renumber yours or theirs")
print("::error::rebase coordination needed — only one PR can land a given prefix; "
"renumber yours or theirs")
return 1
-2
View File
@@ -18,9 +18,7 @@ No network. No live Gitea calls.
from __future__ import annotations
import importlib.util
import json
import os
import sys
import textwrap
from pathlib import Path
from unittest import mock
+1 -3
View File
@@ -55,9 +55,7 @@ from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
@@ -164,7 +162,7 @@ def test_bp_orphan_context_fails(envset, monkeypatch, capsys):
" all-required:\n runs-on: x\n steps:\n - run: echo hi\n",
)
m = _import_lint()
posted = _stub_api(
_stub_api(
monkeypatch,
m,
("ok", {"status_check_contexts": [
@@ -60,10 +60,8 @@ from __future__ import annotations
import importlib.util
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest import mock
import pytest
-3
View File
@@ -53,10 +53,7 @@ from __future__ import annotations
import importlib.util
import os
import subprocess
import sys
import textwrap
from pathlib import Path
from unittest import mock
import pytest
@@ -61,9 +61,7 @@ from __future__ import annotations
import importlib.util
import os
import subprocess
import sys
from pathlib import Path
from unittest import mock
import pytest
-2
View File
@@ -38,9 +38,7 @@ from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
+1 -3
View File
@@ -37,7 +37,6 @@ from __future__ import annotations
import importlib.util
import json
import os
import sys
import urllib.error
from pathlib import Path
from unittest import mock
@@ -542,7 +541,6 @@ def test_auto_close_skips_when_main_pending(wd_module, monkeypatch):
"""main pending (CI still running) at NEW_SHA → leave old issue alone.
Pending could resolve to red, so closing prematurely would lose the
breadcrumb of the prior red."""
old_title = f"[main-red] owner/repo: {SHA_RED[:10]}"
stub = _make_stub_api({
("GET", "/repos/owner/repo/branches/main"): (200, _branches_response(SHA_GREEN)),
("GET", f"/repos/owner/repo/commits/{SHA_GREEN}/status"): (
@@ -790,7 +788,7 @@ def test_emit_loki_event_prints_json_line(wd_module, capsys, monkeypatch):
captured = capsys.readouterr()
assert "main-red-watchdog event:" in captured.out
# Find the JSON payload after the prefix and verify it parses
line = [l for l in captured.out.splitlines() if "main-red-watchdog event:" in l][0]
line = [ln for ln in captured.out.splitlines() if "main-red-watchdog event:" in ln][0]
payload = json.loads(line.split("main-red-watchdog event:", 1)[1].strip())
assert payload["event_type"] == "main_red_detected"
assert payload["repo"] == "owner/repo"
-2
View File
@@ -40,7 +40,6 @@ Dependencies: stdlib + pytest + PyYAML. No network.
from __future__ import annotations
import importlib.util
import json
import os
import sys
from pathlib import Path
@@ -853,7 +852,6 @@ def test_reap_skips_combined_success_shas(sr_module, monkeypatch):
Mock 2 SHAs with combined=success + 1 with combined=failure → only
the failure-SHA's statuses get the per-context loop applied.
"""
per_context_iterated_for: list[str] = []
posts: list[tuple[str, dict]] = []
failure_statuses = [
+3 -5
View File
@@ -23,11 +23,9 @@ import json
import os
import re
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone
from typing import Any, Optional
# ── Gitea API client ────────────────────────────────────────────────────────
@@ -160,9 +158,9 @@ def signal_1_comment_scan(pr_number: int, repo: str) -> dict:
# Build reverse map: login -> (group, agent_key)
login_to_group = {}
for group, login in relevant_roles.items():
for role, l in AGENT_LOGIN_MAP.items():
if l == login:
login_to_group[l] = (group, f"core-{role}")
for role, role_login in AGENT_LOGIN_MAP.items():
if role_login == login:
login_to_group[role_login] = (group, f"core-{role}")
# Collect all agent-tag matches from comments
comments = []