molecule-core/workspace-template/hermes_executor.py
Molecule AI Backend Engineer 3d817a42b7 feat(hermes): expose reasoning mode for Hermes 4 via OpenAI-compat API (#496)
Hermes 4 is a hybrid-reasoning model trained on <think> tags; without asking
for thinking we pay flagship $/tok but get non-reasoning quality. This adds a
dedicated HermesA2AExecutor that dispatches to any OpenAI-compat endpoint
(OpenRouter, Nous Portal) and enables native reasoning for Hermes 4 models.

Key decisions:
- ProviderConfig + _reasoning_supported() detect Hermes 4 by model slug
  substring ("hermes-4", "hermes4") — case-insensitive, no config needed
- extra_body={"reasoning": {"enabled": True}} sent only to Hermes 4 entries;
  Hermes 3 path unchanged (no extra_body, no regressions)
- choices[0].message.reasoning + reasoning_details extracted and written to
  an OTEL span (hermes.reasoning) — deliberately NOT echoed in the A2A reply
  so the reasoning trace never contaminates the agent's next-turn context
- API key / base URL default to OPENAI_API_KEY / OPENAI_BASE_URL env vars
  with openrouter.ai/api/v1 as the fallback endpoint
- _client injection parameter for unit tests (no live API calls needed)
- Error sanitization: only exception class name surfaces to user (mirrors
  sanitize_agent_error() convention from cli_executor.py)

Test coverage: 35 tests, 100% coverage on all new code paths including:
  - _reasoning_supported() — Hermes 4/3/unknown/empty/uppercase
  - ProviderConfig — field assignment and capability flags
  - extra_body presence for Hermes 4, absence for Hermes 3
  - reasoning not in A2A reply; _log_reasoning called when trace present
  - reasoning_details forwarded; span attributes set correctly
  - Telemetry failure swallowed (never blocks response)
  - API error → sanitized class-name-only reply
  - cancel() → TaskStatusUpdateEvent(state=canceled)

Full suite: 990 passed, 0 failed (no regressions).

Resolves #496

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-16 20:38:45 +00:00

328 lines
13 KiB
Python

"""OpenAI-compat A2A executor for Hermes models with native reasoning support.
Dispatches to OpenRouter / Nous Portal (or any OpenAI-compatible endpoint)
and enables Hermes 4 native reasoning when the model supports it.
Reasoning (Hermes 4 only)
--------------------------
Hermes 4 is a hybrid-reasoning model trained on ``<think>`` tags. When
``reasoning_supported`` is True for the active model, this executor appends:
extra_body={"reasoning": {"enabled": True}}
to the ``chat.completions.create()`` call. The ``openai`` SDK forwards
``extra_body`` verbatim to the upstream provider, so both OpenRouter and
Nous Portal receive it without needing provider-specific code paths.
On response, ``choices[0].message.reasoning`` and
``choices[0].message.reasoning_details`` are extracted and written to an
OTEL activity span so operators can inspect the thinking trace in Langfuse
/ Jaeger. The reasoning content is deliberately **not** included in the
A2A reply — doing so would contaminate the agent's next-turn context with
the model's internal scratchpad.
Hermes 3 / unknown models
--------------------------
No ``extra_body`` is sent. The response is processed identically to any
other OpenAI-compat model call. The Hermes 3 path is exercised by the
existing adapter test suite and must remain unchanged.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING, Any
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
if TYPE_CHECKING:
from heartbeat import HeartbeatLoop
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Per-model reasoning capability detection
# ---------------------------------------------------------------------------
# Substrings that identify a Hermes 4 model slug from either provider:
# OpenRouter: "nousresearch/hermes-4-*", "nousresearch/nous-hermes-4-*"
# Nous Portal: "hermes-4", "nous-hermes-4"
#
# Hermes 3 slugs ("hermes-3-llama-3.1-70b", etc.) do NOT contain any of
# these patterns, so they correctly resolve to reasoning_supported=False.
_HERMES4_PATTERNS: tuple[str, ...] = (
"hermes-4",
"hermes4",
)
def _reasoning_supported(model: str) -> bool:
"""Return True if *model* identifies a Hermes 4 variant.
Case-insensitive substring match against ``_HERMES4_PATTERNS``.
>>> _reasoning_supported("nousresearch/hermes-4-0")
True
>>> _reasoning_supported("nousresearch/nous-hermes-4")
True
>>> _reasoning_supported("nousresearch/hermes-3-llama-3.1-70b")
False
>>> _reasoning_supported("gpt-4o")
False
"""
model_lower = model.lower()
return any(pat in model_lower for pat in _HERMES4_PATTERNS)
# ---------------------------------------------------------------------------
# ProviderConfig — per-provider / per-model capability flags
# ---------------------------------------------------------------------------
class ProviderConfig:
"""Immutable capability record derived from a model identifier string.
Attributes:
model: Full model identifier (e.g. "nousresearch/hermes-4-0").
reasoning_supported: True for Hermes 4 entries on OpenRouter / Nous
Portal; False for Hermes 3 and all other models.
Example::
cfg = ProviderConfig("nousresearch/hermes-4-0")
assert cfg.reasoning_supported is True
cfg3 = ProviderConfig("nousresearch/hermes-3-llama-3.1-70b")
assert cfg3.reasoning_supported is False
"""
__slots__ = ("model", "reasoning_supported")
def __init__(self, model: str) -> None:
self.model: str = model
self.reasoning_supported: bool = _reasoning_supported(model)
def __repr__(self) -> str: # pragma: no cover
return (
f"ProviderConfig(model={self.model!r}, "
f"reasoning_supported={self.reasoning_supported})"
)
# ---------------------------------------------------------------------------
# HermesA2AExecutor
# ---------------------------------------------------------------------------
class HermesA2AExecutor(AgentExecutor):
"""A2A executor for Hermes models via OpenAI-compatible API.
Compared to the LangGraph executor, this is intentionally thin:
- Single API call per turn (no streaming or ReAct tool loop).
- System prompt injected as the first ``messages[]`` entry.
- Hermes 4 reasoning enabled via ``extra_body`` when supported.
- Reasoning trace logged to OTEL span — never echoed in the reply.
Parameters
----------
model:
Full model identifier string (e.g. ``"nousresearch/hermes-4-0"``).
Used to select the upstream model AND detect reasoning support.
system_prompt:
Optional system prompt prepended to every conversation.
base_url:
OpenAI-compat endpoint base URL. Defaults to
``OPENAI_BASE_URL`` env var, then ``https://openrouter.ai/api/v1``.
api_key:
Provider API key. Defaults to ``OPENAI_API_KEY`` env var.
heartbeat:
Optional ``HeartbeatLoop`` instance used to surface the current
task description in the platform UI.
_client:
Inject a pre-built ``AsyncOpenAI`` (or compatible mock) — for
testing only. When provided, ``base_url`` and ``api_key`` are
ignored.
"""
def __init__(
self,
model: str,
system_prompt: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
heartbeat: "HeartbeatLoop | None" = None,
_client: Any = None,
) -> None:
self.model = model
self.system_prompt = system_prompt
self._heartbeat = heartbeat
self._provider = ProviderConfig(model)
if _client is not None:
# Test injection path — skip real AsyncOpenAI construction so
# unit tests don't need a live OpenAI API key.
self._client = _client
else:
# Lazy import keeps ``openai`` out of the global module-load path
# so callers that never use HermesA2AExecutor don't pay the import
# cost, and tests can stub ``sys.modules["openai"]`` before import.
from openai import AsyncOpenAI
self._client = AsyncOpenAI(
base_url=(
base_url
or os.environ.get("OPENAI_BASE_URL", "https://openrouter.ai/api/v1")
),
api_key=(
api_key
or os.environ.get("OPENAI_API_KEY", "")
),
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build_messages(self, user_input: str) -> list[dict]:
"""Assemble the ``messages`` list: optional system prompt then user turn."""
msgs: list[dict] = []
if self.system_prompt:
msgs.append({"role": "system", "content": self.system_prompt})
msgs.append({"role": "user", "content": user_input})
return msgs
def _log_reasoning(
self,
context: RequestContext,
reasoning: str | None,
reasoning_details: object | None,
) -> None:
"""Write the Hermes 4 reasoning trace to an OTEL span.
The trace is surfaced to Langfuse / Jaeger for operator inspection.
It is intentionally **not** returned to the caller — including it in
the A2A reply would contaminate the agent's next-turn context.
Any exception is swallowed so a telemetry failure never blocks the
response being returned.
"""
try:
from builtin_tools.telemetry import (
A2A_TASK_ID,
WORKSPACE_ID_ATTR,
get_tracer,
)
workspace_id = os.environ.get("WORKSPACE_ID", "unknown")
tracer = get_tracer()
with tracer.start_as_current_span("hermes.reasoning") as span:
span.set_attribute(WORKSPACE_ID_ATTR, workspace_id)
span.set_attribute(A2A_TASK_ID, context.context_id or "")
span.set_attribute("hermes.model", self.model)
span.set_attribute("hermes.reasoning_length", len(reasoning or ""))
if reasoning:
# Cap the preview attribute at 512 chars — full trace is
# stored in the span exporter's data store.
span.set_attribute("hermes.reasoning_preview", reasoning[:512])
if reasoning_details is not None:
span.set_attribute("hermes.has_reasoning_details", True)
except Exception:
logger.debug(
"hermes_executor: reasoning OTEL log failed (non-fatal)", exc_info=True
)
# ------------------------------------------------------------------
# AgentExecutor interface
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run a single Hermes turn and enqueue the reply as an A2A Message.
Sequence:
1. Extract user text from A2A message parts.
2. Build ``messages[]`` (optional system + user).
3. Call OpenAI-compat API; include ``extra_body`` for Hermes 4.
4. Extract and log reasoning trace — does NOT appear in the reply.
5. Enqueue a final ``Message`` with the content text.
"""
from shared_runtime import extract_message_text
user_input = extract_message_text(context)
if not user_input:
parts = getattr(getattr(context, "message", None), "parts", None)
logger.warning("HermesA2AExecutor: no text in message parts: %s", parts)
await event_queue.enqueue_event(
new_agent_text_message("Error: message contained no text content.")
)
return
messages = self._build_messages(user_input)
# Only Hermes 4 entries get extra_body — sending it to Hermes 3
# or other models is a no-op at best; a 400 at worst.
extra_body: dict | None = None
if self._provider.reasoning_supported:
extra_body = {"reasoning": {"enabled": True}}
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
extra_body=extra_body,
)
choice = response.choices[0]
content: str = choice.message.content or ""
# ``reasoning`` and ``reasoning_details`` are Hermes 4 / provider
# extensions not defined in the openai SDK's ChatCompletionMessage
# schema. They arrive as dynamic attributes when the upstream API
# returns them; getattr guards against their absence.
reasoning: str | None = getattr(choice.message, "reasoning", None)
reasoning_details: object | None = getattr(
choice.message, "reasoning_details", None
)
if reasoning or reasoning_details:
logger.info(
"hermes_executor: reasoning trace [model=%s len=%d]: %.200s...",
self.model,
len(reasoning or ""),
reasoning or "",
)
# Log to OTEL — intentionally omitted from the A2A reply.
self._log_reasoning(context, reasoning, reasoning_details)
final_text = content.strip() or "(no response generated)"
await event_queue.enqueue_event(new_agent_text_message(final_text))
except Exception as exc:
logger.error(
"hermes_executor: API error [model=%s]: %s",
self.model,
type(exc).__name__,
exc_info=True,
)
# Expose only the exception class name — not the message body,
# which may contain API keys, rate-limit metadata, or provider
# error details that shouldn't reach the end user.
# Mirrors the sanitize_agent_error() convention in cli_executor.py.
await event_queue.enqueue_event(
new_agent_text_message(f"Agent error: {type(exc).__name__}")
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel a running task — emits canceled state per A2A protocol."""
from a2a.types import TaskState, TaskStatus, TaskStatusUpdateEvent
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)