Merge pull request 'fix(inbox): drop self-delegation-echo rows from inbox poller' (#1348) from fix/inbox-self-echo into main
Some checks failed
publish-workspace-server-image / build-and-push (push) Successful in 2m33s
Block internal-flavored paths / Block forbidden paths (push) Successful in 4s
CI / Detect changes (push) Successful in 5s
CI / Shellcheck (E2E scripts) (push) Successful in 12s
E2E API Smoke Test / detect-changes (push) Successful in 4s
E2E Chat / detect-changes (push) Successful in 4s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 4s
Handlers Postgres Integration / detect-changes (push) Successful in 3s
publish-runtime-autobump / pr-validate (push) Successful in 30s
publish-runtime-autobump / bump-and-tag (push) Successful in 24s
CI / Platform (Go) (push) Successful in 4m40s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 2s
Ops Scripts Tests / Ops scripts (unittest) (push) Successful in 59s
CI / Canvas (Next.js) (push) Successful in 6m41s
CI / Python Lint & Test (push) Successful in 6m34s
CI / all-required (push) Successful in 6m7s
publish-workspace-server-image / Production auto-deploy (push) Successful in 10m54s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2s
E2E Chat / E2E Chat (push) Successful in 1s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
CI / Canvas Deploy Reminder (push) Successful in 1s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m23s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 1m54s
main-red-watchdog / watchdog (push) Successful in 25s
gate-check-v3 / gate-check (push) Successful in 26s
Sweep stale Cloudflare DNS records / Sweep CF orphans (push) Successful in 7s
ci-required-drift / drift (push) Successful in 1m2s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 4m34s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 2s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 3s
gitea-merge-queue / queue (push) Successful in 4s
status-reaper / reap (push) Successful in 57s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 5m53s
publish-runtime / publish (push) Failing after 2m4s
publish-runtime / cascade (push) Has been skipped

This commit is contained in:
devops-engineer 2026-05-16 21:09:20 +00:00
commit ec664869b0
4 changed files with 412 additions and 25 deletions

View File

@ -68,7 +68,7 @@ import sys
import urllib.error import urllib.error
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from typing import Any from typing import Any, Callable
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -110,7 +110,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not # for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length). # yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile( _DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$", r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE, re.MULTILINE,
) )
@ -118,19 +118,21 @@ _DIRECTIVE_RE = re.compile(
def parse_directives( def parse_directives(
comment_body: str, comment_body: str,
numeric_aliases: dict[int, str], numeric_aliases: dict[int, str],
) -> tuple[list[tuple[str, str, str]], list]: ) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Extract /sop-ack and /sop-revoke directives from a comment body. """Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
Returns (directives, na_directives) where: Returns (directives, na_directives) where each is a list of
directives is a list of (kind, canonical_slug, note) tuples (kind, canonical_slug, note) tuples:
kind is "sop-ack" or "sop-revoke" kind is "sop-ack", "sop-revoke", or "sop-n/a"
canonical_slug is the normalized form (or "" if unparseable) canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "") note is the trailing free-text (may be "")
na_directives is reserved for future N/A handling (always [] for now) The two lists are kept separate so call sites can unpack them
directly (e.g. directives, na_directives = parse_directives(...)).
""" """
out: list[tuple[str, str, str]] = [] directives: list[tuple[str, str, str]] = []
na_directives: list[tuple[str, str, str]] = []
if not comment_body: if not comment_body:
return out, [] return directives, na_directives
for m in _DIRECTIVE_RE.finditer(comment_body): for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1) kind = m.group(1)
raw_slug = (m.group(2) or "").strip() raw_slug = (m.group(2) or "").strip()
@ -160,8 +162,12 @@ def parse_directives(
note_from_group = (m.group(3) or "").strip() note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a # If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it. # trailing-text group too, append it.
out.append((kind, canonical, note_from_group)) entry = (kind, canonical, note_from_group)
return out, [] if kind == "sop-n/a":
na_directives.append(entry)
else:
directives.append(entry)
return directives, na_directives
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -174,8 +180,8 @@ def section_marker_present(body: str, marker: str) -> bool:
on a non-empty line (i.e. the author actually filled it in). on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the We require the marker substring AND non-whitespace content on the
same line OR within the next line this prevents trivially-empty same line OR within the next non-blank line this prevents
checklists like: trivially-empty checklists like:
## SOP-Checklist ## SOP-Checklist
- [ ] **Comprehensive testing performed**: - [ ] **Comprehensive testing performed**:
@ -184,9 +190,18 @@ def section_marker_present(body: str, marker: str) -> bool:
from auto-passing the section-present check. The peer-ack is still from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft required, but answering with empty content is captured as a soft
finding via the section-present test alone. finding via the section-present test alone.
NOTE: we scan forward through blank lines (the markdown-header pattern
is ## Header\\n\\ncontent) so that a header + blank-line + content
structure still satisfies the check. The backward checkbox fallback
catches inline markers without a preceding checkbox (mc#1099).
""" """
if not body or not marker: if not body or not marker:
return False return False
# Strip trailing whitespace so the blank-line scan below can find
# content that appears on the very last line of the body (without
# being misled by a trailing \n or spaces).
body = body.rstrip()
body_lower = body.lower() body_lower = body.lower()
marker_lower = marker.lower() marker_lower = marker.lower()
idx = body_lower.find(marker_lower) idx = body_lower.find(marker_lower)
@ -202,13 +217,44 @@ def section_marker_present(body: str, marker: str) -> bool:
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line) stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped: if stripped:
return True return True
# Fall through: check the NEXT line (multi-line answers). # Fall through: scan forward, skipping blank-only lines, until we find
next_line_end = body.find("\n", line_end + 1) # non-empty content or run out of body. Handles:
if next_line_end < 0: # ## Header ← marker line (empty after marker)
next_line_end = len(body) # ← blank line (skipped)
next_line = body[line_end + 1:next_line_end] # - actual content ← found
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line) pos = line_end
return bool(stripped_next) while True:
# Skip the current newline and any additional newlines (blank lines).
while pos < len(body) and body[pos] == "\n":
pos += 1
if pos >= len(body):
break
line_end = body.find("\n", pos)
if line_end < 0:
line_end = len(body)
line = body[pos:line_end]
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
pos = line_end
# Last resort: the marker may appear mid-sentence (e.g.
# **Memory/saved-feedback consulted**: No applicable...).
# Search backward within the CURRENT LINE only (not preceding lines)
# to find a checkbox on the same line before the marker text.
# mc#1099 follow-up: memory-consulted detection was failing because
# the checkbox was on the same line before the inline marker.
_CHECKBOX_RE = re.compile(r"- \[[ x\]]|<input", re.IGNORECASE)
line_start = body.rfind("\n", 0, idx) + 1 # 0 if no newline before idx
before = body[line_start:idx]
m = _CHECKBOX_RE.search(before)
if not m:
return False
# Require meaningful content between the checkbox and the marker text
# (markdown formatting like ** or * must also be stripped).
# If only whitespace/markdown chars remain, the checkbox line is empty.
between = before[m.end() :]
stripped_between = re.sub(r"[\s\*:#\[\]_\-]+", "", between)
return bool(stripped_between)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -251,8 +297,7 @@ def compute_ack_state(
user = (c.get("user") or {}).get("login", "") user = (c.get("user") or {}).get("login", "")
if not user: if not user:
continue continue
directives, _na = parse_directives(body, numeric_aliases) for kind, slug, _note in parse_directives(body, numeric_aliases)[0]:
for kind, slug, _note in directives:
if not slug: if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1 unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue continue
@ -304,6 +349,63 @@ def compute_ack_state(
} }
# ---------------------------------------------------------------------------
# N/A-gate evaluation
# ---------------------------------------------------------------------------
def compute_na_state(
comments: list[dict[str, Any]],
author: str,
na_gates: dict[str, Any],
probe: Callable[[str, list[str]], list[str]],
) -> dict[str, dict[str, Any]]:
"""Evaluate which N/A gates have a valid declaration from a team member.
Returns dict[gate_name, dict] where each dict has:
declared: bool at least one valid non-author team-member declared N/A
decl_ackers: list[str] usernames who declared this gate N/A
rejected: dict with keys:
not_in_team: list[str] users who tried but aren't in required teams
"""
# Build per-user latest N/A directive (most-recent wins per RFC#324).
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, gate, note in parse_directives(body, {})[1]:
# [1] = na_directives only
if gate in na_gates:
latest_na[user] = (gate, note)
result: dict[str, dict[str, Any]] = {}
for gate, gate_cfg in na_gates.items():
result[gate] = {
"declared": False,
"decl_ackers": [],
"rejected": {"not_in_team": []},
}
decl_ackers: list[str] = []
not_in_team: list[str] = []
for user, (g, _note) in latest_na.items():
if g != gate:
continue
if user == author:
continue # authors cannot self-declare N/A
approved = probe(gate, [user])
if approved:
decl_ackers.append(user)
else:
not_in_team.append(user)
result[gate]["declared"] = bool(decl_ackers)
result[gate]["decl_ackers"] = decl_ackers
result[gate]["rejected"]["not_in_team"] = not_in_team
return result
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Gitea API client # Gitea API client
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -698,6 +800,7 @@ def main(argv: list[str] | None = None) -> int:
cfg = load_config(args.config) cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"] items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items} items_by_slug = {it["slug"]: it for it in items}
na_gates: dict[str, Any] = cfg.get("n/a_gates", {})
numeric_aliases = { numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias") int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
} }
@ -818,6 +921,46 @@ def main(argv: list[str] | None = None) -> int:
description=description, target_url=target_url, description=description, target_url=target_url,
) )
print(f"::notice::status posted: {args.status_context}{state}") print(f"::notice::status posted: {args.status_context}{state}")
# --- N/A gate status (RFC#324 §N/A follow-up) ---
# Post a separate status so review-check.sh can discover N/A declarations
# and waive the Gitea-approve requirement for that gate.
na_state: dict[str, dict[str, Any]] = {}
if na_gates:
na_state = compute_na_state(comments, author, na_gates, probe)
na_descs: list[str] = []
for gate, s in na_state.items():
if s["declared"]:
na_descs.append(gate)
decl = s["decl_ackers"]
rej = s["rejected"]["not_in_team"]
if decl:
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
if rej:
print(
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
file=sys.stderr,
)
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
na_status_state = "success" if na_descs else "pending"
# review-check.sh reads the description to discover which gates are N/A.
# Include the gate names so it can grep for them.
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
if not args.dry_run:
client.post_status(
args.owner, args.repo, head_sha,
state=na_status_state,
context="sop-checklist / na-declarations (pull_request)",
description=na_description,
target_url=target_url,
)
print(
f"::notice::na-declarations status → {na_status_state}: {na_description}"
)
# By default exit 0 — the POSTed status IS the gate, NOT the job # By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals # conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the # (one from the job's auto-status, one from our POST), making the

View File

@ -551,3 +551,55 @@ class TestEndToEndAckFlow(unittest.TestCase):
if __name__ == "__main__": if __name__ == "__main__":
unittest.main(verbosity=2) unittest.main(verbosity=2)
# ---------------------------------------------------------------------------
# compute_na_state
# ---------------------------------------------------------------------------
class TestComputeNaState(unittest.TestCase):
"""Tests for /sop-n/a directive evaluation."""
def test_no_na_declarations(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = []
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda *_: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertFalse(na_state["security-review"]["declared"])
def test_na_declared_by_authorized_user(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("bob", "/sop-n/a qa-review N/A: pure tooling change")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertTrue(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["decl_ackers"], ["bob"])
def test_na_declared_by_unauthorized_user_rejected(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("mallory", "/sop-n/a qa-review N/A: not real team")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["rejected"]["not_in_team"], ["mallory"])
def test_author_cannot_self_declare_na(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("alice", "/sop-n/a qa-review N/A: I am the author")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertFalse(na_state["qa-review"]["declared"])
def test_parse_directives_separates_na_from_ack(self):
directives, na_directives = sop.parse_directives(
"/sop-ack comprehensive-testing\n/sop-n/a qa-review N/A: no surface",
{},
)
self.assertEqual(len(directives), 1)
self.assertEqual(directives[0][0], "sop-ack")
self.assertEqual(len(na_directives), 1)
self.assertEqual(na_directives[0][0], "sop-n/a")
self.assertEqual(na_directives[0][1], "qa-review")
self.assertIn("no surface", na_directives[0][2])

View File

@ -431,6 +431,43 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == "" return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is a self-originated a2a_receive row.
Internal #469: when a workspace delegates to a target that never picks
up the task, ``tool_delegate_task`` calls ``report_activity`` which
POSTs to the platform with source_id set to the *sender's* workspace
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
activity API exposes that row under type=a2a_receive, so the inbox
poller re-fetches it. Without this guard the row is surfaced as
kind='peer_agent' with the workspace's own identity as peer_id —
the workspace sees its own delegation-failure echoed back as if a
peer had delegated to it.
The guard mirrors the existing _is_self_notify_row pattern: both
skip rows that would otherwise create spurious inbound signal. The
long-term fix (making the platform write a distinct activity_type
for agent-outbound rows) is tracked separately; this guard stays
because it only excludes rows the agent never wants.
``workspace_id`` must be non-empty an empty-string workspace_id
(single-workspace legacy path) can never match a UUID source_id, so
the predicate is always False there, which is safe.
RFC #2829 PR-2 note: rows with method="delegate_result" are excluded
from the self-echo guard even when source_id matches our workspace_id.
The platform may write a delegation-result row with source_id set to
our workspace_id (e.g. a self-delegation or edge case in the platform's
result-writing path). Such rows must reach the inbox so that
message_from_activity can surface them as peer_agent inbound and the
runtime receives the delegation result. Silently filtering them as
self-echo would break delegation result delivery.
"""
if not workspace_id:
return False
return row.get("source_id") == workspace_id and row.get("method") != "delegate_result"
def message_from_activity(row: dict[str, Any]) -> InboxMessage: def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage. """Convert one /activity row into an InboxMessage.
@ -623,6 +660,16 @@ def _poll_once(
# the same self-notify on every iteration. # the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id last_id = str(row.get("id", "")) or last_id
continue continue
if _is_self_echo_row(row, workspace_id):
# Internal #469: tool_delegate_task writes its own a2a_receive
# row with source_id = this workspace's UUID (spoof-defense).
# The poll fetches it back as kind='peer_agent', making the
# workspace echo its own delegation-failure as an inbound from
# a phantom peer. Skip it — the real delegation-result path
# (delegate_result push) is separate and unaffected. Cursor
# still advances so the next poll doesn't re-seen this row.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row) message = message_from_activity(row)
if not message.activity_id: if not message.activity_id:
continue continue

View File

@ -495,6 +495,151 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
assert [m.activity_id for m in queue] == ["act-real"] assert [m.activity_id for m in queue] == ["act-real"]
# ---------------------------------------------------------------------------
# _is_self_echo_row — internal #469 fix
# ---------------------------------------------------------------------------
#
# When a workspace delegates to a target that never picks up the task,
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
# to the platform with source_id set to the *sender's* workspace UUID
# (spoof-defense). The activity API returns that row under type=a2a_receive
# on the next poll, so message_from_activity sets peer_id = workspace's own
# UUID — the workspace sees its own delegation-failure as an inbound from
# a phantom peer. _is_self_echo_row guards against this.
#
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
def test_is_self_echo_row_true_when_source_id_matches_workspace():
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-abc123") is True
def test_is_self_echo_row_false_when_source_id_differs():
"""A real peer agent (different workspace_id) must NOT be filtered."""
row = {"source_id": "ws-peer", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_source_id_is_none():
"""Canvas-user inbound has no source_id — never an echo."""
row = {"source_id": None, "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_is_empty():
"""Single-workspace legacy path with empty workspace_id cannot
match a UUID source_id predicate is always False, which is safe."""
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "") is False
def test_is_self_echo_row_false_when_source_id_key_absent():
row = {"method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_for_delegate_result():
"""RFC #2829 PR-2 regression pin: a row with source_id matching our
workspace_id but method=delegate_result must NOT be filtered as a
self-echo. The platform may write a delegation-result row with our
workspace_id as source_id; such rows must reach the inbox so the
runtime receives the delegation result. Silently filtering them would
break delegate_result delivery."""
row = {"source_id": "ws-1", "method": "delegate_result"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
"""Internal #469 regression pin: a row with source_id matching our
workspace_id must NOT land in the inbox queue it is our own
delegation-report echoing back, not a real peer inbound."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-peer",
"method": "a2a_receive",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: target timed out",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real peer inbound counted; self-echo silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
assert queue[0].peer_id == "ws-peer"
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past self-echo rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-echo
on every iteration, wasting requests and blocking real inbound."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-echo"
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-echo rows. Same rationale as self-notify:
push-capable hosts would see the echo loop on the push channel."""
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-echo rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState): def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't """Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self- enqueue them. Otherwise the next poll re-fetches the same self-