Merge pull request 'fix(sop-checklist): widen ack eligibility per RFC#450 Option C (closes internal#442)' (#1554) from fix/sop-checklist-widen-ack-internal-442 into main
Block internal-flavored paths / Block forbidden paths (push) Waiting to run
CI / Detect changes (push) Waiting to run
CI / Platform (Go) (push) Waiting to run
CI / Canvas (Next.js) (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Waiting to run
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Waiting to run
CI / all-required (push) Waiting to run
E2E API Smoke Test / detect-changes (push) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
E2E Chat / detect-changes (push) Waiting to run
E2E Chat / E2E Chat (push) Blocked by required conditions
E2E Staging Canvas (Playwright) / detect-changes (push) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Blocked by required conditions
Handlers Postgres Integration / detect-changes (push) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Waiting to run
publish-workspace-server-image / build-and-push (push) Waiting to run
publish-workspace-server-image / Production auto-deploy (push) Blocked by required conditions
Runtime PR-Built Compatibility / detect-changes (push) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (push) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (push) Successful in 1m35s

This commit was merged in pull request #1554.
This commit is contained in:
2026-05-19 01:57:08 +00:00
3 changed files with 314 additions and 13 deletions
+58 -5
View File
@@ -268,6 +268,7 @@ def compute_ack_state(
items_by_slug: dict[str, dict[str, Any]],
numeric_aliases: dict[int, str],
team_membership_probe: "callable[[str, list[str]], list[str]]",
high_risk: bool = False,
) -> dict[str, dict[str, Any]]:
"""Compute per-item ack state.
@@ -330,11 +331,16 @@ def compute_ack_state(
for slug, candidates in pending_team_check.items():
if not candidates:
continue
required = items_by_slug[slug]["required_teams"]
# Risk-class-aware required-teams resolution (RFC#450 Option C):
# high-risk PRs use `required_teams_high_risk` (when set on the
# item); default class uses `required_teams`. The probe closure
# is built with the same high_risk flag so the two reads are
# always consistent (both sites share `resolve_required_teams`).
required = resolve_required_teams(items_by_slug[slug], high_risk)
approved = team_membership_probe(slug, candidates) # returns subset
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
ackers_per_slug[slug] = approved
# Stash required teams for description rendering.
# Stash resolved teams for description rendering.
items_by_slug[slug]["_required_resolved"] = required
return {
@@ -765,6 +771,42 @@ def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str:
return default_mode
def is_high_risk(pr: dict[str, Any], cfg: dict[str, Any]) -> bool:
"""Return True when the PR is high-risk per RFC#450 Option C.
A PR is high-risk when ANY of:
- it carries the `tier:high` label (mechanically strictest tier), or
- it carries any label listed in cfg.high_risk_labels.
High-risk PRs use `required_teams_high_risk` (when set on an item)
instead of the default `required_teams`. Items without
`required_teams_high_risk` are unaffected (the default applies).
Governance fix for internal#442 — closes the inconsistency between
sop-tier-check (tier-aware) and sop-checklist (was tier-blind).
"""
label_set = {(l.get("name") or "") for l in (pr.get("labels") or [])}
if "tier:high" in label_set:
return True
high_risk_labels = set(cfg.get("high_risk_labels") or [])
return bool(label_set & high_risk_labels)
def resolve_required_teams(item: dict[str, Any], high_risk: bool) -> list[str]:
"""Pick the active required_teams list for an item.
When high_risk is True AND the item declares a non-empty
`required_teams_high_risk`, return that. Else fall back to
`required_teams`. Keeping this in one helper means the gate's
decision shape stays single-sited even as items grow.
"""
if high_risk:
elevated = item.get("required_teams_high_risk") or []
if elevated:
return list(elevated)
return list(item.get("required_teams") or [])
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
@@ -825,6 +867,12 @@ def main(argv: list[str] | None = None) -> int:
comments = client.get_issue_comments(args.owner, args.repo, args.pr)
# High-risk classification (RFC#450 Option C, governance fix for
# internal#442). Computed ONCE per PR — used by both the probe
# closure and compute_ack_state so the elevation decision is
# single-sited.
high_risk = is_high_risk(pr, cfg)
# Build team-membership probe closure that caches results per
# (user, team-id) so a user acking multiple items only triggers
# one membership lookup per team.
@@ -832,7 +880,7 @@ def main(argv: list[str] | None = None) -> int:
def probe(slug: str, users: list[str]) -> list[str]:
item = items_by_slug[slug]
team_names: list[str] = item["required_teams"]
team_names: list[str] = resolve_required_teams(item, high_risk)
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
# available — fall back to the list endpoint.
team_ids: list[int] = []
@@ -877,7 +925,9 @@ def main(argv: list[str] | None = None) -> int:
# may still find membership in another team.
return approved
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
ack_state = compute_ack_state(
comments, author, items_by_slug, numeric_aliases, probe, high_risk=high_risk
)
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
state, description = render_status(items, ack_state, body_state)
@@ -890,7 +940,10 @@ def main(argv: list[str] | None = None) -> int:
description = f"[info tier:low] {description}"
# Diagnostics to job log.
print(f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} mode={mode}")
print(
f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} "
f"mode={mode} risk_class={'high' if high_risk else 'default'}"
)
for it in items:
slug = it["slug"]
ackers = ack_state[slug]["ackers"]
+213 -1
View File
@@ -602,4 +602,216 @@ class TestComputeNaState(unittest.TestCase):
self.assertEqual(len(na_directives), 1)
self.assertEqual(na_directives[0][0], "sop-n/a")
self.assertEqual(na_directives[0][1], "qa-review")
self.assertIn("no surface", na_directives[0][2])
# ---------------------------------------------------------------------------
# RFC#450 Option C — risk-classed two-eyes (governance fix for internal#442)
# ---------------------------------------------------------------------------
class TestIsHighRisk(unittest.TestCase):
"""The high-risk predicate decides which required_teams list applies.
Predicate: tier:high label OR any label in cfg.high_risk_labels.
"""
def setUp(self):
self.cfg = sop.load_config(CONFIG_PATH)
def test_no_labels_is_default_class(self):
pr = {"labels": []}
self.assertFalse(sop.is_high_risk(pr, self.cfg))
def test_tier_high_is_high_risk(self):
pr = {"labels": [{"name": "tier:high"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_tier_low_is_default_class(self):
pr = {"labels": [{"name": "tier:low"}]}
self.assertFalse(sop.is_high_risk(pr, self.cfg))
def test_tier_medium_is_default_class(self):
# tier:medium alone is NOT high-risk (Option C — medium routes
# to the wider engineers OR-set).
pr = {"labels": [{"name": "tier:medium"}]}
self.assertFalse(sop.is_high_risk(pr, self.cfg))
def test_area_security_label_is_high_risk(self):
pr = {"labels": [{"name": "tier:medium"}, {"name": "area:security"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_area_schema_label_is_high_risk(self):
pr = {"labels": [{"name": "area:schema"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_area_identity_label_is_high_risk(self):
pr = {"labels": [{"name": "area:identity"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_area_fleet_image_label_is_high_risk(self):
pr = {"labels": [{"name": "area:fleet-image"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_area_gate_meta_label_is_high_risk(self):
# Gate-meta = changes to sop-checklist/sop-tier-check itself.
pr = {"labels": [{"name": "area:gate-meta"}]}
self.assertTrue(sop.is_high_risk(pr, self.cfg))
def test_unknown_area_label_is_default_class(self):
pr = {"labels": [{"name": "area:docs"}]}
self.assertFalse(sop.is_high_risk(pr, self.cfg))
class TestResolveRequiredTeams(unittest.TestCase):
"""The team resolver picks the elevated list only for high-risk PRs
AND only when the item declares one — items without an elevated
list always use the default required_teams."""
def test_default_class_uses_default_teams(self):
item = {"required_teams": ["engineers", "managers", "ceo"], "required_teams_high_risk": ["ceo"]}
self.assertEqual(
sop.resolve_required_teams(item, high_risk=False),
["engineers", "managers", "ceo"],
)
def test_high_risk_uses_elevated_teams(self):
item = {"required_teams": ["engineers", "managers", "ceo"], "required_teams_high_risk": ["ceo"]}
self.assertEqual(
sop.resolve_required_teams(item, high_risk=True),
["ceo"],
)
def test_high_risk_without_elevated_falls_back_to_default(self):
# Items that don't declare required_teams_high_risk (e.g.
# comprehensive-testing, staging-smoke) are unaffected by risk-class.
item = {"required_teams": ["engineers"]}
self.assertEqual(
sop.resolve_required_teams(item, high_risk=True),
["engineers"],
)
def test_empty_elevated_list_falls_back_to_default(self):
# A defensive case: required_teams_high_risk: [] should not
# silently lock out all approvers — fall back to the default
# so the gate stays satisfiable. (Tightening should remove the
# key, not set it to empty.)
item = {"required_teams": ["engineers"], "required_teams_high_risk": []}
self.assertEqual(
sop.resolve_required_teams(item, high_risk=True),
["engineers"],
)
class TestRootCauseAckEligibilityWidened(unittest.TestCase):
"""Closes internal#442: a non-author engineers-team ack now satisfies
root-cause / no-backwards-compat for the default class.
The dead-managers/ceo-persona-token gridlock is the symptom; the
root cause is that sop-checklist ignored tier-class. These tests
pin the new wider-default behavior so it can't regress silently.
"""
def setUp(self):
self.items = _items_by_slug()
self.aliases = _numeric_aliases()
@staticmethod
def _approve_only(allowed):
return lambda slug, users: [u for u in users if u in allowed]
def test_engineers_ack_satisfies_root_cause_default_class(self):
# Bob is in engineers only (not managers, not ceo). Default class.
comments = [_comment("bob", "/sop-ack root-cause")]
# Probe: bob is approved because root-cause now lists engineers.
probe = self._approve_only({"bob"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe, high_risk=False
)
self.assertEqual(state["root-cause"]["ackers"], ["bob"])
def test_engineers_ack_satisfies_no_backwards_compat_default_class(self):
comments = [_comment("bob", "/sop-ack no-backwards-compat")]
probe = self._approve_only({"bob"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe, high_risk=False
)
self.assertEqual(state["no-backwards-compat"]["ackers"], ["bob"])
def test_engineers_ack_alone_fails_root_cause_when_high_risk(self):
# High-risk PR: only ceo can ack. Engineers-only ack must fail.
comments = [_comment("bob", "/sop-ack root-cause")]
# Probe: bob is in engineers, not ceo. Under high_risk,
# required_teams_high_risk=[ceo] → bob is NOT approved.
# Probe receives the items + flag indirectly via main(); for
# the unit-test path we inject a probe that rejects bob.
probe = self._approve_only(set()) # nobody is in ceo
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe, high_risk=True
)
self.assertEqual(state["root-cause"]["ackers"], [])
self.assertIn("bob", state["root-cause"]["rejected"]["not_in_team"])
def test_ceo_ack_satisfies_root_cause_when_high_risk(self):
# High-risk PR + ceo-team approver → passes (the senior path).
comments = [_comment("hongming", "/sop-ack root-cause")]
probe = self._approve_only({"hongming"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe, high_risk=True
)
self.assertEqual(state["root-cause"]["ackers"], ["hongming"])
def test_self_ack_still_forbidden_even_with_widened_eligibility(self):
# Author cannot self-ack — widening teams must NOT weaken
# the non-author rule.
comments = [_comment("alice", "/sop-ack root-cause")]
probe = self._approve_only({"alice"})
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, probe, high_risk=False
)
self.assertEqual(state["root-cause"]["ackers"], [])
self.assertIn("alice", state["root-cause"]["rejected"]["self_ack"])
class TestHighRiskClassUsesElevatedListInConfig(unittest.TestCase):
"""End-to-end: the shipped config + RFC#450 predicate must keep
root-cause / no-backwards-compat gated on ceo for high-risk PRs."""
def test_root_cause_high_risk_elevated_to_ceo_only(self):
items = _items_by_slug()
# tier:high alone makes the PR high-risk → root-cause needs ceo.
self.assertEqual(
sop.resolve_required_teams(items["root-cause"], high_risk=True),
["ceo"],
)
# Default class accepts engineers/managers/ceo.
self.assertEqual(
sorted(sop.resolve_required_teams(items["root-cause"], high_risk=False)),
sorted(["engineers", "managers", "ceo"]),
)
def test_no_backwards_compat_high_risk_elevated_to_ceo_only(self):
items = _items_by_slug()
self.assertEqual(
sop.resolve_required_teams(items["no-backwards-compat"], high_risk=True),
["ceo"],
)
self.assertEqual(
sorted(sop.resolve_required_teams(items["no-backwards-compat"], high_risk=False)),
sorted(["engineers", "managers", "ceo"]),
)
def test_other_items_unchanged_by_risk_class(self):
# Items without required_teams_high_risk are unaffected.
items = _items_by_slug()
for slug in (
"comprehensive-testing",
"local-postgres-e2e",
"staging-smoke",
"five-axis-review",
"memory-consulted",
):
self.assertEqual(
sop.resolve_required_teams(items[slug], high_risk=False),
sop.resolve_required_teams(items[slug], high_risk=True),
f"item {slug} should not be affected by risk-class",
)
+43 -7
View File
@@ -50,6 +50,34 @@ tier_failure_mode:
"tier:low": soft
default_mode: hard # used when no tier:* label is present
# High-risk class (RFC#450 Option C, governance-fix for internal#442).
#
# A PR is "high-risk" when ANY of the listed labels are applied OR when
# the PR has `tier:high` (mechanically the strictest existing tier).
# High-risk items use `required_teams_high_risk` (when present on the
# item); non-high-risk items use the default `required_teams`.
#
# This closes the inconsistency that the SOP charter already mandates
# `tier:high → ceo only` for the sibling `sop-tier-check` gate; the
# sop-checklist's `root-cause` and `no-backwards-compat` items now
# follow the same risk-classed two-eyes shape:
# - Default class (tier:low/medium, not high-risk): a non-author
# engineers/managers/ceo ack satisfies the item — 25+ live
# identities, no dependency on a dead/inactive senior persona
# token.
# - High-risk class (tier:high OR any high_risk_label): still
# requires a non-author ceo ack (durable human team).
#
# Tightening: add labels to high_risk_labels.
# Loosening: remove labels.
high_risk_labels:
- "risk:high"
- "area:security"
- "area:schema"
- "area:fleet-image"
- "area:identity"
- "area:gate-meta"
items:
- slug: comprehensive-testing
numeric_alias: 1
@@ -78,11 +106,15 @@ items:
- slug: root-cause
numeric_alias: 4
pr_section_marker: "Root-cause not symptom"
required_teams: [managers, ceo]
required_teams: [engineers, managers, ceo]
required_teams_high_risk: [ceo]
description: >-
One-sentence root-cause statement. Ack from managers tier
(team-leads) or ceo. Senior judgment required to attest
root-cause-versus-symptom.
One-sentence root-cause statement. Default class: non-author
engineers/managers/ceo ack suffices (engineers can attest
root-cause-vs-symptom for routine fixes). High-risk class
(see `high_risk_labels`): non-author ceo ack required —
senior judgment for irreversible/security/identity/gate
changes. Closes internal#442 + tracks RFC#450.
- slug: five-axis-review
numeric_alias: 5
@@ -95,10 +127,14 @@ items:
- slug: no-backwards-compat
numeric_alias: 6
pr_section_marker: "No backwards-compat shim / dead code added"
required_teams: [managers, ceo]
required_teams: [engineers, managers, ceo]
required_teams_high_risk: [ceo]
description: >-
Yes/no + justification if no. Senior ack required because
backward-compat shims are how dead-code accretes.
Yes/no + justification if no. Default class: non-author
engineers/managers/ceo ack suffices. High-risk class
(see `high_risk_labels`): non-author ceo ack required —
senior judgment for shim-versus-real-fix on irreversible
surfaces. Closes internal#442 + tracks RFC#450.
- slug: memory-consulted
numeric_alias: 7