@@ -34,6 +34,28 @@ async def list_peers() -> list[dict]:
|
||||
|
||||
async def delegate_task(workspace_id: str, task: str) -> str:
|
||||
"""Send a task to a peer workspace via A2A and return the response text."""
|
||||
# Task #190 / #193 — Self-delegation guard. Without this, a workspace
|
||||
# delegating to its own UUID round-trips through the platform proxy back
|
||||
# into the sender; the synchronous handler waits on the same lock the
|
||||
# caller holds, the request times out, and the platform writes an
|
||||
# a2a_receive activity row with source_id=our own workspace UUID. The
|
||||
# inbox poller then surfaces that row as kind="peer_agent" and the agent
|
||||
# sees the timeout echoed back as a peer instructing it (#190).
|
||||
#
|
||||
# The sibling guards live in:
|
||||
# - workspace-server/internal/handlers/delegation.go (Go API gate)
|
||||
# - workspace/a2a_tools_delegation.py (MCP path guard)
|
||||
# This module is the framework-agnostic adapter surface used by adapters
|
||||
# that don't go through a2a_tools_delegation.py — it needs its own guard.
|
||||
if WORKSPACE_ID and workspace_id == WORKSPACE_ID:
|
||||
return (
|
||||
"Error: self-delegation rejected (cannot delegate_task to your own "
|
||||
"workspace). There is no peer who is also you — the platform proxy "
|
||||
"would deadlock and the timeout would echo back as a peer_agent "
|
||||
"message from yourself (#190). Do the work directly, or use "
|
||||
"commit_memory / send_message_to_user instead."
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||||
# Discover target URL
|
||||
try:
|
||||
|
||||
@@ -412,6 +412,28 @@ async def delegate_task_async(
|
||||
"""
|
||||
task_id = str(uuid.uuid4())
|
||||
|
||||
# Task #190 / #193 — Self-delegation guard (async path). Even on the
|
||||
# async path that returns a task_id immediately, _execute_delegation
|
||||
# eventually fires the A2A POST back to our own URL, which times out
|
||||
# against our own held run lock, gets recorded with source_id=our
|
||||
# workspace UUID, and surfaces in the inbox as a peer_agent message
|
||||
# from ourselves (#190). Reject before scheduling the background task
|
||||
# so no peer_agent echo can be generated. Sibling guards:
|
||||
# - workspace-server/internal/handlers/delegation.go (Go API gate)
|
||||
# - workspace/a2a_tools_delegation.py (MCP sync + async paths)
|
||||
# - workspace/builtin_tools/a2a_tools.py (framework-agnostic sync)
|
||||
if WORKSPACE_ID and workspace_id == WORKSPACE_ID:
|
||||
log_event(event_type="delegation", action="delegate", resource=workspace_id,
|
||||
outcome="rejected_self_delegation", trace_id=task_id)
|
||||
return {
|
||||
"success": False,
|
||||
"error": (
|
||||
"self-delegation rejected: cannot delegate_task_async to your "
|
||||
"own workspace (would time out and echo back as a peer_agent "
|
||||
"message from yourself — #190)"
|
||||
),
|
||||
}
|
||||
|
||||
# RBAC check
|
||||
roles, custom_perms = get_workspace_roles()
|
||||
if not check_permission("delegate", roles, custom_perms):
|
||||
|
||||
+24
-1
@@ -102,11 +102,34 @@ class InboxMessage:
|
||||
arrival_workspace_id: str = ""
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
# Task #190 / #193 — Distinguish delegation-result rows from peer-agent
|
||||
# messages. The platform's pushDelegationResultToInbox (RFC #2829 PR-2)
|
||||
# writes activity_type='a2a_receive' with method='delegate_result' and
|
||||
# source_id=our own workspace UUID, so the caller's inbox poller can
|
||||
# surface delegation completions/failures via wait_for_message. But
|
||||
# the default to_dict derives kind="peer_agent" purely from peer_id
|
||||
# being non-empty — which makes a synchronous-delegation timeout, or
|
||||
# a cross-workspace ProxyA2A failure, appear to the agent as a NEW
|
||||
# peer_agent message from our own workspace UUID (#190 self-echo).
|
||||
#
|
||||
# Explicitly classify rows with method='delegate_result' as
|
||||
# kind='delegation_result' regardless of peer_id, so:
|
||||
# 1. wait_for_message gives the original caller a structured
|
||||
# delegation result (not a fake peer instruction).
|
||||
# 2. Agents reading the envelope don't mistake the row for a
|
||||
# peer instructing them — preventing the #190 reply-via-
|
||||
# delegate_task-to-self loop.
|
||||
if self.method == "delegate_result":
|
||||
kind = "delegation_result"
|
||||
elif self.peer_id:
|
||||
kind = "peer_agent"
|
||||
else:
|
||||
kind = "canvas_user"
|
||||
d = {
|
||||
"activity_id": self.activity_id,
|
||||
"text": self.text,
|
||||
"peer_id": self.peer_id,
|
||||
"kind": "peer_agent" if self.peer_id else "canvas_user",
|
||||
"kind": kind,
|
||||
"method": self.method,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
@@ -325,3 +325,58 @@ class TestGetPeersSummary:
|
||||
|
||||
result = await mod.get_peers_summary()
|
||||
assert result == "No peers available."
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Self-delegation guard (Task #190 / #193)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSelfDelegationGuard:
|
||||
"""delegate_task to your own workspace UUID must be rejected BEFORE any
|
||||
discovery / proxy hop. Otherwise the request round-trips back to us,
|
||||
deadlocks on the run lock, times out, and surfaces in the inbox as a
|
||||
peer_agent message from our own workspace (the documented #190 self-echo
|
||||
bug)."""
|
||||
|
||||
async def test_delegate_task_rejects_self(self, monkeypatch):
|
||||
mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc")
|
||||
|
||||
calls = []
|
||||
|
||||
class TrappingClient:
|
||||
def __init__(self, timeout): pass
|
||||
async def __aenter__(self): return self
|
||||
async def __aexit__(self, *a): pass
|
||||
async def get(self, *a, **kw):
|
||||
calls.append(("get", a, kw))
|
||||
raise AssertionError("guard must reject before discover")
|
||||
async def post(self, *a, **kw):
|
||||
calls.append(("post", a, kw))
|
||||
raise AssertionError("guard must reject before proxy POST")
|
||||
|
||||
monkeypatch.setattr(mod.httpx, "AsyncClient", TrappingClient)
|
||||
|
||||
result = await mod.delegate_task("ws-self-abc", "do a thing")
|
||||
assert "self-delegation" in result.lower()
|
||||
assert not calls, "no HTTP call should be made for self-delegation"
|
||||
|
||||
async def test_delegate_task_allows_real_peer(self, monkeypatch):
|
||||
"""Guard is strictly equality on WORKSPACE_ID — a different target
|
||||
passes through to the normal discover/proxy path."""
|
||||
mod = _load_a2a_tools(monkeypatch, workspace_id="ws-self-abc")
|
||||
|
||||
class FakeClient:
|
||||
def __init__(self, timeout): pass
|
||||
async def __aenter__(self): return self
|
||||
async def __aexit__(self, *a): pass
|
||||
async def get(self, url, headers=None):
|
||||
return _FakeResponse(200, {"url": "http://target.test/a2a"})
|
||||
async def post(self, url, json=None, headers=None):
|
||||
return _FakeResponse(200, {
|
||||
"result": {"parts": [{"kind": "text", "text": "ok"}]}
|
||||
})
|
||||
|
||||
monkeypatch.setattr(mod.httpx, "AsyncClient", FakeClient)
|
||||
|
||||
result = await mod.delegate_task("ws-DIFFERENT-xyz", "do a thing")
|
||||
assert "self-delegation" not in result.lower()
|
||||
|
||||
@@ -148,6 +148,41 @@ class TestRBAC:
|
||||
assert "RBAC" in result["error"]
|
||||
|
||||
|
||||
class TestSelfDelegationGuard:
|
||||
"""Task #190 / #193 — delegate_task_async must reject delegation to the
|
||||
caller's own workspace BEFORE scheduling the background task. Otherwise
|
||||
the platform A2A round-trip times out against our own held run lock, the
|
||||
failure is logged with source_id=our workspace UUID, and the inbox
|
||||
poller surfaces the row as a peer_agent message from ourselves."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_path_rejects_self_workspace(self, delegation_mocks):
|
||||
mod, *_ = delegation_mocks
|
||||
# WORKSPACE_ID was set to "ws-self" by the fixture's monkeypatch.
|
||||
# The module reads it at import time → reload-equivalent comparison.
|
||||
mod.WORKSPACE_ID = "ws-self"
|
||||
|
||||
result = await _invoke(mod, workspace_id="ws-self")
|
||||
|
||||
assert result["success"] is False
|
||||
assert "self-delegation" in result["error"].lower()
|
||||
# No background task should have been scheduled.
|
||||
assert len(mod._background_tasks) == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_path_allows_different_workspace(self, delegation_mocks):
|
||||
"""Guard does NOT short-circuit a real peer target."""
|
||||
mod, *_ = delegation_mocks
|
||||
mod.WORKSPACE_ID = "ws-self"
|
||||
_, mock_cls = _make_mock_client()
|
||||
|
||||
with patch("httpx.AsyncClient", mock_cls):
|
||||
result = await _invoke(mod, workspace_id="ws-peer")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["status"] == "delegated"
|
||||
|
||||
|
||||
class TestAsyncDelegation:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -131,6 +131,36 @@ def test_message_from_activity_peer_agent():
|
||||
assert msg.to_dict()["kind"] == "peer_agent"
|
||||
|
||||
|
||||
def test_message_from_activity_delegate_result_distinct_kind():
|
||||
"""Task #190 / #193 — pushDelegationResultToInbox (RFC #2829 PR-2) writes
|
||||
rows with method='delegate_result' and source_id=our own workspace UUID
|
||||
so the caller's wait_for_message can surface delegation completions or
|
||||
failures. Without an explicit kind override, to_dict() would classify
|
||||
those rows as kind='peer_agent' (peer_id non-empty) and the agent would
|
||||
treat its OWN delegation timeout as a peer instructing it — the #190
|
||||
self-echo bug. Classify these rows as kind='delegation_result' so they
|
||||
are recognizable as structured delegation outcomes."""
|
||||
row = {
|
||||
"id": "act-90",
|
||||
"source_id": "ws-self-abc", # same as our workspace
|
||||
"method": "delegate_result",
|
||||
"summary": "Delegation failed",
|
||||
"response_body": {"text": "polling timeout", "delegation_id": "d-1"},
|
||||
"created_at": "2026-05-18T00:00:00Z",
|
||||
}
|
||||
msg = inbox.message_from_activity(row)
|
||||
payload = msg.to_dict()
|
||||
assert payload["kind"] == "delegation_result", (
|
||||
f"delegate_result rows must surface as kind='delegation_result', "
|
||||
f"not peer_agent (got {payload['kind']!r})"
|
||||
)
|
||||
# Method preserved for downstream consumers that key off it.
|
||||
assert payload["method"] == "delegate_result"
|
||||
# peer_id is still set on the dataclass for back-compat dispatch — the
|
||||
# distinguishing signal is the kind field.
|
||||
assert msg.peer_id == "ws-self-abc"
|
||||
|
||||
|
||||
def test_message_from_activity_handles_string_request_body():
|
||||
row = {
|
||||
"id": "act-3",
|
||||
|
||||
Reference in New Issue
Block a user