fix(inbox): drop self-delegation-echo rows from inbox poller
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
CI / Detect changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 59s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Chat / detect-changes (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 16s
E2E Chat / E2E Chat (pull_request) Successful in 27s
CI / Python Lint & Test (pull_request) Successful in 8m51s
CI / Canvas (Next.js) (pull_request) Successful in 23m42s
CI / Platform (Go) (pull_request) Successful in 26m52s
CI / all-required (pull_request) Successful in 26m56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
All checks were successful
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
CI / Detect changes (pull_request) Successful in 25s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 59s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Chat / detect-changes (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 16s
E2E Chat / E2E Chat (pull_request) Successful in 27s
CI / Python Lint & Test (pull_request) Successful in 8m51s
CI / Canvas (Next.js) (pull_request) Successful in 23m42s
CI / Platform (Go) (pull_request) Successful in 26m52s
CI / all-required (pull_request) Successful in 26m56s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Internal #469: when a workspace delegates to a target that never picks up the task, tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs to the platform with source_id = the sender's workspace UUID (spoof- defense). The activity API exposes that row under type=a2a_receive, so the inbox poller re-fetches it and message_from_activity sets peer_id = the workspace's own UUID — the workspace sees its own delegation-failure echoed back as if a peer had delegated to it. Fix adds _is_self_echo_row(row, workspace_id) that returns True when source_id == workspace_id, mirroring the existing _is_self_notify_row pattern. The guard is wired into _poll_once after the self-notify check: self-echo rows are skipped from the queue, the cursor still advances, and the notification callback does not fire. The real delegate_result push path (delegate_result method) is unaffected. 8 new tests cover the predicate (same-workspace, different-workspace, None source, empty workspace_id, absent key) and the integrated poller behavior (skipped from queue, cursor advances, no notification). Live-repro confirmed on hongming.moleculesai.app prior to this fix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
50de2f6155
commit
deeff950be
@ -431,6 +431,34 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
|
|||||||
return source_id is None or source_id == ""
|
return source_id is None or source_id == ""
|
||||||
|
|
||||||
|
|
||||||
|
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
|
||||||
|
"""Return True if ``row`` is a self-originated a2a_receive row.
|
||||||
|
|
||||||
|
Internal #469: when a workspace delegates to a target that never picks
|
||||||
|
up the task, ``tool_delegate_task`` calls ``report_activity`` which
|
||||||
|
POSTs to the platform with source_id set to the *sender's* workspace
|
||||||
|
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
|
||||||
|
activity API exposes that row under type=a2a_receive, so the inbox
|
||||||
|
poller re-fetches it. Without this guard the row is surfaced as
|
||||||
|
kind='peer_agent' with the workspace's own identity as peer_id —
|
||||||
|
the workspace sees its own delegation-failure echoed back as if a
|
||||||
|
peer had delegated to it.
|
||||||
|
|
||||||
|
The guard mirrors the existing _is_self_notify_row pattern: both
|
||||||
|
skip rows that would otherwise create spurious inbound signal. The
|
||||||
|
long-term fix (making the platform write a distinct activity_type
|
||||||
|
for agent-outbound rows) is tracked separately; this guard stays
|
||||||
|
because it only excludes rows the agent never wants.
|
||||||
|
|
||||||
|
``workspace_id`` must be non-empty — an empty-string workspace_id
|
||||||
|
(single-workspace legacy path) can never match a UUID source_id, so
|
||||||
|
the predicate is always False there, which is safe.
|
||||||
|
"""
|
||||||
|
if not workspace_id:
|
||||||
|
return False
|
||||||
|
return row.get("source_id") == workspace_id
|
||||||
|
|
||||||
|
|
||||||
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
||||||
"""Convert one /activity row into an InboxMessage.
|
"""Convert one /activity row into an InboxMessage.
|
||||||
|
|
||||||
@ -623,6 +651,16 @@ def _poll_once(
|
|||||||
# the same self-notify on every iteration.
|
# the same self-notify on every iteration.
|
||||||
last_id = str(row.get("id", "")) or last_id
|
last_id = str(row.get("id", "")) or last_id
|
||||||
continue
|
continue
|
||||||
|
if _is_self_echo_row(row, workspace_id):
|
||||||
|
# Internal #469: tool_delegate_task writes its own a2a_receive
|
||||||
|
# row with source_id = this workspace's UUID (spoof-defense).
|
||||||
|
# The poll fetches it back as kind='peer_agent', making the
|
||||||
|
# workspace echo its own delegation-failure as an inbound from
|
||||||
|
# a phantom peer. Skip it — the real delegation-result path
|
||||||
|
# (delegate_result push) is separate and unaffected. Cursor
|
||||||
|
# still advances so the next poll doesn't re-seen this row.
|
||||||
|
last_id = str(row.get("id", "")) or last_id
|
||||||
|
continue
|
||||||
message = message_from_activity(row)
|
message = message_from_activity(row)
|
||||||
if not message.activity_id:
|
if not message.activity_id:
|
||||||
continue
|
continue
|
||||||
|
|||||||
@ -495,6 +495,140 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
|
|||||||
assert [m.activity_id for m in queue] == ["act-real"]
|
assert [m.activity_id for m in queue] == ["act-real"]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# _is_self_echo_row — internal #469 fix
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# When a workspace delegates to a target that never picks up the task,
|
||||||
|
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
|
||||||
|
# to the platform with source_id set to the *sender's* workspace UUID
|
||||||
|
# (spoof-defense). The activity API returns that row under type=a2a_receive
|
||||||
|
# on the next poll, so message_from_activity sets peer_id = workspace's own
|
||||||
|
# UUID — the workspace sees its own delegation-failure as an inbound from
|
||||||
|
# a phantom peer. _is_self_echo_row guards against this.
|
||||||
|
#
|
||||||
|
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_self_echo_row_true_when_source_id_matches_workspace():
|
||||||
|
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
|
||||||
|
assert inbox._is_self_echo_row(row, "ws-abc123") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_self_echo_row_false_when_source_id_differs():
|
||||||
|
"""A real peer agent (different workspace_id) must NOT be filtered."""
|
||||||
|
row = {"source_id": "ws-peer", "method": "a2a_receive"}
|
||||||
|
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_self_echo_row_false_when_source_id_is_none():
|
||||||
|
"""Canvas-user inbound has no source_id — never an echo."""
|
||||||
|
row = {"source_id": None, "method": "a2a_receive"}
|
||||||
|
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_self_echo_row_false_when_workspace_id_is_empty():
|
||||||
|
"""Single-workspace legacy path with empty workspace_id cannot
|
||||||
|
match a UUID source_id — predicate is always False, which is safe."""
|
||||||
|
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
|
||||||
|
assert inbox._is_self_echo_row(row, "") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_is_self_echo_row_false_when_source_id_key_absent():
|
||||||
|
row = {"method": "a2a_receive"}
|
||||||
|
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
|
||||||
|
"""Internal #469 regression pin: a row with source_id matching our
|
||||||
|
workspace_id must NOT land in the inbox queue — it is our own
|
||||||
|
delegation-report echoing back, not a real peer inbound."""
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"id": "act-real-peer",
|
||||||
|
"source_id": "ws-peer",
|
||||||
|
"method": "a2a_receive",
|
||||||
|
"summary": None,
|
||||||
|
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "act-self-echo",
|
||||||
|
"source_id": "ws-1",
|
||||||
|
"method": "a2a_receive",
|
||||||
|
"summary": "task result: target timed out",
|
||||||
|
"request_body": None,
|
||||||
|
"created_at": "2026-04-30T22:00:01Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
resp = _make_response(200, rows)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
|
||||||
|
# Only the real peer inbound counted; self-echo silently dropped.
|
||||||
|
assert n == 1
|
||||||
|
queue = state.peek(10)
|
||||||
|
assert [m.activity_id for m in queue] == ["act-real-peer"]
|
||||||
|
assert queue[0].peer_id == "ws-peer"
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
|
||||||
|
"""Cursor must advance past self-echo rows even though we don't
|
||||||
|
enqueue them. Otherwise the next poll re-fetches the same self-echo
|
||||||
|
on every iteration, wasting requests and blocking real inbound."""
|
||||||
|
state.save_cursor("act-old")
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"id": "act-self-echo",
|
||||||
|
"source_id": "ws-1",
|
||||||
|
"method": "a2a_receive",
|
||||||
|
"summary": "task result: timeout",
|
||||||
|
"request_body": None,
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
resp = _make_response(200, rows)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
|
||||||
|
assert n == 0
|
||||||
|
assert state.peek(10) == []
|
||||||
|
# Cursor must move past the skipped row so we don't re-poll it.
|
||||||
|
assert state.load_cursor() == "act-self-echo"
|
||||||
|
|
||||||
|
|
||||||
|
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
|
||||||
|
"""The notification callback (channel push to Claude Code etc.)
|
||||||
|
must not fire for self-echo rows. Same rationale as self-notify:
|
||||||
|
push-capable hosts would see the echo loop on the push channel."""
|
||||||
|
rows = [
|
||||||
|
{
|
||||||
|
"id": "act-self-echo",
|
||||||
|
"source_id": "ws-1",
|
||||||
|
"method": "a2a_receive",
|
||||||
|
"summary": "task result: timeout",
|
||||||
|
"request_body": None,
|
||||||
|
"created_at": "2026-04-30T22:00:00Z",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
received: list[dict] = []
|
||||||
|
inbox.set_notification_callback(received.append)
|
||||||
|
try:
|
||||||
|
resp = _make_response(200, rows)
|
||||||
|
p, _ = _patch_httpx(resp)
|
||||||
|
with p:
|
||||||
|
inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||||
|
finally:
|
||||||
|
inbox.set_notification_callback(None)
|
||||||
|
|
||||||
|
assert received == [], (
|
||||||
|
"self-echo rows must not surface as MCP notifications — "
|
||||||
|
"doing so re-creates the echo loop on push-capable hosts"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
|
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
|
||||||
"""Cursor must advance past self-notify rows even though we don't
|
"""Cursor must advance past self-notify rows even though we don't
|
||||||
enqueue them. Otherwise the next poll re-fetches the same self-
|
enqueue them. Otherwise the next poll re-fetches the same self-
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user