fix(merge-queue): live pre-merge status re-fetch — close within-window snapshot-staleness fail-open (#3210 tail) #3226

Merged
devops-engineer merged 1 commits from fix/merge-gate-live-recheck-3210d into main 2026-06-24 09:38:14 +00:00
2 changed files with 424 additions and 13 deletions
+96 -4
View File
@@ -1636,15 +1636,26 @@ def _snapshot_status_for_sha(sha: str) -> dict | None:
return None
def get_combined_status(sha: str) -> dict:
def get_combined_status(sha: str, *, prefer_live: bool = False) -> dict:
"""Combined status + all individual statuses for `sha`.
Uses the conductor snapshot when available (same tick, same observed
state as the merge-queue pass), otherwise self-fetches via API.
`prefer_live=True` BYPASSES the snapshot entirely and always self-fetches
the LIVE state from the Gitea status API. The cheap enumeration/scan pass
(the decision) reads the snapshot for throughput, but the FINAL pre-merge
re-check (internal#3210 — process_once, immediately before merge_pull)
needs ground truth: a required/enforced/critical context that flipped to
RED *within* the snapshot's freshness window — AFTER the snapshot was
captured but before this tick acts — is still GREEN in the snapshot, so a
snapshot read here would re-confirm a stale green and merge a now-red PR.
A live read closes that within-window staleness gap to ~0.
"""
snapshot_status = _snapshot_status_for_sha(sha)
if snapshot_status is not None:
return snapshot_status
if not prefer_live:
snapshot_status = _snapshot_status_for_sha(sha)
if snapshot_status is not None:
return snapshot_status
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(combined, dict):
@@ -1867,6 +1878,58 @@ def merge_pull(pr_number: int, *, dry_run: bool, force: bool = False) -> None:
raise # re-raise other ApiErrors unchanged
def live_premerge_status_regressions(
head_sha: str,
*,
required_contexts: list[str],
enforced_file_contexts: list[str],
) -> list[str]:
"""internal#3210 — final pre-merge re-check against LIVE status.
The merge decision (`evaluate_merge_readiness`) may run its status gates
against the conductor SNAPSHOT, which is trusted while it is within its
freshness window. But a required/enforced/critical context can flip to RED
*inside* that window — AFTER the snapshot was captured — and still read
GREEN from the snapshot. `process_once` only re-checks that MAIN has not
moved before the merge POST; it does NOT re-verify the candidate's own
statuses live. This re-fetches the candidate head's combined status with
the snapshot BYPASSED (`prefer_live=True`) and re-runs the SAME status
gates the decision used:
* critical_contexts_block (CRITICAL_REQUIRED_CONTEXT_PREFIXES),
* required_contexts_green(live_latest, required_contexts),
* enforced_file_contexts_green(live_latest, enforced) — only if the
enforced set is non-empty (matches evaluate_merge_readiness step 4b).
Returns a list of human-readable regression reasons (now red/missing where
the decision saw green). An EMPTY list means LIVE state still satisfies
every status gate and the merge may proceed. Any non-empty list means the
candidate must be SKIPPED (treated as `wait`, never merged) this tick.
This does NOT re-do the approvals / REQUEST_CHANGES / mergeable gates —
those are not snapshot-sourced status flips and process_once already holds
them via the decision. It re-runs ONLY the status-context gates, which are
exactly the ones the snapshot can stale.
"""
live_status = get_combined_status(head_sha, prefer_live=True)
live_latest = latest_statuses_by_context(live_status.get("statuses") or [])
regressions: list[str] = []
# Same order evaluate_merge_readiness checks: critical first (force cannot
# bypass), then BP+governance required, then the enforced-file SSOT set.
critical_block = critical_contexts_block(live_latest)
if critical_block:
regressions.extend(critical_block)
ok, bad = required_contexts_green(live_latest, required_contexts)
if not ok:
regressions.extend(bad)
if enforced_file_contexts:
efok, efbad = enforced_file_contexts_green(live_latest, enforced_file_contexts)
if not efok:
regressions.extend(efbad)
return regressions
def process_once(*, dry_run: bool = False) -> int:
# Required status contexts come from BRANCH PROTECTION, not a hand-kept env
# list. Fail-closed: if BP cannot be enumerated, HOLD the whole tick rather
@@ -2025,6 +2088,30 @@ def process_once(*, dry_run: bool = False) -> int:
"deferring to next tick"
)
return 0
# FINAL pre-merge re-check against LIVE status (internal#3210).
# The decision above may have run its status gates against the
# conductor snapshot; a required/enforced/critical context can
# flip to RED within the snapshot's freshness window AFTER it was
# captured and still read GREEN there. Re-fetch this candidate's
# head statuses live (snapshot BYPASSED) and re-run the SAME
# status gates. On ANY regression, SKIP this PR (treat as wait —
# never merge a snapshot-green-but-now-red head) and keep scanning
# so a genuinely-ready PR behind it can still merge this tick. One
# extra GET, only for the single candidate about to merge.
head_sha = ctx.get("head_sha")
if isinstance(head_sha, str) and head_sha:
regressions = live_premerge_status_regressions(
head_sha,
required_contexts=contexts,
enforced_file_contexts=enforced_file_contexts,
)
if regressions:
print(
f"::notice::PR #{pr_number} SKIPPED: live pre-merge "
f"re-check found {', '.join(regressions)} "
"(snapshot was stale)"
)
continue # skip — keep scanning for a still-ready candidate
try:
merge_pull(pr_number, dry_run=dry_run, force=decision.force)
except MergePermissionError as exc:
@@ -2259,6 +2346,11 @@ def _evaluate_candidate(
head_sha = pr.get("head", {}).get("sha")
if not isinstance(head_sha, str) or len(head_sha) < 7:
raise ApiError(f"PR #{pr_number} missing head sha")
# Surface the candidate head SHA so process_once can re-fetch its LIVE
# combined status immediately before the merge POST (internal#3210
# pre-merge re-check). The decision below may run against the conductor
# snapshot; the final gate must re-verify against ground truth.
ctx["head_sha"] = head_sha
commits = get_pull_commits(pr_number)
current_base = pr_has_current_base(pr, commits, main_sha)
# Fail-closed: a failed status fetch raises here and propagates (the PR is
+328 -9
View File
@@ -731,7 +731,7 @@ def test_process_once_holds_pr_on_permanent_merge_error(monkeypatch):
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success", "statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
return {"state": "success", "statuses": [
@@ -805,7 +805,7 @@ def _fully_ready_process_once_monkeypatch(monkeypatch, mergeable, calls):
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success", "statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
return {"state": "success", "statuses": [
@@ -979,7 +979,7 @@ def test_process_once_pauses_when_main_not_green_no_direct_merge(monkeypatch):
_fully_ready_process_once_monkeypatch(monkeypatch, mergeable=True, calls=calls)
main_sha = "b" * 40
def red_main_combined(sha):
def red_main_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "failure",
"statuses": [{"context": "CI / all-required (push)", "status": "failure"}]}
@@ -1049,7 +1049,7 @@ def test_status_fetch_failure_is_fail_closed(monkeypatch):
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
@@ -1218,7 +1218,7 @@ def _stale_pr_update_409_monkeypatch(monkeypatch, queued_issues, calls):
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success", "statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
return {"state": "success", "statuses": [
@@ -1494,7 +1494,7 @@ def _wire_ready_process_once(monkeypatch, *, issues, pr_payload, calls):
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success", "statuses": [
{"context": "CI / all-required (push)", "status": "success"},
@@ -1616,7 +1616,7 @@ def test_process_once_does_not_merge_red_required_pr(monkeypatch):
)
# Required PR context is FAILURE; main stays green.
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
@@ -1685,7 +1685,7 @@ def _wire_multi_candidate_process_once(monkeypatch, *, issues, pulls, reviews, c
))
monkeypatch.setattr(mq, "get_branch_head", lambda branch: MAIN_SHA)
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == MAIN_SHA:
return {"state": "success", "statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
return {"state": "success", "statuses": [
@@ -1820,7 +1820,7 @@ def test_hol_unready_red_required_ci_is_skipped_for_ready_pr(monkeypatch):
calls=calls,
)
# PR 600's required PR context is FAILURE; 601 (and main) stay green.
def fake_combined(sha):
def fake_combined(sha, *, prefer_live=False):
if sha == MAIN_SHA:
return {"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}]}
@@ -3340,3 +3340,322 @@ def test_parked_pending_context_does_not_block(tmp_path):
assert decision.ready is True
assert decision.action == "merge"
assert decision.force is True # advisory red present → force bypass
# ==========================================================================
# internal#3210 (final tail) — LIVE pre-merge re-check vs snapshot staleness.
#
# get_combined_status prefers the conductor snapshot while it is within its
# freshness window. A required/enforced/critical context can flip to RED
# *inside* that window, AFTER the snapshot was captured but before the queue
# acts, and still read GREEN from the snapshot — so the merge DECISION sees
# green. process_once only re-checks that MAIN has not moved before the merge
# POST; it did NOT re-verify the candidate head's own statuses live. These
# tests cover the fix: an extra LIVE (snapshot-bypassed) status read of the
# single candidate about to merge, re-running the same status gates, that
# SKIPS the PR on any regression.
# ==========================================================================
def _live_recheck_process_once_monkeypatch(
monkeypatch, *, live_pr_statuses, calls, decision_pr_statuses=None
):
"""Wire process_once fully-ready EXCEPT the candidate head's LIVE statuses
are configurable independently of the DECISION-pass statuses.
The decision pass (get_combined_status with prefer_live=False — the
snapshot/scan read) sees `decision_pr_statuses` (a fully-GREEN default), so
the decision is `merge`. The FINAL live pre-merge re-check (prefer_live=
True) returns `live_pr_statuses` — set it RED to model a within-window flip.
Records every merge attempt in calls["merge_attempts"] and every live
re-fetch in calls["live_refetch_shas"] (spy)."""
monkeypatch.setattr(mq, "OWNER", "molecule-ai")
monkeypatch.setattr(mq, "NAME", "molecule-core")
monkeypatch.setattr(mq, "WATCH_BRANCH", "main")
monkeypatch.setattr(mq, "QUEUE_LABEL", "merge-queue")
monkeypatch.setattr(mq, "HOLD_LABEL", "merge-queue-hold")
monkeypatch.setattr(mq, "AUTO_DISCOVER", True)
monkeypatch.setattr(mq, "OPT_OUT_LABELS", {"merge-queue-hold", "do-not-auto-merge", "wip"})
monkeypatch.setattr(mq, "REVIEWER_SET", REVIEWERS)
monkeypatch.setattr(mq, "get_branch_protection", lambda branch: mq.BranchProtection(
required_contexts=["CI / all-required (pull_request)"],
required_approvals=2,
block_on_rejected_reviews=True,
))
main_sha = "b" * 40
head_sha = "a" * 40
monkeypatch.setattr(mq, "get_branch_head", lambda branch: main_sha)
green_pr_statuses = decision_pr_statuses if decision_pr_statuses is not None else [
{"context": "CI / all-required (pull_request)", "status": "success"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
]
def fake_combined(sha, *, prefer_live=False):
if sha == main_sha:
return {"state": "success", "statuses": [
{"context": "CI / all-required (push)", "status": "success"}]}
if prefer_live:
# The FINAL pre-merge re-check — return the configurable LIVE set.
calls["live_refetch_shas"].append(sha)
return {"state": "unknown", "statuses": list(live_pr_statuses)}
# The DECISION pass (snapshot/scan) — fully green by default.
return {"state": "success", "statuses": list(green_pr_statuses)}
monkeypatch.setattr(mq, "get_combined_status", fake_combined)
monkeypatch.setattr(mq, "list_candidate_issues", lambda *, auto_discover: [
{"number": 333, "pull_request": {}, "labels": [{"name": "merge-queue"}],
"created_at": "2026-06-01T00:00:00Z"},
])
monkeypatch.setattr(mq, "get_pull", lambda n: {
"state": "open", "number": n, "mergeable": True,
"base": {"ref": "main", "repo_id": 1},
"head": {"sha": head_sha, "repo_id": 1},
"labels": [{"name": "merge-queue"}],
})
monkeypatch.setattr(mq, "get_pull_commits", lambda n: [{"sha": main_sha}, {"sha": head_sha}])
monkeypatch.setattr(mq, "get_pull_reviews", lambda n: [
{"state": "APPROVED", "user": {"login": "agent-researcher"},
"official": True, "stale": False, "dismissed": False, "commit_id": head_sha},
{"state": "APPROVED", "user": {"login": "agent-reviewer-cr2"},
"official": True, "stale": False, "dismissed": False, "commit_id": head_sha},
])
def fake_merge(pr_number, *, dry_run, force=False):
calls["merge_attempts"] += 1
monkeypatch.setattr(mq, "merge_pull", fake_merge)
monkeypatch.setattr(mq, "add_label_by_name", lambda *a, **k: None)
monkeypatch.setattr(mq, "update_pull", lambda *a, **k: calls.__setitem__("updated", True))
monkeypatch.setattr(mq, "post_comment", lambda *a, **k: None)
return head_sha
def test_process_once_live_recheck_red_required_aborts_merge(monkeypatch, capsys):
"""(a) GREEN in the snapshot/decision pass but RED on the LIVE pre-merge
re-fetch → the PR is NOT merged. A required context that flipped to red
within the snapshot's freshness window (after capture) must abort the
merge for this candidate (treat as skip), closing the within-window
staleness fail-open."""
calls = {"merge_attempts": 0, "live_refetch_shas": [], "updated": False}
head_sha = _live_recheck_process_once_monkeypatch(
monkeypatch,
live_pr_statuses=[
# A REQUIRED context now RED on the live re-fetch.
{"context": "CI / all-required (pull_request)", "status": "failure"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
],
calls=calls,
)
rc = mq.process_once(dry_run=False)
assert rc == 0
# The fail-open this closes: a snapshot-green-but-live-red head must NOT merge.
assert calls["merge_attempts"] == 0
# The live re-check must have been invoked for the candidate head (spy).
assert calls["live_refetch_shas"] == [head_sha]
out = capsys.readouterr().out
assert "PR #333 SKIPPED: live pre-merge re-check" in out
assert "CI / all-required" in out
def test_process_once_live_recheck_red_critical_aborts_merge(monkeypatch, capsys):
"""(a, variant) A CRITICAL context (CI / Platform (Go)) flipping red on the
live re-fetch also aborts — the critical guard is re-run live and force
cannot bypass it."""
calls = {"merge_attempts": 0, "live_refetch_shas": [], "updated": False}
head_sha = _live_recheck_process_once_monkeypatch(
monkeypatch,
live_pr_statuses=[
{"context": "CI / all-required (pull_request)", "status": "success"},
# CRITICAL context now RED on the live re-fetch.
{"context": "CI / Platform (Go) (pull_request)", "status": "failure"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
],
calls=calls,
)
rc = mq.process_once(dry_run=False)
assert rc == 0
assert calls["merge_attempts"] == 0
assert calls["live_refetch_shas"] == [head_sha]
out = capsys.readouterr().out
assert "PR #333 SKIPPED: live pre-merge re-check" in out
assert "CI / Platform (Go)" in out
def test_process_once_live_recheck_green_merges_normally(monkeypatch, capsys):
"""(b) GREEN in the snapshot AND GREEN on the live pre-merge re-fetch →
merges normally. Regression guard: the live re-check must not block a
genuinely-ready PR, and it IS invoked before the merge (spy)."""
calls = {"merge_attempts": 0, "live_refetch_shas": [], "updated": False}
head_sha = _live_recheck_process_once_monkeypatch(
monkeypatch,
live_pr_statuses=[
{"context": "CI / all-required (pull_request)", "status": "success"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
],
calls=calls,
)
rc = mq.process_once(dry_run=False)
assert rc == 0
# Still green live → merges.
assert calls["merge_attempts"] == 1
# (c) The live re-fetch IS invoked before the merge.
assert calls["live_refetch_shas"] == [head_sha]
out = capsys.readouterr().out
assert "SKIPPED: live pre-merge re-check" not in out
def test_process_once_live_recheck_red_enforced_file_context_aborts_merge(monkeypatch, capsys):
"""(a, enforced-SSOT variant) An ENFORCED `.gitea/required-contexts.txt`
entry green-in-snapshot but RED on the live re-fetch aborts the merge — the
enforced-file gate is re-run live too (only when the enforced set is
non-empty, matching evaluate_merge_readiness step 4b)."""
calls = {"merge_attempts": 0, "live_refetch_shas": [], "updated": False}
enforced_green = {
"context": "E2E Staging SaaS (full lifecycle) / E2E Staging Concierge "
"Creates Workspace (pull_request)",
"status": "success",
}
head_sha = _live_recheck_process_once_monkeypatch(
monkeypatch,
# DECISION pass: the enforced context is GREEN, so the decision reaches
# `merge` (this models the snapshot the decision trusted).
decision_pr_statuses=[
{"context": "CI / all-required (pull_request)", "status": "success"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
enforced_green,
],
# LIVE re-fetch: the SAME enforced context has flipped RED.
live_pr_statuses=[
{"context": "CI / all-required (pull_request)", "status": "success"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
{"context": "qa-review / approved (pull_request_target)", "status": "success"},
{"context": "security-review / approved (pull_request_target)", "status": "success"},
{"context": "sop-checklist / all-items-acked (pull_request_target)", "status": "success"},
{"context": "E2E Staging SaaS (full lifecycle) / E2E Staging Concierge "
"Creates Workspace (pull_request)", "status": "failure"},
],
calls=calls,
)
# The SAME enforced set the decision used must be re-checked live. Override
# the autouse [] stub for THIS test so process_once sees a non-empty
# enforced set (event-stripped form, as the loader produces).
monkeypatch.setattr(
mq, "load_enforced_file_contexts",
lambda path: [
"E2E Staging SaaS (full lifecycle) / E2E Staging Concierge Creates Workspace"
],
)
rc = mq.process_once(dry_run=False)
assert rc == 0
assert calls["merge_attempts"] == 0
assert calls["live_refetch_shas"] == [head_sha]
out = capsys.readouterr().out
assert "PR #333 SKIPPED: live pre-merge re-check" in out
assert "E2E Staging Concierge Creates Workspace" in out
def test_get_combined_status_prefer_live_bypasses_snapshot(monkeypatch):
"""The live re-fetch must GENUINELY bypass the snapshot. With a fresh
snapshot present for the SHA, get_combined_status(sha) returns the snapshot,
but get_combined_status(sha, prefer_live=True) hits the live API instead."""
import json as _json
import os as _os
import tempfile as _tempfile
head_sha = "c" * 40
snapshot = _make_snapshot([
{"number": 90, "title": "PR 90", "head_sha": head_sha, "labels": [],
"combined_state": "success",
"statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}]},
])
with _tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
_json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
monkeypatch.setattr(mq, "OWNER", "o")
monkeypatch.setattr(mq, "NAME", "r")
live_calls = {"n": 0}
def fake_api(method, path, **kw):
if path.endswith("/status"):
live_calls["n"] += 1
# LIVE state differs from the snapshot (now failure).
return 200, {"state": "failure", "statuses": [
{"context": "CI / all-required (pull_request)", "status": "failure"}]}
if path.endswith("/statuses"):
return 200, []
raise mq.ApiError("unexpected")
monkeypatch.setattr(mq, "api", fake_api)
# Default (snapshot) read: returns the snapshot's GREEN, no live call.
snap = mq.get_combined_status(head_sha)
assert snap["state"] == "success"
assert live_calls["n"] == 0
# prefer_live: bypasses the snapshot, hits the live API → FAILURE.
live = mq.get_combined_status(head_sha, prefer_live=True)
assert live["state"] == "failure"
assert live_calls["n"] == 1
finally:
_os.unlink(path)
def test_live_premerge_status_regressions_helper(monkeypatch):
"""Unit: live_premerge_status_regressions returns [] when live is green and
a non-empty reason list when a required/critical/enforced context is red,
and it always reads with prefer_live=True (snapshot bypassed)."""
seen = {"prefer_live": None}
def fake_combined(sha, *, prefer_live=False):
seen["prefer_live"] = prefer_live
return {"state": "success", "statuses": [
{"context": "CI / all-required (pull_request)", "status": "success"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
]}
monkeypatch.setattr(mq, "get_combined_status", fake_combined)
regressions = mq.live_premerge_status_regressions(
"a" * 40,
required_contexts=["CI / all-required (pull_request)"],
enforced_file_contexts=[],
)
assert regressions == []
assert seen["prefer_live"] is True # snapshot genuinely bypassed
def fake_combined_red(sha, *, prefer_live=False):
return {"state": "failure", "statuses": [
{"context": "CI / all-required (pull_request)", "status": "failure"},
{"context": "CI / Platform (Go) (pull_request)", "status": "success"},
]}
monkeypatch.setattr(mq, "get_combined_status", fake_combined_red)
regressions = mq.live_premerge_status_regressions(
"a" * 40,
required_contexts=["CI / all-required (pull_request)"],
enforced_file_contexts=[],
)
assert regressions # non-empty → abort
assert any("CI / all-required" in r for r in regressions)