From e1628c4d56d753ee38632a5d7dbdf10954fe4490 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 3 May 2026 04:06:45 -0700 Subject: [PATCH] fix(a2a): route terminal Message via TaskUpdater.complete/failed in task mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #2558 enqueued a Task at the start of new requests so the v1 SDK would accept TaskUpdater.start_work() — fix #1 of the v0→v1 migration gap (PR #2170). But after Task is enqueued, the executor enters "task mode" and the SDK rejects raw Message enqueues at the terminal step: {"code":-32603,"message":"Received Message object in task mode. Use TaskStatusUpdateEvent or TaskArtifactUpdateEvent instead."} Synth-E2E 2026-05-03T11:00:34Z surfaced this on the very first run after the prior fix cascaded. Validation site is the same a2a/server/agent_execution/active_task.py — the framework's job is to enforce the v1 invariant; we're catching up to it. The fix routes both terminal events through TaskUpdater helpers: - success: updater.complete(message=msg) wraps in TaskStatusUpdateEvent(state=COMPLETED, final=True) - error: updater.failed(message=...) wraps in TaskStatusUpdateEvent(state=FAILED, final=True) Both helpers exist in a2a-sdk ≥ 1.0; verified via TaskUpdater.complete signature. Tests: - conftest TaskUpdater stub now records complete/failed calls AND routes the message back through event_queue.enqueue_event so the ~20 legacy tests asserting on enqueue_event keep working - 2 new regression tests pin the contract: * test_terminal_success_routes_via_updater_complete * test_terminal_error_routes_via_updater_failed - Both NEW tests verified to FAIL on staging-baseline (without this fix) and PASS with it — they'd catch the regression before staging if the wheel-smoke gate covered task-mode terminal events too (separate yak-shave for #131 follow-up) Co-Authored-By: Claude Opus 4.7 (1M context) --- workspace/a2a_executor.py | 21 ++++++-- workspace/tests/conftest.py | 26 +++++++--- workspace/tests/test_a2a_executor.py | 78 ++++++++++++++++++++++++++++ 3 files changed, 114 insertions(+), 11 deletions(-) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 38860c03..9b4d9464 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -509,7 +509,15 @@ class LangGraphA2AExecutor(AgentExecutor): # accept the assignment. See #1787 + commit dcbcf19 # for the original test-mock motivation. logger.debug("metadata attach skipped (non-Message return from new_text_message)") - await event_queue.enqueue_event(msg) + # A2A v1 (a2a-sdk ≥ 1.0): once Task is enqueued (above, PR #2558), + # the executor is in task mode and raw Message enqueues are + # rejected with InvalidAgentResponseError("Received Message + # object in task mode. Use TaskStatusUpdateEvent or + # TaskArtifactUpdateEvent instead."). updater.complete() + # wraps the Message in a terminal TaskStatusUpdateEvent + # (state=COMPLETED, final=True) which both streaming and + # non-streaming clients accept. + await updater.complete(message=msg) _result = final_text except Exception as e: @@ -520,10 +528,13 @@ class LangGraphA2AExecutor(AgentExecutor): task_span.set_status(StatusCode.ERROR, str(e)) except Exception: pass - # Emit a Message so both streaming and non-streaming clients - # receive an error response rather than hanging. - await event_queue.enqueue_event( - new_text_message( + # A2A v1: in task mode, terminal errors must publish a + # FAILED TaskStatusUpdateEvent (carrying the error Message) + # rather than a raw Message enqueue. updater.failed() does + # exactly this — both streaming and non-streaming clients + # receive the error and stop polling. + await updater.failed( + message=new_text_message( f"Agent error: {e}", task_id=task_id, context_id=context_id ) ) diff --git a/workspace/tests/conftest.py b/workspace/tests/conftest.py index 0d130a6f..cb1b75b4 100644 --- a/workspace/tests/conftest.py +++ b/workspace/tests/conftest.py @@ -35,27 +35,41 @@ def _make_a2a_mocks(): events_mod.EventQueue = EventQueue - # a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops. - # In tests, TaskUpdater calls go to this stub rather than the real SDK so - # event_queue.enqueue_event is only called via explicit executor code paths. + # a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops + # for status transitions but ROUTE the terminal message back through + # event_queue.enqueue_event so legacy assertions on enqueue_event keep + # working. The wrapper preserves identity (the same Message object the + # executor passed in) so tests inspecting str(event_arg) still see the + # response text. complete()/failed() also record their last call on the + # event_queue itself (`_complete_calls`, `_failed_calls`) so the v1 + # contract regression test (#262 follow-on to #2558) can pin the proper + # path was taken — raw enqueue from executor would NOT touch these. tasks_mod = ModuleType("a2a.server.tasks") class TaskUpdater: - """Stub TaskUpdater — no-op async methods for unit tests.""" + """Stub TaskUpdater — terminal helpers route through event_queue.""" def __init__(self, event_queue, task_id, context_id, *args, **kwargs): self.event_queue = event_queue self.task_id = task_id self.context_id = context_id + if not hasattr(event_queue, "_complete_calls"): + event_queue._complete_calls = [] + if not hasattr(event_queue, "_failed_calls"): + event_queue._failed_calls = [] async def start_work(self, message=None): pass async def complete(self, message=None): - pass + self.event_queue._complete_calls.append(message) + if message is not None: + await self.event_queue.enqueue_event(message) async def failed(self, message=None): - pass + self.event_queue._failed_calls.append(message) + if message is not None: + await self.event_queue.enqueue_event(message) async def add_artifact( self, parts, artifact_id=None, name=None, metadata=None, diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index 134c56ba..1835092c 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -1123,3 +1123,81 @@ async def test_no_task_enqueue_on_continuation(): assert not isinstance(event, Task), ( f"continuation must not re-enqueue Task, but got Task at {call}" ) + + +# --------------------------------------------------------------------------- +# A2A v1 task-mode terminal-event contract (PR #2558 follow-up, task #262) +# --------------------------------------------------------------------------- +# After PR #2558 enqueues a Task at the start of new requests, the executor +# is in v1 "task mode". The SDK then rejects any subsequent raw Message +# enqueue with InvalidAgentResponseError("Received Message object in task +# mode. Use TaskStatusUpdateEvent or TaskArtifactUpdateEvent instead.") — +# see a2a/server/agent_execution/active_task.py validation site. Synth-E2E +# 2026-05-03T11:00:34Z surfaced this. The fix routes the terminal Message +# through TaskUpdater.complete()/failed() which wrap it in a +# TaskStatusUpdateEvent. Both tests below pin that path so the regression +# can't recur (raw enqueue at the terminal step would NOT touch +# event_queue._complete_calls / _failed_calls). + +@pytest.mark.asyncio +async def test_terminal_success_routes_via_updater_complete(): + """A successful run must terminate via updater.complete(message=...) — + raw event_queue.enqueue_event(Message) crashes the v1 SDK in task mode.""" + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Hello"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Hi" + + context = _make_context([part], "ctx-term-ok", task_id="task-term-ok") + context.current_task = None # forces task-mode (Task gets enqueued) + eq = _make_event_queue() + # Pre-init real lists so the AsyncMock event_queue doesn't auto-spec + # _complete_calls/_failed_calls into child MagicMocks. The conftest + # TaskUpdater stub appends to these lists when complete/failed fire. + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + assert eq._complete_calls, ( + "terminal Message must route via updater.complete() in task mode — " + "raw event_queue.enqueue_event(Message) is rejected by a2a-sdk v1" + ) + final_msg = eq._complete_calls[-1] + assert "Hello" in str(final_msg) + + +@pytest.mark.asyncio +async def test_terminal_error_routes_via_updater_failed(): + """An agent crash must terminate via updater.failed(message=...) — raw + enqueue in task mode hits the same v1 contract violation.""" + async def _error_stream(*args, **kwargs): + raise RuntimeError("model crashed") + yield # pragma: no cover — makes this an async generator + + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_error_stream()) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Break things" + + context = _make_context([part], "ctx-term-err", task_id="task-term-err") + context.current_task = None # forces task-mode + eq = _make_event_queue() + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + assert eq._failed_calls, ( + "terminal error Message must route via updater.failed() in task mode" + ) + err_msg = eq._failed_calls[-1] + assert "model crashed" in str(err_msg) + # And complete() must NOT have been called on the failure path. + assert not eq._complete_calls, ( + "complete() should not fire when execute() raises" + )