From 5c3b79a8ba3cea5b820e508ae510f41e948b37f5 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 3 May 2026 03:15:54 -0700 Subject: [PATCH] fix(a2a): enqueue Task before TaskStatusUpdateEvent for v1 SDK contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit a2a-sdk ≥ 1.0 raises InvalidAgentResponseError when an executor publishes a TaskStatusUpdateEvent (e.g. via TaskUpdater.start_work) before any Task event for fresh requests. The framework only auto-creates the Task on continuation messages (existing task_id resolves via task_manager.get_task); new requests leave _task_created unset and the SDK validation at a2a/server/agent_execution/active_task.py rejects the first status update. PR #2170 migrated the executor surface to v1 but missed this contract. The synthetic E2E gate caught it on every staging run since (~1 week silent fail) with: {"jsonrpc":"2.0","id":"e2e-msg-1","error":{"code":-32603, "message":"Agent should enqueue Task before TaskStatusUpdateEvent event","data":null}} The fix enqueues a Task(state=SUBMITTED) before the TaskUpdater is constructed, gated on `context.current_task is None` so continuation messages don't double-enqueue (which the SDK logs about but doesn't reject). Tests: - test_first_event_is_task_for_new_request — pins the new-request path: first enqueue must be a Task with the expected id/context_id - test_no_task_enqueue_on_continuation — pins the continuation path: when context.current_task is set, the executor must NOT re-enqueue Task - conftest: stub Task / TaskStatus / TaskState in the mocked a2a.types module so the import inside the executor resolves under unit tests google-adk adapter does not have this bug — its execute() only emits Message events, not TaskStatusUpdateEvent. Its cancel() does emit one, but cancel is rarely-invoked and out of scope for this fix. Live verification path: this PR's merge → publish-runtime cascade → next synth-E2E firing should go green at step "8/11 Sending A2A message to parent — expecting agent response". --- workspace/a2a_executor.py | 20 ++++++++ workspace/tests/conftest.py | 29 +++++++++++ workspace/tests/test_a2a_executor.py | 75 ++++++++++++++++++++++++++++ 3 files changed, 124 insertions(+) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index bbda258c..38860c03 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -268,6 +268,26 @@ class LangGraphA2AExecutor(AgentExecutor): task_id = context.task_id or str(uuid.uuid4()) context_id = context.context_id or str(uuid.uuid4()) + # A2A v1 contract (a2a-sdk ≥ 1.0): enqueue a Task event before any + # TaskStatusUpdateEvent. The framework only auto-creates the Task + # on continuation messages (existing task_id resolves via + # task_manager.get_task()). For fresh requests get_task() returns + # None and the SDK rejects the first status update with + # InvalidAgentResponseError("Agent should enqueue Task before + # TaskStatusUpdateEvent event") — see a2a/server/agent_execution/ + # active_task.py for the validation site. PR #2170 migrated the + # surface to v1 but missed this contract; the synth-E2E gate + # surfaced it on every run after staging deploy. + if getattr(context, "current_task", None) is None: + from a2a.types import Task, TaskState, TaskStatus + await event_queue.enqueue_event( + Task( + id=task_id, + context_id=context_id, + status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED), + ) + ) + updater = TaskUpdater(event_queue, task_id, context_id) try: diff --git a/workspace/tests/conftest.py b/workspace/tests/conftest.py index 4368bc79..0d130a6f 100644 --- a/workspace/tests/conftest.py +++ b/workspace/tests/conftest.py @@ -102,6 +102,35 @@ def _make_a2a_mocks(): types_mod.Message = Message types_mod.Role = _RoleEnum + # v1 Task / TaskStatus / TaskState — used by the executor's "enqueue Task + # before any TaskStatusUpdateEvent" guard (a2a-sdk ≥ 1.0 contract). The + # stubs preserve every kwarg so tests can assert on Task(id=..., status=...). + class TaskStatus: + def __init__(self, state=None, **kwargs): + self.state = state + for k, v in kwargs.items(): + setattr(self, k, v) + + class _TaskStateEnum: + TASK_STATE_SUBMITTED = 1 + TASK_STATE_WORKING = 2 + TASK_STATE_COMPLETED = 3 + TASK_STATE_CANCELED = 4 + TASK_STATE_FAILED = 5 + TASK_STATE_REJECTED = 6 + + class Task: + def __init__(self, id="", context_id="", status=None, **kwargs): + self.id = id + self.context_id = context_id + self.status = status + for k, v in kwargs.items(): + setattr(self, k, v) + + types_mod.Task = Task + types_mod.TaskStatus = TaskStatus + types_mod.TaskState = _TaskStateEnum + # a2a.helpers (v1: moved from a2a.utils, renamed new_agent_text_message # → new_text_message). Mock both names — production code only calls # new_text_message, but if any test still references the old name it diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index 98ad19aa..134c56ba 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -1048,3 +1048,78 @@ async def test_cancel_emits_canceled_event(monkeypatch): assert isinstance(event, _TaskStatusUpdateEvent), "expected a TaskStatusUpdateEvent" assert event.final is True, "cancel event must be marked final=True" assert event.status.state == _TaskState.TASK_STATE_CANCELED, "cancel event must have state=TASK_STATE_CANCELED" + + +# --------------------------------------------------------------------------- +# A2A v1 contract — Task event MUST precede any TaskStatusUpdateEvent +# --------------------------------------------------------------------------- +# Regression guard: a2a-sdk ≥ 1.0 raises InvalidAgentResponseError when the +# executor enqueues a TaskStatusUpdateEvent (e.g. via TaskUpdater.start_work) +# before any Task event for fresh requests (no continuation task in the +# task_manager). PR #2170 migrated to v1 but missed this contract; the +# synthetic E2E gate caught it on every staging run with: +# {"error":{"code":-32603,"message":"Agent should enqueue Task before +# TaskStatusUpdateEvent event"}} +# This test pins the executor's first event as a Task instance for the +# new-request path so the regression cannot recur. + +@pytest.mark.asyncio +async def test_first_event_is_task_for_new_request(): + """For a new request (context.current_task is None), the executor must + enqueue a Task event before any TaskUpdater status updates.""" + from a2a.types import Task + + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Hi" + + context = _make_context([part], "ctx-new", task_id="task-new") + context.current_task = None + eq = _make_event_queue() + + await executor.execute(context, eq) + + # First enqueue must be a Task — TaskUpdater is stubbed in conftest so + # its start_work() does NOT enqueue, leaving the new Task as the only + # framework-protocol event before the terminal Message. + first_call = eq.enqueue_event.call_args_list[0] + first_event = first_call[0][0] + assert isinstance(first_event, Task), ( + f"expected first event to be Task, got {type(first_event).__name__}" + ) + assert first_event.id == "task-new" + assert first_event.context_id == "ctx-new" + + +@pytest.mark.asyncio +async def test_no_task_enqueue_on_continuation(): + """For a continuation request (context.current_task is set), the executor + must NOT enqueue a Task — the framework already knows about it. Re- + enqueueing causes the SDK to log 'Task already exists. Ignoring task + replacement.' and confuses the task store.""" + from a2a.types import Task + + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Followup" + + context = _make_context([part], "ctx-cont", task_id="task-cont") + # Simulate the framework having already discovered the task. + context.current_task = Task(id="task-cont", context_id="ctx-cont") + eq = _make_event_queue() + + await executor.execute(context, eq) + + # No enqueued event should be a Task — TaskUpdater stubs are no-ops, so + # the only events should be the executor's own (Message at end). + for call in eq.enqueue_event.call_args_list: + event = call[0][0] + assert not isinstance(event, Task), ( + f"continuation must not re-enqueue Task, but got Task at {call}" + )