fix(sop-checklist): trusted-acker fallback when team membership is unverifiable #9

Merged
agent-dev-a merged 1 commits from fix/sop-checklist-trusted-ackers into main 2026-06-21 03:10:52 +00:00
3 changed files with 109 additions and 8 deletions
+34 -8
View File
@@ -219,17 +219,24 @@ def compute_ack_state(
pr_author: str,
items_by_slug: dict[str, dict[str, Any]],
numeric_aliases: dict[int, str],
team_membership_probe: "callable[[str, list[str]], list[str]]",
team_membership_probe: "callable[[str, list[str]], tuple[list[str], list[str]]]",
trusted_ackers: list[str] | None = None,
) -> dict[str, dict[str, Any]]:
"""Compute per-item ack state.
Each comment is processed in chronological order. The most-recent
directive per (commenter, slug) wins.
``team_membership_probe`` returns ``(approved, unverifiable)`` where
``approved`` are users confirmed to be in a required team and
``unverifiable`` are users whose membership could not be determined
(e.g., the gate token got a 403 on every team probe). Unverifiable
ackers are accepted only if they appear in ``trusted_ackers``.
Returns a dict keyed by canonical slug:
{
"comprehensive-testing": {
"ackers": ["bob"], # non-author, team-verified
"ackers": ["bob"], # non-author, team-verified or trusted
"rejected_ackers": { # debugging info
"self_ack": ["alice"],
"unknown_slug": [],
@@ -239,6 +246,8 @@ def compute_ack_state(
...
}
"""
trusted = set(trusted_ackers or [])
# Step 1: collapse directives per (commenter, slug) — most recent wins.
# comments are expected to come in chronological order from the
# API (Gitea returns oldest-first by default for issues/{N}/comments).
@@ -283,7 +292,13 @@ def compute_ack_state(
if not candidates:
continue
required = items_by_slug[slug]["required_teams"]
approved = team_membership_probe(slug, candidates) # returns subset
approved, unverifiable = team_membership_probe(slug, candidates)
# Accept unverifiable ackers only when they are explicitly
# trusted reviewer bots and no required team probe gave a
# definitive False answer for them.
trusted_fallback = [u for u in unverifiable if u in trusted and u not in approved]
if trusted_fallback:
approved = approved + trusted_fallback
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
ackers_per_slug[slug] = approved
# Stash required teams for description rendering.
@@ -721,8 +736,9 @@ def main(argv: list[str] | None = None) -> int:
# (user, team-id) so a user acking multiple items only triggers
# one membership lookup per team.
team_member_cache: dict[tuple[str, int], bool | None] = {}
trusted_ackers: list[str] = cfg.get("trusted_ackers", [])
def probe(slug: str, users: list[str]) -> list[str]:
def probe(slug: str, users: list[str]) -> tuple[list[str], list[str]]:
item = items_by_slug[slug]
team_names: list[str] = item["required_teams"]
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
@@ -750,7 +766,9 @@ def main(argv: list[str] | None = None) -> int:
file=sys.stderr,
)
approved: list[str] = []
unverifiable: list[str] = []
for u in users:
all_unknown = True if team_ids else False
for tid in team_ids:
cache_key = (u, tid)
if cache_key not in team_member_cache:
@@ -758,18 +776,26 @@ def main(argv: list[str] | None = None) -> int:
result = team_member_cache[cache_key]
if result is True:
approved.append(u)
all_unknown = False
break
if result is False:
all_unknown = False
if result is None:
print(
f"::warning::team-probe for {u} in team-id {tid} returned 403 "
"(token owner not in that team — fail-closed per RFC#324)",
file=sys.stderr,
)
# Treat as not-in-team for this user/team pair; loop
# may still find membership in another team.
return approved
# Keep all_unknown True for this team; loop may still
# find membership in another team.
if u not in approved and all_unknown:
unverifiable.append(u)
return approved, unverifiable
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
ack_state = compute_ack_state(
comments, author, items_by_slug, numeric_aliases, probe,
trusted_ackers=trusted_ackers,
)
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
state, description = render_status(items, ack_state, body_state)
@@ -22,6 +22,7 @@ sys.modules["sop_checklist_gate"] = _gate
_spec.loader.exec_module(_gate)
render_status = _gate.render_status
compute_ack_state = _gate.compute_ack_state
class RenderStatusTestCase(unittest.TestCase):
@@ -92,5 +93,72 @@ class RenderStatusTestCase(unittest.TestCase):
self.assertNotIn("body-unfilled", description)
class ComputeAckStateTestCase(unittest.TestCase):
"""Unit tests for compute_ack_state, especially the trusted-acker
fallback used when the gate token cannot verify team membership.
"""
@staticmethod
def _item(slug, required_teams=None):
return {
"slug": slug,
"required_teams": required_teams or ["engineers"],
"pr_section_marker": slug,
}
@staticmethod
def _comment(user, body):
return {"user": {"login": user}, "body": body}
def test_trusted_acker_accepted_when_team_unverifiable(self):
"""If the probe returns a user as unverifiable (all team probes
returned 403/None) and the user is in trusted_ackers, the ack
counts."""
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
def probe(slug, users):
return [], users # nobody approved, all unverifiable
comments = [self._comment("agent-researcher", "/sop-ack comprehensive-testing")]
ack_state = compute_ack_state(
comments, "agent-dev-a", items_by_slug, {}, probe,
trusted_ackers=["agent-researcher"],
)
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], ["agent-researcher"])
def test_untrusted_user_rejected_when_team_unverifiable(self):
"""An unverifiable acker who is NOT in trusted_ackers is rejected."""
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
def probe(slug, users):
return [], users
comments = [self._comment("untrusted-bot", "/sop-ack comprehensive-testing")]
ack_state = compute_ack_state(
comments, "agent-dev-a", items_by_slug, {}, probe,
trusted_ackers=["agent-researcher"],
)
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], [])
def test_definitively_rejected_user_not_saved_by_trusted_list(self):
"""A user that the probe definitively rejects (approved list empty,
unverifiable list empty) is NOT rescued by trusted_ackers.
"""
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
def probe(slug, users):
return [], [] # definitively not in any required team
comments = [self._comment("agent-researcher", "/sop-ack comprehensive-testing")]
ack_state = compute_ack_state(
comments, "agent-dev-a", items_by_slug, {}, probe,
trusted_ackers=["agent-researcher"],
)
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], [])
if __name__ == "__main__":
unittest.main()
+7
View File
@@ -35,6 +35,13 @@
version: 1
# Trusted acker fallback: when the gate token cannot read membership of a
# required team (Gitea returns 403), accept acks from these known reviewer
# bots. They are still non-author peers; this fallback only applies when
# team verification is impossible, not when a probe returns a definitive
# False.
trusted_ackers: [agent-researcher, agent-dev-b, agent-reviewer-cr2]
# Tier-aware failure mode (RFC#351 open question 2):
# For tier:high — hard-fail (status `failure`, blocks merge via BP).
# For tier:medium — hard-fail (same as high; medium is non-trivial).