forked from molecule-ai/molecule-core
fix(a2a): route terminal Message via TaskUpdater.complete/failed in task mode
PR #2558 enqueued a Task at the start of new requests so the v1 SDK would accept TaskUpdater.start_work() — fix #1 of the v0→v1 migration gap (PR #2170). But after Task is enqueued, the executor enters "task mode" and the SDK rejects raw Message enqueues at the terminal step: {"code":-32603,"message":"Received Message object in task mode. Use TaskStatusUpdateEvent or TaskArtifactUpdateEvent instead."} Synth-E2E 2026-05-03T11:00:34Z surfaced this on the very first run after the prior fix cascaded. Validation site is the same a2a/server/agent_execution/active_task.py — the framework's job is to enforce the v1 invariant; we're catching up to it. The fix routes both terminal events through TaskUpdater helpers: - success: updater.complete(message=msg) wraps in TaskStatusUpdateEvent(state=COMPLETED, final=True) - error: updater.failed(message=...) wraps in TaskStatusUpdateEvent(state=FAILED, final=True) Both helpers exist in a2a-sdk ≥ 1.0; verified via TaskUpdater.complete signature. Tests: - conftest TaskUpdater stub now records complete/failed calls AND routes the message back through event_queue.enqueue_event so the ~20 legacy tests asserting on enqueue_event keep working - 2 new regression tests pin the contract: * test_terminal_success_routes_via_updater_complete * test_terminal_error_routes_via_updater_failed - Both NEW tests verified to FAIL on staging-baseline (without this fix) and PASS with it — they'd catch the regression before staging if the wheel-smoke gate covered task-mode terminal events too (separate yak-shave for #131 follow-up) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
78721f7a42
commit
e1628c4d56
@ -509,7 +509,15 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
# accept the assignment. See #1787 + commit dcbcf19
|
||||
# for the original test-mock motivation.
|
||||
logger.debug("metadata attach skipped (non-Message return from new_text_message)")
|
||||
await event_queue.enqueue_event(msg)
|
||||
# A2A v1 (a2a-sdk ≥ 1.0): once Task is enqueued (above, PR #2558),
|
||||
# the executor is in task mode and raw Message enqueues are
|
||||
# rejected with InvalidAgentResponseError("Received Message
|
||||
# object in task mode. Use TaskStatusUpdateEvent or
|
||||
# TaskArtifactUpdateEvent instead."). updater.complete()
|
||||
# wraps the Message in a terminal TaskStatusUpdateEvent
|
||||
# (state=COMPLETED, final=True) which both streaming and
|
||||
# non-streaming clients accept.
|
||||
await updater.complete(message=msg)
|
||||
_result = final_text
|
||||
|
||||
except Exception as e:
|
||||
@ -520,10 +528,13 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
task_span.set_status(StatusCode.ERROR, str(e))
|
||||
except Exception:
|
||||
pass
|
||||
# Emit a Message so both streaming and non-streaming clients
|
||||
# receive an error response rather than hanging.
|
||||
await event_queue.enqueue_event(
|
||||
new_text_message(
|
||||
# A2A v1: in task mode, terminal errors must publish a
|
||||
# FAILED TaskStatusUpdateEvent (carrying the error Message)
|
||||
# rather than a raw Message enqueue. updater.failed() does
|
||||
# exactly this — both streaming and non-streaming clients
|
||||
# receive the error and stop polling.
|
||||
await updater.failed(
|
||||
message=new_text_message(
|
||||
f"Agent error: {e}", task_id=task_id, context_id=context_id
|
||||
)
|
||||
)
|
||||
|
||||
@ -35,27 +35,41 @@ def _make_a2a_mocks():
|
||||
|
||||
events_mod.EventQueue = EventQueue
|
||||
|
||||
# a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops.
|
||||
# In tests, TaskUpdater calls go to this stub rather than the real SDK so
|
||||
# event_queue.enqueue_event is only called via explicit executor code paths.
|
||||
# a2a.server.tasks needs a TaskUpdater stub whose async methods are no-ops
|
||||
# for status transitions but ROUTE the terminal message back through
|
||||
# event_queue.enqueue_event so legacy assertions on enqueue_event keep
|
||||
# working. The wrapper preserves identity (the same Message object the
|
||||
# executor passed in) so tests inspecting str(event_arg) still see the
|
||||
# response text. complete()/failed() also record their last call on the
|
||||
# event_queue itself (`_complete_calls`, `_failed_calls`) so the v1
|
||||
# contract regression test (#262 follow-on to #2558) can pin the proper
|
||||
# path was taken — raw enqueue from executor would NOT touch these.
|
||||
tasks_mod = ModuleType("a2a.server.tasks")
|
||||
|
||||
class TaskUpdater:
|
||||
"""Stub TaskUpdater — no-op async methods for unit tests."""
|
||||
"""Stub TaskUpdater — terminal helpers route through event_queue."""
|
||||
|
||||
def __init__(self, event_queue, task_id, context_id, *args, **kwargs):
|
||||
self.event_queue = event_queue
|
||||
self.task_id = task_id
|
||||
self.context_id = context_id
|
||||
if not hasattr(event_queue, "_complete_calls"):
|
||||
event_queue._complete_calls = []
|
||||
if not hasattr(event_queue, "_failed_calls"):
|
||||
event_queue._failed_calls = []
|
||||
|
||||
async def start_work(self, message=None):
|
||||
pass
|
||||
|
||||
async def complete(self, message=None):
|
||||
pass
|
||||
self.event_queue._complete_calls.append(message)
|
||||
if message is not None:
|
||||
await self.event_queue.enqueue_event(message)
|
||||
|
||||
async def failed(self, message=None):
|
||||
pass
|
||||
self.event_queue._failed_calls.append(message)
|
||||
if message is not None:
|
||||
await self.event_queue.enqueue_event(message)
|
||||
|
||||
async def add_artifact(
|
||||
self, parts, artifact_id=None, name=None, metadata=None,
|
||||
|
||||
@ -1123,3 +1123,81 @@ async def test_no_task_enqueue_on_continuation():
|
||||
assert not isinstance(event, Task), (
|
||||
f"continuation must not re-enqueue Task, but got Task at {call}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# A2A v1 task-mode terminal-event contract (PR #2558 follow-up, task #262)
|
||||
# ---------------------------------------------------------------------------
|
||||
# After PR #2558 enqueues a Task at the start of new requests, the executor
|
||||
# is in v1 "task mode". The SDK then rejects any subsequent raw Message
|
||||
# enqueue with InvalidAgentResponseError("Received Message object in task
|
||||
# mode. Use TaskStatusUpdateEvent or TaskArtifactUpdateEvent instead.") —
|
||||
# see a2a/server/agent_execution/active_task.py validation site. Synth-E2E
|
||||
# 2026-05-03T11:00:34Z surfaced this. The fix routes the terminal Message
|
||||
# through TaskUpdater.complete()/failed() which wrap it in a
|
||||
# TaskStatusUpdateEvent. Both tests below pin that path so the regression
|
||||
# can't recur (raw enqueue at the terminal step would NOT touch
|
||||
# event_queue._complete_calls / _failed_calls).
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminal_success_routes_via_updater_complete():
|
||||
"""A successful run must terminate via updater.complete(message=...) —
|
||||
raw event_queue.enqueue_event(Message) crashes the v1 SDK in task mode."""
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Hello")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Hi"
|
||||
|
||||
context = _make_context([part], "ctx-term-ok", task_id="task-term-ok")
|
||||
context.current_task = None # forces task-mode (Task gets enqueued)
|
||||
eq = _make_event_queue()
|
||||
# Pre-init real lists so the AsyncMock event_queue doesn't auto-spec
|
||||
# _complete_calls/_failed_calls into child MagicMocks. The conftest
|
||||
# TaskUpdater stub appends to these lists when complete/failed fire.
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
assert eq._complete_calls, (
|
||||
"terminal Message must route via updater.complete() in task mode — "
|
||||
"raw event_queue.enqueue_event(Message) is rejected by a2a-sdk v1"
|
||||
)
|
||||
final_msg = eq._complete_calls[-1]
|
||||
assert "Hello" in str(final_msg)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminal_error_routes_via_updater_failed():
|
||||
"""An agent crash must terminate via updater.failed(message=...) — raw
|
||||
enqueue in task mode hits the same v1 contract violation."""
|
||||
async def _error_stream(*args, **kwargs):
|
||||
raise RuntimeError("model crashed")
|
||||
yield # pragma: no cover — makes this an async generator
|
||||
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_error_stream())
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Break things"
|
||||
|
||||
context = _make_context([part], "ctx-term-err", task_id="task-term-err")
|
||||
context.current_task = None # forces task-mode
|
||||
eq = _make_event_queue()
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
assert eq._failed_calls, (
|
||||
"terminal error Message must route via updater.failed() in task mode"
|
||||
)
|
||||
err_msg = eq._failed_calls[-1]
|
||||
assert "model crashed" in str(err_msg)
|
||||
# And complete() must NOT have been called on the failure path.
|
||||
assert not eq._complete_calls, (
|
||||
"complete() should not fire when execute() raises"
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user