From ef9d6834850f48b79a6f5af389bf573f9896790a Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 10 Jun 2026 06:07:58 +0000 Subject: [PATCH] feat(scripts): consume CONDUCTOR_SNAPSHOT_FILE in merge-queue + reaper (#2502) The conductor tick (operator-config#158) already writes a unified state snapshot (open PRs + per-head combined statuses + reviews) before running the merge-queue and status-reaper passes. This change makes both scripts read from that snapshot when present and fresh, removing the same-tick disagreement window where the two passes used to re-fetch independently. Merge-queue: - list_candidate_issues() returns snapshot PRs when available - get_combined_status() returns snapshot status for matching head SHAs - Falls back to API self-fetch when snapshot is absent/stale/SHA missing Status-reaper: - get_combined_status() returns snapshot status for matching head SHAs - Falls back to API self-fetch for branch commits not in snapshot Both scripts share a load_conductor_snapshot() helper with a 10-minute freshness threshold (twice the */5 conductor cadence). Tests added for snapshot consumption, stale-skip, and fallback paths. Co-Authored-By: Claude Opus 4.8 --- .gitea/scripts/gitea-merge-queue.py | 113 ++++++++++++++-- .gitea/scripts/status-reaper.py | 63 +++++++++ .../scripts/tests/test_gitea_merge_queue.py | 128 +++++++++++++++++- .../scripts/tests/test_status_reaper_api.py | 75 ++++++++++ 4 files changed, 369 insertions(+), 10 deletions(-) diff --git a/.gitea/scripts/gitea-merge-queue.py b/.gitea/scripts/gitea-merge-queue.py index 0f791954b..a26f4f35c 100644 --- a/.gitea/scripts/gitea-merge-queue.py +++ b/.gitea/scripts/gitea-merge-queue.py @@ -210,6 +210,60 @@ REQUIRED_APPROVALS_DEFAULT = int(_env("REQUIRED_APPROVALS", default="2") or "2") OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "") API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else "" +# -------------------------------------------------------------------------- +# Conductor snapshot (operator-config#158) +# -------------------------------------------------------------------------- +# When the conductor tick writes a state snapshot before running the passes, +# both scripts see the SAME observed state instead of re-fetching independently +# and potentially disagreeing within the same tick. +# -------------------------------------------------------------------------- + + +def load_conductor_snapshot() -> dict | None: + """Load the conductor snapshot if present and fresh. + + The snapshot is written by the conductor wrapper + (bin/molecule-core-cron-bot.sh conductor) to a path exported as + CONDUCTOR_SNAPSHOT_FILE. It contains open PRs + per-head combined + statuses + reviews captured in a single state-read. + + Returns the parsed snapshot dict, or None if absent, unreadable, + or older than the freshness threshold (10 minutes — twice the */5 + conductor cadence, so a single skipped tick does not invalidate it). + """ + path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "") + if not path: + return None + try: + with open(path, "r", encoding="utf-8") as f: + snapshot = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching") + return None + + if not isinstance(snapshot, dict): + return None + + ts_str = snapshot.get("ts", "") + if ts_str: + try: + from datetime import datetime, timezone + + ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=timezone.utc + ) + age_sec = (datetime.now(timezone.utc) - ts).total_seconds() + if age_sec > 600: # 10 minutes + print( + f"::notice::conductor snapshot stale ({int(age_sec)}s); " + "self-fetching" + ) + return None + except ValueError: + pass # malformed ts, treat as fresh (conservative) + + return snapshot + class ApiError(RuntimeError): pass @@ -711,19 +765,36 @@ def get_branch_head(branch: str) -> str: return sha +def _snapshot_status_for_sha(sha: str) -> dict | None: + """Return a Gitea-shaped combined-status dict from the conductor snapshot + if the SHA matches an open PR head, else None.""" + snapshot = load_conductor_snapshot() + if snapshot is None: + return None + for pr in (snapshot.get("prs") or []): + if pr.get("head_sha") == sha: + statuses = pr.get("statuses") or [] + return { + "state": pr.get("combined_state", "unknown"), + "statuses": [ + {"context": s.get("context"), "status": s.get("status")} + for s in statuses + if isinstance(s, dict) + ], + } + return None + + def get_combined_status(sha: str) -> dict: """Combined status + all individual statuses for `sha`. - The /status endpoint caps the `statuses` array at 30 entries (Gitea - default page size), so we fetch the full list via /statuses. The combined - `state` still comes from /status. - - Fail-closed: BOTH the PRIMARY /status fetch AND the SECONDARY /statuses - enrichment must succeed. If either raises, the error propagates so the - caller skips this PR this tick (we never treat a failed status fetch as - green — dev-sop "no fail-open"). A paginated /statuses error must NOT - silently degrade to an incomplete status set. + Uses the conductor snapshot when available (same tick, same observed + state as the merge-queue pass), otherwise self-fetches via API. """ + snapshot_status = _snapshot_status_for_sha(sha) + if snapshot_status is not None: + return snapshot_status + _, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status") if not isinstance(combined, dict): raise ApiError(f"status for {sha} response not object") @@ -747,7 +818,27 @@ def get_combined_status(sha: str) -> dict: return combined +def _snapshot_pr_to_issue(pr_entry: dict) -> dict: + """Normalise a conductor-snapshot PR entry into the shape the queue + expects from /issues (number, title, labels, pull_request sub-dict).""" + return { + "number": pr_entry.get("number"), + "title": pr_entry.get("title"), + "labels": [{"name": n} for n in (pr_entry.get("labels") or [])], + "pull_request": {"draft": False}, + "created_at": "", + } + + def list_queued_issues() -> list[dict]: + snapshot = load_conductor_snapshot() + if snapshot is not None: + prs = snapshot.get("prs") or [] + return [ + _snapshot_pr_to_issue(p) + for p in prs + if QUEUE_LABEL in (p.get("labels") or []) + ] return api_paginated( "GET", f"/repos/{OWNER}/{NAME}/issues", @@ -768,6 +859,10 @@ def list_candidate_issues(*, auto_discover: bool) -> list[dict]: back to the legacy label-filtered listing (opt-IN). Opt-out filtering and draft-skipping happen in choose_next_candidate_issue, not here. """ + snapshot = load_conductor_snapshot() + if snapshot is not None: + prs = snapshot.get("prs") or [] + return [_snapshot_pr_to_issue(p) for p in prs] if not auto_discover: return list_queued_issues() return api_paginated( diff --git a/.gitea/scripts/status-reaper.py b/.gitea/scripts/status-reaper.py index 21fac46ee..74f9842b0 100644 --- a/.gitea/scripts/status-reaper.py +++ b/.gitea/scripts/status-reaper.py @@ -154,6 +154,54 @@ CANCELLED_DESCRIPTION = "Has been cancelled" PUSH_SUFFIX = " (push)" PULL_REQUEST_SUFFIX = " (pull_request)" +# -------------------------------------------------------------------------- +# Conductor snapshot (operator-config#158) +# -------------------------------------------------------------------------- +# When the conductor tick writes a state snapshot before running the passes, +# both scripts see the SAME observed state instead of re-fetching independently +# and potentially disagreeing within the same tick. +# -------------------------------------------------------------------------- + + +def load_conductor_snapshot() -> dict | None: + """Load the conductor snapshot if present and fresh. + + Returns the parsed snapshot dict, or None if absent, unreadable, + or older than the freshness threshold (10 minutes). + """ + path = os.environ.get("CONDUCTOR_SNAPSHOT_FILE", "") + if not path: + return None + try: + with open(path, "r", encoding="utf-8") as f: + snapshot = json.load(f) + except (OSError, json.JSONDecodeError) as exc: + print(f"::notice::conductor snapshot unreadable ({exc}); self-fetching") + return None + + if not isinstance(snapshot, dict): + return None + + ts_str = snapshot.get("ts", "") + if ts_str: + try: + from datetime import datetime, timezone + + ts = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=timezone.utc + ) + age_sec = (datetime.now(timezone.utc) - ts).total_seconds() + if age_sec > 600: # 10 minutes + print( + f"::notice::conductor snapshot stale ({int(age_sec)}s); " + "self-fetching" + ) + return None + except ValueError: + pass + + return snapshot + def _require_runtime_env() -> None: """Enforce env contract — called from `main()` only. @@ -382,8 +430,23 @@ def get_combined_status(sha: str) -> dict: ], ... } + Uses the conductor snapshot when the SHA matches an open PR head, + otherwise self-fetches via API. Raises ApiError on non-2xx. """ + snapshot = load_conductor_snapshot() + if snapshot is not None: + for pr in (snapshot.get("prs") or []): + if pr.get("head_sha") == sha: + statuses = pr.get("statuses") or [] + return { + "state": pr.get("combined_state", "unknown"), + "statuses": [ + {"context": s.get("context"), "state": s.get("status")} + for s in statuses + if isinstance(s, dict) + ], + } _, body = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status") if not isinstance(body, dict): raise ApiError(f"status for {sha} response not a JSON object") diff --git a/.gitea/scripts/tests/test_gitea_merge_queue.py b/.gitea/scripts/tests/test_gitea_merge_queue.py index e60a12d8b..6e5517d35 100644 --- a/.gitea/scripts/tests/test_gitea_merge_queue.py +++ b/.gitea/scripts/tests/test_gitea_merge_queue.py @@ -1,4 +1,4 @@ -import importlib.util +import importlib import sys from pathlib import Path @@ -1778,3 +1778,129 @@ def test_print_post_batch_summary_counts_correctly(capsys): assert "PR #1: state=ready" in out assert "PR #2: state=waiting" in out assert "PR #3: state=ineligible" in out + + +# --------------------------------------------------------------------------- +# Conductor snapshot consumption (operator-config#158 / molecule-core#2502) +# --------------------------------------------------------------------------- + +import json +import os +import tempfile + + +def _make_snapshot(prs, ts="2026-06-10T12:00:00Z"): + return {"ts": ts, "repo": "molecule-ai/molecule-core", "prs": prs} + + +def test_list_candidate_issues_uses_snapshot_when_present(monkeypatch): + """When CONDUCTOR_SNAPSHOT_FILE is present and fresh, list_candidate_issues + returns the snapshot PRs instead of hitting the API.""" + snapshot = _make_snapshot([ + {"number": 10, "title": "PR 10", "head_sha": "a" * 40, + "labels": ["merge-queue"], + "combined_state": "success", "statuses": []}, + {"number": 20, "title": "PR 20", "head_sha": "b" * 40, + "labels": [], + "combined_state": "success", "statuses": []}, + ]) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + # reload so load_conductor_snapshot sees the env var + candidates = mq.list_candidate_issues(auto_discover=True) + assert len(candidates) == 2 + assert [c["number"] for c in candidates] == [10, 20] + finally: + os.unlink(path) + + +def test_list_queued_issues_uses_snapshot_label_filter(monkeypatch): + """list_queued_issues (opt-IN mode) filters the snapshot by QUEUE_LABEL.""" + snapshot = _make_snapshot([ + {"number": 11, "title": "Labeled", "head_sha": "a" * 40, + "labels": ["merge-queue"], "combined_state": "success", "statuses": []}, + {"number": 22, "title": "Unlabeled", "head_sha": "b" * 40, + "labels": [], "combined_state": "success", "statuses": []}, + ]) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + monkeypatch.setattr(mq, "QUEUE_LABEL", "merge-queue") + queued = mq.list_queued_issues() + assert len(queued) == 1 + assert queued[0]["number"] == 11 + finally: + os.unlink(path) + + +def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch): + """get_combined_status returns snapshot data when the SHA is an open PR head.""" + head_sha = "c" * 40 + snapshot = _make_snapshot([ + {"number": 30, "title": "PR 30", "head_sha": head_sha, + "labels": [], + "combined_state": "failure", + "statuses": [ + {"context": "CI / all-required (pull_request)", "status": "failure"}, + ]}, + ]) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + combined = mq.get_combined_status(head_sha) + assert combined["state"] == "failure" + assert len(combined["statuses"]) == 1 + assert combined["statuses"][0]["context"] == "CI / all-required (pull_request)" + assert combined["statuses"][0]["status"] == "failure" + finally: + os.unlink(path) + + +def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch): + """If the SHA is not in the snapshot, get_combined_status falls back to API.""" + snapshot = _make_snapshot([ + {"number": 40, "head_sha": "d" * 40, "labels": [], + "combined_state": "success", "statuses": []}, + ]) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + monkeypatch.setattr(mq, "OWNER", "o") + monkeypatch.setattr(mq, "NAME", "r") + + def fake_api(method, path, **kw): + if path.endswith("/status"): + return 200, {"state": "success", "statuses": [{"context": "c1", "status": "success"}]} + if path.endswith("/statuses"): + return 200, [] + raise mq.ApiError("unexpected") + + monkeypatch.setattr(mq, "api", fake_api) + combined = mq.get_combined_status("e" * 40) + assert combined["state"] == "success" + finally: + os.unlink(path) + + +def test_load_conductor_snapshot_ignores_stale_snapshot(monkeypatch): + """A snapshot older than 10 minutes is treated as absent (self-fetch).""" + from datetime import datetime, timezone, timedelta + old_ts = (datetime.now(timezone.utc) - timedelta(minutes=15)).strftime("%Y-%m-%dT%H:%M:%SZ") + snapshot = _make_snapshot([{"number": 50, "head_sha": "f" * 40, "labels": []}], ts=old_ts) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + assert mq.load_conductor_snapshot() is None + finally: + os.unlink(path) diff --git a/.gitea/scripts/tests/test_status_reaper_api.py b/.gitea/scripts/tests/test_status_reaper_api.py index 98c4d5d70..df9c64dbb 100644 --- a/.gitea/scripts/tests/test_status_reaper_api.py +++ b/.gitea/scripts/tests/test_status_reaper_api.py @@ -167,3 +167,78 @@ def test_reap_preserves_failed_pr_context_without_push_success(monkeypatch): assert counters["preserved_pr_without_push_success"] == 2 assert posted == [] + + +# --------------------------------------------------------------------------- +# Conductor snapshot consumption (operator-config#158 / molecule-core#2502) +# --------------------------------------------------------------------------- + +import os +import tempfile + + +def test_get_combined_status_uses_snapshot_when_sha_matches(monkeypatch): + """When the SHA is an open PR head in the conductor snapshot, get_combined_status + returns the snapshot data instead of calling the API.""" + mod = load_reaper() + head_sha = "a" * 40 + snapshot = { + "ts": "2026-06-10T12:00:00Z", + "repo": "molecule-ai/molecule-core", + "prs": [ + { + "number": 99, + "title": "PR 99", + "head_sha": head_sha, + "labels": [], + "combined_state": "failure", + "statuses": [ + {"context": "CI / Platform (Go) (push)", "status": "failure"}, + ], + } + ], + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + import importlib + mod = load_reaper() # reload to pick up env var + combined = mod.get_combined_status(head_sha) + assert combined["state"] == "failure" + assert len(combined["statuses"]) == 1 + assert combined["statuses"][0]["context"] == "CI / Platform (Go) (push)" + finally: + os.unlink(path) + + +def test_get_combined_status_self_fetches_when_sha_not_in_snapshot(monkeypatch): + """If the SHA is not in the snapshot, get_combined_status falls back to API.""" + mod = load_reaper() + snapshot = { + "ts": "2026-06-10T12:00:00Z", + "repo": "molecule-ai/molecule-core", + "prs": [ + {"number": 1, "head_sha": "b" * 40, "labels": [], + "combined_state": "success", "statuses": []}, + ], + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(snapshot, f) + path = f.name + try: + monkeypatch.setenv("CONDUCTOR_SNAPSHOT_FILE", path) + import importlib + mod = load_reaper() + + def fake_api(method, path, **kw): + if path.endswith("/status"): + return 200, {"state": "success", "statuses": []} + raise mod.ApiError("unexpected") + + monkeypatch.setattr(mod, "api", fake_api) + combined = mod.get_combined_status("c" * 40) + assert combined["state"] == "success" + finally: + os.unlink(path) -- 2.52.0