Merge pull request #2470 from Molecule-AI/fix/inbox-filter-self-notify-rows

fix(inbox): skip self-notify rows to break echo loop
This commit is contained in:
Hongming Wang 2026-05-02 00:45:33 +00:00 committed by GitHub
commit 2297c083c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 177 additions and 0 deletions

View File

@ -364,6 +364,23 @@ def _extract_text(request_body: Any, summary: str | None) -> str:
return summary or "(empty A2A message)"
def _is_self_notify_row(row: dict[str, Any]) -> bool:
"""Return True if ``row`` is the agent's own send_message_to_user
POST surfacing back through the activity API.
The shape (workspace-server handlers/activity.go, ``Notify`` writer):
method='notify' AND no peer (source_id is None or '')
Matched on both fields together so a future caller using
``method='notify'`` for a different purpose with a real peer_id
still passes through.
"""
if row.get("method") != "notify":
return False
source_id = row.get("source_id")
return source_id is None or source_id == ""
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage."""
request_body = row.get("request_body")
@ -457,6 +474,28 @@ def _poll_once(
for row in rows:
if not isinstance(row, dict):
continue
if _is_self_notify_row(row):
# The workspace-server's `/notify` handler writes the agent's
# own send_message_to_user POSTs to activity_logs with
# activity_type='a2a_receive', method='notify', and no
# source_id, so the canvas chat-history loader can restore
# those bubbles after a page reload (handlers/activity.go,
# comment block at line 428). The activity API exposes that
# filter only on type, so the same row otherwise lands in
# this poll and gets pushed back to the agent — confirmed
# live 2026-05-01: agent observed its own outbound as an
# inbound `← molecule: Agent message: ...`. Filter here
# belt-and-braces; the long-term fix is upstream renaming
# the activity_type to `agent_outbound` (molecule-core
# #2469). Once that lands, this filter becomes redundant
# but stays in place because it only excludes rows we never
# want, so removing it would just be churn.
#
# NB: still call save_cursor for these rows below — we
# advance past them so the next poll doesn't keep re-seeing
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue

View File

@ -414,6 +414,144 @@ def test_poll_once_initial_backlog_reverses_to_chronological(state: inbox.InboxS
assert state.load_cursor() == "act-newest"
# ---------------------------------------------------------------------------
# _is_self_notify_row + the echo-loop guard in _poll_once
# ---------------------------------------------------------------------------
#
# The workspace-server's `/notify` handler writes the agent's own
# send_message_to_user POSTs to activity_logs as activity_type=
# 'a2a_receive' with method='notify' and no source_id, so the canvas
# chat-history loader can restore those bubbles after a page reload.
# Without a guard, the poller picks them up and pushes them back as
# inbound — confirmed live 2026-05-01: the agent observed its own
# outbound as `← molecule: Agent message: ...`.
#
# These tests pin both the predicate (`_is_self_notify_row`) and the
# integrated behavior in `_poll_once` so a future refactor that drops
# either half breaks loudly. Long-term the upstream fix is renaming
# the activity_type at the workspace-server (#2469); this guard stays
# regardless because it only excludes rows we never want.
def test_is_self_notify_row_true_for_method_notify_no_peer():
assert inbox._is_self_notify_row({"method": "notify", "source_id": None}) is True
assert inbox._is_self_notify_row({"method": "notify", "source_id": ""}) is True
# source_id key absent — same shape (None on .get).
assert inbox._is_self_notify_row({"method": "notify"}) is True
def test_is_self_notify_row_false_for_real_canvas_inbound():
"""Real canvas-user message: method='message/send' (not notify),
source_id None (no peer)."""
row = {"method": "message/send", "source_id": None}
assert inbox._is_self_notify_row(row) is False
def test_is_self_notify_row_false_for_real_peer_inbound():
"""Real peer-agent message: method='message/send' or 'tasks/send',
source_id is the sender workspace UUID."""
row = {"method": "tasks/send", "source_id": "ws-peer-uuid"}
assert inbox._is_self_notify_row(row) is False
def test_is_self_notify_row_false_for_method_notify_with_peer():
"""Defensive: a future caller using method='notify' WITH a real
peer_id is treated as a real inbound, not a self-notify. Drops the
guard if upstream ever repurposes the method='notify' shape."""
row = {"method": "notify", "source_id": "ws-peer-uuid"}
assert inbox._is_self_notify_row(row) is False
def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
"""The integrated guard: a self-notify row in the activity payload
must NOT land in the inbox queue. This is the regression pin for
the 2026-05-01 echo-loop incident."""
rows = [
{
"id": "act-real",
"source_id": None,
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: Hi! What can I help you with today?",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real inbound counted; self-notify silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real"]
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-
notify on every iteration (until a real inbound arrives), wasting
a request and pinning the cursor backward."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: hello",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-notify"
def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-notify rows. Otherwise a notification-
capable host gets the same echo loop the queue side avoids."""
rows = [
{
"id": "act-self-notify",
"source_id": None,
"method": "notify",
"summary": "Agent message: hello",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-notify rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts