diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 38860c03..9b4d9464 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -509,7 +509,15 @@ class LangGraphA2AExecutor(AgentExecutor): # accept the assignment. See #1787 + commit dcbcf19 # for the original test-mock motivation. logger.debug("metadata attach skipped (non-Message return from new_text_message)") - await event_queue.enqueue_event(msg) + # A2A v1 (a2a-sdk ≥ 1.0): once Task is enqueued (above, PR #2558), + # the executor is in task mode and raw Message enqueues are + # rejected with InvalidAgentResponseError("Received Message + # object in task mode. Use TaskStatusUpdateEvent or + # TaskArtifactUpdateEvent instead."). updater.complete() + # wraps the Message in a terminal TaskStatusUpdateEvent + # (state=COMPLETED, final=True) which both streaming and + # non-streaming clients accept. + await updater.complete(message=msg) _result = final_text except Exception as e: @@ -520,10 +528,13 @@ class LangGraphA2AExecutor(AgentExecutor): task_span.set_status(StatusCode.ERROR, str(e)) except Exception: pass - # Emit a Message so both streaming and non-streaming clients - # receive an error response rather than hanging. - await event_queue.enqueue_event( - new_text_message( + # A2A v1: in task mode, terminal errors must publish a + # FAILED TaskStatusUpdateEvent (carrying the error Message) + # rather than a raw Message enqueue. updater.failed() does + # exactly this — both streaming and non-streaming clients + # receive the error and stop polling. + await updater.failed( + message=new_text_message( f"Agent error: {e}", task_id=task_id, context_id=context_id ) ) diff --git a/workspace/tests/conftest.py b/workspace/tests/conftest.py index 0d130a6f..cb1b75b4 100644 --- a/workspace/tests/conftest.py +++ b/workspace/tests/conftest.py @@ -35,27 +35,41 @@ def _make_a2a_mocks(): events_mod.EventQueue = EventQueue - # a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops. - # In tests, TaskUpdater calls go to this stub rather than the real SDK so - # event_queue.enqueue_event is only called via explicit executor code paths. + # a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops + # for status transitions but ROUTE the terminal message back through + # event_queue.enqueue_event so legacy assertions on enqueue_event keep + # working. The wrapper preserves identity (the same Message object the + # executor passed in) so tests inspecting str(event_arg) still see the + # response text. complete()/failed() also record their last call on the + # event_queue itself (`_complete_calls`, `_failed_calls`) so the v1 + # contract regression test (#262 follow-on to #2558) can pin the proper + # path was taken — raw enqueue from executor would NOT touch these. tasks_mod = ModuleType("a2a.server.tasks") class TaskUpdater: - """Stub TaskUpdater — no-op async methods for unit tests.""" + """Stub TaskUpdater — terminal helpers route through event_queue.""" def __init__(self, event_queue, task_id, context_id, *args, **kwargs): self.event_queue = event_queue self.task_id = task_id self.context_id = context_id + if not hasattr(event_queue, "_complete_calls"): + event_queue._complete_calls = [] + if not hasattr(event_queue, "_failed_calls"): + event_queue._failed_calls = [] async def start_work(self, message=None): pass async def complete(self, message=None): - pass + self.event_queue._complete_calls.append(message) + if message is not None: + await self.event_queue.enqueue_event(message) async def failed(self, message=None): - pass + self.event_queue._failed_calls.append(message) + if message is not None: + await self.event_queue.enqueue_event(message) async def add_artifact( self, parts, artifact_id=None, name=None, metadata=None, diff --git a/workspace/tests/test_a2a_executor.py b/workspace/tests/test_a2a_executor.py index 134c56ba..1835092c 100644 --- a/workspace/tests/test_a2a_executor.py +++ b/workspace/tests/test_a2a_executor.py @@ -1123,3 +1123,81 @@ async def test_no_task_enqueue_on_continuation(): assert not isinstance(event, Task), ( f"continuation must not re-enqueue Task, but got Task at {call}" ) + + +# --------------------------------------------------------------------------- +# A2A v1 task-mode terminal-event contract (PR #2558 follow-up, task #262) +# --------------------------------------------------------------------------- +# After PR #2558 enqueues a Task at the start of new requests, the executor +# is in v1 "task mode". The SDK then rejects any subsequent raw Message +# enqueue with InvalidAgentResponseError("Received Message object in task +# mode. Use TaskStatusUpdateEvent or TaskArtifactUpdateEvent instead.") — +# see a2a/server/agent_execution/active_task.py validation site. Synth-E2E +# 2026-05-03T11:00:34Z surfaced this. The fix routes the terminal Message +# through TaskUpdater.complete()/failed() which wrap it in a +# TaskStatusUpdateEvent. Both tests below pin that path so the regression +# can't recur (raw enqueue at the terminal step would NOT touch +# event_queue._complete_calls / _failed_calls). + +@pytest.mark.asyncio +async def test_terminal_success_routes_via_updater_complete(): + """A successful run must terminate via updater.complete(message=...) — + raw event_queue.enqueue_event(Message) crashes the v1 SDK in task mode.""" + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Hello"))) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Hi" + + context = _make_context([part], "ctx-term-ok", task_id="task-term-ok") + context.current_task = None # forces task-mode (Task gets enqueued) + eq = _make_event_queue() + # Pre-init real lists so the AsyncMock event_queue doesn't auto-spec + # _complete_calls/_failed_calls into child MagicMocks. The conftest + # TaskUpdater stub appends to these lists when complete/failed fire. + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + assert eq._complete_calls, ( + "terminal Message must route via updater.complete() in task mode — " + "raw event_queue.enqueue_event(Message) is rejected by a2a-sdk v1" + ) + final_msg = eq._complete_calls[-1] + assert "Hello" in str(final_msg) + + +@pytest.mark.asyncio +async def test_terminal_error_routes_via_updater_failed(): + """An agent crash must terminate via updater.failed(message=...) — raw + enqueue in task mode hits the same v1 contract violation.""" + async def _error_stream(*args, **kwargs): + raise RuntimeError("model crashed") + yield # pragma: no cover — makes this an async generator + + agent = MagicMock() + agent.astream_events = MagicMock(return_value=_error_stream()) + executor = LangGraphA2AExecutor(agent) + + part = MagicMock() + part.text = "Break things" + + context = _make_context([part], "ctx-term-err", task_id="task-term-err") + context.current_task = None # forces task-mode + eq = _make_event_queue() + eq._complete_calls = [] + eq._failed_calls = [] + + await executor.execute(context, eq) + + assert eq._failed_calls, ( + "terminal error Message must route via updater.failed() in task mode" + ) + err_msg = eq._failed_calls[-1] + assert "model crashed" in str(err_msg) + # And complete() must NOT have been called on the failure path. + assert not eq._complete_calls, ( + "complete() should not fire when execute() raises" + )