From 90e115ba55277a61d1319c6f0b462c4d196acd39 Mon Sep 17 00:00:00 2001 From: core-devops Date: Mon, 18 May 2026 16:53:51 -0700 Subject: [PATCH] fix(runtime): close self-delegation echo gap in builtin_tools + inbox kind MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task #190 / #193 — surface the self-delegation echo guard at every runtime delegation entry point, and classify platform-pushed delegation-result rows distinctly from peer_agent messages so a delegation timeout never appears to the caller as a fake peer instruction. Three layers were affected and only two were guarded: 1. workspace/a2a_tools_delegation.py — already had the guard (added in #548 / #469). Untouched. 2. workspace-server/internal/handlers/delegation.go — Go API gate already had the guard. Untouched. 3. workspace/builtin_tools/a2a_tools.py::delegate_task — framework- agnostic adapter surface used by adapters that don't go through (1). NO GUARD. Added. 4. workspace/builtin_tools/delegation.py::delegate_task_async — the LangChain @tool fire-and-forget path. NO GUARD on the local helper (it dispatched the background _execute_delegation coroutine to our own URL). Added. Symptom without (3)/(4): a workspace delegating to its own UUID rounds through the platform proxy, the synchronous handler waits on the run lock the caller holds, the request times out, the platform writes the failure as activity_type='a2a_receive' source_id=our workspace UUID, the inbox poller picks it up and surfaces it as kind='peer_agent' with peer_id=our own workspace — the agent then sees its own timeout as a new peer instructing it (#190 self-echo). Reply via delegate_task to that "peer" re-triggers the loop. Inbox-side fix (workspace/inbox.py): InboxMessage.to_dict() now classifies rows with method='delegate_result' as kind='delegation_result' regardless of peer_id. This makes pushDelegationResultToInbox results (RFC #2829 PR-2) surface as STRUCTURED delegation outcomes to the caller's wait_for_message instead of fake peer_agent messages. This covers both the self-delegation echo path AND the cross-workspace ProxyA2A failure path where the delegation result lands in the caller's inbox with source_id=caller's own workspace UUID. Tests added: - tests/test_a2a_tools_module.py::TestSelfDelegationGuard — verifies the builtin_tools/a2a_tools.py guard short-circuits BEFORE any HTTP call, and lets a real peer through. - tests/test_delegation.py::TestSelfDelegationGuard — verifies builtin_tools/delegation.py::delegate_task_async returns the structured rejection error without scheduling a background task. - tests/test_inbox.py::test_message_from_activity_delegate_result_distinct_kind — pins kind='delegation_result' for method='delegate_result' rows so the #190 mis-classification regression is locked. Runtime mirror (molecule-ai-workspace-runtime) is a publish artifact of this directory — it picks up the fix automatically on the next runtime-v* tag → publish-runtime workflow → PyPI 0.1.1003. Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/builtin_tools/a2a_tools.py | 22 ++++++++++ workspace/builtin_tools/delegation.py | 22 ++++++++++ workspace/inbox.py | 25 ++++++++++- workspace/tests/test_a2a_tools_module.py | 55 ++++++++++++++++++++++++ workspace/tests/test_delegation.py | 35 +++++++++++++++ workspace/tests/test_inbox.py | 30 +++++++++++++ 6 files changed, 188 insertions(+), 1 deletion(-) diff --git a/workspace/builtin_tools/a2a_tools.py b/workspace/builtin_tools/a2a_tools.py index 7ac7bada9..4b921fe10 100644 --- a/workspace/builtin_tools/a2a_tools.py +++ b/workspace/builtin_tools/a2a_tools.py @@ -34,6 +34,28 @@ async def list_peers() -> list[dict]: async def delegate_task(workspace_id: str, task: str) -> str: """Send a task to a peer workspace via A2A and return the response text.""" + # Task #190 / #193 — Self-delegation guard. Without this, a workspace + # delegating to its own UUID round-trips through the platform proxy back + # into the sender; the synchronous handler waits on the same lock the + # caller holds, the request times out, and the platform writes an + # a2a_receive activity row with source_id=our own workspace UUID. The + # inbox poller then surfaces that row as kind="peer_agent" and the agent + # sees the timeout echoed back as a peer instructing it (#190). + # + # The sibling guards live in: + # - workspace-server/internal/handlers/delegation.go (Go API gate) + # - workspace/a2a_tools_delegation.py (MCP path guard) + # This module is the framework-agnostic adapter surface used by adapters + # that don't go through a2a_tools_delegation.py — it needs its own guard. + if WORKSPACE_ID and workspace_id == WORKSPACE_ID: + return ( + "Error: self-delegation rejected (cannot delegate_task to your own " + "workspace). There is no peer who is also you — the platform proxy " + "would deadlock and the timeout would echo back as a peer_agent " + "message from yourself (#190). Do the work directly, or use " + "commit_memory / send_message_to_user instead." + ) + async with httpx.AsyncClient(timeout=120.0) as client: # Discover target URL try: diff --git a/workspace/builtin_tools/delegation.py b/workspace/builtin_tools/delegation.py index f4e6ad016..7f5784500 100644 --- a/workspace/builtin_tools/delegation.py +++ b/workspace/builtin_tools/delegation.py @@ -412,6 +412,28 @@ async def delegate_task_async( """ task_id = str(uuid.uuid4()) + # Task #190 / #193 — Self-delegation guard (async path). Even on the + # async path that returns a task_id immediately, _execute_delegation + # eventually fires the A2A POST back to our own URL, which times out + # against our own held run lock, gets recorded with source_id=our + # workspace UUID, and surfaces in the inbox as a peer_agent message + # from ourselves (#190). Reject before scheduling the background task + # so no peer_agent echo can be generated. Sibling guards: + # - workspace-server/internal/handlers/delegation.go (Go API gate) + # - workspace/a2a_tools_delegation.py (MCP sync + async paths) + # - workspace/builtin_tools/a2a_tools.py (framework-agnostic sync) + if WORKSPACE_ID and workspace_id == WORKSPACE_ID: + log_event(event_type="delegation", action="delegate", resource=workspace_id, + outcome="rejected_self_delegation", trace_id=task_id) + return { + "success": False, + "error": ( + "self-delegation rejected: cannot delegate_task_async to your " + "own workspace (would time out and echo back as a peer_agent " + "message from yourself — #190)" + ), + } + # RBAC check roles, custom_perms = get_workspace_roles() if not check_permission("delegate", roles, custom_perms): diff --git a/workspace/inbox.py b/workspace/inbox.py index bd8cc0404..832b948fe 100644 --- a/workspace/inbox.py +++ b/workspace/inbox.py @@ -102,11 +102,34 @@ class InboxMessage: arrival_workspace_id: str = "" def to_dict(self) -> dict[str, Any]: + # Task #190 / #193 — Distinguish delegation-result rows from peer-agent + # messages. The platform's pushDelegationResultToInbox (RFC #2829 PR-2) + # writes activity_type='a2a_receive' with method='delegate_result' and + # source_id=our own workspace UUID, so the caller's inbox poller can + # surface delegation completions/failures via wait_for_message. But + # the default to_dict derives kind="peer_agent" purely from peer_id + # being non-empty — which makes a synchronous-delegation timeout, or + # a cross-workspace ProxyA2A failure, appear to the agent as a NEW + # peer_agent message from our own workspace UUID (#190 self-echo). + # + # Explicitly classify rows with method='delegate_result' as + # kind='delegation_result' regardless of peer_id, so: + # 1. wait_for_message gives the original caller a structured + # delegation result (not a fake peer instruction). + # 2. Agents reading the envelope don't mistake the row for a + # peer instructing them — preventing the #190 reply-via- + # delegate_task-to-self loop. + if self.method == "delegate_result": + kind = "delegation_result" + elif self.peer_id: + kind = "peer_agent" + else: + kind = "canvas_user" d = { "activity_id": self.activity_id, "text": self.text, "peer_id": self.peer_id, - "kind": "peer_agent" if self.peer_id else "canvas_user", + "kind": kind, "method": self.method, "created_at": self.created_at, } diff --git a/workspace/tests/test_a2a_tools_module.py b/workspace/tests/test_a2a_tools_module.py index 1a058326e..f47b086ef 100644 --- a/workspace/tests/test_a2a_tools_module.py +++ b/workspace/tests/test_a2a_tools_module.py @@ -325,3 +325,58 @@ class TestGetPeersSummary: result = await mod.get_peers_summary() assert result == "No peers available." + + +# --------------------------------------------------------------------------- +# Self-delegation guard (Task #190 / #193) +# --------------------------------------------------------------------------- + +class TestSelfDelegationGuard: + """delegate_task to your own workspace UUID must be rejected BEFORE any + discovery / proxy hop. Otherwise the request round-trips back to us, + deadlocks on the run lock, times out, and surfaces in the inbox as a + peer_agent message from our own workspace (the documented #190 self-echo + bug).""" + + async def test_delegate_task_rejects_self(self, monkeypatch): + mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc") + + calls = [] + + class TrappingClient: + def __init__(self, timeout): pass + async def __aenter__(self): return self + async def __aexit__(self, *a): pass + async def get(self, *a, **kw): + calls.append(("get", a, kw)) + raise AssertionError("guard must reject before discover") + async def post(self, *a, **kw): + calls.append(("post", a, kw)) + raise AssertionError("guard must reject before proxy POST") + + monkeypatch.setattr(mod.httpx, "AsyncClient", TrappingClient) + + result = await mod.delegate_task("ws-self-abc", "do a thing") + assert "self-delegation" in result.lower() + assert not calls, "no HTTP call should be made for self-delegation" + + async def test_delegate_task_allows_real_peer(self, monkeypatch): + """Guard is strictly equality on WORKSPACE_ID — a different target + passes through to the normal discover/proxy path.""" + mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc") + + class FakeClient: + def __init__(self, timeout): pass + async def __aenter__(self): return self + async def __aexit__(self, *a): pass + async def get(self, url, headers=None): + return _FakeResponse(200, {"url": "http://target.test/a2a"}) + async def post(self, url, json=None, headers=None): + return _FakeResponse(200, { + "result": {"parts": [{"kind": "text", "text": "ok"}]} + }) + + monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient) + + result = await mod.delegate_task("ws-DIFFERENT-xyz", "do a thing") + assert "self-delegation" not in result.lower() diff --git a/workspace/tests/test_delegation.py b/workspace/tests/test_delegation.py index 8d33e98d5..9c845ebc8 100644 --- a/workspace/tests/test_delegation.py +++ b/workspace/tests/test_delegation.py @@ -148,6 +148,41 @@ class TestRBAC: assert "RBAC" in result["error"] +class TestSelfDelegationGuard: + """Task #190 / #193 — delegate_task_async must reject delegation to the + caller's own workspace BEFORE scheduling the background task. Otherwise + the platform A2A round-trip times out against our own held run lock, the + failure is logged with source_id=our workspace UUID, and the inbox + poller surfaces the row as a peer_agent message from ourselves.""" + + @pytest.mark.asyncio + async def test_async_path_rejects_self_workspace(self, delegation_mocks): + mod, *_ = delegation_mocks + # WORKSPACE_ID was set to "ws-self" by the fixture's monkeypatch. + # The module reads it at import time → reload-equivalent comparison. + mod.WORKSPACE_ID = "ws-self" + + result = await _invoke(mod, workspace_id="ws-self") + + assert result["success"] is False + assert "self-delegation" in result["error"].lower() + # No background task should have been scheduled. + assert len(mod._background_tasks) == 0 + + @pytest.mark.asyncio + async def test_async_path_allows_different_workspace(self, delegation_mocks): + """Guard does NOT short-circuit a real peer target.""" + mod, *_ = delegation_mocks + mod.WORKSPACE_ID = "ws-self" + _, mock_cls = _make_mock_client() + + with patch("httpx.AsyncClient", mock_cls): + result = await _invoke(mod, workspace_id="ws-peer") + + assert result["success"] is True + assert result["status"] == "delegated" + + class TestAsyncDelegation: @pytest.mark.asyncio diff --git a/workspace/tests/test_inbox.py b/workspace/tests/test_inbox.py index dd7dbdae9..bc5d14ed5 100644 --- a/workspace/tests/test_inbox.py +++ b/workspace/tests/test_inbox.py @@ -131,6 +131,36 @@ def test_message_from_activity_peer_agent(): assert msg.to_dict()["kind"] == "peer_agent" +def test_message_from_activity_delegate_result_distinct_kind(): + """Task #190 / #193 — pushDelegationResultToInbox (RFC #2829 PR-2) writes + rows with method='delegate_result' and source_id=our own workspace UUID + so the caller's wait_for_message can surface delegation completions or + failures. Without an explicit kind override, to_dict() would classify + those rows as kind='peer_agent' (peer_id non-empty) and the agent would + treat its OWN delegation timeout as a peer instructing it — the #190 + self-echo bug. Classify these rows as kind='delegation_result' so they + are recognizable as structured delegation outcomes.""" + row = { + "id": "act-90", + "source_id": "ws-self-abc", # same as our workspace + "method": "delegate_result", + "summary": "Delegation failed", + "response_body": {"text": "polling timeout", "delegation_id": "d-1"}, + "created_at": "2026-05-18T00:00:00Z", + } + msg = inbox.message_from_activity(row) + payload = msg.to_dict() + assert payload["kind"] == "delegation_result", ( + f"delegate_result rows must surface as kind='delegation_result', " + f"not peer_agent (got {payload['kind']!r})" + ) + # Method preserved for downstream consumers that key off it. + assert payload["method"] == "delegate_result" + # peer_id is still set on the dataclass for back-compat dispatch — the + # distinguishing signal is the kind field. + assert msg.peer_id == "ws-self-abc" + + def test_message_from_activity_handles_string_request_body(): row = { "id": "act-3", -- 2.52.0