fix(runtime): close self-delegation echo gap in builtin_tools + inbox kind classification (#190 / #193) #1539

Merged
hongming merged 1 commits from fix/self-delegation-echo-runtime-builtin-tools into main 2026-05-19 00:33:37 +00:00
6 changed files with 188 additions and 1 deletions
+22
View File
@@ -34,6 +34,28 @@ async def list_peers() -> list[dict]:
async def delegate_task(workspace_id: str, task: str) -> str:
"""Send a task to a peer workspace via A2A and return the response text."""
# Task #190 / #193 — Self-delegation guard. Without this, a workspace
# delegating to its own UUID round-trips through the platform proxy back
# into the sender; the synchronous handler waits on the same lock the
# caller holds, the request times out, and the platform writes an
# a2a_receive activity row with source_id=our own workspace UUID. The
# inbox poller then surfaces that row as kind="peer_agent" and the agent
# sees the timeout echoed back as a peer instructing it (#190).
#
# The sibling guards live in:
# - workspace-server/internal/handlers/delegation.go (Go API gate)
# - workspace/a2a_tools_delegation.py (MCP path guard)
# This module is the framework-agnostic adapter surface used by adapters
# that don't go through a2a_tools_delegation.py — it needs its own guard.
if WORKSPACE_ID and workspace_id == WORKSPACE_ID:
return (
"Error: self-delegation rejected (cannot delegate_task to your own "
"workspace). There is no peer who is also you — the platform proxy "
"would deadlock and the timeout would echo back as a peer_agent "
"message from yourself (#190). Do the work directly, or use "
"commit_memory / send_message_to_user instead."
)
async with httpx.AsyncClient(timeout=120.0) as client:
# Discover target URL
try:
+22
View File
@@ -412,6 +412,28 @@ async def delegate_task_async(
"""
task_id = str(uuid.uuid4())
# Task #190 / #193 — Self-delegation guard (async path). Even on the
# async path that returns a task_id immediately, _execute_delegation
# eventually fires the A2A POST back to our own URL, which times out
# against our own held run lock, gets recorded with source_id=our
# workspace UUID, and surfaces in the inbox as a peer_agent message
# from ourselves (#190). Reject before scheduling the background task
# so no peer_agent echo can be generated. Sibling guards:
# - workspace-server/internal/handlers/delegation.go (Go API gate)
# - workspace/a2a_tools_delegation.py (MCP sync + async paths)
# - workspace/builtin_tools/a2a_tools.py (framework-agnostic sync)
if WORKSPACE_ID and workspace_id == WORKSPACE_ID:
log_event(event_type="delegation", action="delegate", resource=workspace_id,
outcome="rejected_self_delegation", trace_id=task_id)
return {
"success": False,
"error": (
"self-delegation rejected: cannot delegate_task_async to your "
"own workspace (would time out and echo back as a peer_agent "
"message from yourself — #190)"
),
}
# RBAC check
roles, custom_perms = get_workspace_roles()
if not check_permission("delegate", roles, custom_perms):
+24 -1
View File
@@ -102,11 +102,34 @@ class InboxMessage:
arrival_workspace_id: str = ""
def to_dict(self) -> dict[str, Any]:
# Task #190 / #193 — Distinguish delegation-result rows from peer-agent
# messages. The platform's pushDelegationResultToInbox (RFC #2829 PR-2)
# writes activity_type='a2a_receive' with method='delegate_result' and
# source_id=our own workspace UUID, so the caller's inbox poller can
# surface delegation completions/failures via wait_for_message. But
# the default to_dict derives kind="peer_agent" purely from peer_id
# being non-empty — which makes a synchronous-delegation timeout, or
# a cross-workspace ProxyA2A failure, appear to the agent as a NEW
# peer_agent message from our own workspace UUID (#190 self-echo).
#
# Explicitly classify rows with method='delegate_result' as
# kind='delegation_result' regardless of peer_id, so:
# 1. wait_for_message gives the original caller a structured
# delegation result (not a fake peer instruction).
# 2. Agents reading the envelope don't mistake the row for a
# peer instructing them — preventing the #190 reply-via-
# delegate_task-to-self loop.
if self.method == "delegate_result":
kind = "delegation_result"
elif self.peer_id:
kind = "peer_agent"
else:
kind = "canvas_user"
d = {
"activity_id": self.activity_id,
"text": self.text,
"peer_id": self.peer_id,
"kind": "peer_agent" if self.peer_id else "canvas_user",
"kind": kind,
"method": self.method,
"created_at": self.created_at,
}
+55
View File
@@ -325,3 +325,58 @@ class TestGetPeersSummary:
result = await mod.get_peers_summary()
assert result == "No peers available."
# ---------------------------------------------------------------------------
# Self-delegation guard (Task #190 / #193)
# ---------------------------------------------------------------------------
class TestSelfDelegationGuard:
"""delegate_task to your own workspace UUID must be rejected BEFORE any
discovery / proxy hop. Otherwise the request round-trips back to us,
deadlocks on the run lock, times out, and surfaces in the inbox as a
peer_agent message from our own workspace (the documented #190 self-echo
bug)."""
async def test_delegate_task_rejects_self(self, monkeypatch):
mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc")
calls = []
class TrappingClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, *a, **kw):
calls.append(("get", a, kw))
raise AssertionError("guard must reject before discover")
async def post(self, *a, **kw):
calls.append(("post", a, kw))
raise AssertionError("guard must reject before proxy POST")
monkeypatch.setattr(mod.httpx, "AsyncClient", TrappingClient)
result = await mod.delegate_task("ws-self-abc", "do a thing")
assert "self-delegation" in result.lower()
assert not calls, "no HTTP call should be made for self-delegation"
async def test_delegate_task_allows_real_peer(self, monkeypatch):
"""Guard is strictly equality on WORKSPACE_ID — a different target
passes through to the normal discover/proxy path."""
mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc")
class FakeClient:
def __init__(self, timeout): pass
async def __aenter__(self): return self
async def __aexit__(self, *a): pass
async def get(self, url, headers=None):
return _FakeResponse(200, {"url": "http://target.test/a2a"})
async def post(self, url, json=None, headers=None):
return _FakeResponse(200, {
"result": {"parts": [{"kind": "text", "text": "ok"}]}
})
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
result = await mod.delegate_task("ws-DIFFERENT-xyz", "do a thing")
assert "self-delegation" not in result.lower()
+35
View File
@@ -148,6 +148,41 @@ class TestRBAC:
assert "RBAC" in result["error"]
class TestSelfDelegationGuard:
"""Task #190 / #193 — delegate_task_async must reject delegation to the
caller's own workspace BEFORE scheduling the background task. Otherwise
the platform A2A round-trip times out against our own held run lock, the
failure is logged with source_id=our workspace UUID, and the inbox
poller surfaces the row as a peer_agent message from ourselves."""
@pytest.mark.asyncio
async def test_async_path_rejects_self_workspace(self, delegation_mocks):
mod, *_ = delegation_mocks
# WORKSPACE_ID was set to "ws-self" by the fixture's monkeypatch.
# The module reads it at import time → reload-equivalent comparison.
mod.WORKSPACE_ID = "ws-self"
result = await _invoke(mod, workspace_id="ws-self")
assert result["success"] is False
assert "self-delegation" in result["error"].lower()
# No background task should have been scheduled.
assert len(mod._background_tasks) == 0
@pytest.mark.asyncio
async def test_async_path_allows_different_workspace(self, delegation_mocks):
"""Guard does NOT short-circuit a real peer target."""
mod, *_ = delegation_mocks
mod.WORKSPACE_ID = "ws-self"
_, mock_cls = _make_mock_client()
with patch("httpx.AsyncClient", mock_cls):
result = await _invoke(mod, workspace_id="ws-peer")
assert result["success"] is True
assert result["status"] == "delegated"
class TestAsyncDelegation:
@pytest.mark.asyncio
+30
View File
@@ -131,6 +131,36 @@ def test_message_from_activity_peer_agent():
assert msg.to_dict()["kind"] == "peer_agent"
def test_message_from_activity_delegate_result_distinct_kind():
"""Task #190 / #193 — pushDelegationResultToInbox (RFC #2829 PR-2) writes
rows with method='delegate_result' and source_id=our own workspace UUID
so the caller's wait_for_message can surface delegation completions or
failures. Without an explicit kind override, to_dict() would classify
those rows as kind='peer_agent' (peer_id non-empty) and the agent would
treat its OWN delegation timeout as a peer instructing it — the #190
self-echo bug. Classify these rows as kind='delegation_result' so they
are recognizable as structured delegation outcomes."""
row = {
"id": "act-90",
"source_id": "ws-self-abc", # same as our workspace
"method": "delegate_result",
"summary": "Delegation failed",
"response_body": {"text": "polling timeout", "delegation_id": "d-1"},
"created_at": "2026-05-18T00:00:00Z",
}
msg = inbox.message_from_activity(row)
payload = msg.to_dict()
assert payload["kind"] == "delegation_result", (
f"delegate_result rows must surface as kind='delegation_result', "
f"not peer_agent (got {payload['kind']!r})"
)
# Method preserved for downstream consumers that key off it.
assert payload["method"] == "delegate_result"
# peer_id is still set on the dataclass for back-compat dispatch — the
# distinguishing signal is the kind field.
assert msg.peer_id == "ws-self-abc"
def test_message_from_activity_handles_string_request_body():
row = {
"id": "act-3",