fix(inbox): drop self-delegation-echo rows from inbox poller
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
CI / Detect changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 59s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Chat / detect-changes (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 16s
E2E Chat / E2E Chat (pull_request) Successful in 27s
CI / Python Lint & Test (pull_request) Successful in 8m51s
CI / Canvas (Next.js) (pull_request) Successful in 23m42s
CI / Platform (Go) (pull_request) Successful in 26m52s
CI / all-required (pull_request) Successful in 26m56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped

Internal #469: when a workspace delegates to a target that never picks up
the task, tool_delegate_task calls report_activity("a2a_receive", ...) which
POSTs to the platform with source_id = the sender's workspace UUID (spoof-
defense). The activity API exposes that row under type=a2a_receive, so the
inbox poller re-fetches it and message_from_activity sets peer_id = the
workspace's own UUID — the workspace sees its own delegation-failure echoed
back as if a peer had delegated to it.

Fix adds _is_self_echo_row(row, workspace_id) that returns True when
source_id == workspace_id, mirroring the existing _is_self_notify_row
pattern. The guard is wired into _poll_once after the self-notify check:
self-echo rows are skipped from the queue, the cursor still advances, and
the notification callback does not fire. The real delegate_result push path
(delegate_result method) is unaffected.

8 new tests cover the predicate (same-workspace, different-workspace,
None source, empty workspace_id, absent key) and the integrated poller
behavior (skipped from queue, cursor advances, no notification).

Live-repro confirmed on hongming.moleculesai.app prior to this fix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-16 12:38:55 +00:00
parent 50de2f6155
commit deeff950be
2 changed files with 172 additions and 0 deletions

View File

@ -431,6 +431,34 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is a self-originated a2a_receive row.
Internal #469: when a workspace delegates to a target that never picks
up the task, ``tool_delegate_task`` calls ``report_activity`` which
POSTs to the platform with source_id set to the *sender's* workspace
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
activity API exposes that row under type=a2a_receive, so the inbox
poller re-fetches it. Without this guard the row is surfaced as
kind='peer_agent' with the workspace's own identity as peer_id —
the workspace sees its own delegation-failure echoed back as if a
peer had delegated to it.
The guard mirrors the existing _is_self_notify_row pattern: both
skip rows that would otherwise create spurious inbound signal. The
long-term fix (making the platform write a distinct activity_type
for agent-outbound rows) is tracked separately; this guard stays
because it only excludes rows the agent never wants.
``workspace_id`` must be non-empty an empty-string workspace_id
(single-workspace legacy path) can never match a UUID source_id, so
the predicate is always False there, which is safe.
"""
if not workspace_id:
return False
return row.get("source_id") == workspace_id
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage.
@ -623,6 +651,16 @@ def _poll_once(
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_echo_row(row, workspace_id):
# Internal #469: tool_delegate_task writes its own a2a_receive
# row with source_id = this workspace's UUID (spoof-defense).
# The poll fetches it back as kind='peer_agent', making the
# workspace echo its own delegation-failure as an inbound from
# a phantom peer. Skip it — the real delegation-result path
# (delegate_result push) is separate and unaffected. Cursor
# still advances so the next poll doesn't re-seen this row.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue

View File

@ -495,6 +495,140 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
assert [m.activity_id for m in queue] == ["act-real"]
# ---------------------------------------------------------------------------
# _is_self_echo_row — internal #469 fix
# ---------------------------------------------------------------------------
#
# When a workspace delegates to a target that never picks up the task,
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
# to the platform with source_id set to the *sender's* workspace UUID
# (spoof-defense). The activity API returns that row under type=a2a_receive
# on the next poll, so message_from_activity sets peer_id = workspace's own
# UUID — the workspace sees its own delegation-failure as an inbound from
# a phantom peer. _is_self_echo_row guards against this.
#
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
def test_is_self_echo_row_true_when_source_id_matches_workspace():
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-abc123") is True
def test_is_self_echo_row_false_when_source_id_differs():
"""A real peer agent (different workspace_id) must NOT be filtered."""
row = {"source_id": "ws-peer", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_source_id_is_none():
"""Canvas-user inbound has no source_id — never an echo."""
row = {"source_id": None, "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_is_empty():
"""Single-workspace legacy path with empty workspace_id cannot
match a UUID source_id predicate is always False, which is safe."""
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "") is False
def test_is_self_echo_row_false_when_source_id_key_absent():
row = {"method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
"""Internal #469 regression pin: a row with source_id matching our
workspace_id must NOT land in the inbox queue it is our own
delegation-report echoing back, not a real peer inbound."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-peer",
"method": "a2a_receive",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: target timed out",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real peer inbound counted; self-echo silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
assert queue[0].peer_id == "ws-peer"
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past self-echo rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-echo
on every iteration, wasting requests and blocking real inbound."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-echo"
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-echo rows. Same rationale as self-notify:
push-capable hosts would see the echo loop on the push channel."""
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-echo rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-