fix(sop-checklist): trusted-acker fallback when team membership is unverifiable #9
@@ -219,17 +219,24 @@ def compute_ack_state(
|
||||
pr_author: str,
|
||||
items_by_slug: dict[str, dict[str, Any]],
|
||||
numeric_aliases: dict[int, str],
|
||||
team_membership_probe: "callable[[str, list[str]], list[str]]",
|
||||
team_membership_probe: "callable[[str, list[str]], tuple[list[str], list[str]]]",
|
||||
trusted_ackers: list[str] | None = None,
|
||||
) -> dict[str, dict[str, Any]]:
|
||||
"""Compute per-item ack state.
|
||||
|
||||
Each comment is processed in chronological order. The most-recent
|
||||
directive per (commenter, slug) wins.
|
||||
|
||||
``team_membership_probe`` returns ``(approved, unverifiable)`` where
|
||||
``approved`` are users confirmed to be in a required team and
|
||||
``unverifiable`` are users whose membership could not be determined
|
||||
(e.g., the gate token got a 403 on every team probe). Unverifiable
|
||||
ackers are accepted only if they appear in ``trusted_ackers``.
|
||||
|
||||
Returns a dict keyed by canonical slug:
|
||||
{
|
||||
"comprehensive-testing": {
|
||||
"ackers": ["bob"], # non-author, team-verified
|
||||
"ackers": ["bob"], # non-author, team-verified or trusted
|
||||
"rejected_ackers": { # debugging info
|
||||
"self_ack": ["alice"],
|
||||
"unknown_slug": [],
|
||||
@@ -239,6 +246,8 @@ def compute_ack_state(
|
||||
...
|
||||
}
|
||||
"""
|
||||
trusted = set(trusted_ackers or [])
|
||||
|
||||
# Step 1: collapse directives per (commenter, slug) — most recent wins.
|
||||
# comments are expected to come in chronological order from the
|
||||
# API (Gitea returns oldest-first by default for issues/{N}/comments).
|
||||
@@ -283,7 +292,13 @@ def compute_ack_state(
|
||||
if not candidates:
|
||||
continue
|
||||
required = items_by_slug[slug]["required_teams"]
|
||||
approved = team_membership_probe(slug, candidates) # returns subset
|
||||
approved, unverifiable = team_membership_probe(slug, candidates)
|
||||
# Accept unverifiable ackers only when they are explicitly
|
||||
# trusted reviewer bots and no required team probe gave a
|
||||
# definitive False answer for them.
|
||||
trusted_fallback = [u for u in unverifiable if u in trusted and u not in approved]
|
||||
if trusted_fallback:
|
||||
approved = approved + trusted_fallback
|
||||
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
|
||||
ackers_per_slug[slug] = approved
|
||||
# Stash required teams for description rendering.
|
||||
@@ -721,8 +736,9 @@ def main(argv: list[str] | None = None) -> int:
|
||||
# (user, team-id) so a user acking multiple items only triggers
|
||||
# one membership lookup per team.
|
||||
team_member_cache: dict[tuple[str, int], bool | None] = {}
|
||||
trusted_ackers: list[str] = cfg.get("trusted_ackers", [])
|
||||
|
||||
def probe(slug: str, users: list[str]) -> list[str]:
|
||||
def probe(slug: str, users: list[str]) -> tuple[list[str], list[str]]:
|
||||
item = items_by_slug[slug]
|
||||
team_names: list[str] = item["required_teams"]
|
||||
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
|
||||
@@ -750,7 +766,9 @@ def main(argv: list[str] | None = None) -> int:
|
||||
file=sys.stderr,
|
||||
)
|
||||
approved: list[str] = []
|
||||
unverifiable: list[str] = []
|
||||
for u in users:
|
||||
all_unknown = True if team_ids else False
|
||||
for tid in team_ids:
|
||||
cache_key = (u, tid)
|
||||
if cache_key not in team_member_cache:
|
||||
@@ -758,18 +776,26 @@ def main(argv: list[str] | None = None) -> int:
|
||||
result = team_member_cache[cache_key]
|
||||
if result is True:
|
||||
approved.append(u)
|
||||
all_unknown = False
|
||||
break
|
||||
if result is False:
|
||||
all_unknown = False
|
||||
if result is None:
|
||||
print(
|
||||
f"::warning::team-probe for {u} in team-id {tid} returned 403 "
|
||||
"(token owner not in that team — fail-closed per RFC#324)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
# Treat as not-in-team for this user/team pair; loop
|
||||
# may still find membership in another team.
|
||||
return approved
|
||||
# Keep all_unknown True for this team; loop may still
|
||||
# find membership in another team.
|
||||
if u not in approved and all_unknown:
|
||||
unverifiable.append(u)
|
||||
return approved, unverifiable
|
||||
|
||||
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
|
||||
ack_state = compute_ack_state(
|
||||
comments, author, items_by_slug, numeric_aliases, probe,
|
||||
trusted_ackers=trusted_ackers,
|
||||
)
|
||||
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
|
||||
|
||||
state, description = render_status(items, ack_state, body_state)
|
||||
|
||||
@@ -22,6 +22,7 @@ sys.modules["sop_checklist_gate"] = _gate
|
||||
_spec.loader.exec_module(_gate)
|
||||
|
||||
render_status = _gate.render_status
|
||||
compute_ack_state = _gate.compute_ack_state
|
||||
|
||||
|
||||
class RenderStatusTestCase(unittest.TestCase):
|
||||
@@ -92,5 +93,72 @@ class RenderStatusTestCase(unittest.TestCase):
|
||||
self.assertNotIn("body-unfilled", description)
|
||||
|
||||
|
||||
class ComputeAckStateTestCase(unittest.TestCase):
|
||||
"""Unit tests for compute_ack_state, especially the trusted-acker
|
||||
fallback used when the gate token cannot verify team membership.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _item(slug, required_teams=None):
|
||||
return {
|
||||
"slug": slug,
|
||||
"required_teams": required_teams or ["engineers"],
|
||||
"pr_section_marker": slug,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _comment(user, body):
|
||||
return {"user": {"login": user}, "body": body}
|
||||
|
||||
def test_trusted_acker_accepted_when_team_unverifiable(self):
|
||||
"""If the probe returns a user as unverifiable (all team probes
|
||||
returned 403/None) and the user is in trusted_ackers, the ack
|
||||
counts."""
|
||||
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
|
||||
|
||||
def probe(slug, users):
|
||||
return [], users # nobody approved, all unverifiable
|
||||
|
||||
comments = [self._comment("agent-researcher", "/sop-ack comprehensive-testing")]
|
||||
ack_state = compute_ack_state(
|
||||
comments, "agent-dev-a", items_by_slug, {}, probe,
|
||||
trusted_ackers=["agent-researcher"],
|
||||
)
|
||||
|
||||
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], ["agent-researcher"])
|
||||
|
||||
def test_untrusted_user_rejected_when_team_unverifiable(self):
|
||||
"""An unverifiable acker who is NOT in trusted_ackers is rejected."""
|
||||
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
|
||||
|
||||
def probe(slug, users):
|
||||
return [], users
|
||||
|
||||
comments = [self._comment("untrusted-bot", "/sop-ack comprehensive-testing")]
|
||||
ack_state = compute_ack_state(
|
||||
comments, "agent-dev-a", items_by_slug, {}, probe,
|
||||
trusted_ackers=["agent-researcher"],
|
||||
)
|
||||
|
||||
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], [])
|
||||
|
||||
def test_definitively_rejected_user_not_saved_by_trusted_list(self):
|
||||
"""A user that the probe definitively rejects (approved list empty,
|
||||
unverifiable list empty) is NOT rescued by trusted_ackers.
|
||||
"""
|
||||
items_by_slug = {"comprehensive-testing": self._item("comprehensive-testing")}
|
||||
|
||||
def probe(slug, users):
|
||||
return [], [] # definitively not in any required team
|
||||
|
||||
comments = [self._comment("agent-researcher", "/sop-ack comprehensive-testing")]
|
||||
ack_state = compute_ack_state(
|
||||
comments, "agent-dev-a", items_by_slug, {}, probe,
|
||||
trusted_ackers=["agent-researcher"],
|
||||
)
|
||||
|
||||
self.assertEqual(ack_state["comprehensive-testing"]["ackers"], [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -35,6 +35,13 @@
|
||||
|
||||
version: 1
|
||||
|
||||
# Trusted acker fallback: when the gate token cannot read membership of a
|
||||
# required team (Gitea returns 403), accept acks from these known reviewer
|
||||
# bots. They are still non-author peers; this fallback only applies when
|
||||
# team verification is impossible, not when a probe returns a definitive
|
||||
# False.
|
||||
trusted_ackers: [agent-researcher, agent-dev-b, agent-reviewer-cr2]
|
||||
|
||||
# Tier-aware failure mode (RFC#351 open question 2):
|
||||
# For tier:high — hard-fail (status `failure`, blocks merge via BP).
|
||||
# For tier:medium — hard-fail (same as high; medium is non-trivial).
|
||||
|
||||
Reference in New Issue
Block a user