fix(inbox): drop self-delegation-echo bookkeeping rows from inbox poller #1346

Closed
core-devops wants to merge 2 commits from fix/a2a-self-delegation-echo-inbox into main
2 changed files with 199 additions and 0 deletions

View File

@ -431,6 +431,52 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is this workspace's OWN delegation-outcome
bookkeeping surfacing back through the activity API as a fake inbound
peer message (the self-echo bug, confirmed live 2026-05-16).
Shape: when ``tool_delegate_task`` reports the outcome of a delegation
it fired, ``a2a_tools.report_activity`` POSTs an
``activity_type='a2a_receive'`` row to ``/workspaces/<self>/activity``
with ``source_id = WORKSPACE_ID`` (its own id required by the
workspace-server's source_id spoof-defense, handlers/activity.go:549)
and ``method='message/send'``. The inbox poller then re-fetches that
row from ``?type=a2a_receive``; ``message_from_activity`` sets
``peer_id = source_id`` (non-empty), so ``to_dict`` classifies it as
``kind=peer_agent`` and the registry resolves the workspace's own id
to its display name (e.g. "mac laptop"). The agent sees its own
delegation-failure echoed back as if a peer delegated to it.
A genuine inbound peer message ALWAYS has ``source_id`` set to a
DIFFERENT workspace than the one being polled self-delegation is
already rejected upstream by two independent guards (the platform's
DelegationHandler.Delegate and the agent-side tool_delegate_task).
So ``source_id == workspace_id`` on the a2a_receive feed only ever
means a self-originated bookkeeping row, never a real peer.
Exception: ``method='delegate_result'`` rows ARE legitimately written
to the caller's own inbox with ``source_id == workspace_id`` — that
is the RFC #2829 PR-2 delegation-result delivery path
(handlers/delegation.go:pushDelegationResultToInbox). Those must keep
flowing to the agent, so they are explicitly NOT treated as echo.
Matched on (source_id == workspace_id) AND (method != 'delegate_result')
together so the RFC #2829 result-push is preserved while the
report_activity self-echo is filtered.
"""
if not workspace_id:
# Single-workspace pollers may pass "" as the cursor key but the
# poller always knows its real workspace_id; an empty value means
# we can't safely compare — fall through (treat as not-echo) so
# this guard never silently drops a real message.
return False
source_id = row.get("source_id")
if not source_id or source_id != workspace_id:
return False
return row.get("method") != "delegate_result"
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage.
@ -623,6 +669,16 @@ def _poll_once(
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_echo_row(row, workspace_id):
# This workspace's OWN delegation-outcome bookkeeping row
# (report_activity wrote source_id=WORKSPACE_ID). Re-delivering
# it would surface the agent's own delegation-failure as a
# bogus inbound peer_agent message from itself (self-echo,
# 2026-05-16). Skip + advance the cursor past it, exactly like
# the self-notify guard above, so the next poll doesn't keep
# re-seeing it.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue

View File

@ -552,6 +552,149 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat
)
# ---------------------------------------------------------------------------
# _is_self_echo_row + the self-delegation-echo guard in _poll_once
# ---------------------------------------------------------------------------
#
# Confirmed live on hongming.moleculesai.app 2026-05-16: a workspace
# delegating to another workspace that never picks up the task hits the
# 300s polling timeout; tool_delegate_task then calls
# a2a_tools.report_activity("a2a_receive", ...), which POSTs an
# activity_type='a2a_receive' row to /workspaces/<self>/activity with
# source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's
# source_id spoof-defense) and method='message/send'. The poller then
# re-fetches that row from ?type=a2a_receive and message_from_activity
# sets peer_id=source_id (non-empty) → kind=peer_agent, registry-resolved
# to the workspace's own display name ("mac laptop"). The agent sees its
# own delegation-failure echoed back as an inbound peer message.
#
# A genuine inbound peer always has source_id != polled workspace_id
# (self-delegation is blocked upstream by two guards). The ONLY
# legitimate source_id==self a2a_receive rows are RFC #2829 PR-2
# delegation-result pushes (method='delegate_result'), which must keep
# flowing. These tests pin both halves so a refactor that drops either
# breaks loudly.
def test_is_self_echo_row_true_for_own_report_activity_row():
"""report_activity writes source_id=WORKSPACE_ID, method='message/send'.
Polled back on the same workspace, that is a self-echo."""
row = {"source_id": "ws-1", "method": "message/send"}
assert inbox._is_self_echo_row(row, "ws-1") is True
# tasks/send is the other A2A method report_activity-class rows use.
row2 = {"source_id": "ws-1", "method": "tasks/send"}
assert inbox._is_self_echo_row(row2, "ws-1") is True
def test_is_self_echo_row_false_for_real_peer_inbound():
"""Real peer-agent message: source_id is a DIFFERENT workspace."""
row = {"source_id": "ws-peer-uuid", "method": "message/send"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_for_canvas_user():
"""Canvas-user message: source_id is None/empty (no peer)."""
assert inbox._is_self_echo_row({"source_id": None, "method": "message/send"}, "ws-1") is False
assert inbox._is_self_echo_row({"source_id": "", "method": "message/send"}, "ws-1") is False
def test_is_self_echo_row_false_for_delegate_result_push():
"""RFC #2829 PR-2 delegation-result delivery legitimately writes the
caller's own inbox with source_id==self + method='delegate_result'.
That MUST NOT be filtered it's how the agent gets its delegation
results back on the durable async path."""
row = {"source_id": "ws-1", "method": "delegate_result"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_unknown():
"""Empty workspace_id → can't safely compare → never drop a message."""
assert inbox._is_self_echo_row({"source_id": "ws-1", "method": "message/send"}, "") is False
def test_poll_once_skips_self_echo_delegation_bookkeeping(state: inbox.InboxState):
"""Integrated regression pin for the 2026-05-16 self-echo incident:
a report_activity-shaped row (source_id == polled workspace_id,
method='message/send', activity_type=a2a_receive) must NOT land in
the inbox queue as a fake inbound peer message."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-other-peer",
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer task"}]},
"created_at": "2026-05-16T10:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1", # == the polled workspace_id below
"method": "message/send",
"summary": "hongming-pc failed: polling timeout after 300.0s "
"(delegation_id=16a70d38, last_status=dispatched); the platform",
"request_body": None,
"created_at": "2026-05-16T10:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the genuine peer message counted; the self-echo dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past the skipped self-echo row so the next
poll doesn't re-fetch it forever (the same invariant as self-notify)."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "message/send",
"summary": "ws-child failed: polling timeout after 300.0s",
"request_body": None,
"created_at": "2026-05-16T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
assert state.load_cursor() == "act-self-echo"
def test_poll_once_delegate_result_push_still_delivered(state: inbox.InboxState):
"""RFC #2829 PR-2 delegation-result push (source_id==self,
method='delegate_result') must STILL reach the agent the self-echo
guard must not over-filter the legitimate result-delivery path."""
rows = [
{
"id": "act-delegate-result",
"source_id": "ws-1",
"method": "delegate_result",
"summary": "Delegation result delivered",
"request_body": {"parts": [{"type": "text", "text": "peer's answer"}]},
"created_at": "2026-05-16T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-delegate-result"]
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts