molecule-core/workspace-template/adapters/hermes/executor.py
rabbitblood cb3c7dcf91 feat(hermes): Phase 2c — multi-turn history passed natively to all paths
Completes the Phase 2 scope by keeping conversation turns as turns across
all three dispatch paths. Pre-2c, history was flattened into a single user
message via shared_runtime.build_task_text, which worked as a fallback but
lost the model's native multi-turn awareness (role attribution,
instruction-following on mid-conversation corrections, system-prompt
grounding against prior turns).

Phase 2a + 2b shipped the dispatch infrastructure + per-provider native
paths. This PR uses them properly.

## What's new

- **`_history_to_openai_messages(user_message, history)`** (static) — maps
  A2A `(role, text)` tuples to OpenAI Chat Completions
  `[{"role":"user"|"assistant","content":str}]`. Roles: `human`→`user`,
  `ai`→`assistant`. Current turn appended as the final user message.

- **`_history_to_anthropic_messages`** (static) — identical wire shape to
  OpenAI for text-only turns, so it delegates. Phase 2d tool_use/vision
  blocks will diverge here.

- **`_history_to_gemini_contents`** (static) — Gemini uses a different
  shape: `role="user"|"model"` (NOT "assistant") and text wrapped in
  `parts=[{"text":...}]`. Delegates to none of the others.

- **`_do_openai_compat(user_message, history=None)`** — accepts history,
  builds messages via `_history_to_openai_messages`. Back-compat: pass
  `history=None` to get the old single-turn behavior.

- **`_do_anthropic_native(user_message, history=None)`** — same signature
  change, calls `_history_to_anthropic_messages`. Still uses
  `anthropic.AsyncAnthropic().messages.create()`, just with proper
  multi-turn.

- **`_do_gemini_native(user_message, history=None)`** — same pattern,
  calls `_history_to_gemini_contents`, passes to Gemini's
  `generate_content(contents=...)`.

- **`_do_inference(user_message, history=None)`** — new signature,
  dispatches by auth_scheme as before, passes both args through.

- **`execute()`** — no longer calls `build_task_text`. Calls
  `extract_history(context)` directly and forwards to `_do_inference`.
  Removes the `build_task_text` import (not needed in this file anymore).

## Tests

Existing 7 dispatch tests updated for the new `(user_message, history)`
signature — they assert the path is called with `("hello", None)` since
they pass no history.

5 NEW tests:

- `test_history_to_openai_messages_empty_history` — empty history degrades
  to single user message (back-compat)
- `test_history_to_openai_messages_multi_turn` — round-trip of a 3-turn
  history + current turn
- `test_history_to_anthropic_messages_same_as_openai` — cross-check that
  anthropic path produces identical wire shape for text-only
- `test_history_to_gemini_contents_uses_model_role_and_parts_wrapper` —
  verifies the Gemini-specific role mapping (`ai`→`model`) + parts wrapper
- `test_dispatch_passes_history_through` — end-to-end: _do_inference
  forwards history to the chosen provider path

All 41 tests pass (15 Phase 2 dispatch + 26 Phase 1 registry):

    pytest tests/test_hermes_phase2_dispatch.py tests/test_hermes_providers.py
    41 passed in 0.07s

## Back-compat

- No public API changes to `create_executor()`. Callers that hit
  `execute()` via A2A get the new multi-turn behavior automatically via
  `extract_history(context)`.
- Callers that passed an empty history list (or None) get the same
  single-turn behavior as pre-2c.
- The `build_task_text` helper in shared_runtime is unchanged — other
  adapters (AutoGen, LangGraph) that use it keep working. Only Hermes
  bypasses it now.

## What's NOT in this PR (Phase 2d)

- Tool calling / function calling on native paths (anthropic `tools=`,
  gemini `tools=Tool(function_declarations=[...])`)
- Vision content blocks (image_url → anthropic `{type:"image", source:
  {type:"base64",...}}` / gemini `{inline_data:{mime_type,data}}`)
- System instructions pass-through (anthropic `system=`, gemini
  `system_instruction=`)
- Streaming (`astream_messages` / `streamGenerateContent` stream variants)
- Extended thinking (anthropic `thinking={"type":"enabled"}`) / Gemini
  thinking config

Phase 2c is the **multi-turn upgrade**. Tool + vision + streaming are
Phase 2d, scoped in project_hermes_multi_provider.md.

## Related

- #240 Phase 2a (native Anthropic dispatch — in main)
- #255 Phase 2b (native Gemini dispatch — in main)
- Phase 1 (#208 — provider registry baseline, in main)
- `project_hermes_multi_provider.md` queued memory
- CEO 2026-04-15: "focus on supporting hermes agent"
2026-04-15 14:21:10 -07:00

383 lines
16 KiB
Python

"""Hermes adapter executor — Phase 2 multi-provider with native SDK dispatch.
Hermes supports 15 providers via the shared ``providers.py`` registry. Each
provider's ``auth_scheme`` field controls which client + request shape the
executor uses:
- ``auth_scheme="openai"`` (13 providers) — OpenAI-compat ``/v1/chat/completions``
via the ``openai`` Python SDK. Covers: Nous Portal, OpenRouter, OpenAI, xAI,
Qwen, GLM, Kimi, MiniMax, DeepSeek, Groq, Together, Fireworks, Mistral.
- ``auth_scheme="anthropic"`` (1 provider — anthropic) — native Messages API via
the ``anthropic`` Python SDK. Phase 2a: better tool calling, vision support,
extended thinking semantics. If the ``anthropic`` package isn't installed in
the workspace image, ``_do_anthropic_native`` raises a clear error with
install instructions rather than silently falling back to the OpenAI-compat
shim (which would lose fidelity invisibly).
- ``auth_scheme="gemini"`` (1 provider — gemini) — native ``generateContent`` API
via the official ``google-genai`` Python SDK. Phase 2b: first-class vision
content blocks, tool/function calling, system instructions, and thinking
config — all of which the OpenAI-compat shim at ``/v1beta/openai`` either
strips or mis-translates. Same fail-loud semantics as the anthropic path.
Key resolution order (unchanged from Phase 1)
----------------------------------------------
1. ``hermes_api_key`` parameter (explicit call-site override — routes to Nous Portal)
2. ``provider`` parameter (explicit provider name — looks up its env var(s))
3. Auto-detect: walk ``providers.RESOLUTION_ORDER`` and pick the first provider
whose env var is set.
Raises ``ValueError`` if nothing resolves. The error message lists every env var
that was checked so the operator knows their options without reading source.
"""
from __future__ import annotations
import logging
import os
from typing import Optional
from .providers import PROVIDERS, ProviderConfig, resolve_provider
logger = logging.getLogger(__name__)
def create_executor(
hermes_api_key: Optional[str] = None,
provider: Optional[str] = None,
model: Optional[str] = None,
):
"""Create and return a LangGraph-compatible executor for the Hermes adapter.
Parameters
----------
hermes_api_key:
Explicit API key. When provided, the call routes to Nous Portal (the
PR 2 back-compat path) regardless of ``provider``.
provider:
Canonical provider short name from ``providers.PROVIDERS`` (e.g.
``"openai"``, ``"anthropic"``, ``"qwen"``, ``"xai"``). When set, the
registry entry's env vars are used to find the API key and its
base URL + default model override the auto-detect path. When unset,
auto-detect walks ``providers.RESOLUTION_ORDER`` until it finds a
provider whose env var is set.
model:
Override the provider's default model. Passed straight through to
``chat.completions.create``.
Returns
-------
HermesA2AExecutor
A ready-to-use executor wired with the resolved api_key + base_url
+ model.
Raises
------
ValueError
If ``provider`` is an unknown name, if ``provider`` is known but its
env vars are all empty, or if auto-detect finds nothing.
"""
# Path 1: PR 2 back-compat — explicit hermes_api_key routes to Nous Portal.
if hermes_api_key:
cfg = PROVIDERS["nous_portal"]
logger.debug("Hermes: using explicit hermes_api_key param (Nous Portal)")
return HermesA2AExecutor(
provider_cfg=cfg,
api_key=hermes_api_key,
model=model or cfg.default_model,
)
# Path 2/3: registry resolution (either explicit provider name or auto-detect).
cfg, api_key = resolve_provider(provider)
logger.info(
"Hermes: provider=%s auth_scheme=%s base_url=%s model=%s",
cfg.name,
cfg.auth_scheme,
cfg.base_url,
model or cfg.default_model,
)
return HermesA2AExecutor(
provider_cfg=cfg,
api_key=api_key,
model=model or cfg.default_model,
)
class HermesA2AExecutor:
"""LangGraph-compatible AgentExecutor for Hermes-style multi-provider LLMs.
Dispatches each inference call based on ``provider_cfg.auth_scheme``:
- ``"openai"`` → OpenAI-compat ``/v1/chat/completions`` via the ``openai`` SDK
- ``"anthropic"`` → native Messages API via the ``anthropic`` SDK
The ``execute()`` and ``cancel()`` async methods satisfy the
``a2a.server.agent_execution.AgentExecutor`` interface so this
executor can be dropped into the A2A server's DefaultRequestHandler.
"""
def __init__(
self,
provider_cfg: ProviderConfig,
api_key: str,
model: str,
heartbeat=None,
):
self.provider_cfg = provider_cfg
self.api_key = api_key
self.base_url = provider_cfg.base_url
self.model = model
self._heartbeat = heartbeat
# ------------------------------------------------------------------
# History → provider-specific message list converters
# ------------------------------------------------------------------
#
# The A2A shared runtime gives us history as ``list[tuple[str, str]]``
# with roles ``"human"`` / ``"ai"``. Each provider wants a different
# shape:
#
# OpenAI-compat: [{"role":"user"|"assistant", "content": str}, ...]
# Anthropic: [{"role":"user"|"assistant", "content": str}, ...] (same)
# Gemini: [{"role":"user"|"model", "parts": [{"text": str}]}, ...]
#
# Before Phase 2c these were flattened into a single user turn via
# ``shared_runtime.build_task_text``, which worked for basic text
# handoff but lost the model's native multi-turn awareness (system
# prompts, tool-use history, role attribution for instruction
# following). Phase 2c keeps the turns as turns.
@staticmethod
def _history_to_openai_messages(
user_message: str,
history: "list[tuple[str, str]]",
) -> "list[dict]":
"""Convert A2A history + current turn to OpenAI Chat Completions shape."""
messages: list[dict] = []
for role, text in history or []:
messages.append({
"role": "user" if role == "human" else "assistant",
"content": text,
})
messages.append({"role": "user", "content": user_message})
return messages
@staticmethod
def _history_to_anthropic_messages(
user_message: str,
history: "list[tuple[str, str]]",
) -> "list[dict]":
"""Convert A2A history + current turn to Anthropic Messages API shape.
Identical wire format to OpenAI (``role`` + ``content``) for text-only
turns, so we just delegate. The difference matters for tool_use /
content blocks, which are Phase 2d territory.
"""
return HermesA2AExecutor._history_to_openai_messages(user_message, history)
@staticmethod
def _history_to_gemini_contents(
user_message: str,
history: "list[tuple[str, str]]",
) -> "list[dict]":
"""Convert A2A history + current turn to Gemini generateContent shape.
Gemini uses ``role: "user" | "model"`` (NOT "assistant") and wraps
text in a ``parts: [{"text": ...}]`` list.
"""
contents: list[dict] = []
for role, text in history or []:
contents.append({
"role": "user" if role == "human" else "model",
"parts": [{"text": text}],
})
contents.append({"role": "user", "parts": [{"text": user_message}]})
return contents
# ------------------------------------------------------------------
# Per-provider inference paths
# ------------------------------------------------------------------
async def _do_openai_compat(
self,
user_message: str,
history: "list[tuple[str, str]] | None" = None,
) -> str:
"""OpenAI-compat inference — used by every provider with auth_scheme='openai'.
13 of the 15 registered providers route here. Uses ``openai.AsyncOpenAI``
pointed at the provider's base_url; every provider's API is wire-
compatible with the OpenAI Chat Completions shape.
Phase 2c: accepts multi-turn history. The old single-``task_text`` call
shape (pre-2c) is preserved — pass the flattened text as ``user_message``
with no history and the call degrades gracefully to the original
behavior. See ``_history_to_openai_messages`` for the conversion.
"""
import openai
client = openai.AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
)
messages = self._history_to_openai_messages(user_message, history or [])
response = await client.chat.completions.create(
model=self.model,
messages=messages,
)
return response.choices[0].message.content or ""
async def _do_anthropic_native(
self,
user_message: str,
history: "list[tuple[str, str]] | None" = None,
) -> str:
"""Native Anthropic Messages API inference.
Uses the official ``anthropic`` Python SDK for correct tool-calling,
vision, and extended-thinking semantics that don't translate cleanly
through the OpenAI-compat shim.
If the ``anthropic`` package is not installed in the workspace image,
we raise a clear error rather than silently falling back to the
OpenAI-compat path — silent fallback would mask the fidelity loss
(tool_use blocks become plain text, vision gets stripped, etc.).
Phase 2a: single-turn text in, text out. Phase 2c: multi-turn history.
Tools + vision remain Phase 2d.
"""
try:
import anthropic
except ImportError as exc: # pragma: no cover — exercised by test_missing_sdk
raise RuntimeError(
"Hermes anthropic native path requires the `anthropic` package. "
"Install in the workspace image with `pip install anthropic>=0.39.0` "
"or set HERMES provider=openrouter to route Claude models through "
"OpenRouter's OpenAI-compat shim instead."
) from exc
client = anthropic.AsyncAnthropic(api_key=self.api_key)
messages = self._history_to_anthropic_messages(user_message, history or [])
response = await client.messages.create(
model=self.model,
max_tokens=4096,
messages=messages,
)
# response.content is a list of ContentBlock; for text-only the first
# block is a TextBlock with a .text attribute.
if response.content and hasattr(response.content[0], "text"):
return response.content[0].text
return ""
async def _do_gemini_native(
self,
user_message: str,
history: "list[tuple[str, str]] | None" = None,
) -> str:
"""Native Google Gemini ``generateContent`` inference.
Uses the official ``google-genai`` Python SDK for correct vision
content blocks, tool/function calling, system instructions, and
thinking config. These all get stripped or mis-translated through
the OpenAI-compat ``/v1beta/openai`` shim.
If the ``google-genai`` package is not installed in the workspace
image, raise a clear error with install instructions rather than
silently falling back to the OpenAI-compat shim (same fail-loud
semantics as the anthropic path).
Phase 2b: single-turn text in, text out. Phase 2c: multi-turn history
via Gemini's ``contents=[{role,parts}]`` shape (note: role is
``"user"`` / ``"model"``, NOT ``"assistant"``).
"""
try:
from google import genai # type: ignore[import-not-found]
except ImportError as exc: # pragma: no cover — exercised by test_missing_sdk
raise RuntimeError(
"Hermes gemini native path requires the `google-genai` package. "
"Install in the workspace image with `pip install google-genai>=1.0.0` "
"or set HERMES provider=openrouter to route Gemini models through "
"OpenRouter's OpenAI-compat shim instead."
) from exc
client = genai.Client(api_key=self.api_key)
contents = self._history_to_gemini_contents(user_message, history or [])
response = await client.aio.models.generate_content(
model=self.model,
contents=contents,
)
# response.text is the flattened text across all parts of the first
# candidate. For text-only that's the whole reply.
return response.text or ""
async def _do_inference(
self,
user_message: str,
history: "list[tuple[str, str]] | None" = None,
) -> str:
"""Dispatch to the right inference path based on provider auth_scheme.
Phase 2c: takes ``user_message`` + optional ``history`` list-of-tuples,
passes through to the chosen path. Each path has its own history →
provider-message conversion via the static helpers above.
"""
scheme = self.provider_cfg.auth_scheme
if scheme == "anthropic":
return await self._do_anthropic_native(user_message, history)
if scheme == "gemini":
return await self._do_gemini_native(user_message, history)
if scheme == "openai":
return await self._do_openai_compat(user_message, history)
# Unknown scheme — treat as openai-compat for forward-compat with any
# future provider the registry adds without yet having a native path.
logger.warning(
"Hermes: unknown auth_scheme=%r for provider=%s — falling back to openai-compat",
scheme, self.provider_cfg.name,
)
return await self._do_openai_compat(user_message, history)
# ------------------------------------------------------------------
# AgentExecutor interface
# ------------------------------------------------------------------
async def execute(self, context, event_queue): # pragma: no cover
"""Execute a Hermes inference request and push the reply to event_queue.
Phase 2c: passes the conversation history to the dispatch layer as a
structured list of (role, text) turns instead of flattening via
``build_task_text``. Each provider path converts the list into its
native multi-turn message shape (OpenAI messages, Anthropic messages,
or Gemini contents). This gives the model its native multi-turn
awareness for instruction following.
"""
from a2a.utils import new_agent_text_message
from adapters.shared_runtime import (
brief_task,
extract_history,
extract_message_text,
set_current_task,
)
user_message = extract_message_text(context)
if not user_message:
await event_queue.enqueue_event(new_agent_text_message("No message provided"))
return
await set_current_task(self._heartbeat, brief_task(user_message))
try:
history = extract_history(context)
reply = await self._do_inference(user_message, history)
except Exception as exc:
logger.exception("Hermes executor error: %s", exc)
reply = f"Hermes error: {exc}"
finally:
await set_current_task(self._heartbeat, "")
await event_queue.enqueue_event(new_agent_text_message(reply))
async def cancel(self, context, event_queue): # pragma: no cover
"""No-op cancel — Hermes requests are not cancellable mid-flight."""
pass