From 2d8c45989a7ee146aae2a293f2f3684f9ae2be72 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Fri, 1 May 2026 17:35:49 -0700 Subject: [PATCH] fix(inbox): skip self-notify rows in poller to break echo loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The workspace-server's `/notify` handler writes the agent's own send_message_to_user POSTs to activity_logs as activity_type= 'a2a_receive', method='notify', source_id=NULL so the canvas chat-history loader can restore those bubbles after a page reload. The activity API exposes the row to /workspaces/:id/activity? type=a2a_receive, so the inbox poller picks it up and pushes the agent's own outbound back as an inbound `← molecule: Agent message: ...` — confirmed live 2026-05-01. Add `_is_self_notify_row` predicate matched on (method='notify' AND no source_id) and call it from `_poll_once` before enqueue. The predicate combines BOTH discriminators so a future caller using method='notify' with a real peer_id still passes through. Cursor advances past skipped rows so we don't re-poll the same self-notify on every iteration. Belt-and-braces: long-term fix lives in workspace-server (rename the misclassified activity_type to 'agent_outbound' — RFC at #2469). This guard stays regardless because it only excludes rows we never want. Tests: 7 new — predicate true/false matrix + integrated _poll_once behavior (skip, cursor advance, notification suppression). Mutation-verified: reverting inbox.py to the prior shape fails 7/7; applied state passes 48/48. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/inbox.py | 39 ++++++++++ workspace/tests/test_inbox.py | 138 ++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) diff --git a/workspace/inbox.py b/workspace/inbox.py index 28e1b46e..b0718f82 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -364,6 +364,23 @@ def _extract_text(request_body: Any, summary: str | None) -> str: return summary or "(empty A2A message)" +def _is_self_notify_row(row: dict[str, Any]) -> bool: + """Return True if ``row`` is the agent's own send_message_to_user + POST surfacing back through the activity API. + + The shape (workspace-server handlers/activity.go, ``Notify`` writer): + method='notify' AND no peer (source_id is None or '') + + Matched on both fields together so a future caller using + ``method='notify'`` for a different purpose with a real peer_id + still passes through. + """ + if row.get("method") != "notify": + return False + source_id = row.get("source_id") + return source_id is None or source_id == "" + + def message_from_activity(row: dict[str, Any]) -> InboxMessage: """Convert one /activity row into an InboxMessage.""" request_body = row.get("request_body") @@ -457,6 +474,28 @@ def _poll_once( for row in rows: if not isinstance(row, dict): continue + if _is_self_notify_row(row): + # The workspace-server's `/notify` handler writes the agent's + # own send_message_to_user POSTs to activity_logs with + # activity_type='a2a_receive', method='notify', and no + # source_id, so the canvas chat-history loader can restore + # those bubbles after a page reload (handlers/activity.go, + # comment block at line 428). The activity API exposes that + # filter only on type, so the same row otherwise lands in + # this poll and gets pushed back to the agent — confirmed + # live 2026-05-01: agent observed its own outbound as an + # inbound `← molecule: Agent message: ...`. Filter here + # belt-and-braces; the long-term fix is upstream renaming + # the activity_type to `agent_outbound` (molecule-core + # #2469). Once that lands, this filter becomes redundant + # but stays in place because it only excludes rows we never + # want, so removing it would just be churn. + # + # NB: still call save_cursor for these rows below — we + # advance past them so the next poll doesn't keep re-seeing + # the same self-notify on every iteration. + last_id = str(row.get("id", "")) or last_id + continue message = message_from_activity(row) if not message.activity_id: continue diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index 0b8cacd1..6731701a 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -414,6 +414,144 @@ def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxS assert state.load_cursor() == "act-newest" +# --------------------------------------------------------------------------- +# _is_self_notify_row + the echo-loop guard in _poll_once +# --------------------------------------------------------------------------- +# +# The workspace-server's `/notify` handler writes the agent's own +# send_message_to_user POSTs to activity_logs as activity_type= +# 'a2a_receive' with method='notify' and no source_id, so the canvas +# chat-history loader can restore those bubbles after a page reload. +# Without a guard, the poller picks them up and pushes them back as +# inbound — confirmed live 2026-05-01: the agent observed its own +# outbound as `← molecule: Agent message: ...`. +# +# These tests pin both the predicate (`_is_self_notify_row`) and the +# integrated behavior in `_poll_once` so a future refactor that drops +# either half breaks loudly. Long-term the upstream fix is renaming +# the activity_type at the workspace-server (#2469); this guard stays +# regardless because it only excludes rows we never want. + + +def test_is_self_notify_row_true_for_method_notify_no_peer(): + assert inbox._is_self_notify_row({"method": "notify", "source_id": None}) is True + assert inbox._is_self_notify_row({"method": "notify", "source_id": ""}) is True + # source_id key absent — same shape (None on .get). + assert inbox._is_self_notify_row({"method": "notify"}) is True + + +def test_is_self_notify_row_false_for_real_canvas_inbound(): + """Real canvas-user message: method='message/send' (not notify), + source_id None (no peer).""" + row = {"method": "message/send", "source_id": None} + assert inbox._is_self_notify_row(row) is False + + +def test_is_self_notify_row_false_for_real_peer_inbound(): + """Real peer-agent message: method='message/send' or 'tasks/send', + source_id is the sender workspace UUID.""" + row = {"method": "tasks/send", "source_id": "ws-peer-uuid"} + assert inbox._is_self_notify_row(row) is False + + +def test_is_self_notify_row_false_for_method_notify_with_peer(): + """Defensive: a future caller using method='notify' WITH a real + peer_id is treated as a real inbound, not a self-notify. Drops the + guard if upstream ever repurposes the method='notify' shape.""" + row = {"method": "notify", "source_id": "ws-peer-uuid"} + assert inbox._is_self_notify_row(row) is False + + +def test_poll_once_skips_self_notify_rows(state: inbox.InboxState): + """The integrated guard: a self-notify row in the activity payload + must NOT land in the inbox queue. This is the regression pin for + the 2026-05-01 echo-loop incident.""" + rows = [ + { + "id": "act-real", + "source_id": None, + "method": "message/send", + "summary": None, + "request_body": {"parts": [{"type": "text", "text": "real inbound"}]}, + "created_at": "2026-04-30T22:00:00Z", + }, + { + "id": "act-self-notify", + "source_id": None, + "method": "notify", + "summary": "Agent message: Hi! What can I help you with today?", + "request_body": None, + "created_at": "2026-04-30T22:00:01Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + # Only the real inbound counted; self-notify silently dropped. + assert n == 1 + queue = state.peek(10) + assert [m.activity_id for m in queue] == ["act-real"] + + +def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState): + """Cursor must advance past self-notify rows even though we don't + enqueue them. Otherwise the next poll re-fetches the same self- + notify on every iteration (until a real inbound arrives), wasting + a request and pinning the cursor backward.""" + state.save_cursor("act-old") + rows = [ + { + "id": "act-self-notify", + "source_id": None, + "method": "notify", + "summary": "Agent message: hello", + "request_body": None, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + n = inbox._poll_once(state, "http://platform", "ws-1", {}) + + assert n == 0 + assert state.peek(10) == [] + # Cursor must move past the skipped row so we don't re-poll it. + assert state.load_cursor() == "act-self-notify" + + +def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxState): + """The notification callback (channel push to Claude Code etc.) + must not fire for self-notify rows. Otherwise a notification- + capable host gets the same echo loop the queue side avoids.""" + rows = [ + { + "id": "act-self-notify", + "source_id": None, + "method": "notify", + "summary": "Agent message: hello", + "request_body": None, + "created_at": "2026-04-30T22:00:00Z", + }, + ] + received: list[dict] = [] + inbox.set_notification_callback(received.append) + try: + resp = _make_response(200, rows) + p, _ = _patch_httpx(resp) + with p: + inbox._poll_once(state, "http://platform", "ws-1", {}) + finally: + inbox.set_notification_callback(None) + + assert received == [], ( + "self-notify rows must not surface as MCP notifications — " + "doing so re-creates the echo loop on push-capable hosts" + ) + + def test_start_poller_thread_is_daemon(state: inbox.InboxState): """Daemon flag is required so the poller dies with the parent process; a non-daemon poller would leak across `claude` restarts