fix(inbox): drop self-delegation-echo rows from inbox poller
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
CI / Detect changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 59s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Chat / detect-changes (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 16s
E2E Chat / E2E Chat (pull_request) Successful in 27s
CI / Python Lint & Test (pull_request) Successful in 8m51s
CI / Canvas (Next.js) (pull_request) Successful in 23m42s
CI / Platform (Go) (pull_request) Successful in 26m52s
CI / all-required (pull_request) Successful in 26m56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
CI / Detect changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 59s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Chat / detect-changes (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 16s
E2E Chat / E2E Chat (pull_request) Successful in 27s
CI / Python Lint & Test (pull_request) Successful in 8m51s
CI / Canvas (Next.js) (pull_request) Successful in 23m42s
CI / Platform (Go) (pull_request) Successful in 26m52s
CI / all-required (pull_request) Successful in 26m56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Internal #469: when a workspace delegates to a target that never picks up the task, tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs to the platform with source_id = the sender's workspace UUID (spoof- defense). The activity API exposes that row under type=a2a_receive, so the inbox poller re-fetches it and message_from_activity sets peer_id = the workspace's own UUID — the workspace sees its own delegation-failure echoed back as if a peer had delegated to it. Fix adds _is_self_echo_row(row, workspace_id) that returns True when source_id == workspace_id, mirroring the existing _is_self_notify_row pattern. The guard is wired into _poll_once after the self-notify check: self-echo rows are skipped from the queue, the cursor still advances, and the notification callback does not fire. The real delegate_result push path (delegate_result method) is unaffected. 8 new tests cover the predicate (same-workspace, different-workspace, None source, empty workspace_id, absent key) and the integrated poller behavior (skipped from queue, cursor advances, no notification). Live-repro confirmed on hongming.moleculesai.app prior to this fix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
50de2f6155
commit
deeff950be
@ -431,6 +431,34 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
|
||||
return source_id is None or source_id == ""
|
||||
|
||||
|
||||
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
|
||||
"""Return True if ``row`` is a self-originated a2a_receive row.
|
||||
|
||||
Internal #469: when a workspace delegates to a target that never picks
|
||||
up the task, ``tool_delegate_task`` calls ``report_activity`` which
|
||||
POSTs to the platform with source_id set to the *sender's* workspace
|
||||
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
|
||||
activity API exposes that row under type=a2a_receive, so the inbox
|
||||
poller re-fetches it. Without this guard the row is surfaced as
|
||||
kind='peer_agent' with the workspace's own identity as peer_id —
|
||||
the workspace sees its own delegation-failure echoed back as if a
|
||||
peer had delegated to it.
|
||||
|
||||
The guard mirrors the existing _is_self_notify_row pattern: both
|
||||
skip rows that would otherwise create spurious inbound signal. The
|
||||
long-term fix (making the platform write a distinct activity_type
|
||||
for agent-outbound rows) is tracked separately; this guard stays
|
||||
because it only excludes rows the agent never wants.
|
||||
|
||||
``workspace_id`` must be non-empty — an empty-string workspace_id
|
||||
(single-workspace legacy path) can never match a UUID source_id, so
|
||||
the predicate is always False there, which is safe.
|
||||
"""
|
||||
if not workspace_id:
|
||||
return False
|
||||
return row.get("source_id") == workspace_id
|
||||
|
||||
|
||||
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
||||
"""Convert one /activity row into an InboxMessage.
|
||||
|
||||
@ -623,6 +651,16 @@ def _poll_once(
|
||||
# the same self-notify on every iteration.
|
||||
last_id = str(row.get("id", "")) or last_id
|
||||
continue
|
||||
if _is_self_echo_row(row, workspace_id):
|
||||
# Internal #469: tool_delegate_task writes its own a2a_receive
|
||||
# row with source_id = this workspace's UUID (spoof-defense).
|
||||
# The poll fetches it back as kind='peer_agent', making the
|
||||
# workspace echo its own delegation-failure as an inbound from
|
||||
# a phantom peer. Skip it — the real delegation-result path
|
||||
# (delegate_result push) is separate and unaffected. Cursor
|
||||
# still advances so the next poll doesn't re-seen this row.
|
||||
last_id = str(row.get("id", "")) or last_id
|
||||
continue
|
||||
message = message_from_activity(row)
|
||||
if not message.activity_id:
|
||||
continue
|
||||
|
||||
@ -495,6 +495,140 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
|
||||
assert [m.activity_id for m in queue] == ["act-real"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _is_self_echo_row — internal #469 fix
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# When a workspace delegates to a target that never picks up the task,
|
||||
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
|
||||
# to the platform with source_id set to the *sender's* workspace UUID
|
||||
# (spoof-defense). The activity API returns that row under type=a2a_receive
|
||||
# on the next poll, so message_from_activity sets peer_id = workspace's own
|
||||
# UUID — the workspace sees its own delegation-failure as an inbound from
|
||||
# a phantom peer. _is_self_echo_row guards against this.
|
||||
#
|
||||
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
|
||||
|
||||
|
||||
def test_is_self_echo_row_true_when_source_id_matches_workspace():
|
||||
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
|
||||
assert inbox._is_self_echo_row(row, "ws-abc123") is True
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_when_source_id_differs():
|
||||
"""A real peer agent (different workspace_id) must NOT be filtered."""
|
||||
row = {"source_id": "ws-peer", "method": "a2a_receive"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_when_source_id_is_none():
|
||||
"""Canvas-user inbound has no source_id — never an echo."""
|
||||
row = {"source_id": None, "method": "a2a_receive"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_when_workspace_id_is_empty():
|
||||
"""Single-workspace legacy path with empty workspace_id cannot
|
||||
match a UUID source_id — predicate is always False, which is safe."""
|
||||
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
|
||||
assert inbox._is_self_echo_row(row, "") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_when_source_id_key_absent():
|
||||
row = {"method": "a2a_receive"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||
|
||||
|
||||
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
|
||||
"""Internal #469 regression pin: a row with source_id matching our
|
||||
workspace_id must NOT land in the inbox queue — it is our own
|
||||
delegation-report echoing back, not a real peer inbound."""
|
||||
rows = [
|
||||
{
|
||||
"id": "act-real-peer",
|
||||
"source_id": "ws-peer",
|
||||
"method": "a2a_receive",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
},
|
||||
{
|
||||
"id": "act-self-echo",
|
||||
"source_id": "ws-1",
|
||||
"method": "a2a_receive",
|
||||
"summary": "task result: target timed out",
|
||||
"request_body": None,
|
||||
"created_at": "2026-04-30T22:00:01Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
|
||||
# Only the real peer inbound counted; self-echo silently dropped.
|
||||
assert n == 1
|
||||
queue = state.peek(10)
|
||||
assert [m.activity_id for m in queue] == ["act-real-peer"]
|
||||
assert queue[0].peer_id == "ws-peer"
|
||||
|
||||
|
||||
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
|
||||
"""Cursor must advance past self-echo rows even though we don't
|
||||
enqueue them. Otherwise the next poll re-fetches the same self-echo
|
||||
on every iteration, wasting requests and blocking real inbound."""
|
||||
state.save_cursor("act-old")
|
||||
rows = [
|
||||
{
|
||||
"id": "act-self-echo",
|
||||
"source_id": "ws-1",
|
||||
"method": "a2a_receive",
|
||||
"summary": "task result: timeout",
|
||||
"request_body": None,
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
|
||||
assert n == 0
|
||||
assert state.peek(10) == []
|
||||
# Cursor must move past the skipped row so we don't re-poll it.
|
||||
assert state.load_cursor() == "act-self-echo"
|
||||
|
||||
|
||||
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
|
||||
"""The notification callback (channel push to Claude Code etc.)
|
||||
must not fire for self-echo rows. Same rationale as self-notify:
|
||||
push-capable hosts would see the echo loop on the push channel."""
|
||||
rows = [
|
||||
{
|
||||
"id": "act-self-echo",
|
||||
"source_id": "ws-1",
|
||||
"method": "a2a_receive",
|
||||
"summary": "task result: timeout",
|
||||
"request_body": None,
|
||||
"created_at": "2026-04-30T22:00:00Z",
|
||||
},
|
||||
]
|
||||
received: list[dict] = []
|
||||
inbox.set_notification_callback(received.append)
|
||||
try:
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
finally:
|
||||
inbox.set_notification_callback(None)
|
||||
|
||||
assert received == [], (
|
||||
"self-echo rows must not surface as MCP notifications — "
|
||||
"doing so re-creates the echo loop on push-capable hosts"
|
||||
)
|
||||
|
||||
|
||||
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
|
||||
"""Cursor must advance past self-notify rows even though we don't
|
||||
enqueue them. Otherwise the next poll re-fetches the same self-
|
||||
|
||||
Loading…
Reference in New Issue
Block a user