feat: migrate a2a-sdk 1.x (KI-009) (#39)

- Replace a2a.utils.new_agent_text_message → a2a.helpers.new_text_message
- Replace Part(root=TextPart(...)) → Part(text=...) (flat Part API)
- Replace A2AStarletteApplication → Starlette route factories
  (create_agent_card_routes, create_jsonrpc_routes)
- Update conftest stubs: remove a2a.server.apps/a2a.utils,
  add a2a.server.routes/a2a.helpers/AgentInterface
- Add AgentInterface to AgentCard supported_interfaces
- Rename snake_case AgentCard fields per 1.x schema

Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
molecule-ai[bot] 2026-04-24 01:54:33 +00:00 committed by GitHub
parent 1b04da2061
commit d5cf872311
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 66 additions and 36 deletions

View File

@ -39,8 +39,9 @@ import uuid
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import Part, TextPart
from a2a.utils import new_agent_text_message
# a2a.utils and TextPart removed in a2a-sdk 1.x; use a2a.helpers + flat Part
from a2a.types import Part
from a2a.helpers import new_text_message
from molecule_runtime.adapters.shared_runtime import (
extract_history as _extract_history,
extract_message_text,
@ -214,7 +215,7 @@ class LangGraphA2AExecutor(AgentExecutor):
parts = getattr(getattr(context, "message", None), "parts", None)
logger.warning("A2A execute: no text content in message parts: %s", parts)
await event_queue.enqueue_event(
new_agent_text_message("Error: message contained no text content.")
new_text_message("Error: message contained no text content.")
)
return ""
@ -229,7 +230,7 @@ class LangGraphA2AExecutor(AgentExecutor):
)
except PromptInjectionError as exc:
await event_queue.enqueue_event(
new_agent_text_message(f"Request blocked: {exc}")
new_text_message(f"Request blocked: {exc}")
)
return ""
@ -324,7 +325,7 @@ class LangGraphA2AExecutor(AgentExecutor):
texts = _extract_chunk_text(chunk.content)
for text in texts:
await updater.add_artifact(
parts=[Part(root=TextPart(text=text))],
parts=[Part(text=text)],
artifact_id=artifact_id,
append=has_streamed, # False=first, True=append
last_chunk=False,
@ -384,7 +385,7 @@ class LangGraphA2AExecutor(AgentExecutor):
# immediately as the response (a2a_client.py reads .parts[0].text).
# Streaming: yielded as the last SSE event in the stream.
await event_queue.enqueue_event(
new_agent_text_message(final_text, task_id=task_id, context_id=context_id)
new_text_message(final_text, task_id=task_id, context_id=context_id)
)
_result = final_text
@ -399,7 +400,7 @@ class LangGraphA2AExecutor(AgentExecutor):
# Emit a Message so both streaming and non-streaming clients
# receive an error response rather than hanging.
await event_queue.enqueue_event(
new_agent_text_message(
new_text_message(
f"Agent error: {e}", task_id=task_id, context_id=context_id
)
)

View File

@ -38,7 +38,8 @@ import claude_agent_sdk as sdk
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
# a2a.utils removed in a2a-sdk 1.x; replaced by a2a.helpers
from a2a.helpers import new_text_message
from molecule_runtime.executor_helpers import (
CONFIG_MOUNT,
@ -354,7 +355,7 @@ class ClaudeSDKExecutor(AgentExecutor):
"""
user_input = extract_message_text(context.message)
if not user_input:
await event_queue.enqueue_event(new_agent_text_message(_NO_TEXT_MSG))
await event_queue.enqueue_event(new_text_message(_NO_TEXT_MSG))
return
async with self._run_lock:
@ -363,7 +364,7 @@ class ClaudeSDKExecutor(AgentExecutor):
# Enqueue outside the lock so the next queued turn can start
# preparing its prompt while this turn's response ships. Event
# ordering is preserved per-queue by the A2A server, so no races.
await event_queue.enqueue_event(new_agent_text_message(response_text))
await event_queue.enqueue_event(new_text_message(response_text))
@staticmethod
def _is_retryable(exc: BaseException) -> bool:

View File

@ -34,7 +34,8 @@ from pathlib import Path
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
# a2a.utils removed in a2a-sdk 1.x; replaced by a2a.helpers
from a2a.helpers import new_text_message
from molecule_runtime.config import RuntimeConfig
from molecule_runtime.executor_helpers import (
@ -279,7 +280,7 @@ class CLIAgentExecutor(AgentExecutor):
user_input = extract_message_text(context.message)
if not user_input:
await event_queue.enqueue_event(
new_agent_text_message("Error: message contained no text content.")
new_text_message("Error: message contained no text content.")
)
return
@ -368,7 +369,7 @@ class CLIAgentExecutor(AgentExecutor):
result = stdout_text
if result:
await event_queue.enqueue_event(
new_agent_text_message(result)
new_text_message(result)
)
return
else:
@ -380,7 +381,7 @@ class CLIAgentExecutor(AgentExecutor):
await asyncio.sleep(delay)
continue
await event_queue.enqueue_event(
new_agent_text_message("(no response generated after retries)")
new_text_message("(no response generated after retries)")
)
return
else:
@ -402,7 +403,7 @@ class CLIAgentExecutor(AgentExecutor):
# only the sanitized category to the user.
logger.error("CLI agent error [%s]: %s", self.runtime, error_msg[:500])
await event_queue.enqueue_event(
new_agent_text_message(sanitize_agent_error(category=category))
new_text_message(sanitize_agent_error(category=category))
)
return
@ -424,13 +425,13 @@ class CLIAgentExecutor(AgentExecutor):
except Exception as wait_err:
logger.warning("CLI wait error: %s", wait_err)
await event_queue.enqueue_event(
new_agent_text_message(sanitize_agent_error(category="timeout"))
new_text_message(sanitize_agent_error(category="timeout"))
)
return
except Exception as exc:
logger.exception("CLI agent exception [%s]", self.runtime)
await event_queue.enqueue_event(
new_agent_text_message(sanitize_agent_error(exc))
new_text_message(sanitize_agent_error(exc))
)
return

View File

@ -19,10 +19,11 @@ if _PKG_DIR not in sys.path:
import httpx
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
from a2a.types import AgentCard, AgentCapabilities, AgentSkill, AgentInterface
from starlette.applications import Starlette
from molecule_runtime.adapters import get_adapter, AdapterConfig
from molecule_runtime.config import load_config
@ -155,11 +156,16 @@ async def main(): # pragma: no cover
name=config.name,
description=config.description or config.name,
version=config.version,
url=workspace_url,
supported_interfaces=[
AgentInterface(
protocol_binding="JSONRPC",
url=f"{workspace_url}/",
protocol_version="1.0",
),
],
capabilities=AgentCapabilities(
streaming=config.a2a.streaming,
pushNotifications=config.a2a.push_notifications,
stateTransitionHistory=True,
push_notifications=config.a2a.push_notifications,
),
skills=[
AgentSkill(
@ -171,8 +177,8 @@ async def main(): # pragma: no cover
)
for skill in loaded_skills
],
defaultInputModes=["text/plain", "application/json"],
defaultOutputModes=["text/plain", "application/json"],
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
)
# 7. Wrap in A2A.
@ -189,12 +195,14 @@ async def main(): # pragma: no cover
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
agent_card=agent_card,
)
app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=handler,
)
# Build Starlette app from route factory functions (a2a-sdk 1.x API)
routes = []
routes.extend(create_agent_card_routes(agent_card))
routes.extend(create_jsonrpc_routes(handler, "/api/v1/jsonrpc/"))
app = Starlette(routes=routes)
# 8. Register with platform
agent_card_dict = {
@ -303,7 +311,7 @@ async def main(): # pragma: no cover
print(f"Workspace {workspace_id} starting on port {port}")
# Wrap the ASGI app with W3C TraceContext extraction middleware so incoming
# A2A HTTP requests propagate their trace context into _incoming_trace_context.
starlette_app = app.build()
starlette_app = app
# Add /transcript route — exposes the most-recent agent session log
# (claude-code reads ~/.claude/projects/<cwd>/<session>.jsonl). Other

View File

@ -13,9 +13,9 @@ license = {text = "BSL-1.1"}
readme = "README.md"
# Don't pin heavy deps — each adapter adds its own
dependencies = [
# Upper bound: a2a-sdk 1.0.0 dropped the a2a.server.apps module we import
# in main.py. Keep on the 0.3.x line until we migrate to the 1.x API.
"a2a-sdk[http-server]>=0.3.25,<1.0",
# Migrated to a2a-sdk 1.x (KI-009). A2AStarletteApplication replaced by
# Starlette route factory functions (create_agent_card_routes, create_jsonrpc_routes).
"a2a-sdk[http-server]>=0.3.25",
"httpx>=0.27.0",
"uvicorn>=0.30.0",
"starlette>=0.38.0",

View File

@ -10,6 +10,12 @@ Every known submodule + symbol used anywhere in `molecule_runtime/` is
stubbed here, so test_imports.py (walking every module) + test_session_
resume_gate.py (ClaudeSDKExecutor methods) + future tests all share one
consistent stub set.
NOTE: a2a-sdk 1.x (KI-009 migration) changed the module layout:
- REMOVED: a2a.server.apps (A2AStarletteApplication)
- REMOVED: a2a.utils
- ADDED: a2a.server.routes (create_agent_card_routes, create_jsonrpc_routes)
- ADDED: AgentInterface in a2a.types
"""
from __future__ import annotations
@ -60,19 +66,32 @@ if "claude_agent_sdk" not in sys.modules:
sys.modules["claude_agent_sdk"] = sdk_stub
# a2a + submodules — every import path in the package gets stubbed.
# NOTE: a2a-sdk 1.x (KI-009 migration) removed a2a.server.apps and
# a2a.utils; added a2a.server.routes. Update stubs accordingly.
_A2A_MODULES: dict[str, list[str]] = {
"a2a": [],
"a2a.server": [],
"a2a.server.agent_execution": ["AgentExecutor", "RequestContext"],
"a2a.server.agent_execution": ["AgentExecutor", "RequestContext", "RequestContextBuilder"],
"a2a.server.events": ["EventQueue"],
"a2a.server.tasks": ["TaskUpdater", "InMemoryTaskStore"],
"a2a.server.apps": ["A2AStarletteApplication"],
"a2a.server.routes": [
"create_agent_card_routes",
"create_jsonrpc_routes",
"create_rest_routes",
"DefaultServerCallContextBuilder",
],
"a2a.server.request_handlers": ["DefaultRequestHandler"],
"a2a.types": [
"Part", "TextPart", "AgentCard", "AgentCapabilities", "AgentSkill",
# TextPart removed in 1.x; Part is now flat (Part(text="...") instead of Part(root=TextPart(...)))
"Part", "AgentCard", "AgentCapabilities", "AgentSkill",
"AgentInterface", # added in a2a-sdk 1.x
"TaskStatus", "TaskState", "TaskStatusUpdateEvent",
],
"a2a.utils": ["new_agent_text_message"],
"a2a.helpers": [
# Added in 1.x; replaces a2a.utils
"new_text_message", "new_task", "new_artifact",
"get_message_text", "get_text_parts",
],
}
for dotted, attrs in _A2A_MODULES.items():
mod = _ensure_package(dotted)