feat(scripts): consume CONDUCTOR_SNAPSHOT_FILE in merge-queue + reaper (#2502) #2513

Merged
agent-reviewer merged 1 commits from feat/2502-consume-conductor-snapshot into main 2026-06-10 06:28:08 +00:00
4 changed files with 369 additions and 10 deletions
+104 -9
View File
@@ -210,6 +210,60 @@ REQUIRED_APPROVALS_DEFAULT = int(_env("REQUIRED_APPROVALS", default="2") or "2")
OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "")
API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
# --------------------------------------------------------------------------
# Conductor snapshot (operator-config#158)
# --------------------------------------------------------------------------
# When the conductor tick writes a state snapshot before running the passes,
# both scripts see the SAME observed state instead of re-fetching independently
# and potentially disagreeing within the same tick.
# --------------------------------------------------------------------------
def load_conductor_snapshot() -> dict | None:
"""Load the conductor snapshot if present and fresh.
The snapshot is written by the conductor wrapper
(bin/molecule-core-cron-bot.sh conductor) to a path exported as
CONDUCTOR_SNAPSHOT_FILE. It contains open PRs + per-head combined
statuses + reviews captured in a single state-read.
Returns the parsed snapshot dict, or None if absent, unreadable,
or older than the freshness threshold (10 minutes — twice the */5
conductor cadence, so a single skipped tick does not invalidate it).
"""
path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "")
if not path:
return None
try:
with open(path, "r", encoding="utf-8") as f:
snapshot = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching")
return None
if not isinstance(snapshot, dict):
return None
ts_str = snapshot.get("ts", "")
if ts_str:
try:
from datetime import datetime, timezone
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=timezone.utc
)
age_sec = (datetime.now(timezone.utc) - ts).total_seconds()
if age_sec > 600: # 10 minutes
print(
f"::notice::conductor snapshot stale ({int(age_sec)}s); "
"self-fetching"
)
return None
except ValueError:
pass # malformed ts, treat as fresh (conservative)
return snapshot
class ApiError(RuntimeError):
pass
@@ -711,19 +765,36 @@ def get_branch_head(branch: str) -> str:
return sha
def _snapshot_status_for_sha(sha: str) -> dict | None:
"""Return a Gitea-shaped combined-status dict from the conductor snapshot
if the SHA matches an open PR head, else None."""
snapshot = load_conductor_snapshot()
if snapshot is None:
return None
for pr in (snapshot.get("prs") or []):
if pr.get("head_sha") == sha:
statuses = pr.get("statuses") or []
return {
"state": pr.get("combined_state", "unknown"),
"statuses": [
{"context": s.get("context"), "status": s.get("status")}
for s in statuses
if isinstance(s, dict)
],
}
return None
def get_combined_status(sha: str) -> dict:
"""Combined status + all individual statuses for `sha`.
The /status endpoint caps the `statuses` array at 30 entries (Gitea
default page size), so we fetch the full list via /statuses. The combined
`state` still comes from /status.
Fail-closed: BOTH the PRIMARY /status fetch AND the SECONDARY /statuses
enrichment must succeed. If either raises, the error propagates so the
caller skips this PR this tick (we never treat a failed status fetch as
green — dev-sop "no fail-open"). A paginated /statuses error must NOT
silently degrade to an incomplete status set.
Uses the conductor snapshot when available (same tick, same observed
state as the merge-queue pass), otherwise self-fetches via API.
"""
snapshot_status = _snapshot_status_for_sha(sha)
if snapshot_status is not None:
return snapshot_status
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(combined, dict):
raise ApiError(f"status for {sha} response not object")
@@ -747,7 +818,27 @@ def get_combined_status(sha: str) -> dict:
return combined
def _snapshot_pr_to_issue(pr_entry: dict) -> dict:
"""Normalise a conductor-snapshot PR entry into the shape the queue
expects from /issues (number, title, labels, pull_request sub-dict)."""
return {
"number": pr_entry.get("number"),
"title": pr_entry.get("title"),
"labels": [{"name": n} for n in (pr_entry.get("labels") or [])],
"pull_request": {"draft": False},
"created_at": "",
}
def list_queued_issues() -> list[dict]:
snapshot = load_conductor_snapshot()
if snapshot is not None:
prs = snapshot.get("prs") or []
return [
_snapshot_pr_to_issue(p)
for p in prs
if QUEUE_LABEL in (p.get("labels") or [])
]
return api_paginated(
"GET",
f"/repos/{OWNER}/{NAME}/issues",
@@ -768,6 +859,10 @@ def list_candidate_issues(*, auto_discover: bool) -> list[dict]:
back to the legacy label-filtered listing (opt-IN). Opt-out filtering and
draft-skipping happen in choose_next_candidate_issue, not here.
"""
snapshot = load_conductor_snapshot()
if snapshot is not None:
prs = snapshot.get("prs") or []
return [_snapshot_pr_to_issue(p) for p in prs]
if not auto_discover:
return list_queued_issues()
return api_paginated(
+63
View File
@@ -154,6 +154,54 @@ CANCELLED_DESCRIPTION = "Has been cancelled"
PUSH_SUFFIX = " (push)"
PULL_REQUEST_SUFFIX = " (pull_request)"
# --------------------------------------------------------------------------
# Conductor snapshot (operator-config#158)
# --------------------------------------------------------------------------
# When the conductor tick writes a state snapshot before running the passes,
# both scripts see the SAME observed state instead of re-fetching independently
# and potentially disagreeing within the same tick.
# --------------------------------------------------------------------------
def load_conductor_snapshot() -> dict | None:
"""Load the conductor snapshot if present and fresh.
Returns the parsed snapshot dict, or None if absent, unreadable,
or older than the freshness threshold (10 minutes).
"""
path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "")
if not path:
return None
try:
with open(path, "r", encoding="utf-8") as f:
snapshot = json.load(f)
except (OSError, json.JSONDecodeError) as exc:
print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching")
return None
if not isinstance(snapshot, dict):
return None
ts_str = snapshot.get("ts", "")
if ts_str:
try:
from datetime import datetime, timezone
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=timezone.utc
)
age_sec = (datetime.now(timezone.utc) - ts).total_seconds()
if age_sec > 600: # 10 minutes
print(
f"::notice::conductor snapshot stale ({int(age_sec)}s); "
"self-fetching"
)
return None
except ValueError:
pass
return snapshot
def _require_runtime_env() -> None:
"""Enforce env contract — called from `main()` only.
@@ -382,8 +430,23 @@ def get_combined_status(sha: str) -> dict:
],
...
}
Uses the conductor snapshot when the SHA matches an open PR head,
otherwise self-fetches via API.
Raises ApiError on non-2xx.
"""
snapshot = load_conductor_snapshot()
if snapshot is not None:
for pr in (snapshot.get("prs") or []):
if pr.get("head_sha") == sha:
statuses = pr.get("statuses") or []
return {
"state": pr.get("combined_state", "unknown"),
"statuses": [
{"context": s.get("context"), "state": s.get("status")}
for s in statuses
if isinstance(s, dict)
],
}
_, body = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(body, dict):
raise ApiError(f"status for {sha} response not a JSON object")
+127 -1
View File
@@ -1,4 +1,4 @@
import importlib.util
import importlib
import sys
from pathlib import Path
@@ -1778,3 +1778,129 @@ def test_print_post_batch_summary_counts_correctly(capsys):
assert "PR #1: state=ready" in out
assert "PR #2: state=waiting" in out
assert "PR #3: state=ineligible" in out
# ---------------------------------------------------------------------------
# Conductor snapshot consumption (operator-config#158 / molecule-core#2502)
# ---------------------------------------------------------------------------
import json
import os
import tempfile
def _make_snapshot(prs, ts="2026-06-10T12:00:00Z"):
return {"ts": ts, "repo": "molecule-ai/molecule-core", "prs": prs}
def test_list_candidate_issues_uses_snapshot_when_present(monkeypatch):
"""When CONDUCTOR_SNAPSHOT_FILE is present and fresh, list_candidate_issues
returns the snapshot PRs instead of hitting the API."""
snapshot = _make_snapshot([
{"number": 10, "title": "PR 10", "head_sha": "a" * 40,
"labels": ["merge-queue"],
"combined_state": "success", "statuses": []},
{"number": 20, "title": "PR 20", "head_sha": "b" * 40,
"labels": [],
"combined_state": "success", "statuses": []},
])
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
# reload so load_conductor_snapshot sees the env var
candidates = mq.list_candidate_issues(auto_discover=True)
assert len(candidates) == 2
assert [c["number"] for c in candidates] == [10, 20]
finally:
os.unlink(path)
def test_list_queued_issues_uses_snapshot_label_filter(monkeypatch):
"""list_queued_issues (opt-IN mode) filters the snapshot by QUEUE_LABEL."""
snapshot = _make_snapshot([
{"number": 11, "title": "Labeled", "head_sha": "a" * 40,
"labels": ["merge-queue"], "combined_state": "success", "statuses": []},
{"number": 22, "title": "Unlabeled", "head_sha": "b" * 40,
"labels": [], "combined_state": "success", "statuses": []},
])
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
monkeypatch.setattr(mq, "QUEUE_LABEL", "merge-queue")
queued = mq.list_queued_issues()
assert len(queued) == 1
assert queued[0]["number"] == 11
finally:
os.unlink(path)
def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch):
"""get_combined_status returns snapshot data when the SHA is an open PR head."""
head_sha = "c" * 40
snapshot = _make_snapshot([
{"number": 30, "title": "PR 30", "head_sha": head_sha,
"labels": [],
"combined_state": "failure",
"statuses": [
{"context": "CI / all-required (pull_request)", "status": "failure"},
]},
])
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
combined = mq.get_combined_status(head_sha)
assert combined["state"] == "failure"
assert len(combined["statuses"]) == 1
assert combined["statuses"][0]["context"] == "CI / all-required (pull_request)"
assert combined["statuses"][0]["status"] == "failure"
finally:
os.unlink(path)
def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch):
"""If the SHA is not in the snapshot, get_combined_status falls back to API."""
snapshot = _make_snapshot([
{"number": 40, "head_sha": "d" * 40, "labels": [],
"combined_state": "success", "statuses": []},
])
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
monkeypatch.setattr(mq, "OWNER", "o")
monkeypatch.setattr(mq, "NAME", "r")
def fake_api(method, path, **kw):
if path.endswith("/status"):
return 200, {"state": "success", "statuses": [{"context": "c1", "status": "success"}]}
if path.endswith("/statuses"):
return 200, []
raise mq.ApiError("unexpected")
monkeypatch.setattr(mq, "api", fake_api)
combined = mq.get_combined_status("e" * 40)
assert combined["state"] == "success"
finally:
os.unlink(path)
def test_load_conductor_snapshot_ignores_stale_snapshot(monkeypatch):
"""A snapshot older than 10 minutes is treated as absent (self-fetch)."""
from datetime import datetime, timezone, timedelta
old_ts = (datetime.now(timezone.utc) - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ")
snapshot = _make_snapshot([{"number": 50, "head_sha": "f" * 40, "labels": []}], ts=old_ts)
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
assert mq.load_conductor_snapshot() is None
finally:
os.unlink(path)
@@ -167,3 +167,78 @@ def test_reap_preserves_failed_pr_context_without_push_success(monkeypatch):
assert counters["preserved_pr_without_push_success"] == 2
assert posted == []
# ---------------------------------------------------------------------------
# Conductor snapshot consumption (operator-config#158 / molecule-core#2502)
# ---------------------------------------------------------------------------
import os
import tempfile
def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch):
"""When the SHA is an open PR head in the conductor snapshot, get_combined_status
returns the snapshot data instead of calling the API."""
mod = load_reaper()
head_sha = "a" * 40
snapshot = {
"ts": "2026-06-10T12:00:00Z",
"repo": "molecule-ai/molecule-core",
"prs": [
{
"number": 99,
"title": "PR 99",
"head_sha": head_sha,
"labels": [],
"combined_state": "failure",
"statuses": [
{"context": "CI / Platform (Go) (push)", "status": "failure"},
],
}
],
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
import importlib
mod = load_reaper() # reload to pick up env var
combined = mod.get_combined_status(head_sha)
assert combined["state"] == "failure"
assert len(combined["statuses"]) == 1
assert combined["statuses"][0]["context"] == "CI / Platform (Go) (push)"
finally:
os.unlink(path)
def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch):
"""If the SHA is not in the snapshot, get_combined_status falls back to API."""
mod = load_reaper()
snapshot = {
"ts": "2026-06-10T12:00:00Z",
"repo": "molecule-ai/molecule-core",
"prs": [
{"number": 1, "head_sha": "b" * 40, "labels": [],
"combined_state": "success", "statuses": []},
],
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(snapshot, f)
path = f.name
try:
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
import importlib
mod = load_reaper()
def fake_api(method, path, **kw):
if path.endswith("/status"):
return 200, {"state": "success", "statuses": []}
raise mod.ApiError("unexpected")
monkeypatch.setattr(mod, "api", fake_api)
combined = mod.get_combined_status("c" * 40)
assert combined["state"] == "success"
finally:
os.unlink(path)