Merge pull request #198 from Molecule-AI/fix/a2a-compat-batch-173-174-175

fix(a2a): A2A protocol compliance — cancel(), capabilities, push store (closes #173 #174 #175)
This commit is contained in:
Hongming Wang 2026-04-15 11:02:11 -07:00 committed by GitHub
commit 9a23180fa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 56 additions and 4 deletions

View File

@ -408,6 +408,12 @@ class LangGraphA2AExecutor(AgentExecutor):
return _result
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: # pragma: no cover
"""Cancel a running task (cancellation via asyncio task cancellation)."""
pass
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel a running task — emits canceled state to comply with A2A protocol."""
from a2a.types import TaskStatus, TaskState, TaskStatusUpdateEvent
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)

View File

@ -12,7 +12,7 @@ import httpx
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, PushNotificationSender
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
from adapters import get_adapter, AdapterConfig
@ -136,6 +136,7 @@ async def main(): # pragma: no cover
capabilities=AgentCapabilities(
streaming=config.a2a.streaming,
pushNotifications=config.a2a.push_notifications,
stateTransitionHistory=True,
),
skills=[
AgentSkill(
@ -155,6 +156,8 @@ async def main(): # pragma: no cover
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
push_config_store=InMemoryPushNotificationConfigStore(),
push_sender=PushNotificationSender(),
)
app = A2AStarletteApplication(

View File

@ -998,3 +998,46 @@ def test_default_recursion_limit_value():
"""Regression guard: DeepAgents fan-outs need 100+; 500 is today's ceiling."""
from a2a_executor import DEFAULT_RECURSION_LIMIT
assert DEFAULT_RECURSION_LIMIT == 500
# ---------------------------------------------------------------------------
# Issue #173 — cancel() emits TaskStatusUpdateEvent(state=canceled, final=True)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cancel_emits_canceled_event(monkeypatch):
"""cancel() must enqueue a TaskStatusUpdateEvent with state=canceled and final=True.
The a2a.types module is pre-mocked by conftest; inject the three extra
type stubs needed by cancel() so the local import inside the method resolves.
"""
import sys
types_mod = sys.modules["a2a.types"]
class _TaskState:
canceled = "canceled"
class _TaskStatus:
def __init__(self, state=None):
self.state = state
class _TaskStatusUpdateEvent:
def __init__(self, status=None, final=False):
self.status = status
self.final = final
monkeypatch.setattr(types_mod, "TaskState", _TaskState, raising=False)
monkeypatch.setattr(types_mod, "TaskStatus", _TaskStatus, raising=False)
monkeypatch.setattr(types_mod, "TaskStatusUpdateEvent", _TaskStatusUpdateEvent, raising=False)
executor = LangGraphA2AExecutor(agent=MagicMock(), heartbeat=None)
context = _make_context([])
eq = _make_event_queue()
await executor.cancel(context, eq)
eq.enqueue_event.assert_called_once()
event = eq.enqueue_event.call_args[0][0]
assert isinstance(event, _TaskStatusUpdateEvent), "expected a TaskStatusUpdateEvent"
assert event.final is True, "cancel event must be marked final=True"
assert event.status.state == _TaskState.canceled, "cancel event must have state=canceled"