Merge pull request #2558 from Molecule-AI/fix/a2a-v1-task-enqueue
fix(a2a): enqueue Task before TaskStatusUpdateEvent for v1 SDK contract
This commit is contained in:
commit
750b32c33f
@ -268,6 +268,26 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
task_id = context.task_id or str(uuid.uuid4())
|
||||
context_id = context.context_id or str(uuid.uuid4())
|
||||
|
||||
# A2A v1 contract (a2a-sdk ≥ 1.0): enqueue a Task event before any
|
||||
# TaskStatusUpdateEvent. The framework only auto-creates the Task
|
||||
# on continuation messages (existing task_id resolves via
|
||||
# task_manager.get_task()). For fresh requests get_task() returns
|
||||
# None and the SDK rejects the first status update with
|
||||
# InvalidAgentResponseError("Agent should enqueue Task before
|
||||
# TaskStatusUpdateEvent event") — see a2a/server/agent_execution/
|
||||
# active_task.py for the validation site. PR #2170 migrated the
|
||||
# surface to v1 but missed this contract; the synth-E2E gate
|
||||
# surfaced it on every run after staging deploy.
|
||||
if getattr(context, "current_task", None) is None:
|
||||
from a2a.types import Task, TaskState, TaskStatus
|
||||
await event_queue.enqueue_event(
|
||||
Task(
|
||||
id=task_id,
|
||||
context_id=context_id,
|
||||
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
|
||||
)
|
||||
)
|
||||
|
||||
updater = TaskUpdater(event_queue, task_id, context_id)
|
||||
|
||||
try:
|
||||
|
||||
@ -102,6 +102,35 @@ def _make_a2a_mocks():
|
||||
types_mod.Message = Message
|
||||
types_mod.Role = _RoleEnum
|
||||
|
||||
# v1 Task / TaskStatus / TaskState — used by the executor's "enqueue Task
|
||||
# before any TaskStatusUpdateEvent" guard (a2a-sdk ≥ 1.0 contract). The
|
||||
# stubs preserve every kwarg so tests can assert on Task(id=..., status=...).
|
||||
class TaskStatus:
|
||||
def __init__(self, state=None, **kwargs):
|
||||
self.state = state
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
class _TaskStateEnum:
|
||||
TASK_STATE_SUBMITTED = 1
|
||||
TASK_STATE_WORKING = 2
|
||||
TASK_STATE_COMPLETED = 3
|
||||
TASK_STATE_CANCELED = 4
|
||||
TASK_STATE_FAILED = 5
|
||||
TASK_STATE_REJECTED = 6
|
||||
|
||||
class Task:
|
||||
def __init__(self, id="", context_id="", status=None, **kwargs):
|
||||
self.id = id
|
||||
self.context_id = context_id
|
||||
self.status = status
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
types_mod.Task = Task
|
||||
types_mod.TaskStatus = TaskStatus
|
||||
types_mod.TaskState = _TaskStateEnum
|
||||
|
||||
# a2a.helpers (v1: moved from a2a.utils, renamed new_agent_text_message
|
||||
# → new_text_message). Mock both names — production code only calls
|
||||
# new_text_message, but if any test still references the old name it
|
||||
|
||||
@ -1048,3 +1048,78 @@ async def test_cancel_emits_canceled_event(monkeypatch):
|
||||
assert isinstance(event, _TaskStatusUpdateEvent), "expected a TaskStatusUpdateEvent"
|
||||
assert event.final is True, "cancel event must be marked final=True"
|
||||
assert event.status.state == _TaskState.TASK_STATE_CANCELED, "cancel event must have state=TASK_STATE_CANCELED"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# A2A v1 contract — Task event MUST precede any TaskStatusUpdateEvent
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regression guard: a2a-sdk ≥ 1.0 raises InvalidAgentResponseError when the
|
||||
# executor enqueues a TaskStatusUpdateEvent (e.g. via TaskUpdater.start_work)
|
||||
# before any Task event for fresh requests (no continuation task in the
|
||||
# task_manager). PR #2170 migrated to v1 but missed this contract; the
|
||||
# synthetic E2E gate caught it on every staging run with:
|
||||
# {"error":{"code":-32603,"message":"Agent should enqueue Task before
|
||||
# TaskStatusUpdateEvent event"}}
|
||||
# This test pins the executor's first event as a Task instance for the
|
||||
# new-request path so the regression cannot recur.
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_event_is_task_for_new_request():
|
||||
"""For a new request (context.current_task is None), the executor must
|
||||
enqueue a Task event before any TaskUpdater status updates."""
|
||||
from a2a.types import Task
|
||||
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Hi"
|
||||
|
||||
context = _make_context([part], "ctx-new", task_id="task-new")
|
||||
context.current_task = None
|
||||
eq = _make_event_queue()
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
# First enqueue must be a Task — TaskUpdater is stubbed in conftest so
|
||||
# its start_work() does NOT enqueue, leaving the new Task as the only
|
||||
# framework-protocol event before the terminal Message.
|
||||
first_call = eq.enqueue_event.call_args_list[0]
|
||||
first_event = first_call[0][0]
|
||||
assert isinstance(first_event, Task), (
|
||||
f"expected first event to be Task, got {type(first_event).__name__}"
|
||||
)
|
||||
assert first_event.id == "task-new"
|
||||
assert first_event.context_id == "ctx-new"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_task_enqueue_on_continuation():
|
||||
"""For a continuation request (context.current_task is set), the executor
|
||||
must NOT enqueue a Task — the framework already knows about it. Re-
|
||||
enqueueing causes the SDK to log 'Task already exists. Ignoring task
|
||||
replacement.' and confuses the task store."""
|
||||
from a2a.types import Task
|
||||
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Followup"
|
||||
|
||||
context = _make_context([part], "ctx-cont", task_id="task-cont")
|
||||
# Simulate the framework having already discovered the task.
|
||||
context.current_task = Task(id="task-cont", context_id="ctx-cont")
|
||||
eq = _make_event_queue()
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
# No enqueued event should be a Task — TaskUpdater stubs are no-ops, so
|
||||
# the only events should be the executor's own (Message at end).
|
||||
for call in eq.enqueue_event.call_args_list:
|
||||
event = call[0][0]
|
||||
assert not isinstance(event, Task), (
|
||||
f"continuation must not re-enqueue Task, but got Task at {call}"
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user