Merge pull request #2844 from Molecule-AI/feat/rfc2829-pr5-agent-side-cutover
feat(delegations): agent-side cutover — sync delegate uses async+poll path (RFC #2829 PR-5)
This commit is contained in:
commit
52915268b2
@ -191,6 +191,145 @@ async def report_activity(
|
||||
pass # Best-effort — don't block delegation on activity reporting
|
||||
|
||||
|
||||
# RFC #2829 PR-5 cutover constants. The poll cadence + timeout are
|
||||
# intentionally generous: 3s gives the platform's executeDelegation
|
||||
# goroutine room to dispatch + the callee to respond + the result to
|
||||
# write to activity_logs without thrashing the platform with rapid
|
||||
# polls; the budget matches the legacy DELEGATION_TIMEOUT (300s) so
|
||||
# operators don't see behavior change beyond "no more 600s timeouts".
|
||||
_SYNC_POLL_INTERVAL_S = 3.0
|
||||
_SYNC_POLL_BUDGET_S = float(os.environ.get("DELEGATION_TIMEOUT", "300.0"))
|
||||
|
||||
|
||||
async def _delegate_sync_via_polling(
|
||||
workspace_id: str,
|
||||
task: str,
|
||||
src: str,
|
||||
) -> str:
|
||||
"""RFC #2829 PR-5: durable async delegation + poll for terminal status.
|
||||
|
||||
Sidesteps the platform proxy's blocking `message/send` HTTP path that
|
||||
hits a hard 600s ceiling. Instead:
|
||||
|
||||
1. POST /workspaces/<src>/delegate (async, returns 202 + delegation_id)
|
||||
— platform's executeDelegation goroutine handles A2A dispatch in
|
||||
the background. No client-side timeout dependency on the platform
|
||||
holding a connection open.
|
||||
2. Poll GET /workspaces/<src>/delegations every 3s for a row with
|
||||
matching delegation_id reaching terminal status (completed/failed).
|
||||
3. Return the response_preview text on completed; surface error_detail
|
||||
on failed (with the same _A2A_ERROR_PREFIX wrapping the legacy
|
||||
path uses, so caller error-detection logic is unchanged).
|
||||
|
||||
Both /delegate and /delegations are existing endpoints — this helper
|
||||
just composes them into a polling synchronous facade. The result is
|
||||
available the moment the platform writes the terminal status row;
|
||||
no extra latency vs. the legacy proxy-blocked path on fast cases.
|
||||
"""
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
idem_key = hashlib.sha256(f"{src}:{workspace_id}:{task}".encode()).hexdigest()[:32]
|
||||
|
||||
# 1. Dispatch via /delegate (the async, durable path).
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{src}/delegate",
|
||||
json={
|
||||
"target_id": workspace_id,
|
||||
"task": task,
|
||||
"idempotency_key": idem_key,
|
||||
},
|
||||
headers=_auth_headers_for_heartbeat(src),
|
||||
)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: {e}"
|
||||
|
||||
if resp.status_code != 202 and resp.status_code != 200:
|
||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch failed: HTTP {resp.status_code} {resp.text[:200]}"
|
||||
|
||||
try:
|
||||
dispatch = resp.json()
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch returned non-JSON: {e}"
|
||||
|
||||
delegation_id = dispatch.get("delegation_id", "")
|
||||
if not delegation_id:
|
||||
return f"{_A2A_ERROR_PREFIX}delegate dispatch missing delegation_id: {dispatch}"
|
||||
|
||||
# 2. Poll for terminal status with a deadline. Each poll is a cheap
|
||||
# /delegations GET — bounded by the platform's existing rate limit.
|
||||
deadline = time.monotonic() + _SYNC_POLL_BUDGET_S
|
||||
last_status = "unknown"
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
poll = await client.get(
|
||||
f"{PLATFORM_URL}/workspaces/{src}/delegations",
|
||||
headers=_auth_headers_for_heartbeat(src),
|
||||
)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
# Transient — keep polling. The platform IS holding the
|
||||
# delegation row; we just lost a network request.
|
||||
last_status = f"poll-error: {e}"
|
||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||
continue
|
||||
|
||||
if poll.status_code != 200:
|
||||
last_status = f"poll HTTP {poll.status_code}"
|
||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||
continue
|
||||
|
||||
try:
|
||||
rows = poll.json()
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
last_status = f"poll non-JSON: {e}"
|
||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||
continue
|
||||
|
||||
# /delegations returns a flat list of delegation events. Filter to
|
||||
# our delegation_id; pick the first terminal one. The list may
|
||||
# have multiple rows per delegation_id (one for the original
|
||||
# dispatch, one per status update); we want the latest terminal.
|
||||
if not isinstance(rows, list):
|
||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||
continue
|
||||
terminal = None
|
||||
for r in rows:
|
||||
if not isinstance(r, dict):
|
||||
continue
|
||||
if r.get("delegation_id") != delegation_id:
|
||||
continue
|
||||
status = (r.get("status") or "").lower()
|
||||
last_status = status
|
||||
if status in ("completed", "failed"):
|
||||
terminal = r
|
||||
break
|
||||
if terminal:
|
||||
if (terminal.get("status") or "").lower() == "completed":
|
||||
return terminal.get("response_preview") or ""
|
||||
err = (
|
||||
terminal.get("error_detail")
|
||||
or terminal.get("summary")
|
||||
or "delegation failed"
|
||||
)
|
||||
return f"{_A2A_ERROR_PREFIX}{err}"
|
||||
|
||||
await asyncio.sleep(_SYNC_POLL_INTERVAL_S)
|
||||
|
||||
# Budget exhausted — the platform's row is still in flight (or queued).
|
||||
# Surface as an error so the caller can decide to retry or fall back;
|
||||
# the platform DOES still have the durable row, so the work isn't
|
||||
# lost — it'll complete eventually and a future check_task_status
|
||||
# will surface the result.
|
||||
return (
|
||||
f"{_A2A_ERROR_PREFIX}polling timeout after {_SYNC_POLL_BUDGET_S}s "
|
||||
f"(delegation_id={delegation_id}, last_status={last_status}); "
|
||||
f"the platform is still working on it — call check_task_status('{delegation_id}') to retrieve later"
|
||||
)
|
||||
|
||||
|
||||
async def tool_delegate_task(
|
||||
workspace_id: str,
|
||||
task: str,
|
||||
@ -229,10 +368,22 @@ async def tool_delegate_task(
|
||||
# Brief summary for canvas display — just the delegation target
|
||||
await report_activity("a2a_send", workspace_id, f"Delegating to {peer_name}", task_text=task)
|
||||
|
||||
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
|
||||
# (the platform proxy) so the same code works for in-container and
|
||||
# external (standalone molecule-mcp) callers.
|
||||
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
||||
# RFC #2829 PR-5: agent-side cutover. When DELEGATION_SYNC_VIA_INBOX=1,
|
||||
# use the platform's durable async delegation API (POST /delegate +
|
||||
# poll /delegations) instead of the proxy-blocked message/send path.
|
||||
# This sidesteps the 600s message/send timeout class that broke
|
||||
# iteration-14/90-style long-running delegations on 2026-05-05.
|
||||
#
|
||||
# Default off — staging-canary first, flip default after PR-2's
|
||||
# result-push flag (DELEGATION_RESULT_INBOX_PUSH) has been on for
|
||||
# ≥1 week without incident.
|
||||
if os.environ.get("DELEGATION_SYNC_VIA_INBOX") == "1":
|
||||
result = await _delegate_sync_via_polling(workspace_id, task, src or WORKSPACE_ID)
|
||||
else:
|
||||
# send_a2a_message routes through ${PLATFORM_URL}/workspaces/{id}/a2a
|
||||
# (the platform proxy) so the same code works for in-container and
|
||||
# external (standalone molecule-mcp) callers.
|
||||
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
|
||||
|
||||
# Detect delegation failures — wrap them clearly so the calling agent
|
||||
# can decide to retry, use another peer, or handle the task itself.
|
||||
|
||||
321
workspace/tests/test_delegation_sync_via_polling.py
Normal file
321
workspace/tests/test_delegation_sync_via_polling.py
Normal file
@ -0,0 +1,321 @@
|
||||
"""RFC #2829 PR-5: tests for the agent-side cutover that replaces the
|
||||
proxy-blocked send_a2a_message sync path with delegate-then-poll.
|
||||
|
||||
Coverage:
|
||||
|
||||
- Flag off (default) → byte-identical to legacy: tool_delegate_task
|
||||
calls send_a2a_message and never touches /delegate.
|
||||
- Flag on, dispatch fails → wrapped error returned, no infinite poll.
|
||||
- Flag on, dispatch returns no delegation_id → wrapped error.
|
||||
- Flag on, completed status on first poll → response_preview returned.
|
||||
- Flag on, failed status → wrapped error with error_detail.
|
||||
- Flag on, transient poll error → keeps polling, eventually succeeds.
|
||||
- Flag on, deadline exceeded → wrapped timeout error mentions
|
||||
delegation_id so caller can pick it up via check_task_status later.
|
||||
- Idempotency key is consistent with the legacy path's hashing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
# WORKSPACE_ID + PLATFORM_URL are checked at a2a_client import time.
|
||||
# CI ships them via the workflow env block; for local pytest runs we
|
||||
# set them here so the test file can import a2a_tools at module scope
|
||||
# (matching the pattern in test_a2a_tools_impl.py — that file relies
|
||||
# on the same CI env shape).
|
||||
os.environ.setdefault("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
|
||||
os.environ.setdefault("PLATFORM_URL", "http://localhost:8080")
|
||||
|
||||
|
||||
def _resp(status_code, payload, text=None):
|
||||
r = MagicMock()
|
||||
r.status_code = status_code
|
||||
r.json = MagicMock(return_value=payload)
|
||||
r.text = text or json.dumps(payload)
|
||||
return r
|
||||
|
||||
|
||||
def _make_client(post_resp=None, get_resps=None, post_exc=None):
|
||||
"""Build an AsyncClient mock where get() returns a sequence of responses
|
||||
(one per call) so we can simulate multiple poll rounds.
|
||||
"""
|
||||
mc = AsyncMock()
|
||||
mc.__aenter__ = AsyncMock(return_value=mc)
|
||||
mc.__aexit__ = AsyncMock(return_value=False)
|
||||
if post_exc is not None:
|
||||
mc.post = AsyncMock(side_effect=post_exc)
|
||||
else:
|
||||
mc.post = AsyncMock(return_value=post_resp or _resp(202, {"delegation_id": "deleg-1"}))
|
||||
if get_resps is None:
|
||||
get_resps = [_resp(200, [])]
|
||||
mc.get = AsyncMock(side_effect=get_resps)
|
||||
return mc
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Flag-off: legacy path is preserved
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFlagOffLegacyPath:
|
||||
|
||||
async def test_flag_off_uses_send_a2a_message_not_polling(self, monkeypatch):
|
||||
"""With DELEGATION_SYNC_VIA_INBOX unset, tool_delegate_task must
|
||||
invoke the legacy send_a2a_message and NEVER call /delegate."""
|
||||
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
|
||||
|
||||
import a2a_tools
|
||||
send_calls = []
|
||||
|
||||
async def fake_send(workspace_id, task, source_workspace_id=None):
|
||||
send_calls.append((workspace_id, task, source_workspace_id))
|
||||
return "legacy ok"
|
||||
|
||||
async def fake_discover(*_a, **_kw):
|
||||
return {"name": "peer-name", "status": "online"}
|
||||
|
||||
async def fake_report_activity(*_a, **_kw):
|
||||
return None
|
||||
|
||||
with patch("a2a_tools.send_a2a_message", side_effect=fake_send), \
|
||||
patch("a2a_tools.discover_peer", side_effect=fake_discover), \
|
||||
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
|
||||
patch("a2a_tools._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
|
||||
result = await a2a_tools.tool_delegate_task(
|
||||
"ws-target", "task body", source_workspace_id="ws-self"
|
||||
)
|
||||
|
||||
assert result == "legacy ok", f"expected legacy passthrough, got {result!r}"
|
||||
assert send_calls == [("ws-target", "task body", "ws-self")]
|
||||
poll_mock.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Flag-on: dispatch failures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFlagOnDispatchFailures:
|
||||
|
||||
async def test_dispatch_http_exception_returns_wrapped_error(self, monkeypatch):
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
|
||||
import a2a_tools
|
||||
mc = _make_client(post_exc=httpx.ConnectError("network down"))
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
|
||||
assert "delegate dispatch failed" in res
|
||||
|
||||
async def test_dispatch_non_2xx_returns_wrapped_error(self, monkeypatch):
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
|
||||
import a2a_tools
|
||||
mc = _make_client(post_resp=_resp(403, {"error": "forbidden"}))
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
|
||||
assert "HTTP 403" in res
|
||||
|
||||
async def test_dispatch_missing_delegation_id_returns_wrapped_error(self, monkeypatch):
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
|
||||
import a2a_tools
|
||||
# 202 Accepted but no delegation_id field — defensive shape check.
|
||||
mc = _make_client(post_resp=_resp(202, {"status": "delegated"}))
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
|
||||
assert "missing delegation_id" in res
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Flag-on: polling outcomes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFlagOnPollingOutcomes:
|
||||
|
||||
async def test_completed_first_poll_returns_response_preview(self, monkeypatch):
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
# Tighten budget to a few seconds so the test never blocks long.
|
||||
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
|
||||
|
||||
import importlib
|
||||
import a2a_tools
|
||||
importlib.reload(a2a_tools) # pick up new env-driven _SYNC_POLL_BUDGET_S
|
||||
|
||||
completed_row = {
|
||||
"delegation_id": "deleg-1",
|
||||
"status": "completed",
|
||||
"response_preview": "the answer",
|
||||
}
|
||||
mc = _make_client(
|
||||
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
|
||||
get_resps=[_resp(200, [completed_row])],
|
||||
)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res == "the answer"
|
||||
# Cleanup: restore the module to default state for subsequent tests.
|
||||
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
async def test_failed_status_returns_wrapped_error_with_detail(self, monkeypatch):
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
|
||||
|
||||
import importlib
|
||||
import a2a_tools
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
failed_row = {
|
||||
"delegation_id": "deleg-1",
|
||||
"status": "failed",
|
||||
"error_detail": "callee unreachable",
|
||||
}
|
||||
mc = _make_client(
|
||||
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
|
||||
get_resps=[_resp(200, [failed_row])],
|
||||
)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
|
||||
assert "callee unreachable" in res
|
||||
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
async def test_transient_poll_error_then_completed_succeeds(self, monkeypatch):
|
||||
"""A network blip during polling must NOT abort — keep polling."""
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
monkeypatch.setenv("DELEGATION_TIMEOUT", "30")
|
||||
|
||||
import importlib
|
||||
import a2a_tools
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
# Speed up: monkey-patch the poll interval to 0.01s so we don't
|
||||
# actually wait 3s between rounds in the test.
|
||||
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01)
|
||||
|
||||
completed_row = {
|
||||
"delegation_id": "deleg-1",
|
||||
"status": "completed",
|
||||
"response_preview": "eventually ok",
|
||||
}
|
||||
# First poll raises, second poll returns completed.
|
||||
get_seq = [
|
||||
httpx.ConnectError("transient"),
|
||||
_resp(200, [completed_row]),
|
||||
]
|
||||
mc = _make_client(
|
||||
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
|
||||
get_resps=get_seq,
|
||||
)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res == "eventually ok"
|
||||
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
async def test_deadline_exceeded_returns_recovery_hint(self, monkeypatch):
|
||||
"""When the budget runs out without a terminal status, the error
|
||||
must surface delegation_id + a check_task_status hint so the
|
||||
caller can recover the result."""
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
monkeypatch.setenv("DELEGATION_TIMEOUT", "1") # 1s budget
|
||||
|
||||
import importlib
|
||||
import a2a_tools
|
||||
importlib.reload(a2a_tools)
|
||||
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.05)
|
||||
|
||||
# Endless in-progress responses.
|
||||
in_progress_row = {
|
||||
"delegation_id": "deleg-1",
|
||||
"status": "in_progress",
|
||||
}
|
||||
get_seq = [_resp(200, [in_progress_row])] * 50
|
||||
mc = _make_client(
|
||||
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
|
||||
get_resps=get_seq,
|
||||
)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res.startswith(a2a_tools._A2A_ERROR_PREFIX)
|
||||
assert "polling timeout" in res
|
||||
assert "deleg-1" in res, "must surface delegation_id for recovery"
|
||||
assert "check_task_status" in res, "must hint at the recovery tool"
|
||||
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
async def test_poll_filters_by_delegation_id_ignoring_other_rows(self, monkeypatch):
|
||||
"""Other delegations' rows in the response must NOT be picked up
|
||||
by mistake — we pin to delegation_id."""
|
||||
monkeypatch.setenv("DELEGATION_SYNC_VIA_INBOX", "1")
|
||||
monkeypatch.setenv("DELEGATION_TIMEOUT", "10")
|
||||
|
||||
import importlib
|
||||
import a2a_tools
|
||||
importlib.reload(a2a_tools)
|
||||
monkeypatch.setattr(a2a_tools, "_SYNC_POLL_INTERVAL_S", 0.01)
|
||||
|
||||
# First poll: no row matching ours, BUT a completed row for
|
||||
# someone else's delegation. We must NOT return that one.
|
||||
# Second poll: ours completes.
|
||||
first_poll = _resp(200, [
|
||||
{"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"},
|
||||
])
|
||||
second_poll = _resp(200, [
|
||||
{"delegation_id": "deleg-OTHER", "status": "completed", "response_preview": "wrong"},
|
||||
{"delegation_id": "deleg-1", "status": "completed", "response_preview": "right"},
|
||||
])
|
||||
mc = _make_client(
|
||||
post_resp=_resp(202, {"delegation_id": "deleg-1"}),
|
||||
get_resps=[first_poll, second_poll],
|
||||
)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
res = await a2a_tools._delegate_sync_via_polling(
|
||||
"ws-target", "task", "ws-self"
|
||||
)
|
||||
|
||||
assert res == "right", f"must filter to delegation_id, got {res!r}"
|
||||
monkeypatch.delenv("DELEGATION_TIMEOUT", raising=False)
|
||||
importlib.reload(a2a_tools)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# pytest-asyncio collection marker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
Loading…
Reference in New Issue
Block a user