fix(a2a): enqueue Task before TaskStatusUpdateEvent for v1 SDK contract

a2a-sdk ≥ 1.0 raises InvalidAgentResponseError when an executor publishes a
TaskStatusUpdateEvent (e.g. via TaskUpdater.start_work) before any Task
event for fresh requests. The framework only auto-creates the Task on
continuation messages (existing task_id resolves via task_manager.get_task);
new requests leave _task_created unset and the SDK validation at
a2a/server/agent_execution/active_task.py rejects the first status update.

PR #2170 migrated the executor surface to v1 but missed this contract. The
synthetic E2E gate caught it on every staging run since (~1 week silent
fail) with:

    {"jsonrpc":"2.0","id":"e2e-msg-1","error":{"code":-32603,
     "message":"Agent should enqueue Task before TaskStatusUpdateEvent
     event","data":null}}

The fix enqueues a Task(state=SUBMITTED) before the TaskUpdater is
constructed, gated on `context.current_task is None` so continuation
messages don't double-enqueue (which the SDK logs about but doesn't reject).

Tests:

  - test_first_event_is_task_for_new_request — pins the new-request path:
    first enqueue must be a Task with the expected id/context_id
  - test_no_task_enqueue_on_continuation — pins the continuation path: when
    context.current_task is set, the executor must NOT re-enqueue Task
  - conftest: stub Task / TaskStatus / TaskState in the mocked a2a.types
    module so the import inside the executor resolves under unit tests

google-adk adapter does not have this bug — its execute() only emits
Message events, not TaskStatusUpdateEvent. Its cancel() does emit one,
but cancel is rarely-invoked and out of scope for this fix.

Live verification path: this PR's merge → publish-runtime cascade → next
synth-E2E firing should go green at step "8/11 Sending A2A message to
parent — expecting agent response".
This commit is contained in:
Hongming Wang 2026-05-03 03:15:54 -07:00
parent 6f8f7932d2
commit 5c3b79a8ba
3 changed files with 124 additions and 0 deletions

View File

@ -268,6 +268,26 @@ class LangGraphA2AExecutor(AgentExecutor):
task_id = context.task_id or str(uuid.uuid4())
context_id = context.context_id or str(uuid.uuid4())
# A2A v1 contract (a2a-sdk ≥ 1.0): enqueue a Task event before any
# TaskStatusUpdateEvent. The framework only auto-creates the Task
# on continuation messages (existing task_id resolves via
# task_manager.get_task()). For fresh requests get_task() returns
# None and the SDK rejects the first status update with
# InvalidAgentResponseError("Agent should enqueue Task before
# TaskStatusUpdateEvent event") — see a2a/server/agent_execution/
# active_task.py for the validation site. PR #2170 migrated the
# surface to v1 but missed this contract; the synth-E2E gate
# surfaced it on every run after staging deploy.
if getattr(context, "current_task", None) is None:
from a2a.types import Task, TaskState, TaskStatus
await event_queue.enqueue_event(
Task(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.TASK_STATE_SUBMITTED),
)
)
updater = TaskUpdater(event_queue, task_id, context_id)
try:

View File

@ -102,6 +102,35 @@ def _make_a2a_mocks():
types_mod.Message = Message
types_mod.Role = _RoleEnum
# v1 Task / TaskStatus / TaskState — used by the executor's "enqueue Task
# before any TaskStatusUpdateEvent" guard (a2a-sdk ≥ 1.0 contract). The
# stubs preserve every kwarg so tests can assert on Task(id=..., status=...).
class TaskStatus:
def __init__(self, state=None, **kwargs):
self.state = state
for k, v in kwargs.items():
setattr(self, k, v)
class _TaskStateEnum:
TASK_STATE_SUBMITTED = 1
TASK_STATE_WORKING = 2
TASK_STATE_COMPLETED = 3
TASK_STATE_CANCELED = 4
TASK_STATE_FAILED = 5
TASK_STATE_REJECTED = 6
class Task:
def __init__(self, id="", context_id="", status=None, **kwargs):
self.id = id
self.context_id = context_id
self.status = status
for k, v in kwargs.items():
setattr(self, k, v)
types_mod.Task = Task
types_mod.TaskStatus = TaskStatus
types_mod.TaskState = _TaskStateEnum
# a2a.helpers (v1: moved from a2a.utils, renamed new_agent_text_message
# → new_text_message). Mock both names — production code only calls
# new_text_message, but if any test still references the old name it

View File

@ -1048,3 +1048,78 @@ async def test_cancel_emits_canceled_event(monkeypatch):
assert isinstance(event, _TaskStatusUpdateEvent), "expected a TaskStatusUpdateEvent"
assert event.final is True, "cancel event must be marked final=True"
assert event.status.state == _TaskState.TASK_STATE_CANCELED, "cancel event must have state=TASK_STATE_CANCELED"
# ---------------------------------------------------------------------------
# A2A v1 contract — Task event MUST precede any TaskStatusUpdateEvent
# ---------------------------------------------------------------------------
# Regression guard: a2a-sdk ≥ 1.0 raises InvalidAgentResponseError when the
# executor enqueues a TaskStatusUpdateEvent (e.g. via TaskUpdater.start_work)
# before any Task event for fresh requests (no continuation task in the
# task_manager). PR #2170 migrated to v1 but missed this contract; the
# synthetic E2E gate caught it on every staging run with:
# {"error":{"code":-32603,"message":"Agent should enqueue Task before
# TaskStatusUpdateEvent event"}}
# This test pins the executor's first event as a Task instance for the
# new-request path so the regression cannot recur.
@pytest.mark.asyncio
async def test_first_event_is_task_for_new_request():
"""For a new request (context.current_task is None), the executor must
enqueue a Task event before any TaskUpdater status updates."""
from a2a.types import Task
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "Hi"
context = _make_context([part], "ctx-new", task_id="task-new")
context.current_task = None
eq = _make_event_queue()
await executor.execute(context, eq)
# First enqueue must be a Task — TaskUpdater is stubbed in conftest so
# its start_work() does NOT enqueue, leaving the new Task as the only
# framework-protocol event before the terminal Message.
first_call = eq.enqueue_event.call_args_list[0]
first_event = first_call[0][0]
assert isinstance(first_event, Task), (
f"expected first event to be Task, got {type(first_event).__name__}"
)
assert first_event.id == "task-new"
assert first_event.context_id == "ctx-new"
@pytest.mark.asyncio
async def test_no_task_enqueue_on_continuation():
"""For a continuation request (context.current_task is set), the executor
must NOT enqueue a Task the framework already knows about it. Re-
enqueueing causes the SDK to log 'Task already exists. Ignoring task
replacement.' and confuses the task store."""
from a2a.types import Task
agent = MagicMock()
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
executor = LangGraphA2AExecutor(agent)
part = MagicMock()
part.text = "Followup"
context = _make_context([part], "ctx-cont", task_id="task-cont")
# Simulate the framework having already discovered the task.
context.current_task = Task(id="task-cont", context_id="ctx-cont")
eq = _make_event_queue()
await executor.execute(context, eq)
# No enqueued event should be a Task — TaskUpdater stubs are no-ops, so
# the only events should be the executor's own (Message at end).
for call in eq.enqueue_event.call_args_list:
event = call[0][0]
assert not isinstance(event, Task), (
f"continuation must not re-enqueue Task, but got Task at {call}"
)