Merge pull request 'tools: gate-check-v3 MVP — automated SOP-6 + CI gate detector' (#393) from tools/gate-check-v3 into main
Some checks failed
Block internal-flavored paths / Block forbidden paths (push) Successful in 7s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 7s
CI / Detect changes (push) Successful in 16s
E2E API Smoke Test / detect-changes (push) Successful in 17s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 16s
Handlers Postgres Integration / detect-changes (push) Successful in 16s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 15s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
CI / Platform (Go) (push) Successful in 3s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Python Lint & Test (push) Successful in 2s
CI / Canvas Deploy Reminder (push) Has been skipped
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 3s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 3s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 3s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 2s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 5s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Failing after 8s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Failing after 4m57s

This commit is contained in:
Molecule AI · core-be 2026-05-11 13:41:08 +00:00
commit 6403c5196f
2 changed files with 634 additions and 0 deletions

View File

@ -0,0 +1,91 @@
# gate-check-v3 — automated PR gate detector
#
# Runs on every open PR (push/synchronize) and hourly via cron.
# Posts a structured [gate-check-v3] STATUS: comment on the PR.
#
# Inputs:
# PR_NUMBER — set via ${{ github.event.pull_request.number }} from the trigger
# POST_COMMENT — "true" to post/update comment on PR
#
# Gating logic (MVP signals 1,2,3,6):
# 1. Author-aware agent-tag comment scan
# 2. REQUEST_CHANGES reviews state machine
# 3. Staleness detection (SOP-12: review.commit_id != PR.head_sha + >1 working day)
# 6. CI required-checks awareness
#
# Exit code: 0=CLEAR, 1=BLOCKED, 2=ERROR
name: gate-check-v3
on:
pull_request_target:
types: [opened, edited, synchronize, reopened]
schedule:
# Hourly: refresh all open PRs
- cron: '8 * * * *'
workflow_dispatch:
inputs:
pr_number:
description: 'PR number to check (omit for all open PRs)'
required: false
type: string
post_comment:
description: 'Post comment on PR'
required: false
type: string
default: 'true'
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
gate-check:
runs-on: ubuntu-latest
continue-on-error: true # Never block on our own detector failing
steps:
- name: Check out base branch (for the script)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
ref: ${{ github.event.pull_request.base.sha || github.ref_name }}
- name: Run gate-check-v3 (single PR mode)
if: github.event_name == 'pull_request_target' || github.event.inputs.pr_number != ''
env:
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number || github.event.inputs.pr_number }}
POST_COMMENT: ${{ github.event.inputs.post_comment || 'true' }}
run: |
set -euo pipefail
python3 tools/gate-check-v3/gate_check.py \
--repo "${{ github.repository }}" \
--pr "$PR_NUMBER" \
$([ "$POST_COMMENT" = "true" ] && echo "--post-comment")
echo "verdict=$?" >> "$GITHUB_OUTPUT"
- name: Run gate-check-v3 (all open PRs — cron mode)
if: github.event_name == 'schedule'
env:
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
run: |
set -euo pipefail
# Fetch all open PRs and run gate-check on each
pr_numbers=$(python3 -c "
import urllib.request, json, os
token = os.environ['GITEA_TOKEN']
req = urllib.request.Request(
'https://git.moleculesai.app/api/v1/repos/${{ github.repository }}/pulls?state=open&limit=100',
headers={'Authorization': f'token {token}', 'Accept': 'application/json'}
)
with urllib.request.urlopen(req) as r:
prs = json.loads(r.read())
for pr in prs:
print(pr['number'])
")
for pr in $pr_numbers; do
echo "Checking PR #$pr..."
python3 tools/gate-check-v3/gate_check.py \
--repo "${{ github.repository }}" \
--pr "$pr" \
--post-comment \
|| true
done

View File

@ -0,0 +1,543 @@
#!/usr/bin/env python3
"""
gate-check-v3 SOP-6 + CI gate detector for Gitea PRs.
Emits structured verdict + human-readable summary. Designed to run as:
1. CLI: python gate_check.py --repo org/repo --pr N
2. Gitea Actions step: runs this script, captures stdout JSON
Signals (MVP signals 1,2,3,6):
1. Author-aware agent-tag comment scan
2. REQUEST_CHANGES reviews state machine
3. Staleness detection (review.commit_id != PR.head_sha)
6. CI required-checks awareness
Exit codes:
0 all gates pass (verdict=CLEAR)
1 one or more gates blocking (verdict=BLOCKED)
2 API error / usage error (verdict=ERROR)
"""
import argparse
import json
import os
import re
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone
from typing import Any, Optional
# ── Gitea API client ────────────────────────────────────────────────────────
GITEA_HOST = os.environ.get("GITEA_HOST", "git.moleculesai.app")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", os.environ.get("GITHUB_TOKEN", ""))
API_BASE = f"https://{GITEA_HOST}/api/v1"
def api_get(path: str) -> dict | list:
url = f"{API_BASE}{path}"
req = urllib.request.Request(
url,
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req) as r:
return json.loads(r.read())
except urllib.error.HTTPError as e:
body = e.read().decode(errors="replace")
raise GiteaError(f"GET {url}{e.code}: {body[:300]}")
def api_list(path: str, per_page: int = 100) -> list:
"""Paginate a list endpoint using Link headers (Gitea/GitHub convention)."""
results = []
page = 1
while True:
paged_path = f"{path}?per_page={per_page}&page={page}"
result = api_get(paged_path)
if isinstance(result, list):
results.extend(result)
if len(result) < per_page:
break
page += 1
else:
# Some endpoints return an object with a data/items key
data = result.get("data", result.get("items", result))
if isinstance(data, list):
results.extend(data)
break
# Safety cap to avoid runaway pagination
if page > 20:
break
return results
class GiteaError(Exception):
pass
# ── Signal 1: Author-aware agent-tag comment scan ─────────────────────────────
# Matches: [core-{role}-agent] VERDICT in comment body.
# Must be authored by the agent whose role is tagged.
# Scans BOTH issue comments (/issues/{N}/comments) and PR comments
# (/pulls/{N}/comments) since agents post on both.
# Matches [core-{role}-agent] VERDICT anywhere in the comment body.
AGENT_TAG_RE = re.compile(
r"\[core-([a-z]+)-agent\]\s+(APPROVED|N/?A|CHANGES_REQUESTED|COMMENT|BLOCKED|ACK)\b",
)
# Map agent role → canonical login (from workspace registry)
AGENT_LOGIN_MAP = {
"qa": "core-qa",
"security": "core-security",
"uiux": "core-uiux",
"lead": "core-lead",
"devops": "core-devops",
"be": "core-be",
"fe": "core-fe",
"offsec": "core-offsec",
}
# SOP-6 tier → required agent groups
# tier:low → engineers,managers,ceo (OR: any one suffices)
# tier:medium → managers AND engineers AND qa,security (AND)
# tier:high → ceo (OR, but single)
# "?" = teams not yet created; treated as optional for MVP
TIER_AGENTS = {
"tier:low": {"managers": "core-lead", "engineers": "core-devops", "ceo": "ceo"},
"tier:medium": {"managers": "core-lead", "engineers": "core-devops", "qa": "core-qa", "security": "core-security"},
"tier:high": {"ceo": "ceo"},
}
POSITIVE_VERDICTS = {"APPROVED", "N/A", "ACK"}
def _get_pr_tier(pr_number: int, repo: str) -> str:
"""Get the PR's tier label."""
owner, name = repo.split("/", 1)
try:
pr = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
for label in pr.get("labels", []):
name_l = label.get("name", "")
if name_l in TIER_AGENTS:
return name_l
except GiteaError:
pass
return "tier:low" # Default for untagged PRs
def signal_1_comment_scan(pr_number: int, repo: str) -> dict:
"""
Scan issue + PR comments AND reviews for agent-tag policy gates.
Matches tag AND author. Filters to tier-relevant agents.
Returns: {signal, results, verdict}
"""
owner, name = repo.split("/", 1)
# Get tier label to determine relevant agents
tier = _get_pr_tier(pr_number, repo)
relevant_roles = TIER_AGENTS.get(tier, TIER_AGENTS["tier:low"])
# Build reverse map: login -> (group, agent_key)
login_to_group = {}
for group, login in relevant_roles.items():
for role, l in AGENT_LOGIN_MAP.items():
if l == login:
login_to_group[l] = (group, f"core-{role}")
# Collect all agent-tag matches from comments
comments = []
try:
comments.extend(api_list(f"/repos/{owner}/{name}/issues/{pr_number}/comments"))
except GiteaError:
pass
try:
comments.extend(api_list(f"/repos/{owner}/{name}/pulls/{pr_number}/comments"))
except GiteaError:
pass
# Collect APPROVED reviews from agent logins
try:
reviews = api_list(f"/repos/{owner}/{name}/pulls/{pr_number}/reviews")
for r in reviews:
login = r.get("user", {}).get("login", "")
if login in login_to_group and r.get("state") == "APPROVED":
comments.append(
{
"id": f"review-{r['id']}",
"user": {"login": login},
"body": f"[{login}-agent] APPROVED",
"created_at": r.get("submitted_at") or r.get("created_at", ""),
"source": "review",
}
)
except GiteaError:
pass
# Find latest verdict per agent login
findings = {}
for login, (group, agent_key) in login_to_group.items():
matches = []
for c in comments:
body = c.get("body", "") or ""
user_login = c.get("user", {}).get("login", "")
if user_login != login:
continue
for m in AGENT_TAG_RE.finditer(body):
tag_role, verdict = m.group(1), m.group(2)
# Match the role part of the login (e.g. "core-devops" → "devops")
login_role = login.replace("core-", "")
if tag_role == login_role:
matches.append(
{
"comment_id": c["id"],
"verdict": verdict,
"user": user_login,
"created_at": c["created_at"],
"source": c.get("source", "comment"),
}
)
latest = max(matches, key=lambda x: x["created_at"], default=None) if matches else None
findings[agent_key] = {
"group": group,
"tier": tier,
"found": latest,
"verdict": latest["verdict"] if latest else "MISSING",
}
# Compute gate verdict using tier-specific logic:
# - tier:low / tier:high (OR gate): ANY positive = CLEAR, ANY negative = BLOCKED
# - tier:medium (AND gate): ALL must be positive = CLEAR, ANY negative = BLOCKED
verdicts = [f["verdict"] for f in findings.values()]
if not verdicts:
gate_verdict = "N/A"
elif tier in ("tier:low", "tier:high"):
# OR gate: one positive is enough
if any(v in POSITIVE_VERDICTS for v in verdicts):
gate_verdict = "CLEAR"
elif any(v in ("BLOCKED", "CHANGES_REQUESTED", "COMMENT") for v in verdicts):
gate_verdict = "BLOCKED"
else:
gate_verdict = "INCOMPLETE"
else:
# AND gate (tier:medium): all must be positive
if all(v in POSITIVE_VERDICTS for v in verdicts):
gate_verdict = "CLEAR"
elif any(v in ("BLOCKED", "CHANGES_REQUESTED", "COMMENT") for v in verdicts):
gate_verdict = "BLOCKED"
else:
gate_verdict = "INCOMPLETE"
return {"signal": "agent_tag_comments", "results": findings, "verdict": gate_verdict, "tier": tier}
# ── Signal 2: REQUEST_CHANGES reviews state machine ────────────────────────────
def signal_2_reviews(pr_number: int, repo: str) -> dict:
"""
Check /pulls/{N}/reviews for active REQUEST_CHANGES with dismissed=false.
This is the layer that empirically blocks Gitea merges.
Returns: {blocking_reviews: [...], verdict}
"""
owner, name = repo.split("/", 1)
reviews = api_list(f"/repos/{owner}/{name}/pulls/{pr_number}/reviews")
blocking = []
for r in reviews:
if r.get("state") == "REQUEST_CHANGES" and not r.get("dismissed", False):
blocking.append(
{
"review_id": r["id"],
"user": r["user"]["login"],
"commit_id": r.get("commit_id", ""),
"created_at": r.get("submitted_at") or r.get("created_at", ""),
}
)
return {
"signal": "request_changes_reviews",
"blocking_reviews": blocking,
"verdict": "BLOCKED" if blocking else "CLEAR",
}
# ── Signal 3: Staleness detection ────────────────────────────────────────────
WORKING_DAY_SECONDS = 9 * 3600 # SOP-12: 1 working day threshold
def signal_3_staleness(pr_number: int, repo: str) -> dict:
"""
Flag reviews where review.commit_id != PR.head_sha AND
time_since_review > 1 working day. Per SOP-12 (internal#282).
Returns: {stale_reviews: [...], verdict}
"""
owner, name = repo.split("/", 1)
# Get PR head sha
pr = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
head_sha = pr["head"]["sha"]
reviews = api_list(f"/repos/{owner}/{name}/pulls/{pr_number}/reviews")
stale = []
now = datetime.now(timezone.utc)
for r in reviews:
review_commit = r.get("commit_id", "")
if review_commit and review_commit != head_sha:
# Review predates current head
try:
created = datetime.fromisoformat(r["created_at"].replace("Z", "+00:00"))
except (KeyError, ValueError):
continue
age_seconds = (now - created).total_seconds()
if age_seconds > WORKING_DAY_SECONDS:
stale.append(
{
"review_id": r["id"],
"user": r["user"]["login"],
"review_commit": review_commit,
"pr_head": head_sha,
"age_hours": round(age_seconds / 3600, 1),
"created_at": r.get("submitted_at") or r.get("created_at", ""),
}
)
return {
"signal": "stale_reviews",
"stale_reviews": stale,
"verdict": "STALE-RC" if stale else "CLEAR",
}
# ── Signal 6: CI required-checks awareness ───────────────────────────────────
def signal_6_ci(pr_number: int, repo: str, branch: str = "main") -> dict:
"""
Query combined CI status for PR head commit.
Find required status checks on target branch.
Surface any failing required check as primary blocker.
"""
owner, name = repo.split("/", 1)
pr = api_get(f"/repos/{owner}/{name}/pulls/{pr_number}")
head_sha = pr["head"]["sha"]
# Combined status of PR head
combined = api_get(f"/repos/{owner}/{name}/commits/{head_sha}/status")
ci_state = combined.get("state", "null")
# Individual check statuses
# Gitea Actions uses "status" (pending/success/failure) not "state" for
# individual check entries. "state" is null for pending runs.
check_statuses = {}
for s in combined.get("statuses") or []:
check_statuses[s["context"]] = s.get("status", "pending")
# Try to get branch protection for required checks
required_checks = []
try:
protection = api_get(f"/repos/{owner}/{name}/branches/{branch}/protection")
for check in protection.get("required_status_checks", {}).get("checks", []):
required_checks.append(check["context"])
except GiteaError:
pass # No protection or no read access
failing_required = []
passing_required = []
for ctx in required_checks:
state = check_statuses.get(ctx, "null")
if state == "failure":
failing_required.append(ctx)
elif state in ("success", "neutral"):
passing_required.append(ctx)
else:
passing_required.append(f"{ctx} (pending)")
if failing_required:
verdict = "CI_FAIL"
elif ci_state == "failure":
verdict = "CI_FAIL"
elif ci_state == "pending":
verdict = "CI_PENDING"
else:
verdict = "CLEAR"
return {
"signal": "ci_checks",
"combined_state": ci_state,
"required_checks": required_checks,
"failing_required": failing_required,
"passing_required": passing_required,
"all_check_statuses": check_statuses,
"verdict": verdict,
}
# ── Gate evaluation ───────────────────────────────────────────────────────────
VERDICT_ORDER = {"ERROR": 0, "CI_FAIL": 1, "BLOCKED": 2, "STALE-RC": 3, "CI_PENDING": 4, "N/A": 5, "CLEAR": 6}
def compute_verdict(gates: list[dict]) -> tuple[str, list[dict]]:
"""Compute overall verdict from gate results. Worst gate wins."""
worst = "CLEAR"
blockers = []
for g in gates:
v = g.get("verdict", "N/A")
if VERDICT_ORDER.get(v, 99) < VERDICT_ORDER.get(worst, 0):
worst = v
if v in ("BLOCKED", "CI_FAIL", "STALE-RC", "ERROR"):
blockers.append(g)
return worst, blockers
def format_gate_verdict(v: str) -> tuple[str, str]:
"""Return (icon, label) for a gate verdict."""
if v in ("APPROVED", "CLEAR"):
return "", v
if v in ("BLOCKED", "CI_FAIL", "ERROR"):
return "", v
return "⚠️", v
def format_comment(repo: str, pr_number: int, verdict: str, gates: list[dict], blockers: list[dict]) -> str:
"""Format human-readable Gitea PR comment."""
gate_labels = {
"agent_tag_comments": "Agent-tag gates",
"request_changes_reviews": "REQUEST_CHANGES reviews",
"stale_reviews": "Staleness check",
"ci_checks": "CI required checks",
}
lines = [f"[gate-check-v3] STATUS: **{verdict}**", ""]
# Per-gate summary
for g in gates:
sig = g.get("signal", "?")
label = gate_labels.get(sig, sig)
v = g.get("verdict", "N/A")
icon, _ = format_gate_verdict(v)
lines.append(f"{icon} **{label}**: {v}")
# Gate-specific detail
if blockers:
lines.append("")
lines.append("### Blockers")
for b in blockers:
sig = b.get("signal", "?")
if sig == "request_changes_reviews":
for r in b.get("blocking_reviews", []):
lines.append(f" - @{r['user']} requested changes (review id={r['review_id']})")
elif sig == "ci_checks":
combined = b.get("combined_state", "?")
lines.append(f" - CI combined state: **{combined}**")
for c in b.get("failing_required", []):
lines.append(f" - required check failing: **{c}**")
for c in b.get("all_check_statuses", {}).items():
ctx, state = c
lines.append(f" - {ctx}: {state}")
elif sig == "stale_reviews":
for r in b.get("stale_reviews", []):
lines.append(
f" - @{r['user']} stale (commit={r.get('review_commit','?')[:7]}, age={r.get('age_hours','?')}h)"
)
elif sig == "agent_tag_comments":
for agent, res in b.get("results", {}).items():
v = res.get("verdict", "MISSING")
icon, _ = format_gate_verdict(v)
if v == "MISSING":
lines.append(f" {icon} {agent}: no agent-tag comment found")
else:
lines.append(f" {icon} {agent}: {v}")
lines.append("")
lines.append(f"_gate-check-v3 · repo={repo} · pr={pr_number}_")
return "\n".join(lines)
lines.append("")
lines.append(f"_gate-check-v3 · repo={repo} · pr={pr_number}_")
return "\n".join(lines)
# ── Main ─────────────────────────────────────────────────────────────────────
def run(repo: str, pr_number: int, post_comment: bool = False) -> dict:
try:
gates = [
signal_1_comment_scan(pr_number, repo),
signal_2_reviews(pr_number, repo),
signal_3_staleness(pr_number, repo),
signal_6_ci(pr_number, repo),
]
verdict, blockers = compute_verdict(gates)
result = {
"verdict": verdict,
"repo": repo,
"pr": pr_number,
"gates": gates,
"blockers": blockers,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Print human-readable to stdout for Gitea Actions log
print(json.dumps(result, indent=2))
# Optionally post comment
if post_comment:
owner, name = repo.split("/", 1)
comment_body = format_comment(repo, pr_number, verdict, gates, blockers)
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
# Check if a gate-check comment already exists to avoid spamming
existing = api_list(f"/repos/{owner}/{name}/issues/{pr_number}/comments")
our_comments = [c for c in existing if "[gate-check-v3]" in (c.get("body") or "")]
if our_comments:
# Update latest
comment_id = our_comments[-1]["id"]
url = f"{API_BASE}/repos/{owner}/{name}/issues/comments/{comment_id}"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="PATCH")
with urllib.request.urlopen(req) as r:
r.read()
else:
url = f"{API_BASE}/repos/{owner}/{name}/issues/{pr_number}/comments"
req = urllib.request.Request(url, data=json.dumps({"body": comment_body}).encode(), headers=headers, method="POST")
with urllib.request.urlopen(req) as r:
r.read()
return result
except GiteaError as e:
result = {"verdict": "ERROR", "error": str(e), "repo": repo, "pr": pr_number}
print(json.dumps(result, indent=2), file=sys.stderr)
return result
def main() -> int:
parser = argparse.ArgumentParser(description="gate-check-v3 — PR gate detector")
parser.add_argument("--repo", required=True, help="org/repo (e.g. molecule-ai/molecule-core)")
parser.add_argument("--pr", type=int, required=True, help="PR number")
parser.add_argument("--post-comment", action="store_true", help="Post/update comment on PR")
args = parser.parse_args()
result = run(args.repo, args.pr, post_comment=args.post_comment)
verdict = result.get("verdict", "ERROR")
if verdict == "ERROR":
return 2
elif verdict in ("BLOCKED", "CI_FAIL", "STALE-RC", "ERROR"):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())