feat(scripts): consume CONDUCTOR_SNAPSHOT_FILE in merge-queue + reaper (#2502) #2513
@@ -210,6 +210,60 @@ REQUIRED_APPROVALS_DEFAULT = int(_env("REQUIRED_APPROVALS", default="2") or "2")
|
||||
OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "")
|
||||
API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Conductor snapshot (operator-config#158)
|
||||
# --------------------------------------------------------------------------
|
||||
# When the conductor tick writes a state snapshot before running the passes,
|
||||
# both scripts see the SAME observed state instead of re-fetching independently
|
||||
# and potentially disagreeing within the same tick.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def load_conductor_snapshot() -> dict | None:
|
||||
"""Load the conductor snapshot if present and fresh.
|
||||
|
||||
The snapshot is written by the conductor wrapper
|
||||
(bin/molecule-core-cron-bot.sh conductor) to a path exported as
|
||||
CONDUCTOR_SNAPSHOT_FILE. It contains open PRs + per-head combined
|
||||
statuses + reviews captured in a single state-read.
|
||||
|
||||
Returns the parsed snapshot dict, or None if absent, unreadable,
|
||||
or older than the freshness threshold (10 minutes — twice the */5
|
||||
conductor cadence, so a single skipped tick does not invalidate it).
|
||||
"""
|
||||
path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "")
|
||||
if not path:
|
||||
return None
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
snapshot = json.load(f)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching")
|
||||
return None
|
||||
|
||||
if not isinstance(snapshot, dict):
|
||||
return None
|
||||
|
||||
ts_str = snapshot.get("ts", "")
|
||||
if ts_str:
|
||||
try:
|
||||
from datetime import datetime, timezone
|
||||
|
||||
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
age_sec = (datetime.now(timezone.utc) - ts).total_seconds()
|
||||
if age_sec > 600: # 10 minutes
|
||||
print(
|
||||
f"::notice::conductor snapshot stale ({int(age_sec)}s); "
|
||||
"self-fetching"
|
||||
)
|
||||
return None
|
||||
except ValueError:
|
||||
pass # malformed ts, treat as fresh (conservative)
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
class ApiError(RuntimeError):
|
||||
pass
|
||||
@@ -711,19 +765,36 @@ def get_branch_head(branch: str) -> str:
|
||||
return sha
|
||||
|
||||
|
||||
def _snapshot_status_for_sha(sha: str) -> dict | None:
|
||||
"""Return a Gitea-shaped combined-status dict from the conductor snapshot
|
||||
if the SHA matches an open PR head, else None."""
|
||||
snapshot = load_conductor_snapshot()
|
||||
if snapshot is None:
|
||||
return None
|
||||
for pr in (snapshot.get("prs") or []):
|
||||
if pr.get("head_sha") == sha:
|
||||
statuses = pr.get("statuses") or []
|
||||
return {
|
||||
"state": pr.get("combined_state", "unknown"),
|
||||
"statuses": [
|
||||
{"context": s.get("context"), "status": s.get("status")}
|
||||
for s in statuses
|
||||
if isinstance(s, dict)
|
||||
],
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
def get_combined_status(sha: str) -> dict:
|
||||
"""Combined status + all individual statuses for `sha`.
|
||||
|
||||
The /status endpoint caps the `statuses` array at 30 entries (Gitea
|
||||
default page size), so we fetch the full list via /statuses. The combined
|
||||
`state` still comes from /status.
|
||||
|
||||
Fail-closed: BOTH the PRIMARY /status fetch AND the SECONDARY /statuses
|
||||
enrichment must succeed. If either raises, the error propagates so the
|
||||
caller skips this PR this tick (we never treat a failed status fetch as
|
||||
green — dev-sop "no fail-open"). A paginated /statuses error must NOT
|
||||
silently degrade to an incomplete status set.
|
||||
Uses the conductor snapshot when available (same tick, same observed
|
||||
state as the merge-queue pass), otherwise self-fetches via API.
|
||||
"""
|
||||
snapshot_status = _snapshot_status_for_sha(sha)
|
||||
if snapshot_status is not None:
|
||||
return snapshot_status
|
||||
|
||||
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(combined, dict):
|
||||
raise ApiError(f"status for {sha} response not object")
|
||||
@@ -747,7 +818,27 @@ def get_combined_status(sha: str) -> dict:
|
||||
return combined
|
||||
|
||||
|
||||
def _snapshot_pr_to_issue(pr_entry: dict) -> dict:
|
||||
"""Normalise a conductor-snapshot PR entry into the shape the queue
|
||||
expects from /issues (number, title, labels, pull_request sub-dict)."""
|
||||
return {
|
||||
"number": pr_entry.get("number"),
|
||||
"title": pr_entry.get("title"),
|
||||
"labels": [{"name": n} for n in (pr_entry.get("labels") or [])],
|
||||
"pull_request": {"draft": False},
|
||||
"created_at": "",
|
||||
}
|
||||
|
||||
|
||||
def list_queued_issues() -> list[dict]:
|
||||
snapshot = load_conductor_snapshot()
|
||||
if snapshot is not None:
|
||||
prs = snapshot.get("prs") or []
|
||||
return [
|
||||
_snapshot_pr_to_issue(p)
|
||||
for p in prs
|
||||
if QUEUE_LABEL in (p.get("labels") or [])
|
||||
]
|
||||
return api_paginated(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/issues",
|
||||
@@ -768,6 +859,10 @@ def list_candidate_issues(*, auto_discover: bool) -> list[dict]:
|
||||
back to the legacy label-filtered listing (opt-IN). Opt-out filtering and
|
||||
draft-skipping happen in choose_next_candidate_issue, not here.
|
||||
"""
|
||||
snapshot = load_conductor_snapshot()
|
||||
if snapshot is not None:
|
||||
prs = snapshot.get("prs") or []
|
||||
return [_snapshot_pr_to_issue(p) for p in prs]
|
||||
if not auto_discover:
|
||||
return list_queued_issues()
|
||||
return api_paginated(
|
||||
|
||||
@@ -154,6 +154,54 @@ CANCELLED_DESCRIPTION = "Has been cancelled"
|
||||
PUSH_SUFFIX = " (push)"
|
||||
PULL_REQUEST_SUFFIX = " (pull_request)"
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Conductor snapshot (operator-config#158)
|
||||
# --------------------------------------------------------------------------
|
||||
# When the conductor tick writes a state snapshot before running the passes,
|
||||
# both scripts see the SAME observed state instead of re-fetching independently
|
||||
# and potentially disagreeing within the same tick.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def load_conductor_snapshot() -> dict | None:
|
||||
"""Load the conductor snapshot if present and fresh.
|
||||
|
||||
Returns the parsed snapshot dict, or None if absent, unreadable,
|
||||
or older than the freshness threshold (10 minutes).
|
||||
"""
|
||||
path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "")
|
||||
if not path:
|
||||
return None
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
snapshot = json.load(f)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching")
|
||||
return None
|
||||
|
||||
if not isinstance(snapshot, dict):
|
||||
return None
|
||||
|
||||
ts_str = snapshot.get("ts", "")
|
||||
if ts_str:
|
||||
try:
|
||||
from datetime import datetime, timezone
|
||||
|
||||
ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
age_sec = (datetime.now(timezone.utc) - ts).total_seconds()
|
||||
if age_sec > 600: # 10 minutes
|
||||
print(
|
||||
f"::notice::conductor snapshot stale ({int(age_sec)}s); "
|
||||
"self-fetching"
|
||||
)
|
||||
return None
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return snapshot
|
||||
|
||||
|
||||
def _require_runtime_env() -> None:
|
||||
"""Enforce env contract — called from `main()` only.
|
||||
@@ -382,8 +430,23 @@ def get_combined_status(sha: str) -> dict:
|
||||
],
|
||||
...
|
||||
}
|
||||
Uses the conductor snapshot when the SHA matches an open PR head,
|
||||
otherwise self-fetches via API.
|
||||
Raises ApiError on non-2xx.
|
||||
"""
|
||||
snapshot = load_conductor_snapshot()
|
||||
if snapshot is not None:
|
||||
for pr in (snapshot.get("prs") or []):
|
||||
if pr.get("head_sha") == sha:
|
||||
statuses = pr.get("statuses") or []
|
||||
return {
|
||||
"state": pr.get("combined_state", "unknown"),
|
||||
"statuses": [
|
||||
{"context": s.get("context"), "state": s.get("status")}
|
||||
for s in statuses
|
||||
if isinstance(s, dict)
|
||||
],
|
||||
}
|
||||
_, body = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(body, dict):
|
||||
raise ApiError(f"status for {sha} response not a JSON object")
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import importlib.util
|
||||
import importlib
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
@@ -1778,3 +1778,129 @@ def test_print_post_batch_summary_counts_correctly(capsys):
|
||||
assert "PR #1: state=ready" in out
|
||||
assert "PR #2: state=waiting" in out
|
||||
assert "PR #3: state=ineligible" in out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conductor snapshot consumption (operator-config#158 / molecule-core#2502)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
|
||||
def _make_snapshot(prs, ts="2026-06-10T12:00:00Z"):
|
||||
return {"ts": ts, "repo": "molecule-ai/molecule-core", "prs": prs}
|
||||
|
||||
|
||||
def test_list_candidate_issues_uses_snapshot_when_present(monkeypatch):
|
||||
"""When CONDUCTOR_SNAPSHOT_FILE is present and fresh, list_candidate_issues
|
||||
returns the snapshot PRs instead of hitting the API."""
|
||||
snapshot = _make_snapshot([
|
||||
{"number": 10, "title": "PR 10", "head_sha": "a" * 40,
|
||||
"labels": ["merge-queue"],
|
||||
"combined_state": "success", "statuses": []},
|
||||
{"number": 20, "title": "PR 20", "head_sha": "b" * 40,
|
||||
"labels": [],
|
||||
"combined_state": "success", "statuses": []},
|
||||
])
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
# reload so load_conductor_snapshot sees the env var
|
||||
candidates = mq.list_candidate_issues(auto_discover=True)
|
||||
assert len(candidates) == 2
|
||||
assert [c["number"] for c in candidates] == [10, 20]
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_list_queued_issues_uses_snapshot_label_filter(monkeypatch):
|
||||
"""list_queued_issues (opt-IN mode) filters the snapshot by QUEUE_LABEL."""
|
||||
snapshot = _make_snapshot([
|
||||
{"number": 11, "title": "Labeled", "head_sha": "a" * 40,
|
||||
"labels": ["merge-queue"], "combined_state": "success", "statuses": []},
|
||||
{"number": 22, "title": "Unlabeled", "head_sha": "b" * 40,
|
||||
"labels": [], "combined_state": "success", "statuses": []},
|
||||
])
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
monkeypatch.setattr(mq, "QUEUE_LABEL", "merge-queue")
|
||||
queued = mq.list_queued_issues()
|
||||
assert len(queued) == 1
|
||||
assert queued[0]["number"] == 11
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch):
|
||||
"""get_combined_status returns snapshot data when the SHA is an open PR head."""
|
||||
head_sha = "c" * 40
|
||||
snapshot = _make_snapshot([
|
||||
{"number": 30, "title": "PR 30", "head_sha": head_sha,
|
||||
"labels": [],
|
||||
"combined_state": "failure",
|
||||
"statuses": [
|
||||
{"context": "CI / all-required (pull_request)", "status": "failure"},
|
||||
]},
|
||||
])
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
combined = mq.get_combined_status(head_sha)
|
||||
assert combined["state"] == "failure"
|
||||
assert len(combined["statuses"]) == 1
|
||||
assert combined["statuses"][0]["context"] == "CI / all-required (pull_request)"
|
||||
assert combined["statuses"][0]["status"] == "failure"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch):
|
||||
"""If the SHA is not in the snapshot, get_combined_status falls back to API."""
|
||||
snapshot = _make_snapshot([
|
||||
{"number": 40, "head_sha": "d" * 40, "labels": [],
|
||||
"combined_state": "success", "statuses": []},
|
||||
])
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
monkeypatch.setattr(mq, "OWNER", "o")
|
||||
monkeypatch.setattr(mq, "NAME", "r")
|
||||
|
||||
def fake_api(method, path, **kw):
|
||||
if path.endswith("/status"):
|
||||
return 200, {"state": "success", "statuses": [{"context": "c1", "status": "success"}]}
|
||||
if path.endswith("/statuses"):
|
||||
return 200, []
|
||||
raise mq.ApiError("unexpected")
|
||||
|
||||
monkeypatch.setattr(mq, "api", fake_api)
|
||||
combined = mq.get_combined_status("e" * 40)
|
||||
assert combined["state"] == "success"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_load_conductor_snapshot_ignores_stale_snapshot(monkeypatch):
|
||||
"""A snapshot older than 10 minutes is treated as absent (self-fetch)."""
|
||||
from datetime import datetime, timezone, timedelta
|
||||
old_ts = (datetime.now(timezone.utc) - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
snapshot = _make_snapshot([{"number": 50, "head_sha": "f" * 40, "labels": []}], ts=old_ts)
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
assert mq.load_conductor_snapshot() is None
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
@@ -167,3 +167,78 @@ def test_reap_preserves_failed_pr_context_without_push_success(monkeypatch):
|
||||
|
||||
assert counters["preserved_pr_without_push_success"] == 2
|
||||
assert posted == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Conductor snapshot consumption (operator-config#158 / molecule-core#2502)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
|
||||
def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch):
|
||||
"""When the SHA is an open PR head in the conductor snapshot, get_combined_status
|
||||
returns the snapshot data instead of calling the API."""
|
||||
mod = load_reaper()
|
||||
head_sha = "a" * 40
|
||||
snapshot = {
|
||||
"ts": "2026-06-10T12:00:00Z",
|
||||
"repo": "molecule-ai/molecule-core",
|
||||
"prs": [
|
||||
{
|
||||
"number": 99,
|
||||
"title": "PR 99",
|
||||
"head_sha": head_sha,
|
||||
"labels": [],
|
||||
"combined_state": "failure",
|
||||
"statuses": [
|
||||
{"context": "CI / Platform (Go) (push)", "status": "failure"},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
import importlib
|
||||
mod = load_reaper() # reload to pick up env var
|
||||
combined = mod.get_combined_status(head_sha)
|
||||
assert combined["state"] == "failure"
|
||||
assert len(combined["statuses"]) == 1
|
||||
assert combined["statuses"][0]["context"] == "CI / Platform (Go) (push)"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch):
|
||||
"""If the SHA is not in the snapshot, get_combined_status falls back to API."""
|
||||
mod = load_reaper()
|
||||
snapshot = {
|
||||
"ts": "2026-06-10T12:00:00Z",
|
||||
"repo": "molecule-ai/molecule-core",
|
||||
"prs": [
|
||||
{"number": 1, "head_sha": "b" * 40, "labels": [],
|
||||
"combined_state": "success", "statuses": []},
|
||||
],
|
||||
}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(snapshot, f)
|
||||
path = f.name
|
||||
try:
|
||||
monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path)
|
||||
import importlib
|
||||
mod = load_reaper()
|
||||
|
||||
def fake_api(method, path, **kw):
|
||||
if path.endswith("/status"):
|
||||
return 200, {"state": "success", "statuses": []}
|
||||
raise mod.ApiError("unexpected")
|
||||
|
||||
monkeypatch.setattr(mod, "api", fake_api)
|
||||
combined = mod.get_combined_status("c" * 40)
|
||||
assert combined["state"] == "success"
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
Reference in New Issue
Block a user