fix(ci): gate-check-v3 — 3 bug fixes

Bug 1 (self-referential failure loop, #544):
  signal_6_ci now filters out its own prior status from
  check_statuses before evaluating, preventing a
  gate-check-v3 → failure → re-reads self → failure cycle.

Bug 2 (hardcoded base branch, #544):
  signal_6_ci now uses the PR's actual base branch ref
  instead of hardcoded 'main'. Caller passes PR data to
  avoid redundant API call.

Bug 3 (comment-post 403, #543):
  Wrapped POST/PATCH comment-post in try/except for
  HTTPError 403. Logs a warning and skips posting when
  the token lacks write:repository scope — verdict still
  drives exit code correctly.

Also removed 3 lines of dead code at the end of
format_comment (unreachable return after prior return).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · infra-sre 2026-05-11 19:08:04 +00:00 committed by Molecule AI Infra-Runtime-BE
parent d5114fdbef
commit f5f27cb870

View File

@ -316,7 +316,7 @@ def signal_3_staleness(pr_number: int, repo: str) -> dict:
# ── Signal 6: CI required-checks awareness ───────────────────────────────────
def signal_6_ci(pr_number: int, repo: str, branch: str = "main") -> dict:
def signal_6_ci(pr_number: int, repo: str, branch: str | None = None, pr_data: dict | None = None) -> dict:
"""
Query combined CI status for PR head commit.
Find required status checks on target branch.
@ -324,8 +324,12 @@ def signal_6_ci(pr_number: int, repo: str, branch: str = "main") -> dict:
"""
owner, name = repo.split("/", 1)
pr = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
head_sha = pr["head"]["sha"]
# Re-use PR data if already fetched by caller; otherwise fetch once.
if pr_data is None:
pr_data = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
head_sha = pr_data["head"]["sha"]
# Fall back to PR's actual base branch when no explicit branch is given
branch = branch or pr_data.get("base", {}).get("ref", "main")
# Combined status of PR head
combined = api_get(f"/repos/{owner}/{name}/commits/{head_sha}/status")
@ -334,9 +338,12 @@ def signal_6_ci(pr_number: int, repo: str, branch: str = "main") -> dict:
# Individual check statuses
# Gitea Actions uses "status" (pending/success/failure) not "state" for
# individual check entries. "state" is null for pending runs.
# Exclude our own prior status to prevent self-referential failure loops.
check_statuses = {}
for s in combined.get("statuses") or []:
check_statuses[s["context"]] = s.get("status", "pending")
ctx = s["context"]
if "gate-check" not in ctx.lower():
check_statuses[ctx] = s.get("status", "pending")
# Try to get branch protection for required checks
required_checks = []
@ -459,21 +466,21 @@ def format_comment(repo: str, pr_number: int, verdict: str, gates: list[dict], b
lines.append(f"_gate-check-v3 · repo={repo} · pr={pr_number}_")
return "\n".join(lines)
lines.append("")
lines.append(f"_gate-check-v3 · repo={repo} · pr={pr_number}_")
return "\n".join(lines)
# ── Main ─────────────────────────────────────────────────────────────────────
def run(repo: str, pr_number: int, post_comment: bool = False) -> dict:
try:
# Fetch PR once to get base ref for signal_6_ci
owner, name = repo.split("/", 1)
pr = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
base_ref = pr.get("base", {}).get("ref", "main")
gates = [
signal_1_comment_scan(pr_number, repo),
signal_2_reviews(pr_number, repo),
signal_3_staleness(pr_number, repo),
signal_6_ci(pr_number, repo),
signal_6_ci(pr_number, repo, branch=base_ref, pr_data=pr),
]
verdict, blockers = compute_verdict(gates)
@ -501,18 +508,24 @@ def run(repo: str, pr_number: int, post_comment: bool = False) -> dict:
# Check if a gate-check comment already exists to avoid spamming
existing = api_list(f"/repos/{owner}/{name}/issues/{pr_number}/comments")
our_comments = [c for c in existing if "[gate-check-v3]" in (c.get("body") or "")]
if our_comments:
# Update latest
comment_id = our_comments[-1]["id"]
url = f"{API_BASE}/repos/{owner}/{name}/issues/comments/{comment_id}"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="PATCH")
with urllib.request.urlopen(req) as r:
r.read()
else:
url = f"{API_BASE}/repos/{owner}/{name}/issues/{pr_number}/comments"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="POST")
with urllib.request.urlopen(req) as r:
r.read()
try:
if our_comments:
# Update latest
comment_id = our_comments[-1]["id"]
url = f"{API_BASE}/repos/{owner}/{name}/issues/comments/{comment_id}"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="PATCH")
with urllib.request.urlopen(req) as r:
r.read()
else:
url = f"{API_BASE}/repos/{owner}/{name}/issues/{pr_number}/comments"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="POST")
with urllib.request.urlopen(req) as r:
r.read()
except urllib.error.HTTPError as e:
if e.code == 403:
print(f"WARN: --post-comment 403 (token scope) — verdict={verdict}; skipping comment-post", file=sys.stderr)
else:
raise
return result