Compare commits

...

2 Commits

Author SHA1 Message Date
devops-engineer 4baecebf8e Merge branch 'main' into fix/a2a-self-delegation-echo-inbox
Block internal-flavored paths / Block forbidden paths (pull_request) Waiting to run
CI / Detect changes (pull_request) Waiting to run
CI / Platform (Go) (pull_request) Waiting to run
CI / Canvas (Next.js) (pull_request) Waiting to run
CI / Shellcheck (E2E scripts) (pull_request) Waiting to run
CI / Python Lint & Test (pull_request) Waiting to run
CI / all-required (pull_request) Waiting to run
E2E API Smoke Test / detect-changes (pull_request) Waiting to run
E2E Chat / detect-changes (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Waiting to run
Handlers Postgres Integration / detect-changes (pull_request) Waiting to run
audit-force-merge / audit (pull_request) Waiting to run
lint-required-no-paths / lint-required-no-paths (pull_request) Waiting to run
publish-runtime-autobump / pr-validate (pull_request) Waiting to run
publish-runtime-autobump / bump-and-tag (pull_request) Waiting to run
Runtime PR-Built Compatibility / detect-changes (pull_request) Waiting to run
Secret scan / Scan diff for credential-shaped strings (pull_request) Waiting to run
gate-check-v3 / gate-check (pull_request) Waiting to run
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
E2E API Smoke Test / E2E API Smoke Test (pull_request) Has been cancelled
E2E Chat / E2E Chat (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Has been cancelled
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Has been cancelled
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Has been cancelled
2026-05-16 13:03:54 +00:00
core-devops 3bda7e1c07 fix(inbox): drop self-delegation-echo bookkeeping rows from inbox poller
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 6s
CI / Detect changes (pull_request) Successful in 9s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Handlers Postgres Integration / detect-changes (pull_request) Successful in 17s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 26s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 25s
E2E API Smoke Test / detect-changes (pull_request) Successful in 26s
E2E Chat / detect-changes (pull_request) Successful in 27s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 23s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 20s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 8s
qa-review / approved (pull_request) Failing after 20s
security-review / approved (pull_request) Failing after 21s
sop-tier-check / tier-check (pull_request) Successful in 18s
sop-checklist / all-items-acked (pull_request) Successful in 20s
publish-runtime-autobump / pr-validate (pull_request) Successful in 51s
gate-check-v3 / gate-check (pull_request) Successful in 33s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 15s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 14s
E2E Chat / E2E Chat (pull_request) Successful in 51s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m34s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m22s
CI / Python Lint & Test (pull_request) Successful in 7m52s
CI / Canvas (Next.js) (pull_request) Successful in 22m40s
CI / Platform (Go) (pull_request) Successful in 23m33s
CI / all-required (pull_request) Successful in 24m11s
A workspace delegating to a target that never picks up the task hits
the 300s polling timeout; tool_delegate_task then reports the outcome
via a2a_tools.report_activity("a2a_receive", ...), which POSTs an
activity_type='a2a_receive' row to /workspaces/<self>/activity with
source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's
source_id spoof-defense, activity.go:549) and method='message/send'.

The workspace's own inbox poller then re-fetches that row from
?type=a2a_receive; message_from_activity sets peer_id=source_id
(non-empty) so to_dict classifies it kind=peer_agent, registry-resolved
to the workspace's own display name ("mac laptop"). The agent sees its
own delegation-failure echoed back as if a peer delegated to it
(confirmed live on hongming.moleculesai.app 2026-05-16).

This is NOT a self-delegation (the platform + agent self-delegation
guards are correct and fire on source==target — the real target here
was a different workspace that legitimately timed out). The producer
side is also correct by platform contract (report_activity MUST send
source_id=WORKSPACE_ID). The defect is the inbox poller mis-classifying
a self-originated bookkeeping row as an inbound peer message — the same
class as the already-fixed self-notify echo, slipping through because
it uses method='message/send' + non-empty self source_id.

Fix at the correct layer (the consumer): add inbox._is_self_echo_row
and skip+advance-cursor for any a2a_receive row whose source_id equals
the polled workspace_id, mirroring _is_self_notify_row exactly. A
genuine inbound peer always has source_id != polled workspace_id
(self-delegation is blocked upstream by two guards). RFC #2829 PR-2
delegation-result pushes (method='delegate_result', source_id==self)
are explicitly preserved so the durable async result-delivery path
keeps working.

Tests: 8 new tests (predicate + integrated _poll_once skip/cursor +
delegate_result preservation). test_inbox.py 62/62 green; related
delegation/a2a suites 48/48 green; zero regression.

Refs: molecule-ai/internal#469

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 05:30:46 -07:00
2 changed files with 199 additions and 0 deletions
+56
View File
@@ -431,6 +431,52 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is this workspace's OWN delegation-outcome
bookkeeping surfacing back through the activity API as a fake inbound
peer message (the self-echo bug, confirmed live 2026-05-16).
Shape: when ``tool_delegate_task`` reports the outcome of a delegation
it fired, ``a2a_tools.report_activity`` POSTs an
``activity_type='a2a_receive'`` row to ``/workspaces/<self>/activity``
with ``source_id = WORKSPACE_ID`` (its own id — required by the
workspace-server's source_id spoof-defense, handlers/activity.go:549)
and ``method='message/send'``. The inbox poller then re-fetches that
row from ``?type=a2a_receive``; ``message_from_activity`` sets
``peer_id = source_id`` (non-empty), so ``to_dict`` classifies it as
``kind=peer_agent`` and the registry resolves the workspace's own id
to its display name (e.g. "mac laptop"). The agent sees its own
delegation-failure echoed back as if a peer delegated to it.
A genuine inbound peer message ALWAYS has ``source_id`` set to a
DIFFERENT workspace than the one being polled — self-delegation is
already rejected upstream by two independent guards (the platform's
DelegationHandler.Delegate and the agent-side tool_delegate_task).
So ``source_id == workspace_id`` on the a2a_receive feed only ever
means a self-originated bookkeeping row, never a real peer.
Exception: ``method='delegate_result'`` rows ARE legitimately written
to the caller's own inbox with ``source_id == workspace_id`` — that
is the RFC #2829 PR-2 delegation-result delivery path
(handlers/delegation.go:pushDelegationResultToInbox). Those must keep
flowing to the agent, so they are explicitly NOT treated as echo.
Matched on (source_id == workspace_id) AND (method != 'delegate_result')
together so the RFC #2829 result-push is preserved while the
report_activity self-echo is filtered.
"""
if not workspace_id:
# Single-workspace pollers may pass "" as the cursor key but the
# poller always knows its real workspace_id; an empty value means
# we can't safely compare — fall through (treat as not-echo) so
# this guard never silently drops a real message.
return False
source_id = row.get("source_id")
if not source_id or source_id != workspace_id:
return False
return row.get("method") != "delegate_result"
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage.
@@ -623,6 +669,16 @@ def _poll_once(
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_echo_row(row, workspace_id):
# This workspace's OWN delegation-outcome bookkeeping row
# (report_activity wrote source_id=WORKSPACE_ID). Re-delivering
# it would surface the agent's own delegation-failure as a
# bogus inbound peer_agent message from itself (self-echo,
# 2026-05-16). Skip + advance the cursor past it, exactly like
# the self-notify guard above, so the next poll doesn't keep
# re-seeing it.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue
+143
View File
@@ -552,6 +552,149 @@ def test_poll_once_self_notify_does_not_fire_notification(state: inbox.InboxStat
)
# ---------------------------------------------------------------------------
# _is_self_echo_row + the self-delegation-echo guard in _poll_once
# ---------------------------------------------------------------------------
#
# Confirmed live on hongming.moleculesai.app 2026-05-16: a workspace
# delegating to another workspace that never picks up the task hits the
# 300s polling timeout; tool_delegate_task then calls
# a2a_tools.report_activity("a2a_receive", ...), which POSTs an
# activity_type='a2a_receive' row to /workspaces/<self>/activity with
# source_id=WORKSPACE_ID (its own id — mandated by the workspace-server's
# source_id spoof-defense) and method='message/send'. The poller then
# re-fetches that row from ?type=a2a_receive and message_from_activity
# sets peer_id=source_id (non-empty) → kind=peer_agent, registry-resolved
# to the workspace's own display name ("mac laptop"). The agent sees its
# own delegation-failure echoed back as an inbound peer message.
#
# A genuine inbound peer always has source_id != polled workspace_id
# (self-delegation is blocked upstream by two guards). The ONLY
# legitimate source_id==self a2a_receive rows are RFC #2829 PR-2
# delegation-result pushes (method='delegate_result'), which must keep
# flowing. These tests pin both halves so a refactor that drops either
# breaks loudly.
def test_is_self_echo_row_true_for_own_report_activity_row():
"""report_activity writes source_id=WORKSPACE_ID, method='message/send'.
Polled back on the same workspace, that is a self-echo."""
row = {"source_id": "ws-1", "method": "message/send"}
assert inbox._is_self_echo_row(row, "ws-1") is True
# tasks/send is the other A2A method report_activity-class rows use.
row2 = {"source_id": "ws-1", "method": "tasks/send"}
assert inbox._is_self_echo_row(row2, "ws-1") is True
def test_is_self_echo_row_false_for_real_peer_inbound():
"""Real peer-agent message: source_id is a DIFFERENT workspace."""
row = {"source_id": "ws-peer-uuid", "method": "message/send"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_for_canvas_user():
"""Canvas-user message: source_id is None/empty (no peer)."""
assert inbox._is_self_echo_row({"source_id": None, "method": "message/send"}, "ws-1") is False
assert inbox._is_self_echo_row({"source_id": "", "method": "message/send"}, "ws-1") is False
def test_is_self_echo_row_false_for_delegate_result_push():
"""RFC #2829 PR-2 delegation-result delivery legitimately writes the
caller's own inbox with source_id==self + method='delegate_result'.
That MUST NOT be filtered — it's how the agent gets its delegation
results back on the durable async path."""
row = {"source_id": "ws-1", "method": "delegate_result"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_unknown():
"""Empty workspace_id → can't safely compare → never drop a message."""
assert inbox._is_self_echo_row({"source_id": "ws-1", "method": "message/send"}, "") is False
def test_poll_once_skips_self_echo_delegation_bookkeeping(state: inbox.InboxState):
"""Integrated regression pin for the 2026-05-16 self-echo incident:
a report_activity-shaped row (source_id == polled workspace_id,
method='message/send', activity_type=a2a_receive) must NOT land in
the inbox queue as a fake inbound peer message."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-other-peer",
"method": "message/send",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer task"}]},
"created_at": "2026-05-16T10:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1", # == the polled workspace_id below
"method": "message/send",
"summary": "hongming-pc failed: polling timeout after 300.0s "
"(delegation_id=16a70d38, last_status=dispatched); the platform",
"request_body": None,
"created_at": "2026-05-16T10:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the genuine peer message counted; the self-echo dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past the skipped self-echo row so the next
poll doesn't re-fetch it forever (the same invariant as self-notify)."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "message/send",
"summary": "ws-child failed: polling timeout after 300.0s",
"request_body": None,
"created_at": "2026-05-16T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
assert state.load_cursor() == "act-self-echo"
def test_poll_once_delegate_result_push_still_delivered(state: inbox.InboxState):
"""RFC #2829 PR-2 delegation-result push (source_id==self,
method='delegate_result') must STILL reach the agent — the self-echo
guard must not over-filter the legitimate result-delivery path."""
rows = [
{
"id": "act-delegate-result",
"source_id": "ws-1",
"method": "delegate_result",
"summary": "Delegation result delivered",
"request_body": {"parts": [{"type": "text", "text": "peer's answer"}]},
"created_at": "2026-05-16T10:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-delegate-result"]
def test_start_poller_thread_is_daemon(state: inbox.InboxState):
"""Daemon flag is required so the poller dies with the parent
process; a non-daemon poller would leak across `claude` restarts