Merge pull request #2887 from Molecule-AI/refactor/a2a-tools-delegation-extract-rfc2873-iter4b
refactor(workspace): extract delegation handlers from a2a_tools.py (RFC #2873 iter 4b)
This commit is contained in:
commit
6470e5f41b
@ -55,6 +55,7 @@ TOP_LEVEL_MODULES = {
|
|||||||
"a2a_executor",
|
"a2a_executor",
|
||||||
"a2a_mcp_server",
|
"a2a_mcp_server",
|
||||||
"a2a_tools",
|
"a2a_tools",
|
||||||
|
"a2a_tools_delegation",
|
||||||
"a2a_tools_rbac",
|
"a2a_tools_rbac",
|
||||||
"adapter_base",
|
"adapter_base",
|
||||||
"agent",
|
"agent",
|
||||||
|
|||||||
@ -115,324 +115,18 @@ async def report_activity(
|
|||||||
pass # Best-effort — don't block delegation on activity reporting
|
pass # Best-effort — don't block delegation on activity reporting
|
||||||
|
|
||||||
|
|
||||||
# RFC #2829 PR-5 cutover constants. The poll cadence + timeout are
|
# Delegation tool handlers — extracted to a2a_tools_delegation
|
||||||
# intentionally generous: 3s gives the platform's executeDelegation
|
# (RFC #2873 iter 4b). Re-imported here so call sites + tests that
|
||||||
# goroutine room to dispatch + the callee to respond + the result to
|
# reference ``a2a_tools.tool_delegate_task`` /
|
||||||
# write to activity_logs without thrashing the platform with rapid
|
# ``a2a_tools._delegate_sync_via_polling`` keep resolving identically.
|
||||||
# polls; the budget matches the legacy DELEGATION_TIMEOUT (300s) so
|
from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_client block)
|
||||||
# operators don't see behavior change beyond "no more 600s timeouts".
|
_SYNC_POLL_BUDGET_S,
|
||||||
_SYNC_POLL_INTERVAL_S = 3.0
|
_SYNC_POLL_INTERVAL_S,
|
||||||
_SYNC_POLL_BUDGET_S = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
|
_delegate_sync_via_polling,
|
||||||
|
tool_check_task_status,
|
||||||
|
tool_delegate_task,
|
||||||
async def _delegate_sync_via_polling(
|
tool_delegate_task_async,
|
||||||
workspace_id: str,
|
)
|
||||||
task: str,
|
|
||||||
src: str,
|
|
||||||
) -> str:
|
|
||||||
"""RFC #2829 PR-5: durable async delegation + poll for terminal status.
|
|
||||||
|
|
||||||
Sidesteps the platform proxy's blocking `message/send` HTTP path that
|
|
||||||
hits a hard 600s ceiling. Instead:
|
|
||||||
|
|
||||||
1. POST /workspaces/<src>/delegate (async, returns 202 + delegation_id)
|
|
||||||
— platform's executeDelegation goroutine handles A2A dispatch in
|
|
||||||
the background. No client-side timeout dependency on the platform
|
|
||||||
holding a connection open.
|
|
||||||
2. Poll GET /workspaces/<src>/delegations every 3s for a row with
|
|
||||||
matching delegation_id reaching terminal status (completed/failed).
|
|
||||||
3. Return the response_preview text on completed; surface error_detail
|
|
||||||
on failed (with the same _A2A_ERROR_PREFIX wrapping the legacy
|
|
||||||
path uses, so caller error-detection logic is unchanged).
|
|
||||||
|
|
||||||
Both /delegate and /delegations are existing endpoints — this helper
|
|
||||||
just composes them into a polling synchronous facade. The result is
|
|
||||||
available the moment the platform writes the terminal status row;
|
|
||||||
no extra latency vs. the legacy proxy-blocked path on fast cases.
|
|
||||||
"""
|
|
||||||
import asyncio
|
|
||||||
import time
|
|
||||||
|
|
||||||
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
|
|
||||||
|
|
||||||
# 1. Dispatch via /delegate (the async, durable path).
|
|
||||||
try:
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
||||||
resp = await client.post(
|
|
||||||
f"{PLATFORM_URL}/workspaces/{src}/delegate",
|
|
||||||
json={
|
|
||||||
"target_id": workspace_id,
|
|
||||||
"task": task,
|
|
||||||
"idempotency_key": idem_key,
|
|
||||||
},
|
|
||||||
headers=_auth_headers_for_heartbeat(src),
|
|
||||||
)
|
|
||||||
except Exception as e: # pylint: disable=broad-except
|
|
||||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: {e}"
|
|
||||||
|
|
||||||
if resp.status_code != 202 and resp.status_code != 200:
|
|
||||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: HTTP {resp.status_code} {resp.text[:200]}"
|
|
||||||
|
|
||||||
try:
|
|
||||||
dispatch = resp.json()
|
|
||||||
except Exception as e: # pylint: disable=broad-except
|
|
||||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch returned non-JSON: {e}"
|
|
||||||
|
|
||||||
delegation_id = dispatch.get("delegation_id", "")
|
|
||||||
if not delegation_id:
|
|
||||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch missing delegation_id: {dispatch}"
|
|
||||||
|
|
||||||
# 2. Poll for terminal status with a deadline. Each poll is a cheap
|
|
||||||
# /delegations GET — bounded by the platform's existing rate limit.
|
|
||||||
deadline = time.monotonic() + _SYNC_POLL_BUDGET_S
|
|
||||||
last_status = "unknown"
|
|
||||||
while time.monotonic() < deadline:
|
|
||||||
try:
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
||||||
poll = await client.get(
|
|
||||||
f"{PLATFORM_URL}/workspaces/{src}/delegations",
|
|
||||||
headers=_auth_headers_for_heartbeat(src),
|
|
||||||
)
|
|
||||||
except Exception as e: # pylint: disable=broad-except
|
|
||||||
# Transient — keep polling. The platform IS holding the
|
|
||||||
# delegation row; we just lost a network request.
|
|
||||||
last_status = f"poll-error: {e}"
|
|
||||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if poll.status_code != 200:
|
|
||||||
last_status = f"poll HTTP {poll.status_code}"
|
|
||||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
rows = poll.json()
|
|
||||||
except Exception as e: # pylint: disable=broad-except
|
|
||||||
last_status = f"poll non-JSON: {e}"
|
|
||||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# /delegations returns a flat list of delegation events. Filter to
|
|
||||||
# our delegation_id; pick the first terminal one. The list may
|
|
||||||
# have multiple rows per delegation_id (one for the original
|
|
||||||
# dispatch, one per status update); we want the latest terminal.
|
|
||||||
if not isinstance(rows, list):
|
|
||||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
|
||||||
continue
|
|
||||||
terminal = None
|
|
||||||
for r in rows:
|
|
||||||
if not isinstance(r, dict):
|
|
||||||
continue
|
|
||||||
if r.get("delegation_id") != delegation_id:
|
|
||||||
continue
|
|
||||||
status = (r.get("status") or "").lower()
|
|
||||||
last_status = status
|
|
||||||
if status in ("completed", "failed"):
|
|
||||||
terminal = r
|
|
||||||
break
|
|
||||||
if terminal:
|
|
||||||
if (terminal.get("status") or "").lower() == "completed":
|
|
||||||
return terminal.get("response_preview") or ""
|
|
||||||
err = (
|
|
||||||
terminal.get("error_detail")
|
|
||||||
or terminal.get("summary")
|
|
||||||
or "delegation failed"
|
|
||||||
)
|
|
||||||
return f"{_A2A_ERROR_PREFIX}{err}"
|
|
||||||
|
|
||||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
|
||||||
|
|
||||||
# Budget exhausted — the platform's row is still in flight (or queued).
|
|
||||||
# Surface as an error so the caller can decide to retry or fall back;
|
|
||||||
# the platform DOES still have the durable row, so the work isn't
|
|
||||||
# lost — it'll complete eventually and a future check_task_status
|
|
||||||
# will surface the result.
|
|
||||||
return (
|
|
||||||
f"{_A2A_ERROR_PREFIX}polling timeout after {_SYNC_POLL_BUDGET_S}s "
|
|
||||||
f"(delegation_id={delegation_id}, last_status={last_status}); "
|
|
||||||
f"the platform is still working on it — call check_task_status('{delegation_id}') to retrieve later"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def tool_delegate_task(
|
|
||||||
workspace_id: str,
|
|
||||||
task: str,
|
|
||||||
source_workspace_id: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
"""Delegate a task to another workspace via A2A (synchronous — waits for response).
|
|
||||||
|
|
||||||
``source_workspace_id`` selects which registered workspace this
|
|
||||||
delegation originates from — drives auth + the X-Workspace-ID source
|
|
||||||
header so the platform's a2a_proxy logs the correct sender. Single-
|
|
||||||
workspace operators leave it None and routing falls back to the
|
|
||||||
module-level WORKSPACE_ID.
|
|
||||||
"""
|
|
||||||
if not workspace_id or not task:
|
|
||||||
return "Error: workspace_id and task are required"
|
|
||||||
|
|
||||||
# Auto-route: if source not specified, look up which registered
|
|
||||||
# workspace last saw this peer (populated by tool_list_peers). Falls
|
|
||||||
# back to the legacy WORKSPACE_ID for single-workspace operators.
|
|
||||||
src = source_workspace_id or _peer_to_source.get(workspace_id) or None
|
|
||||||
|
|
||||||
# Discover the target. discover_peer is the access-control gate +
|
|
||||||
# name/status lookup. The peer's reported ``url`` field is NOT used
|
|
||||||
# for routing — see send_a2a_message, which constructs the URL via
|
|
||||||
# the platform's A2A proxy.
|
|
||||||
peer = await discover_peer(workspace_id, source_workspace_id=src)
|
|
||||||
if not peer:
|
|
||||||
return f"Error: workspace {workspace_id} not found or not accessible (check access control)"
|
|
||||||
|
|
||||||
if (peer.get("status") or "").lower() == "offline":
|
|
||||||
return f"Error: workspace {workspace_id} is offline"
|
|
||||||
|
|
||||||
# Report delegation start — include the task text for traceability
|
|
||||||
peer_name = peer.get("name") or _peer_names.get(workspace_id) or workspace_id[:8]
|
|
||||||
_peer_names[workspace_id] = peer_name # cache for future use
|
|
||||||
# Brief summary for canvas display — just the delegation target
|
|
||||||
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
|
|
||||||
|
|
||||||
# RFC #2829 PR-5: agent-side cutover. When DELEGATION_SYNC_VIA_INBOX=1,
|
|
||||||
# use the platform's durable async delegation API (POST /delegate +
|
|
||||||
# poll /delegations) instead of the proxy-blocked message/send path.
|
|
||||||
# This sidesteps the 600s message/send timeout class that broke
|
|
||||||
# iteration-14/90-style long-running delegations on 2026-05-05.
|
|
||||||
#
|
|
||||||
# Default off — staging-canary first, flip default after PR-2's
|
|
||||||
# result-push flag (DELEGATION_RESULT_INBOX_PUSH) has been on for
|
|
||||||
# ≥1 week without incident.
|
|
||||||
if os.environ.get("DELEGATION_SYNC_VIA_INBOX") == "1":
|
|
||||||
result = await _delegate_sync_via_polling(workspace_id, task, src or WORKSPACE_ID)
|
|
||||||
else:
|
|
||||||
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
|
|
||||||
# (the platform proxy) so the same code works for in-container and
|
|
||||||
# external (standalone molecule-mcp) callers.
|
|
||||||
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
|
||||||
|
|
||||||
# Detect delegation failures — wrap them clearly so the calling agent
|
|
||||||
# can decide to retry, use another peer, or handle the task itself.
|
|
||||||
is_error = result.startswith(_A2A_ERROR_PREFIX)
|
|
||||||
# Strip the sentinel prefix so error_detail is the human-readable
|
|
||||||
# cause directly. The Activity tab's red error chip surfaces this
|
|
||||||
# without the user having to scroll into the raw response JSON.
|
|
||||||
#
|
|
||||||
# Cap at 4096 chars before sending — the platform's
|
|
||||||
# activity_logs.error_detail column is unbounded TEXT and a
|
|
||||||
# malicious or buggy peer could otherwise stream an arbitrarily
|
|
||||||
# large error message into the caller's activity log. 4096 is
|
|
||||||
# comfortably above any real exception traceback we've seen and
|
|
||||||
# well below an obvious-DoS threshold.
|
|
||||||
error_detail = result[len(_A2A_ERROR_PREFIX):].strip()[:4096] if is_error else ""
|
|
||||||
await report_activity(
|
|
||||||
"a2a_receive", workspace_id,
|
|
||||||
f"{peer_name} responded ({len(result)} chars)" if not is_error else f"{peer_name} failed: {error_detail[:120]}",
|
|
||||||
task_text=task, response_text=result,
|
|
||||||
status="error" if is_error else "ok",
|
|
||||||
error_detail=error_detail,
|
|
||||||
)
|
|
||||||
if is_error:
|
|
||||||
return (
|
|
||||||
f"DELEGATION FAILED to {peer_name}: {result}\n"
|
|
||||||
f"You should either: (1) try a different peer, (2) handle this task yourself, "
|
|
||||||
f"or (3) inform the user that {peer_name} is unavailable and provide your best answer."
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
async def tool_delegate_task_async(
|
|
||||||
workspace_id: str,
|
|
||||||
task: str,
|
|
||||||
source_workspace_id: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
"""Delegate a task via the platform's async delegation API (fire-and-forget).
|
|
||||||
|
|
||||||
Uses POST /workspaces/:id/delegate which runs the A2A request in the background.
|
|
||||||
Results are tracked in the platform DB and broadcast via WebSocket.
|
|
||||||
Use check_task_status to poll for results.
|
|
||||||
|
|
||||||
``source_workspace_id`` selects the sending workspace (which one of
|
|
||||||
this agent's registered workspaces gets logged as the originator);
|
|
||||||
auto-routes via the peer→source cache when omitted.
|
|
||||||
"""
|
|
||||||
if not workspace_id or not task:
|
|
||||||
return "Error: workspace_id and task are required"
|
|
||||||
|
|
||||||
src = source_workspace_id or _peer_to_source.get(workspace_id) or WORKSPACE_ID
|
|
||||||
|
|
||||||
# Idempotency key: SHA-256 of (source, target, task) so that a
|
|
||||||
# restarted agent firing the same delegation gets the same key and
|
|
||||||
# the platform returns the existing delegation_id instead of
|
|
||||||
# creating a duplicate. Fixes #1456. Source is in the key so the
|
|
||||||
# SAME task delegated from two different registered workspaces
|
|
||||||
# produces two distinct delegations (the right behavior — one per
|
|
||||||
# tenant audit trail).
|
|
||||||
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
||||||
resp = await client.post(
|
|
||||||
f"{PLATFORM_URL}/workspaces/{src}/delegate",
|
|
||||||
json={"target_id": workspace_id, "task": task, "idempotency_key": idem_key},
|
|
||||||
headers=_auth_headers_for_heartbeat(src),
|
|
||||||
)
|
|
||||||
if resp.status_code == 202:
|
|
||||||
data = resp.json()
|
|
||||||
return json.dumps({
|
|
||||||
"delegation_id": data.get("delegation_id", ""),
|
|
||||||
"workspace_id": workspace_id,
|
|
||||||
"status": "delegated",
|
|
||||||
"note": "Task delegated. The platform runs it in the background. Use check_task_status to poll for results.",
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
return f"Error: delegation failed with status {resp.status_code}: {resp.text[:200]}"
|
|
||||||
except Exception as e:
|
|
||||||
return f"Error: delegation failed — {e}"
|
|
||||||
|
|
||||||
|
|
||||||
async def tool_check_task_status(
|
|
||||||
workspace_id: str,
|
|
||||||
task_id: str,
|
|
||||||
source_workspace_id: str | None = None,
|
|
||||||
) -> str:
|
|
||||||
"""Check delegations for this workspace via the platform API.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
workspace_id: Ignored (kept for backward compat). Checks
|
|
||||||
``source_workspace_id``'s delegations (the workspace that
|
|
||||||
FIRED the delegations), not the target's.
|
|
||||||
task_id: Optional delegation_id to filter. If empty, returns all recent delegations.
|
|
||||||
source_workspace_id: Which registered workspace's delegation log
|
|
||||||
to query. Defaults to the module-level WORKSPACE_ID.
|
|
||||||
"""
|
|
||||||
src = source_workspace_id or WORKSPACE_ID
|
|
||||||
try:
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
||||||
resp = await client.get(
|
|
||||||
f"{PLATFORM_URL}/workspaces/{src}/delegations",
|
|
||||||
headers=_auth_headers_for_heartbeat(src),
|
|
||||||
)
|
|
||||||
if resp.status_code != 200:
|
|
||||||
return f"Error: failed to check delegations ({resp.status_code})"
|
|
||||||
delegations = resp.json()
|
|
||||||
if task_id:
|
|
||||||
# Filter by delegation_id
|
|
||||||
matching = [d for d in delegations if d.get("delegation_id") == task_id]
|
|
||||||
if matching:
|
|
||||||
return json.dumps(matching[0])
|
|
||||||
return json.dumps({"status": "not_found", "delegation_id": task_id})
|
|
||||||
# Return all recent delegations
|
|
||||||
summary = []
|
|
||||||
for d in delegations[:10]:
|
|
||||||
summary.append({
|
|
||||||
"delegation_id": d.get("delegation_id", ""),
|
|
||||||
"target_id": d.get("target_id", ""),
|
|
||||||
"status": d.get("status", ""),
|
|
||||||
"summary": d.get("summary", ""),
|
|
||||||
"response_preview": d.get("response_preview", ""),
|
|
||||||
})
|
|
||||||
return json.dumps({"delegations": summary, "count": len(delegations)})
|
|
||||||
except Exception as e:
|
|
||||||
return f"Error checking delegations: {e}"
|
|
||||||
|
|
||||||
|
|
||||||
async def _upload_chat_files(
|
async def _upload_chat_files(
|
||||||
|
|||||||
372
workspace/a2a_tools_delegation.py
Normal file
372
workspace/a2a_tools_delegation.py
Normal file
@ -0,0 +1,372 @@
|
|||||||
|
"""Delegation tool handlers — single-concern slice of the a2a_tools surface.
|
||||||
|
|
||||||
|
Extracted from ``a2a_tools.py`` (RFC #2873 iter 4b). Owns the three
|
||||||
|
delegation MCP tools + the RFC #2829 PR-5 sync-via-polling helper they
|
||||||
|
share.
|
||||||
|
|
||||||
|
Public surface:
|
||||||
|
|
||||||
|
* ``tool_delegate_task`` — synchronous delegation, waits for response.
|
||||||
|
* ``tool_delegate_task_async`` — fire-and-forget delegation; returns
|
||||||
|
``{delegation_id, ...}``.
|
||||||
|
* ``tool_check_task_status`` — poll the platform's ``/delegations`` log.
|
||||||
|
|
||||||
|
Internal:
|
||||||
|
|
||||||
|
* ``_delegate_sync_via_polling`` — durable async + poll for terminal
|
||||||
|
status (RFC #2829 PR-5 cutover path; toggled by
|
||||||
|
``DELEGATION_SYNC_VIA_INBOX=1``).
|
||||||
|
* ``_SYNC_POLL_INTERVAL_S`` / ``_SYNC_POLL_BUDGET_S`` constants.
|
||||||
|
|
||||||
|
Circular-import note: this module calls ``report_activity`` from
|
||||||
|
``a2a_tools`` to emit activity rows around the delegate dispatch.
|
||||||
|
``a2a_tools`` imports the public symbols here at module-load time,
|
||||||
|
so we use a LAZY import for ``report_activity`` inside the function
|
||||||
|
that needs it. Without the lazy hop Python raises an ImportError
|
||||||
|
on first ``a2a_tools`` import.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from a2a_client import (
|
||||||
|
PLATFORM_URL,
|
||||||
|
WORKSPACE_ID,
|
||||||
|
_A2A_ERROR_PREFIX,
|
||||||
|
_peer_names,
|
||||||
|
_peer_to_source,
|
||||||
|
discover_peer,
|
||||||
|
send_a2a_message,
|
||||||
|
)
|
||||||
|
from a2a_tools_rbac import auth_headers_for_heartbeat as _auth_headers_for_heartbeat
|
||||||
|
|
||||||
|
|
||||||
|
# RFC #2829 PR-5 cutover constants. The poll cadence + timeout are
|
||||||
|
# intentionally generous: 3s gives the platform's executeDelegation
|
||||||
|
# goroutine room to dispatch + the callee to respond + the result to
|
||||||
|
# write to activity_logs without thrashing the platform with rapid
|
||||||
|
# polls; the budget matches the legacy DELEGATION_TIMEOUT (300s) so
|
||||||
|
# operators don't see behavior change beyond "no more 600s timeouts".
|
||||||
|
_SYNC_POLL_INTERVAL_S = 3.0
|
||||||
|
_SYNC_POLL_BUDGET_S = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
|
||||||
|
|
||||||
|
|
||||||
|
async def _delegate_sync_via_polling(
|
||||||
|
workspace_id: str,
|
||||||
|
task: str,
|
||||||
|
src: str,
|
||||||
|
) -> str:
|
||||||
|
"""RFC #2829 PR-5: durable async delegation + poll for terminal status.
|
||||||
|
|
||||||
|
Sidesteps the platform proxy's blocking `message/send` HTTP path that
|
||||||
|
hits a hard 600s ceiling. Instead:
|
||||||
|
|
||||||
|
1. POST /workspaces/<src>/delegate (async, returns 202 + delegation_id)
|
||||||
|
— platform's executeDelegation goroutine handles A2A dispatch in
|
||||||
|
the background. No client-side timeout dependency on the platform
|
||||||
|
holding a connection open.
|
||||||
|
2. Poll GET /workspaces/<src>/delegations every 3s for a row with
|
||||||
|
matching delegation_id reaching terminal status (completed/failed).
|
||||||
|
3. Return the response_preview text on completed; surface error_detail
|
||||||
|
on failed (with the same _A2A_ERROR_PREFIX wrapping the legacy
|
||||||
|
path uses, so caller error-detection logic is unchanged).
|
||||||
|
|
||||||
|
Both /delegate and /delegations are existing endpoints — this helper
|
||||||
|
just composes them into a polling synchronous facade. The result is
|
||||||
|
available the moment the platform writes the terminal status row;
|
||||||
|
no extra latency vs. the legacy proxy-blocked path on fast cases.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
|
||||||
|
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
|
||||||
|
|
||||||
|
# 1. Dispatch via /delegate (the async, durable path).
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{PLATFORM_URL}/workspaces/{src}/delegate",
|
||||||
|
json={
|
||||||
|
"target_id": workspace_id,
|
||||||
|
"task": task,
|
||||||
|
"idempotency_key": idem_key,
|
||||||
|
},
|
||||||
|
headers=_auth_headers_for_heartbeat(src),
|
||||||
|
)
|
||||||
|
except Exception as e: # pylint: disable=broad-except
|
||||||
|
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: {e}"
|
||||||
|
|
||||||
|
if resp.status_code != 202 and resp.status_code != 200:
|
||||||
|
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: HTTP {resp.status_code} {resp.text[:200]}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
dispatch = resp.json()
|
||||||
|
except Exception as e: # pylint: disable=broad-except
|
||||||
|
return f"{_A2A_ERROR_PREFIX}delegate dispatch returned non-JSON: {e}"
|
||||||
|
|
||||||
|
delegation_id = dispatch.get("delegation_id", "")
|
||||||
|
if not delegation_id:
|
||||||
|
return f"{_A2A_ERROR_PREFIX}delegate dispatch missing delegation_id: {dispatch}"
|
||||||
|
|
||||||
|
# 2. Poll for terminal status with a deadline. Each poll is a cheap
|
||||||
|
# /delegations GET — bounded by the platform's existing rate limit.
|
||||||
|
deadline = time.monotonic() + _SYNC_POLL_BUDGET_S
|
||||||
|
last_status = "unknown"
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
poll = await client.get(
|
||||||
|
f"{PLATFORM_URL}/workspaces/{src}/delegations",
|
||||||
|
headers=_auth_headers_for_heartbeat(src),
|
||||||
|
)
|
||||||
|
except Exception as e: # pylint: disable=broad-except
|
||||||
|
# Transient — keep polling. The platform IS holding the
|
||||||
|
# delegation row; we just lost a network request.
|
||||||
|
last_status = f"poll-error: {e}"
|
||||||
|
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if poll.status_code != 200:
|
||||||
|
last_status = f"poll HTTP {poll.status_code}"
|
||||||
|
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
rows = poll.json()
|
||||||
|
except Exception as e: # pylint: disable=broad-except
|
||||||
|
last_status = f"poll non-JSON: {e}"
|
||||||
|
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# /delegations returns a flat list of delegation events. Filter to
|
||||||
|
# our delegation_id; pick the first terminal one. The list may
|
||||||
|
# have multiple rows per delegation_id (one for the original
|
||||||
|
# dispatch, one per status update); we want the latest terminal.
|
||||||
|
if not isinstance(rows, list):
|
||||||
|
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||||
|
continue
|
||||||
|
terminal = None
|
||||||
|
for r in rows:
|
||||||
|
if not isinstance(r, dict):
|
||||||
|
continue
|
||||||
|
if r.get("delegation_id") != delegation_id:
|
||||||
|
continue
|
||||||
|
status = (r.get("status") or "").lower()
|
||||||
|
last_status = status
|
||||||
|
if status in ("completed", "failed"):
|
||||||
|
terminal = r
|
||||||
|
break
|
||||||
|
if terminal:
|
||||||
|
if (terminal.get("status") or "").lower() == "completed":
|
||||||
|
return terminal.get("response_preview") or ""
|
||||||
|
err = (
|
||||||
|
terminal.get("error_detail")
|
||||||
|
or terminal.get("summary")
|
||||||
|
or "delegation failed"
|
||||||
|
)
|
||||||
|
return f"{_A2A_ERROR_PREFIX}{err}"
|
||||||
|
|
||||||
|
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||||
|
|
||||||
|
# Budget exhausted — the platform's row is still in flight (or queued).
|
||||||
|
# Surface as an error so the caller can decide to retry or fall back;
|
||||||
|
# the platform DOES still have the durable row, so the work isn't
|
||||||
|
# lost — it'll complete eventually and a future check_task_status
|
||||||
|
# will surface the result.
|
||||||
|
return (
|
||||||
|
f"{_A2A_ERROR_PREFIX}polling timeout after {_SYNC_POLL_BUDGET_S}s "
|
||||||
|
f"(delegation_id={delegation_id}, last_status={last_status}); "
|
||||||
|
f"the platform is still working on it — call check_task_status('{delegation_id}') to retrieve later"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_delegate_task(
|
||||||
|
workspace_id: str,
|
||||||
|
task: str,
|
||||||
|
source_workspace_id: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Delegate a task to another workspace via A2A (synchronous — waits for response).
|
||||||
|
|
||||||
|
``source_workspace_id`` selects which registered workspace this
|
||||||
|
delegation originates from — drives auth + the X-Workspace-ID source
|
||||||
|
header so the platform's a2a_proxy logs the correct sender. Single-
|
||||||
|
workspace operators leave it None and routing falls back to the
|
||||||
|
module-level WORKSPACE_ID.
|
||||||
|
"""
|
||||||
|
if not workspace_id or not task:
|
||||||
|
return "Error: workspace_id and task are required"
|
||||||
|
|
||||||
|
# Auto-route: if source not specified, look up which registered
|
||||||
|
# workspace last saw this peer (populated by tool_list_peers). Falls
|
||||||
|
# back to the legacy WORKSPACE_ID for single-workspace operators.
|
||||||
|
src = source_workspace_id or _peer_to_source.get(workspace_id) or None
|
||||||
|
|
||||||
|
# Discover the target. discover_peer is the access-control gate +
|
||||||
|
# name/status lookup. The peer's reported ``url`` field is NOT used
|
||||||
|
# for routing — see send_a2a_message, which constructs the URL via
|
||||||
|
# the platform's A2A proxy.
|
||||||
|
peer = await discover_peer(workspace_id, source_workspace_id=src)
|
||||||
|
if not peer:
|
||||||
|
return f"Error: workspace {workspace_id} not found or not accessible (check access control)"
|
||||||
|
|
||||||
|
if (peer.get("status") or "").lower() == "offline":
|
||||||
|
return f"Error: workspace {workspace_id} is offline"
|
||||||
|
|
||||||
|
# Lazy import: a2a_tools imports this module at top-level, so a
|
||||||
|
# top-level import of report_activity from a2a_tools would create a
|
||||||
|
# circular dependency at first-import time. Lazy resolution inside
|
||||||
|
# the function body breaks the cycle without forcing a ground-up
|
||||||
|
# restructure of the activity-reporting layer.
|
||||||
|
from a2a_tools import report_activity
|
||||||
|
|
||||||
|
# Report delegation start — include the task text for traceability
|
||||||
|
peer_name = peer.get("name") or _peer_names.get(workspace_id) or workspace_id[:8]
|
||||||
|
_peer_names[workspace_id] = peer_name # cache for future use
|
||||||
|
# Brief summary for canvas display — just the delegation target
|
||||||
|
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
|
||||||
|
|
||||||
|
# RFC #2829 PR-5: agent-side cutover. When DELEGATION_SYNC_VIA_INBOX=1,
|
||||||
|
# use the platform's durable async delegation API (POST /delegate +
|
||||||
|
# poll /delegations) instead of the proxy-blocked message/send path.
|
||||||
|
# This sidesteps the 600s message/send timeout class that broke
|
||||||
|
# iteration-14/90-style long-running delegations on 2026-05-05.
|
||||||
|
#
|
||||||
|
# Default off — staging-canary first, flip default after PR-2's
|
||||||
|
# result-push flag (DELEGATION_RESULT_INBOX_PUSH) has been on for
|
||||||
|
# ≥1 week without incident.
|
||||||
|
if os.environ.get("DELEGATION_SYNC_VIA_INBOX") == "1":
|
||||||
|
result = await _delegate_sync_via_polling(workspace_id, task, src or WORKSPACE_ID)
|
||||||
|
else:
|
||||||
|
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
|
||||||
|
# (the platform proxy) so the same code works for in-container and
|
||||||
|
# external (standalone molecule-mcp) callers.
|
||||||
|
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
||||||
|
|
||||||
|
# Detect delegation failures — wrap them clearly so the calling agent
|
||||||
|
# can decide to retry, use another peer, or handle the task itself.
|
||||||
|
is_error = result.startswith(_A2A_ERROR_PREFIX)
|
||||||
|
# Strip the sentinel prefix so error_detail is the human-readable
|
||||||
|
# cause directly. The Activity tab's red error chip surfaces this
|
||||||
|
# without the user having to scroll into the raw response JSON.
|
||||||
|
#
|
||||||
|
# Cap at 4096 chars before sending — the platform's
|
||||||
|
# activity_logs.error_detail column is unbounded TEXT and a
|
||||||
|
# malicious or buggy peer could otherwise stream an arbitrarily
|
||||||
|
# large error message into the caller's activity log. 4096 is
|
||||||
|
# comfortably above any real exception traceback we've seen and
|
||||||
|
# well below an obvious-DoS threshold.
|
||||||
|
error_detail = result[len(_A2A_ERROR_PREFIX):].strip()[:4096] if is_error else ""
|
||||||
|
await report_activity(
|
||||||
|
"a2a_receive", workspace_id,
|
||||||
|
f"{peer_name} responded ({len(result)} chars)" if not is_error else f"{peer_name} failed: {error_detail[:120]}",
|
||||||
|
task_text=task, response_text=result,
|
||||||
|
status="error" if is_error else "ok",
|
||||||
|
error_detail=error_detail,
|
||||||
|
)
|
||||||
|
if is_error:
|
||||||
|
return (
|
||||||
|
f"DELEGATION FAILED to {peer_name}: {result}\n"
|
||||||
|
f"You should either: (1) try a different peer, (2) handle this task yourself, "
|
||||||
|
f"or (3) inform the user that {peer_name} is unavailable and provide your best answer."
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_delegate_task_async(
|
||||||
|
workspace_id: str,
|
||||||
|
task: str,
|
||||||
|
source_workspace_id: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Delegate a task via the platform's async delegation API (fire-and-forget).
|
||||||
|
|
||||||
|
Uses POST /workspaces/:id/delegate which runs the A2A request in the background.
|
||||||
|
Results are tracked in the platform DB and broadcast via WebSocket.
|
||||||
|
Use check_task_status to poll for results.
|
||||||
|
|
||||||
|
``source_workspace_id`` selects the sending workspace (which one of
|
||||||
|
this agent's registered workspaces gets logged as the originator);
|
||||||
|
auto-routes via the peer→source cache when omitted.
|
||||||
|
"""
|
||||||
|
if not workspace_id or not task:
|
||||||
|
return "Error: workspace_id and task are required"
|
||||||
|
|
||||||
|
src = source_workspace_id or _peer_to_source.get(workspace_id) or WORKSPACE_ID
|
||||||
|
|
||||||
|
# Idempotency key: SHA-256 of (source, target, task) so that a
|
||||||
|
# restarted agent firing the same delegation gets the same key and
|
||||||
|
# the platform returns the existing delegation_id instead of
|
||||||
|
# creating a duplicate. Fixes #1456. Source is in the key so the
|
||||||
|
# SAME task delegated from two different registered workspaces
|
||||||
|
# produces two distinct delegations (the right behavior — one per
|
||||||
|
# tenant audit trail).
|
||||||
|
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{PLATFORM_URL}/workspaces/{src}/delegate",
|
||||||
|
json={"target_id": workspace_id, "task": task, "idempotency_key": idem_key},
|
||||||
|
headers=_auth_headers_for_heartbeat(src),
|
||||||
|
)
|
||||||
|
if resp.status_code == 202:
|
||||||
|
data = resp.json()
|
||||||
|
return json.dumps({
|
||||||
|
"delegation_id": data.get("delegation_id", ""),
|
||||||
|
"workspace_id": workspace_id,
|
||||||
|
"status": "delegated",
|
||||||
|
"note": "Task delegated. The platform runs it in the background. Use check_task_status to poll for results.",
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
return f"Error: delegation failed with status {resp.status_code}: {resp.text[:200]}"
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error: delegation failed — {e}"
|
||||||
|
|
||||||
|
|
||||||
|
async def tool_check_task_status(
|
||||||
|
workspace_id: str,
|
||||||
|
task_id: str,
|
||||||
|
source_workspace_id: str | None = None,
|
||||||
|
) -> str:
|
||||||
|
"""Check delegations for this workspace via the platform API.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
workspace_id: Ignored (kept for backward compat). Checks
|
||||||
|
``source_workspace_id``'s delegations (the workspace that
|
||||||
|
FIRED the delegations), not the target's.
|
||||||
|
task_id: Optional delegation_id to filter. If empty, returns all recent delegations.
|
||||||
|
source_workspace_id: Which registered workspace's delegation log
|
||||||
|
to query. Defaults to the module-level WORKSPACE_ID.
|
||||||
|
"""
|
||||||
|
src = source_workspace_id or WORKSPACE_ID
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
resp = await client.get(
|
||||||
|
f"{PLATFORM_URL}/workspaces/{src}/delegations",
|
||||||
|
headers=_auth_headers_for_heartbeat(src),
|
||||||
|
)
|
||||||
|
if resp.status_code != 200:
|
||||||
|
return f"Error: failed to check delegations ({resp.status_code})"
|
||||||
|
delegations = resp.json()
|
||||||
|
if task_id:
|
||||||
|
# Filter by delegation_id
|
||||||
|
matching = [d for d in delegations if d.get("delegation_id") == task_id]
|
||||||
|
if matching:
|
||||||
|
return json.dumps(matching[0])
|
||||||
|
return json.dumps({"status": "not_found", "delegation_id": task_id})
|
||||||
|
# Return all recent delegations
|
||||||
|
summary = []
|
||||||
|
for d in delegations[:10]:
|
||||||
|
summary.append({
|
||||||
|
"delegation_id": d.get("delegation_id", ""),
|
||||||
|
"target_id": d.get("target_id", ""),
|
||||||
|
"status": d.get("status", ""),
|
||||||
|
"summary": d.get("summary", ""),
|
||||||
|
"response_preview": d.get("response_preview", ""),
|
||||||
|
})
|
||||||
|
return json.dumps({"delegations": summary, "count": len(delegations)})
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error checking delegations: {e}"
|
||||||
@ -339,8 +339,8 @@ class TestToolDelegateTaskAutoRouting:
|
|||||||
seen_send_src["src"] = source_workspace_id
|
seen_send_src["src"] = source_workspace_id
|
||||||
return "ok"
|
return "ok"
|
||||||
|
|
||||||
with patch("a2a_tools.discover_peer", side_effect=fake_discover), \
|
with patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
||||||
|
|
||||||
@ -367,8 +367,8 @@ class TestToolDelegateTaskAutoRouting:
|
|||||||
seen["send"] = source_workspace_id
|
seen["send"] = source_workspace_id
|
||||||
return "ok"
|
return "ok"
|
||||||
|
|
||||||
with patch("a2a_tools.discover_peer", side_effect=fake_discover), \
|
with patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
await a2a_tools.tool_delegate_task(
|
await a2a_tools.tool_delegate_task(
|
||||||
peer_id, "do thing", source_workspace_id=ws_explicit,
|
peer_id, "do thing", source_workspace_id=ws_explicit,
|
||||||
@ -395,8 +395,8 @@ class TestToolDelegateTaskAutoRouting:
|
|||||||
seen["send"] = source_workspace_id
|
seen["send"] = source_workspace_id
|
||||||
return "ok"
|
return "ok"
|
||||||
|
|
||||||
with patch("a2a_tools.discover_peer", side_effect=fake_discover), \
|
with patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
||||||
|
|
||||||
|
|||||||
129
workspace/tests/test_a2a_tools_delegation.py
Normal file
129
workspace/tests/test_a2a_tools_delegation.py
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
"""Drift gate + direct surface tests for ``a2a_tools_delegation`` (RFC #2873 iter 4b).
|
||||||
|
|
||||||
|
The full behavior matrix for the three delegation MCP tools lives in
|
||||||
|
``test_a2a_tools_impl.py`` (TestToolDelegateTask + TestToolDelegateTaskAsync
|
||||||
|
+ TestToolCheckTaskStatus). Those exercise call paths through the
|
||||||
|
``a2a_tools_delegation.foo`` module (after the iter 4b retarget).
|
||||||
|
|
||||||
|
This file owns the post-split contract:
|
||||||
|
|
||||||
|
1. **Drift gate** — every previously-public symbol on ``a2a_tools``
|
||||||
|
(``tool_delegate_task``, ``tool_delegate_task_async``,
|
||||||
|
``tool_check_task_status``, ``_delegate_sync_via_polling``,
|
||||||
|
``_SYNC_POLL_INTERVAL_S``, ``_SYNC_POLL_BUDGET_S``) is the EXACT
|
||||||
|
same callable / value as the new module's public name. A wrapper
|
||||||
|
that drifted would silently bypass tests targeting the wrapper.
|
||||||
|
|
||||||
|
2. **Smoke import** — both modules import in either order without
|
||||||
|
raising (the lazy ``report_activity`` import inside
|
||||||
|
``tool_delegate_task`` is the contract that prevents a circular
|
||||||
|
import; this test pins it).
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _require_workspace_id(monkeypatch):
|
||||||
|
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||||
|
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Drift gate ==============
|
||||||
|
|
||||||
|
class TestBackCompatAliases:
|
||||||
|
def test_tool_delegate_task_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_delegation
|
||||||
|
assert a2a_tools.tool_delegate_task is a2a_tools_delegation.tool_delegate_task
|
||||||
|
|
||||||
|
def test_tool_delegate_task_async_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_delegation
|
||||||
|
assert (
|
||||||
|
a2a_tools.tool_delegate_task_async
|
||||||
|
is a2a_tools_delegation.tool_delegate_task_async
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_tool_check_task_status_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_delegation
|
||||||
|
assert (
|
||||||
|
a2a_tools.tool_check_task_status
|
||||||
|
is a2a_tools_delegation.tool_check_task_status
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_delegate_sync_via_polling_alias(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_delegation
|
||||||
|
assert (
|
||||||
|
a2a_tools._delegate_sync_via_polling
|
||||||
|
is a2a_tools_delegation._delegate_sync_via_polling
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_constants_match(self):
|
||||||
|
import a2a_tools
|
||||||
|
import a2a_tools_delegation
|
||||||
|
assert (
|
||||||
|
a2a_tools._SYNC_POLL_INTERVAL_S
|
||||||
|
== a2a_tools_delegation._SYNC_POLL_INTERVAL_S
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
a2a_tools._SYNC_POLL_BUDGET_S
|
||||||
|
== a2a_tools_delegation._SYNC_POLL_BUDGET_S
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Smoke imports ==============
|
||||||
|
|
||||||
|
class TestImportContracts:
|
||||||
|
def test_delegation_imports_without_a2a_tools_loaded(self, monkeypatch):
|
||||||
|
"""``a2a_tools_delegation`` should NOT pull in ``a2a_tools`` at
|
||||||
|
module-load time. The lazy ``from a2a_tools import report_activity``
|
||||||
|
inside ``tool_delegate_task`` is the only legitimate hop.
|
||||||
|
|
||||||
|
Pin this so a future refactor that adds a top-level
|
||||||
|
``from a2a_tools import …`` re-introduces the circular-import
|
||||||
|
crash that motivated the lazy pattern.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
# Drop both modules so we re-import in a controlled order
|
||||||
|
for mod in ("a2a_tools", "a2a_tools_delegation"):
|
||||||
|
sys.modules.pop(mod, None)
|
||||||
|
|
||||||
|
# Importing delegation first must succeed without a2a_tools
|
||||||
|
# being loaded (because a2a_tools imports delegation, the
|
||||||
|
# circular path ONLY closes if delegation top-level imports
|
||||||
|
# something from a2a_tools).
|
||||||
|
import a2a_tools_delegation # noqa: F401
|
||||||
|
# If we got here, no circular import.
|
||||||
|
assert "a2a_tools_delegation" in sys.modules
|
||||||
|
|
||||||
|
def test_a2a_tools_imports_via_delegation_re_export(self):
|
||||||
|
"""The opposite direction: importing a2a_tools must trigger the
|
||||||
|
delegation re-export so a2a_tools.tool_delegate_task resolves."""
|
||||||
|
import a2a_tools
|
||||||
|
assert hasattr(a2a_tools, "tool_delegate_task")
|
||||||
|
assert hasattr(a2a_tools, "tool_delegate_task_async")
|
||||||
|
assert hasattr(a2a_tools, "tool_check_task_status")
|
||||||
|
|
||||||
|
|
||||||
|
# ============== Sync-poll budget env override ==============
|
||||||
|
|
||||||
|
class TestPollBudgetEnvOverride:
|
||||||
|
def test_default_budget_when_env_unset(self):
|
||||||
|
"""Module-level constant. Set DELEGATION_TIMEOUT before importing
|
||||||
|
a2a_tools_delegation to override; default is 300.0."""
|
||||||
|
# The constant is computed at module-load time. To verify the
|
||||||
|
# override path we'd need to reload — skipped here because it's
|
||||||
|
# tested at boot. This test pins the default for catch-the-eye
|
||||||
|
# documentation.
|
||||||
|
import a2a_tools_delegation
|
||||||
|
# Whatever was set when the module first loaded — assert it's
|
||||||
|
# numeric and >= the documented floor (180s healthsweep budget).
|
||||||
|
assert isinstance(a2a_tools_delegation._SYNC_POLL_BUDGET_S, float)
|
||||||
|
assert a2a_tools_delegation._SYNC_POLL_BUDGET_S >= 180.0
|
||||||
@ -226,16 +226,16 @@ class TestToolDelegateTask:
|
|||||||
|
|
||||||
async def test_peer_not_found_returns_error(self):
|
async def test_peer_not_found_returns_error(self):
|
||||||
import a2a_tools
|
import a2a_tools
|
||||||
with patch("a2a_tools.discover_peer", return_value=None):
|
with patch("a2a_tools_delegation.discover_peer", return_value=None):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-missing", "task")
|
result = await a2a_tools.tool_delegate_task("ws-missing", "task")
|
||||||
assert "not found" in result or "Error" in result
|
assert "not found" in result or "Error" in result
|
||||||
|
|
||||||
async def test_offline_peer_returns_error(self):
|
async def test_offline_peer_returns_error(self):
|
||||||
"""A peer with status=offline short-circuits before we hit the proxy."""
|
"""A peer with status=offline short-circuits before we hit the proxy."""
|
||||||
import a2a_tools
|
import a2a_tools
|
||||||
with patch("a2a_tools.discover_peer", return_value={"id": "ws-1", "status": "offline"}):
|
with patch("a2a_tools_delegation.discover_peer", return_value={"id": "ws-1", "status": "offline"}):
|
||||||
mc = _make_http_mock()
|
mc = _make_http_mock()
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-1", "task")
|
result = await a2a_tools.tool_delegate_task("ws-1", "task")
|
||||||
assert "offline" in result.lower()
|
assert "offline" in result.lower()
|
||||||
|
|
||||||
@ -261,8 +261,8 @@ class TestToolDelegateTask:
|
|||||||
captured["source"] = source_workspace_id
|
captured["source"] = source_workspace_id
|
||||||
return "ok"
|
return "ok"
|
||||||
|
|
||||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
|
||||||
patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
await a2a_tools.tool_delegate_task(peer_id, "do thing")
|
||||||
|
|
||||||
@ -274,8 +274,8 @@ class TestToolDelegateTask:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
peer = {"id": "ws-1", "url": "http://ws-1.svc/a2a", "name": "Worker"}
|
peer = {"id": "ws-1", "url": "http://ws-1.svc/a2a", "name": "Worker"}
|
||||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
|
||||||
patch("a2a_tools.send_a2a_message", return_value="Task completed!"), \
|
patch("a2a_tools_delegation.send_a2a_message", return_value="Task completed!"), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
|
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
|
||||||
|
|
||||||
@ -287,8 +287,8 @@ class TestToolDelegateTask:
|
|||||||
|
|
||||||
peer = {"id": "ws-1", "url": "http://ws-1.svc/a2a", "name": "Worker"}
|
peer = {"id": "ws-1", "url": "http://ws-1.svc/a2a", "name": "Worker"}
|
||||||
error_msg = f"{a2a_tools._A2A_ERROR_PREFIX}Agent error: something bad"
|
error_msg = f"{a2a_tools._A2A_ERROR_PREFIX}Agent error: something bad"
|
||||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
|
||||||
patch("a2a_tools.send_a2a_message", return_value=error_msg), \
|
patch("a2a_tools_delegation.send_a2a_message", return_value=error_msg), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
|
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
|
||||||
|
|
||||||
@ -302,8 +302,8 @@ class TestToolDelegateTask:
|
|||||||
# Pre-populate the cache
|
# Pre-populate the cache
|
||||||
a2a_tools._peer_names["ws-cached"] = "CachedName"
|
a2a_tools._peer_names["ws-cached"] = "CachedName"
|
||||||
peer = {"id": "ws-cached", "url": "http://ws-cached.svc/a2a"} # no 'name'
|
peer = {"id": "ws-cached", "url": "http://ws-cached.svc/a2a"} # no 'name'
|
||||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
|
||||||
patch("a2a_tools.send_a2a_message", return_value="done"), \
|
patch("a2a_tools_delegation.send_a2a_message", return_value="done"), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-cached", "task")
|
result = await a2a_tools.tool_delegate_task("ws-cached", "task")
|
||||||
|
|
||||||
@ -316,8 +316,8 @@ class TestToolDelegateTask:
|
|||||||
# Ensure not in cache
|
# Ensure not in cache
|
||||||
a2a_tools._peer_names.pop("ws-nona000", None)
|
a2a_tools._peer_names.pop("ws-nona000", None)
|
||||||
peer = {"id": "ws-nona000", "url": "http://x.svc/a2a"} # no 'name'
|
peer = {"id": "ws-nona000", "url": "http://x.svc/a2a"} # no 'name'
|
||||||
with patch("a2a_tools.discover_peer", return_value=peer), \
|
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
|
||||||
patch("a2a_tools.send_a2a_message", return_value="ok"), \
|
patch("a2a_tools_delegation.send_a2a_message", return_value="ok"), \
|
||||||
patch("a2a_tools.report_activity", new=AsyncMock()):
|
patch("a2a_tools.report_activity", new=AsyncMock()):
|
||||||
result = await a2a_tools.tool_delegate_task("ws-nona000", "task")
|
result = await a2a_tools.tool_delegate_task("ws-nona000", "task")
|
||||||
|
|
||||||
@ -349,7 +349,7 @@ class TestToolDelegateTaskAsync:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
mc = _make_http_mock(post_resp=_resp(202, {"delegation_id": "d-123", "status": "delegated"}))
|
mc = _make_http_mock(post_resp=_resp(202, {"delegation_id": "d-123", "status": "delegated"}))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
||||||
|
|
||||||
data = json.loads(result)
|
data = json.loads(result)
|
||||||
@ -362,7 +362,7 @@ class TestToolDelegateTaskAsync:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
mc = _make_http_mock(post_resp=_resp(500, {"error": "internal"}))
|
mc = _make_http_mock(post_resp=_resp(500, {"error": "internal"}))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
||||||
|
|
||||||
assert "Error" in result
|
assert "Error" in result
|
||||||
@ -372,7 +372,7 @@ class TestToolDelegateTaskAsync:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
mc = _make_http_mock(post_exc=httpx.ConnectError("connection refused"))
|
mc = _make_http_mock(post_exc=httpx.ConnectError("connection refused"))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
result = await a2a_tools.tool_delegate_task_async("ws-1", "do task")
|
||||||
|
|
||||||
assert "Error" in result or "failed" in result.lower()
|
assert "Error" in result or "failed" in result.lower()
|
||||||
@ -393,7 +393,7 @@ class TestToolCheckTaskStatus:
|
|||||||
{"delegation_id": "d-2", "target_id": "ws-u", "status": "pending", "summary": "waiting"},
|
{"delegation_id": "d-2", "target_id": "ws-u", "status": "pending", "summary": "waiting"},
|
||||||
]
|
]
|
||||||
mc = _make_http_mock(get_resp=_resp(200, delegations))
|
mc = _make_http_mock(get_resp=_resp(200, delegations))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_check_task_status("ws-1", "")
|
result = await a2a_tools.tool_check_task_status("ws-1", "")
|
||||||
|
|
||||||
data = json.loads(result)
|
data = json.loads(result)
|
||||||
@ -409,7 +409,7 @@ class TestToolCheckTaskStatus:
|
|||||||
{"delegation_id": "d-2", "status": "pending"},
|
{"delegation_id": "d-2", "status": "pending"},
|
||||||
]
|
]
|
||||||
mc = _make_http_mock(get_resp=_resp(200, delegations))
|
mc = _make_http_mock(get_resp=_resp(200, delegations))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_check_task_status("ws-1", "d-1")
|
result = await a2a_tools.tool_check_task_status("ws-1", "d-1")
|
||||||
|
|
||||||
data = json.loads(result)
|
data = json.loads(result)
|
||||||
@ -421,7 +421,7 @@ class TestToolCheckTaskStatus:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_check_task_status("ws-1", "d-missing")
|
result = await a2a_tools.tool_check_task_status("ws-1", "d-missing")
|
||||||
|
|
||||||
data = json.loads(result)
|
data = json.loads(result)
|
||||||
@ -432,7 +432,7 @@ class TestToolCheckTaskStatus:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
|
|
||||||
mc = _make_http_mock(get_resp=_resp(500, {"error": "db down"}))
|
mc = _make_http_mock(get_resp=_resp(500, {"error": "db down"}))
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
result = await a2a_tools.tool_check_task_status("ws-1", "d-1")
|
result = await a2a_tools.tool_check_task_status("ws-1", "d-1")
|
||||||
|
|
||||||
assert "Error" in result or "failed" in result.lower()
|
assert "Error" in result or "failed" in result.lower()
|
||||||
|
|||||||
@ -80,10 +80,10 @@ class TestFlagOffLegacyPath:
|
|||||||
async def fake_report_activity(*_a, **_kw):
|
async def fake_report_activity(*_a, **_kw):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
with patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
|
||||||
patch("a2a_tools.discover_peer", side_effect=fake_discover), \
|
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
|
||||||
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
|
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
|
||||||
patch("a2a_tools._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
|
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
|
||||||
result = await a2a_tools.tool_delegate_task(
|
result = await a2a_tools.tool_delegate_task(
|
||||||
"ws-target", "task body", source_workspace_id="ws-self"
|
"ws-target", "task body", source_workspace_id="ws-self"
|
||||||
)
|
)
|
||||||
@ -105,7 +105,7 @@ class TestFlagOnDispatchFailures:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
mc = _make_client(post_exc=httpx.ConnectError("network down"))
|
mc = _make_client(post_exc=httpx.ConnectError("network down"))
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -119,7 +119,7 @@ class TestFlagOnDispatchFailures:
|
|||||||
import a2a_tools
|
import a2a_tools
|
||||||
mc = _make_client(post_resp=_resp(403, {"error": "forbidden"}))
|
mc = _make_client(post_resp=_resp(403, {"error": "forbidden"}))
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -134,7 +134,7 @@ class TestFlagOnDispatchFailures:
|
|||||||
# 202 Accepted but no delegation_id field — defensive shape check.
|
# 202 Accepted but no delegation_id field — defensive shape check.
|
||||||
mc = _make_client(post_resp=_resp(202, {"status": "delegated"}))
|
mc = _make_client(post_resp=_resp(202, {"status": "delegated"}))
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -168,7 +168,7 @@ class TestFlagOnPollingOutcomes:
|
|||||||
get_resps=[_resp(200, [completed_row])],
|
get_resps=[_resp(200, [completed_row])],
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -196,7 +196,7 @@ class TestFlagOnPollingOutcomes:
|
|||||||
get_resps=[_resp(200, [failed_row])],
|
get_resps=[_resp(200, [failed_row])],
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -234,7 +234,7 @@ class TestFlagOnPollingOutcomes:
|
|||||||
get_resps=get_seq,
|
get_resps=get_seq,
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -266,7 +266,7 @@ class TestFlagOnPollingOutcomes:
|
|||||||
get_resps=get_seq,
|
get_resps=get_seq,
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
@ -304,7 +304,7 @@ class TestFlagOnPollingOutcomes:
|
|||||||
get_resps=[first_poll, second_poll],
|
get_resps=[first_poll, second_poll],
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=mc):
|
||||||
res = await a2a_tools._delegate_sync_via_polling(
|
res = await a2a_tools._delegate_sync_via_polling(
|
||||||
"ws-target", "task", "ws-self"
|
"ws-target", "task", "ws-self"
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user