From 3bda7e1c077f66983c55c6838fcd05a386915b5f Mon Sep 17 00:00:00 2001 From: Molecule AI Core-DevOps Date: Sat, 16 May 2026 05:30:46 -0700 Subject: [PATCH] fix(inbox): drop self-delegation-echo bookkeeping rows from inbox poller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A workspace delegating to a target that never picks up the task hits the 300s polling timeout; tool_delegate_task then reports the outcome via a2a_tools.report_activity("a2a_receive", ...), which POSTs an activity_type='a2a_receive' row to /workspaces//activity with source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's source_id spoof-defense, activity.go:549) and method='message/send'. The workspace's own inbox poller then re-fetches that row from ?type=a2a_receive; message_from_activity sets peer_id=source_id (non-empty) so to_dict classifies it kind=peer_agent, registry-resolved to the workspace's own display name ("mac laptop"). The agent sees its own delegation-failure echoed back as if a peer delegated to it (confirmed live on hongming.moleculesai.app 2026-05-16). This is NOT a self-delegation (the platform + agent self-delegation guards are correct and fire on source==target — the real target here was a different workspace that legitimately timed out). The producer side is also correct by platform contract (report_activity MUST send source_id=WORKSPACE_ID). The defect is the inbox poller mis-classifying a self-originated bookkeeping row as an inbound peer message — the same class as the already-fixed self-notify echo, slipping through because it uses method='message/send' + non-empty self source_id. Fix at the correct layer (the consumer): add inbox._is_self_echo_row and skip+advance-cursor for any a2a_receive row whose source_id equals the polled workspace_id, mirroring _is_self_notify_row exactly. A genuine inbound peer always has source_id != polled workspace_id (self-delegation is blocked upstream by two guards). RFC #2829 PR-2 delegation-result pushes (method='delegate_result', source_id==self) are explicitly preserved so the durable async result-delivery path keeps working. Tests: 8 new tests (predicate + integrated _poll_once skip/cursor + delegate_result preservation). test_inbox.py 62/62 green; related delegation/a2a suites 48/48 green; zero regression. Refs: molecule-ai/internal#469 Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 56 +++++++++++++ workspace/tests/test_inbox.py | 143 ++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+) diff --git a/workspace/inbox.py b/workspace/inbox.py index cff95c6d..b19ff1fe 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -431,6 +431,52 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool: return source_id is None or source_id == "" +def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool: + """Return True if ``row`` is this workspace's OWN delegation-outcome + bookkeeping surfacing back through the activity API as a fake inbound + peer message (the self-echo bug, confirmed live 2026-05-16). + + Shape: when ``tool_delegate_task`` reports the outcome of a delegation + it fired, ``a2a_tools.report_activity`` POSTs an + ``activity_type='a2a_receive'`` row to ``/workspaces//activity`` + with ``source_id = WORKSPACE_ID`` (its own id — required by the + workspace-server's source_id spoof-defense, handlers/activity.go:549) + and ``method='message/send'``. The inbox poller then re-fetches that + row from ``?type=a2a_receive``; ``message_from_activity`` sets + ``peer_id = source_id`` (non-empty), so ``to_dict`` classifies it as + ``kind=peer_agent`` and the registry resolves the workspace's own id + to its display name (e.g. "mac laptop"). The agent sees its own + delegation-failure echoed back as if a peer delegated to it. + + A genuine inbound peer message ALWAYS has ``source_id`` set to a + DIFFERENT workspace than the one being polled — self-delegation is + already rejected upstream by two independent guards (the platform's + DelegationHandler.Delegate and the agent-side tool_delegate_task). + So ``source_id == workspace_id`` on the a2a_receive feed only ever + means a self-originated bookkeeping row, never a real peer. + + Exception: ``method='delegate_result'`` rows ARE legitimately written + to the caller's own inbox with ``source_id == workspace_id`` — that + is the RFC #2829 PR-2 delegation-result delivery path + (handlers/delegation.go:pushDelegationResultToInbox). Those must keep + flowing to the agent, so they are explicitly NOT treated as echo. + + Matched on (source_id == workspace_id) AND (method != 'delegate_result') + together so the RFC #2829 result-push is preserved while the + report_activity self-echo is filtered. + """ + if not workspace_id: + # Single-workspace pollers may pass "" as the cursor key but the + # poller always knows its real workspace_id; an empty value means + # we can't safely compare — fall through (treat as not-echo) so + # this guard never silently drops a real message. + return False + source_id = row.get("source_id") + if not source_id or source_id != workspace_id: + return False + return row.get("method") != "delegate_result" + + def message_from_activity(row: dict[str, Any]) -> InboxMessage: """Convert one /activity row into an InboxMessage. @@ -623,6 +669,16 @@ def _poll_once( # the same self-notify on every iteration. last_id = str(row.get("id", "")) or last_id continue + if _is_self_echo_row(row, workspace_id): + # This workspace's OWN delegation-outcome bookkeeping row + # (report_activity wrote source_id=WORKSPACE_ID). Re-delivering + # it would surface the agent's own delegation-failure as a + # bogus inbound peer_agent message from itself (self-echo, + # 2026-05-16). Skip + advance the cursor past it, exactly like + # the self-notify guard above, so the next poll doesn't keep + # re-seeing it. + last_id = str(row.get("id", "")) or last_id + continue message = message_from_activity(row) if not message.activity_id: continue diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index cbba9a3b..57b44994 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -552,6 +552,149 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat ) +# --------------------------------------------------------------------------- +# _is_self_echo_row + the self-delegation-echo guard in _poll_once +# --------------------------------------------------------------------------- +# +# Confirmed live on hongming.moleculesai.app 2026-05-16: a workspace +# delegating to another workspace that never picks up the task hits the +# 300s polling timeout; tool_delegate_task then calls +# a2a_tools.report_activity("a2a_receive", ...), which POSTs an +# activity_type='a2a_receive' row to /workspaces//activity with +# source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's +# source_id spoof-defense) and method='message/send'. The poller then +# re-fetches that row from ?type=a2a_receive and message_from_activity +# sets peer_id=source_id (non-empty) → kind=peer_agent, registry-resolved +# to the workspace's own display name ("mac laptop"). The agent sees its +# own delegation-failure echoed back as an inbound peer message. +# +# A genuine inbound peer always has source_id != polled workspace_id +# (self-delegation is blocked upstream by two guards). The ONLY +# legitimate source_id==self a2a_receive rows are RFC #2829 PR-2 +# delegation-result pushes (method='delegate_result'), which must keep +# flowing. These tests pin both halves so a refactor that drops either +# breaks loudly. + + +def test_is_self_echo_row_true_for_own_report_activity_row(): + """report_activity writes source_id=WORKSPACE_ID, method='message/send'. + Polled back on the same workspace, that is a self-echo.""" + row = {"source_id": "ws-1", "method": "message/send"} + assert inbox._is_self_echo_row(row, "ws-1") is True + # tasks/send is the other A2A method report_activity-class rows use. + row2 = {"source_id": "ws-1", "method": "tasks/send"} + assert inbox._is_self_echo_row(row2, "ws-1") is True + + +def test_is_self_echo_row_false_for_real_peer_inbound(): + """Real peer-agent message: source_id is a DIFFERENT workspace.""" + row = {"source_id": "ws-peer-uuid", "method": "message/send"} + assert inbox._is_self_echo_row(row, "ws-1") is False + + +def test_is_self_echo_row_false_for_canvas_user(): + """Canvas-user message: source_id is None/empty (no peer).""" + assert inbox._is_self_echo_row({"source_id": None, "method": "message/send"}, "ws-1") is False + assert inbox._is_self_echo_row({"source_id": "", "method": "message/send"}, "ws-1") is False + + +def test_is_self_echo_row_false_for_delegate_result_push(): + """RFC #2829 PR-2 delegation-result delivery legitimately writes the + caller's own inbox with source_id==self + method='delegate_result'. + That MUST NOT be filtered — it's how the agent gets its delegation + results back on the durable async path.""" + row = {"source_id": "ws-1", "method": "delegate_result"} + assert inbox._is_self_echo_row(row, "ws-1") is False + + +def test_is_self_echo_row_false_when_workspace_id_unknown(): + """Empty workspace_id → can't safely compare → never drop a message.""" + assert inbox._is_self_echo_row({"source_id": "ws-1", "method": "message/send"}, "") is False + + +def test_poll_once_skips_self_echo_delegation_bookkeeping(state: inbox.InboxState): + """Integrated regression pin for the 2026-05-16 self-echo incident: + a report_activity-shaped row (source_id == polled workspace_id, + method='message/send', activity_type=a2a_receive) must NOT land in + the inbox queue as a fake inbound peer message.""" + rows = [ + { + "id": "act-real-peer", + "source_id": "ws-other-peer", + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "real peer task"}]}, + "created_at": "2026-05-16T10:00:00Z", + }, + { + "id": "act-self-echo", + "source_id": "ws-1", # == the polled workspace_id below + "method": "message/send", + "summary": "hongming-pc failed: polling timeout after 300.0s " + "(delegation_id=16a70d38, last_status=dispatched); the platform", + "request_body": None, + "created_at": "2026-05-16T10:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Only the genuine peer message counted; the self-echo dropped. + assert n == 1 + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-real-peer"] + + +def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState): + """Cursor must advance past the skipped self-echo row so the next + poll doesn't re-fetch it forever (the same invariant as self-notify).""" + state.save_cursor("act-old") + rows = [ + { + "id": "act-self-echo", + "source_id": "ws-1", + "method": "message/send", + "summary": "ws-child failed: polling timeout after 300.0s", + "request_body": None, + "created_at": "2026-05-16T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 0 + assert state.peek(10) == [] + assert state.load_cursor() == "act-self-echo" + + +def test_poll_once_delegate_result_push_still_delivered(state: inbox.InboxState): + """RFC #2829 PR-2 delegation-result push (source_id==self, + method='delegate_result') must STILL reach the agent — the self-echo + guard must not over-filter the legitimate result-delivery path.""" + rows = [ + { + "id": "act-delegate-result", + "source_id": "ws-1", + "method": "delegate_result", + "summary": "Delegation result delivered", + "request_body": {"parts": [{"type": "text", "text": "peer's answer"}]}, + "created_at": "2026-05-16T10:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 1 + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-delegate-result"] + + def test_start_poller_thread_is_daemon(state: inbox.InboxState): """Daemon flag is required so the poller dies with the parent process; a non-daemon poller would leak across `claude` restarts -- 2.45.2