diff --git a/workspace/inbox.py b/workspace/inbox.py index cff95c6d..046f2977 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -431,6 +431,34 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool: return source_id is None or source_id == "" +def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool: + """Return True if ``row`` is a self-originated a2a_receive row. + + Internal #469: when a workspace delegates to a target that never picks + up the task, ``tool_delegate_task`` calls ``report_activity`` which + POSTs to the platform with source_id set to the *sender's* workspace + UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The + activity API exposes that row under type=a2a_receive, so the inbox + poller re-fetches it. Without this guard the row is surfaced as + kind='peer_agent' with the workspace's own identity as peer_id — + the workspace sees its own delegation-failure echoed back as if a + peer had delegated to it. + + The guard mirrors the existing _is_self_notify_row pattern: both + skip rows that would otherwise create spurious inbound signal. The + long-term fix (making the platform write a distinct activity_type + for agent-outbound rows) is tracked separately; this guard stays + because it only excludes rows the agent never wants. + + ``workspace_id`` must be non-empty — an empty-string workspace_id + (single-workspace legacy path) can never match a UUID source_id, so + the predicate is always False there, which is safe. + """ + if not workspace_id: + return False + return row.get("source_id") == workspace_id + + def message_from_activity(row: dict[str, Any]) -> InboxMessage: """Convert one /activity row into an InboxMessage. @@ -623,6 +651,16 @@ def _poll_once( # the same self-notify on every iteration. last_id = str(row.get("id", "")) or last_id continue + if _is_self_echo_row(row, workspace_id): + # Internal #469: tool_delegate_task writes its own a2a_receive + # row with source_id = this workspace's UUID (spoof-defense). + # The poll fetches it back as kind='peer_agent', making the + # workspace echo its own delegation-failure as an inbound from + # a phantom peer. Skip it — the real delegation-result path + # (delegate_result push) is separate and unaffected. Cursor + # still advances so the next poll doesn't re-seen this row. + last_id = str(row.get("id", "")) or last_id + continue message = message_from_activity(row) if not message.activity_id: continue diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index cbba9a3b..1a6c0b03 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -495,6 +495,140 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState): assert [m.activity_id for m in queue] == ["act-real"] +# --------------------------------------------------------------------------- +# _is_self_echo_row — internal #469 fix +# --------------------------------------------------------------------------- +# +# When a workspace delegates to a target that never picks up the task, +# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs +# to the platform with source_id set to the *sender's* workspace UUID +# (spoof-defense). The activity API returns that row under type=a2a_receive +# on the next poll, so message_from_activity sets peer_id = workspace's own +# UUID — the workspace sees its own delegation-failure as an inbound from +# a phantom peer. _is_self_echo_row guards against this. +# +# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16. + + +def test_is_self_echo_row_true_when_source_id_matches_workspace(): + row = {"source_id": "ws-abc123", "method": "a2a_receive"} + assert inbox._is_self_echo_row(row, "ws-abc123") is True + + +def test_is_self_echo_row_false_when_source_id_differs(): + """A real peer agent (different workspace_id) must NOT be filtered.""" + row = {"source_id": "ws-peer", "method": "a2a_receive"} + assert inbox._is_self_echo_row(row, "ws-1") is False + + +def test_is_self_echo_row_false_when_source_id_is_none(): + """Canvas-user inbound has no source_id — never an echo.""" + row = {"source_id": None, "method": "a2a_receive"} + assert inbox._is_self_echo_row(row, "ws-1") is False + + +def test_is_self_echo_row_false_when_workspace_id_is_empty(): + """Single-workspace legacy path with empty workspace_id cannot + match a UUID source_id — predicate is always False, which is safe.""" + row = {"source_id": "ws-abc123", "method": "a2a_receive"} + assert inbox._is_self_echo_row(row, "") is False + + +def test_is_self_echo_row_false_when_source_id_key_absent(): + row = {"method": "a2a_receive"} + assert inbox._is_self_echo_row(row, "ws-1") is False + + +def test_poll_once_skips_self_echo_rows(state: inbox.InboxState): + """Internal #469 regression pin: a row with source_id matching our + workspace_id must NOT land in the inbox queue — it is our own + delegation-report echoing back, not a real peer inbound.""" + rows = [ + { + "id": "act-real-peer", + "source_id": "ws-peer", + "method": "a2a_receive", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + { + "id": "act-self-echo", + "source_id": "ws-1", + "method": "a2a_receive", + "summary": "task result: target timed out", + "request_body": None, + "created_at": "2026-04-30T22:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Only the real peer inbound counted; self-echo silently dropped. + assert n == 1 + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-real-peer"] + assert queue[0].peer_id == "ws-peer" + + +def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState): + """Cursor must advance past self-echo rows even though we don't + enqueue them. Otherwise the next poll re-fetches the same self-echo + on every iteration, wasting requests and blocking real inbound.""" + state.save_cursor("act-old") + rows = [ + { + "id": "act-self-echo", + "source_id": "ws-1", + "method": "a2a_receive", + "summary": "task result: timeout", + "request_body": None, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 0 + assert state.peek(10) == [] + # Cursor must move past the skipped row so we don't re-poll it. + assert state.load_cursor() == "act-self-echo" + + +def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState): + """The notification callback (channel push to Claude Code etc.) + must not fire for self-echo rows. Same rationale as self-notify: + push-capable hosts would see the echo loop on the push channel.""" + rows = [ + { + "id": "act-self-echo", + "source_id": "ws-1", + "method": "a2a_receive", + "summary": "task result: timeout", + "request_body": None, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + received: list[dict] = [] + inbox.set_notification_callback(received.append) + try: + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox.set_notification_callback(None) + + assert received == [], ( + "self-echo rows must not surface as MCP notifications — " + "doing so re-creates the echo loop on push-capable hosts" + ) + + def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState): """Cursor must advance past self-notify rows even though we don't enqueue them. Otherwise the next poll re-fetches the same self-