fix(inbox): drop self-delegation-echo bookkeeping rows from inbox poller #1346
@ -431,6 +431,52 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
|
||||
return source_id is None or source_id == ""
|
||||
|
||||
|
||||
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
|
||||
"""Return True if ``row`` is this workspace's OWN delegation-outcome
|
||||
bookkeeping surfacing back through the activity API as a fake inbound
|
||||
peer message (the self-echo bug, confirmed live 2026-05-16).
|
||||
|
||||
Shape: when ``tool_delegate_task`` reports the outcome of a delegation
|
||||
it fired, ``a2a_tools.report_activity`` POSTs an
|
||||
``activity_type='a2a_receive'`` row to ``/workspaces/<self>/activity``
|
||||
with ``source_id = WORKSPACE_ID`` (its own id — required by the
|
||||
workspace-server's source_id spoof-defense, handlers/activity.go:549)
|
||||
and ``method='message/send'``. The inbox poller then re-fetches that
|
||||
row from ``?type=a2a_receive``; ``message_from_activity`` sets
|
||||
``peer_id = source_id`` (non-empty), so ``to_dict`` classifies it as
|
||||
``kind=peer_agent`` and the registry resolves the workspace's own id
|
||||
to its display name (e.g. "mac laptop"). The agent sees its own
|
||||
delegation-failure echoed back as if a peer delegated to it.
|
||||
|
||||
A genuine inbound peer message ALWAYS has ``source_id`` set to a
|
||||
DIFFERENT workspace than the one being polled — self-delegation is
|
||||
already rejected upstream by two independent guards (the platform's
|
||||
DelegationHandler.Delegate and the agent-side tool_delegate_task).
|
||||
So ``source_id == workspace_id`` on the a2a_receive feed only ever
|
||||
means a self-originated bookkeeping row, never a real peer.
|
||||
|
||||
Exception: ``method='delegate_result'`` rows ARE legitimately written
|
||||
to the caller's own inbox with ``source_id == workspace_id`` — that
|
||||
is the RFC #2829 PR-2 delegation-result delivery path
|
||||
(handlers/delegation.go:pushDelegationResultToInbox). Those must keep
|
||||
flowing to the agent, so they are explicitly NOT treated as echo.
|
||||
|
||||
Matched on (source_id == workspace_id) AND (method != 'delegate_result')
|
||||
together so the RFC #2829 result-push is preserved while the
|
||||
report_activity self-echo is filtered.
|
||||
"""
|
||||
if not workspace_id:
|
||||
# Single-workspace pollers may pass "" as the cursor key but the
|
||||
# poller always knows its real workspace_id; an empty value means
|
||||
# we can't safely compare — fall through (treat as not-echo) so
|
||||
# this guard never silently drops a real message.
|
||||
return False
|
||||
source_id = row.get("source_id")
|
||||
if not source_id or source_id != workspace_id:
|
||||
return False
|
||||
return row.get("method") != "delegate_result"
|
||||
|
||||
|
||||
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
|
||||
"""Convert one /activity row into an InboxMessage.
|
||||
|
||||
@ -623,6 +669,16 @@ def _poll_once(
|
||||
# the same self-notify on every iteration.
|
||||
last_id = str(row.get("id", "")) or last_id
|
||||
continue
|
||||
if _is_self_echo_row(row, workspace_id):
|
||||
# This workspace's OWN delegation-outcome bookkeeping row
|
||||
# (report_activity wrote source_id=WORKSPACE_ID). Re-delivering
|
||||
# it would surface the agent's own delegation-failure as a
|
||||
# bogus inbound peer_agent message from itself (self-echo,
|
||||
# 2026-05-16). Skip + advance the cursor past it, exactly like
|
||||
# the self-notify guard above, so the next poll doesn't keep
|
||||
# re-seeing it.
|
||||
last_id = str(row.get("id", "")) or last_id
|
||||
continue
|
||||
message = message_from_activity(row)
|
||||
if not message.activity_id:
|
||||
continue
|
||||
|
||||
@ -552,6 +552,149 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _is_self_echo_row + the self-delegation-echo guard in _poll_once
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# Confirmed live on hongming.moleculesai.app 2026-05-16: a workspace
|
||||
# delegating to another workspace that never picks up the task hits the
|
||||
# 300s polling timeout; tool_delegate_task then calls
|
||||
# a2a_tools.report_activity("a2a_receive", ...), which POSTs an
|
||||
# activity_type='a2a_receive' row to /workspaces/<self>/activity with
|
||||
# source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's
|
||||
# source_id spoof-defense) and method='message/send'. The poller then
|
||||
# re-fetches that row from ?type=a2a_receive and message_from_activity
|
||||
# sets peer_id=source_id (non-empty) → kind=peer_agent, registry-resolved
|
||||
# to the workspace's own display name ("mac laptop"). The agent sees its
|
||||
# own delegation-failure echoed back as an inbound peer message.
|
||||
#
|
||||
# A genuine inbound peer always has source_id != polled workspace_id
|
||||
# (self-delegation is blocked upstream by two guards). The ONLY
|
||||
# legitimate source_id==self a2a_receive rows are RFC #2829 PR-2
|
||||
# delegation-result pushes (method='delegate_result'), which must keep
|
||||
# flowing. These tests pin both halves so a refactor that drops either
|
||||
# breaks loudly.
|
||||
|
||||
|
||||
def test_is_self_echo_row_true_for_own_report_activity_row():
|
||||
"""report_activity writes source_id=WORKSPACE_ID, method='message/send'.
|
||||
Polled back on the same workspace, that is a self-echo."""
|
||||
row = {"source_id": "ws-1", "method": "message/send"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is True
|
||||
# tasks/send is the other A2A method report_activity-class rows use.
|
||||
row2 = {"source_id": "ws-1", "method": "tasks/send"}
|
||||
assert inbox._is_self_echo_row(row2, "ws-1") is True
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_for_real_peer_inbound():
|
||||
"""Real peer-agent message: source_id is a DIFFERENT workspace."""
|
||||
row = {"source_id": "ws-peer-uuid", "method": "message/send"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_for_canvas_user():
|
||||
"""Canvas-user message: source_id is None/empty (no peer)."""
|
||||
assert inbox._is_self_echo_row({"source_id": None, "method": "message/send"}, "ws-1") is False
|
||||
assert inbox._is_self_echo_row({"source_id": "", "method": "message/send"}, "ws-1") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_for_delegate_result_push():
|
||||
"""RFC #2829 PR-2 delegation-result delivery legitimately writes the
|
||||
caller's own inbox with source_id==self + method='delegate_result'.
|
||||
That MUST NOT be filtered — it's how the agent gets its delegation
|
||||
results back on the durable async path."""
|
||||
row = {"source_id": "ws-1", "method": "delegate_result"}
|
||||
assert inbox._is_self_echo_row(row, "ws-1") is False
|
||||
|
||||
|
||||
def test_is_self_echo_row_false_when_workspace_id_unknown():
|
||||
"""Empty workspace_id → can't safely compare → never drop a message."""
|
||||
assert inbox._is_self_echo_row({"source_id": "ws-1", "method": "message/send"}, "") is False
|
||||
|
||||
|
||||
def test_poll_once_skips_self_echo_delegation_bookkeeping(state: inbox.InboxState):
|
||||
"""Integrated regression pin for the 2026-05-16 self-echo incident:
|
||||
a report_activity-shaped row (source_id == polled workspace_id,
|
||||
method='message/send', activity_type=a2a_receive) must NOT land in
|
||||
the inbox queue as a fake inbound peer message."""
|
||||
rows = [
|
||||
{
|
||||
"id": "act-real-peer",
|
||||
"source_id": "ws-other-peer",
|
||||
"method": "message/send",
|
||||
"summary": None,
|
||||
"request_body": {"parts": [{"type": "text", "text": "real peer task"}]},
|
||||
"created_at": "2026-05-16T10:00:00Z",
|
||||
},
|
||||
{
|
||||
"id": "act-self-echo",
|
||||
"source_id": "ws-1", # == the polled workspace_id below
|
||||
"method": "message/send",
|
||||
"summary": "hongming-pc failed: polling timeout after 300.0s "
|
||||
"(delegation_id=16a70d38, last_status=dispatched); the platform",
|
||||
"request_body": None,
|
||||
"created_at": "2026-05-16T10:00:01Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
|
||||
# Only the genuine peer message counted; the self-echo dropped.
|
||||
assert n == 1
|
||||
queue = state.peek(10)
|
||||
assert [m.activity_id for m in queue] == ["act-real-peer"]
|
||||
|
||||
|
||||
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
|
||||
"""Cursor must advance past the skipped self-echo row so the next
|
||||
poll doesn't re-fetch it forever (the same invariant as self-notify)."""
|
||||
state.save_cursor("act-old")
|
||||
rows = [
|
||||
{
|
||||
"id": "act-self-echo",
|
||||
"source_id": "ws-1",
|
||||
"method": "message/send",
|
||||
"summary": "ws-child failed: polling timeout after 300.0s",
|
||||
"request_body": None,
|
||||
"created_at": "2026-05-16T10:00:00Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
|
||||
assert n == 0
|
||||
assert state.peek(10) == []
|
||||
assert state.load_cursor() == "act-self-echo"
|
||||
|
||||
|
||||
def test_poll_once_delegate_result_push_still_delivered(state: inbox.InboxState):
|
||||
"""RFC #2829 PR-2 delegation-result push (source_id==self,
|
||||
method='delegate_result') must STILL reach the agent — the self-echo
|
||||
guard must not over-filter the legitimate result-delivery path."""
|
||||
rows = [
|
||||
{
|
||||
"id": "act-delegate-result",
|
||||
"source_id": "ws-1",
|
||||
"method": "delegate_result",
|
||||
"summary": "Delegation result delivered",
|
||||
"request_body": {"parts": [{"type": "text", "text": "peer's answer"}]},
|
||||
"created_at": "2026-05-16T10:00:00Z",
|
||||
},
|
||||
]
|
||||
resp = _make_response(200, rows)
|
||||
p, _ = _patch_httpx(resp)
|
||||
with p:
|
||||
n = inbox._poll_once(state, "http://platform", "ws-1", {})
|
||||
|
||||
assert n == 1
|
||||
queue = state.peek(10)
|
||||
assert [m.activity_id for m in queue] == ["act-delegate-result"]
|
||||
|
||||
|
||||
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
|
||||
"""Daemon flag is required so the poller dies with the parent
|
||||
process; a non-daemon poller would leak across `claude` restarts
|
||||
|
||||
Loading…
Reference in New Issue
Block a user