feat(hermes): expose reasoning mode for Hermes 4 via OpenAI-compat API (#496)

Hermes 4 is a hybrid-reasoning model trained on <think> tags; without asking
for thinking we pay flagship $/tok but get non-reasoning quality. This adds a
dedicated HermesA2AExecutor that dispatches to any OpenAI-compat endpoint
(OpenRouter, Nous Portal) and enables native reasoning for Hermes 4 models.

Key decisions:
- ProviderConfig + _reasoning_supported() detect Hermes 4 by model slug
  substring ("hermes-4", "hermes4") — case-insensitive, no config needed
- extra_body={"reasoning": {"enabled": True}} sent only to Hermes 4 entries;
  Hermes 3 path unchanged (no extra_body, no regressions)
- choices[0].message.reasoning + reasoning_details extracted and written to
  an OTEL span (hermes.reasoning) — deliberately NOT echoed in the A2A reply
  so the reasoning trace never contaminates the agent's next-turn context
- API key / base URL default to OPENAI_API_KEY / OPENAI_BASE_URL env vars
  with openrouter.ai/api/v1 as the fallback endpoint
- _client injection parameter for unit tests (no live API calls needed)
- Error sanitization: only exception class name surfaces to user (mirrors
  sanitize_agent_error() convention from cli_executor.py)

Test coverage: 35 tests, 100% coverage on all new code paths including:
  - _reasoning_supported() — Hermes 4/3/unknown/empty/uppercase
  - ProviderConfig — field assignment and capability flags
  - extra_body presence for Hermes 4, absence for Hermes 3
  - reasoning not in A2A reply; _log_reasoning called when trace present
  - reasoning_details forwarded; span attributes set correctly
  - Telemetry failure swallowed (never blocks response)
  - API error → sanitized class-name-only reply
  - cancel() → TaskStatusUpdateEvent(state=canceled)

Full suite: 990 passed, 0 failed (no regressions).

Resolves #496

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Molecule AI Backend Engineer 2026-04-16 20:38:45 +00:00
parent f88f221dfe
commit b7c8f18ab2
2 changed files with 1028 additions and 0 deletions

View File

@ -0,0 +1,327 @@
"""OpenAI-compat A2A executor for Hermes models with native reasoning support.
Dispatches to OpenRouter / Nous Portal (or any OpenAI-compatible endpoint)
and enables Hermes 4 native reasoning when the model supports it.
Reasoning (Hermes 4 only)
--------------------------
Hermes 4 is a hybrid-reasoning model trained on ``<think>`` tags. When
``reasoning_supported`` is True for the active model, this executor appends:
extra_body={"reasoning": {"enabled": True}}
to the ``chat.completions.create()`` call. The ``openai`` SDK forwards
``extra_body`` verbatim to the upstream provider, so both OpenRouter and
Nous Portal receive it without needing provider-specific code paths.
On response, ``choices[0].message.reasoning`` and
``choices[0].message.reasoning_details`` are extracted and written to an
OTEL activity span so operators can inspect the thinking trace in Langfuse
/ Jaeger. The reasoning content is deliberately **not** included in the
A2A reply doing so would contaminate the agent's next-turn context with
the model's internal scratchpad.
Hermes 3 / unknown models
--------------------------
No ``extra_body`` is sent. The response is processed identically to any
other OpenAI-compat model call. The Hermes 3 path is exercised by the
existing adapter test suite and must remain unchanged.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING, Any
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.utils import new_agent_text_message
if TYPE_CHECKING:
from heartbeat import HeartbeatLoop
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Per-model reasoning capability detection
# ---------------------------------------------------------------------------
# Substrings that identify a Hermes 4 model slug from either provider:
# OpenRouter: "nousresearch/hermes-4-*", "nousresearch/nous-hermes-4-*"
# Nous Portal: "hermes-4", "nous-hermes-4"
#
# Hermes 3 slugs ("hermes-3-llama-3.1-70b", etc.) do NOT contain any of
# these patterns, so they correctly resolve to reasoning_supported=False.
_HERMES4_PATTERNS: tuple[str, ...] = (
"hermes-4",
"hermes4",
)
def _reasoning_supported(model: str) -> bool:
"""Return True if *model* identifies a Hermes 4 variant.
Case-insensitive substring match against ``_HERMES4_PATTERNS``.
>>> _reasoning_supported("nousresearch/hermes-4-0")
True
>>> _reasoning_supported("nousresearch/nous-hermes-4")
True
>>> _reasoning_supported("nousresearch/hermes-3-llama-3.1-70b")
False
>>> _reasoning_supported("gpt-4o")
False
"""
model_lower = model.lower()
return any(pat in model_lower for pat in _HERMES4_PATTERNS)
# ---------------------------------------------------------------------------
# ProviderConfig — per-provider / per-model capability flags
# ---------------------------------------------------------------------------
class ProviderConfig:
"""Immutable capability record derived from a model identifier string.
Attributes:
model: Full model identifier (e.g. "nousresearch/hermes-4-0").
reasoning_supported: True for Hermes 4 entries on OpenRouter / Nous
Portal; False for Hermes 3 and all other models.
Example::
cfg = ProviderConfig("nousresearch/hermes-4-0")
assert cfg.reasoning_supported is True
cfg3 = ProviderConfig("nousresearch/hermes-3-llama-3.1-70b")
assert cfg3.reasoning_supported is False
"""
__slots__ = ("model", "reasoning_supported")
def __init__(self, model: str) -> None:
self.model: str = model
self.reasoning_supported: bool = _reasoning_supported(model)
def __repr__(self) -> str: # pragma: no cover
return (
f"ProviderConfig(model={self.model!r}, "
f"reasoning_supported={self.reasoning_supported})"
)
# ---------------------------------------------------------------------------
# HermesA2AExecutor
# ---------------------------------------------------------------------------
class HermesA2AExecutor(AgentExecutor):
"""A2A executor for Hermes models via OpenAI-compatible API.
Compared to the LangGraph executor, this is intentionally thin:
- Single API call per turn (no streaming or ReAct tool loop).
- System prompt injected as the first ``messages[]`` entry.
- Hermes 4 reasoning enabled via ``extra_body`` when supported.
- Reasoning trace logged to OTEL span never echoed in the reply.
Parameters
----------
model:
Full model identifier string (e.g. ``"nousresearch/hermes-4-0"``).
Used to select the upstream model AND detect reasoning support.
system_prompt:
Optional system prompt prepended to every conversation.
base_url:
OpenAI-compat endpoint base URL. Defaults to
``OPENAI_BASE_URL`` env var, then ``https://openrouter.ai/api/v1``.
api_key:
Provider API key. Defaults to ``OPENAI_API_KEY`` env var.
heartbeat:
Optional ``HeartbeatLoop`` instance used to surface the current
task description in the platform UI.
_client:
Inject a pre-built ``AsyncOpenAI`` (or compatible mock) for
testing only. When provided, ``base_url`` and ``api_key`` are
ignored.
"""
def __init__(
self,
model: str,
system_prompt: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
heartbeat: "HeartbeatLoop | None" = None,
_client: Any = None,
) -> None:
self.model = model
self.system_prompt = system_prompt
self._heartbeat = heartbeat
self._provider = ProviderConfig(model)
if _client is not None:
# Test injection path — skip real AsyncOpenAI construction so
# unit tests don't need a live OpenAI API key.
self._client = _client
else:
# Lazy import keeps ``openai`` out of the global module-load path
# so callers that never use HermesA2AExecutor don't pay the import
# cost, and tests can stub ``sys.modules["openai"]`` before import.
from openai import AsyncOpenAI
self._client = AsyncOpenAI(
base_url=(
base_url
or os.environ.get("OPENAI_BASE_URL", "https://openrouter.ai/api/v1")
),
api_key=(
api_key
or os.environ.get("OPENAI_API_KEY", "")
),
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build_messages(self, user_input: str) -> list[dict]:
"""Assemble the ``messages`` list: optional system prompt then user turn."""
msgs: list[dict] = []
if self.system_prompt:
msgs.append({"role": "system", "content": self.system_prompt})
msgs.append({"role": "user", "content": user_input})
return msgs
def _log_reasoning(
self,
context: RequestContext,
reasoning: str | None,
reasoning_details: object | None,
) -> None:
"""Write the Hermes 4 reasoning trace to an OTEL span.
The trace is surfaced to Langfuse / Jaeger for operator inspection.
It is intentionally **not** returned to the caller including it in
the A2A reply would contaminate the agent's next-turn context.
Any exception is swallowed so a telemetry failure never blocks the
response being returned.
"""
try:
from builtin_tools.telemetry import (
A2A_TASK_ID,
WORKSPACE_ID_ATTR,
get_tracer,
)
workspace_id = os.environ.get("WORKSPACE_ID", "unknown")
tracer = get_tracer()
with tracer.start_as_current_span("hermes.reasoning") as span:
span.set_attribute(WORKSPACE_ID_ATTR, workspace_id)
span.set_attribute(A2A_TASK_ID, context.context_id or "")
span.set_attribute("hermes.model", self.model)
span.set_attribute("hermes.reasoning_length", len(reasoning or ""))
if reasoning:
# Cap the preview attribute at 512 chars — full trace is
# stored in the span exporter's data store.
span.set_attribute("hermes.reasoning_preview", reasoning[:512])
if reasoning_details is not None:
span.set_attribute("hermes.has_reasoning_details", True)
except Exception:
logger.debug(
"hermes_executor: reasoning OTEL log failed (non-fatal)", exc_info=True
)
# ------------------------------------------------------------------
# AgentExecutor interface
# ------------------------------------------------------------------
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run a single Hermes turn and enqueue the reply as an A2A Message.
Sequence:
1. Extract user text from A2A message parts.
2. Build ``messages[]`` (optional system + user).
3. Call OpenAI-compat API; include ``extra_body`` for Hermes 4.
4. Extract and log reasoning trace does NOT appear in the reply.
5. Enqueue a final ``Message`` with the content text.
"""
from shared_runtime import extract_message_text
user_input = extract_message_text(context)
if not user_input:
parts = getattr(getattr(context, "message", None), "parts", None)
logger.warning("HermesA2AExecutor: no text in message parts: %s", parts)
await event_queue.enqueue_event(
new_agent_text_message("Error: message contained no text content.")
)
return
messages = self._build_messages(user_input)
# Only Hermes 4 entries get extra_body — sending it to Hermes 3
# or other models is a no-op at best; a 400 at worst.
extra_body: dict | None = None
if self._provider.reasoning_supported:
extra_body = {"reasoning": {"enabled": True}}
try:
response = await self._client.chat.completions.create(
model=self.model,
messages=messages,
extra_body=extra_body,
)
choice = response.choices[0]
content: str = choice.message.content or ""
# ``reasoning`` and ``reasoning_details`` are Hermes 4 / provider
# extensions not defined in the openai SDK's ChatCompletionMessage
# schema. They arrive as dynamic attributes when the upstream API
# returns them; getattr guards against their absence.
reasoning: str | None = getattr(choice.message, "reasoning", None)
reasoning_details: object | None = getattr(
choice.message, "reasoning_details", None
)
if reasoning or reasoning_details:
logger.info(
"hermes_executor: reasoning trace [model=%s len=%d]: %.200s...",
self.model,
len(reasoning or ""),
reasoning or "",
)
# Log to OTEL — intentionally omitted from the A2A reply.
self._log_reasoning(context, reasoning, reasoning_details)
final_text = content.strip() or "(no response generated)"
await event_queue.enqueue_event(new_agent_text_message(final_text))
except Exception as exc:
logger.error(
"hermes_executor: API error [model=%s]: %s",
self.model,
type(exc).__name__,
exc_info=True,
)
# Expose only the exception class name — not the message body,
# which may contain API keys, rate-limit metadata, or provider
# error details that shouldn't reach the end user.
# Mirrors the sanitize_agent_error() convention in cli_executor.py.
await event_queue.enqueue_event(
new_agent_text_message(f"Agent error: {type(exc).__name__}")
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Cancel a running task — emits canceled state per A2A protocol."""
from a2a.types import TaskState, TaskStatus, TaskStatusUpdateEvent
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)

View File

@ -0,0 +1,701 @@
"""Tests for hermes_executor.py — Hermes OpenAI-compat A2A executor.
Coverage targets
----------------
- _reasoning_supported() model name pattern detection
- ProviderConfig capability flags derived from model name
- HermesA2AExecutor.__init__ field assignment + client injection
- HermesA2AExecutor._build_messages system prompt + user turn assembly
- HermesA2AExecutor._log_reasoning OTEL span emission + swallowed errors
- HermesA2AExecutor.execute happy path, empty input, API error,
Hermes 4 extra_body, Hermes 3 no extra_body,
reasoning not in reply, reasoning_details
- HermesA2AExecutor.cancel TaskStatusUpdateEvent emitted
The ``openai`` module is stubbed in sys.modules so no real API call is made.
The A2A SDK types are already stubbed by conftest.py.
"""
import sys
from types import ModuleType
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Stub openai before hermes_executor is imported so AsyncOpenAI resolves to a
# controllable mock. conftest.py already stubs a2a and builtin_tools.
# ---------------------------------------------------------------------------
if "openai" not in sys.modules:
_openai_mod = ModuleType("openai")
class _StubAsyncOpenAI:
"""Minimal stand-in for openai.AsyncOpenAI — tests override this."""
def __init__(self, base_url=None, api_key=None):
self.base_url = base_url
self.api_key = api_key
self.chat = MagicMock()
_openai_mod.AsyncOpenAI = _StubAsyncOpenAI
sys.modules["openai"] = _openai_mod
# ---------------------------------------------------------------------------
# Stub shared_runtime.extract_message_text (mirrors the real implementation).
# ---------------------------------------------------------------------------
if "shared_runtime" not in sys.modules:
_sr_mod = ModuleType("shared_runtime")
def _extract_message_text(context_or_parts) -> str:
parts = getattr(getattr(context_or_parts, "message", None), "parts", None)
if parts is None:
parts = context_or_parts
texts = []
for p in parts or []:
t = getattr(p, "text", None) or getattr(
getattr(p, "root", None), "text", None
) or ""
if t:
texts.append(t)
return " ".join(texts).strip()
_sr_mod.extract_message_text = _extract_message_text
sys.modules["shared_runtime"] = _sr_mod
# Now import the module under test
from hermes_executor import ( # noqa: E402
HermesA2AExecutor,
ProviderConfig,
_HERMES4_PATTERNS,
_reasoning_supported,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_context(text: str, context_id: str = "ctx-test") -> MagicMock:
"""Return a mock RequestContext with the given text in message.parts."""
part = MagicMock()
part.text = text
ctx = MagicMock()
ctx.message.parts = [part]
ctx.context_id = context_id
return ctx
def _make_empty_context() -> MagicMock:
"""Return a context whose message parts contain no text."""
part = MagicMock(spec=[]) # no .text attribute
part.root = MagicMock(spec=[]) # no .root.text either
ctx = MagicMock()
ctx.message.parts = [part]
ctx.context_id = "ctx-empty"
return ctx
class _FakeMessage:
"""Minimal stand-in for openai ChatCompletionMessage.
Only sets *reasoning* / *reasoning_details* as real attributes when
explicitly provided matching what an upstream OpenAI-compat provider
returns (the SDK does NOT define these fields on ChatCompletionMessage;
they arrive as dynamic extras). Using a plain class rather than
MagicMock avoids MagicMock's auto-attribute creation, which would cause
``getattr(msg, "reasoning", None)`` to return a truthy MagicMock even
when the field was never set.
"""
def __init__(
self,
content: str,
reasoning: str | None = None,
reasoning_details=None,
*,
_set_reasoning: bool = False,
_set_reasoning_details: bool = False,
) -> None:
self.content = content
if _set_reasoning or reasoning is not None:
self.reasoning = reasoning
if _set_reasoning_details or reasoning_details is not None:
self.reasoning_details = reasoning_details
def _make_api_response(content: str, reasoning: str | None = None, reasoning_details=None):
"""Build a mock OpenAI ChatCompletion response."""
msg = _FakeMessage(content=content, reasoning=reasoning, reasoning_details=reasoning_details)
choice = MagicMock()
choice.message = msg
response = MagicMock()
response.choices = [choice]
return response
def _make_executor(
model: str = "nousresearch/hermes-4-0",
system_prompt: str | None = "You are Hermes.",
) -> tuple[HermesA2AExecutor, AsyncMock]:
"""Return (executor, mock_client) with a pre-wired async mock client."""
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock()
executor = HermesA2AExecutor(
model=model,
system_prompt=system_prompt,
_client=mock_client,
)
return executor, mock_client
# ---------------------------------------------------------------------------
# _reasoning_supported
# ---------------------------------------------------------------------------
def test_reasoning_supported_hermes4_slug():
"""Exact "hermes-4" substring → True."""
assert _reasoning_supported("nousresearch/hermes-4-0") is True
def test_reasoning_supported_hermes4_nous_portal():
"""Nous Portal style slug containing "hermes-4" → True."""
assert _reasoning_supported("nous-hermes-4") is True
def test_reasoning_supported_hermes4_uppercase():
"""Case-insensitive match — uppercase "HERMES-4" → True."""
assert _reasoning_supported("NOUSRESEARCH/HERMES-4") is True
def test_reasoning_supported_hermes4_compact():
"""Compact "hermes4" pattern → True."""
assert _reasoning_supported("hermes4-fine-tuned") is True
def test_reasoning_not_supported_hermes3():
"""Hermes 3 slug → False (pattern "hermes-3" not in _HERMES4_PATTERNS)."""
assert _reasoning_supported("nousresearch/hermes-3-llama-3.1-70b") is False
def test_reasoning_not_supported_gpt4():
"""Unrelated model → False."""
assert _reasoning_supported("gpt-4o") is False
def test_reasoning_not_supported_empty():
"""Empty string → False."""
assert _reasoning_supported("") is False
# ---------------------------------------------------------------------------
# ProviderConfig
# ---------------------------------------------------------------------------
def test_provider_config_hermes4():
"""Hermes 4 model → reasoning_supported=True."""
cfg = ProviderConfig("nousresearch/hermes-4-0")
assert cfg.model == "nousresearch/hermes-4-0"
assert cfg.reasoning_supported is True
def test_provider_config_hermes3():
"""Hermes 3 model → reasoning_supported=False."""
cfg = ProviderConfig("nousresearch/hermes-3-llama-3.1-70b")
assert cfg.reasoning_supported is False
def test_provider_config_unknown():
"""Unknown model → reasoning_supported=False."""
cfg = ProviderConfig("mistralai/mixtral-8x7b")
assert cfg.reasoning_supported is False
# ---------------------------------------------------------------------------
# HermesA2AExecutor construction
# ---------------------------------------------------------------------------
def test_constructor_fields_stored():
"""All constructor fields are persisted as attributes."""
mock_client = MagicMock()
executor = HermesA2AExecutor(
model="nousresearch/hermes-4-0",
system_prompt="sys",
_client=mock_client,
)
assert executor.model == "nousresearch/hermes-4-0"
assert executor.system_prompt == "sys"
assert executor._client is mock_client
assert isinstance(executor._provider, ProviderConfig)
assert executor._provider.reasoning_supported is True
def test_constructor_hermes3_reasoning_not_enabled():
"""Hermes 3 model → _provider.reasoning_supported is False."""
executor = HermesA2AExecutor(
model="nousresearch/hermes-3-llama-3.1-70b",
_client=MagicMock(),
)
assert executor._provider.reasoning_supported is False
def test_constructor_uses_injected_client():
"""When _client is supplied, AsyncOpenAI is never called."""
stub = MagicMock()
executor = HermesA2AExecutor(model="hermes-4", _client=stub)
assert executor._client is stub
# ---------------------------------------------------------------------------
# _build_messages
# ---------------------------------------------------------------------------
def test_build_messages_with_system_prompt():
"""System prompt is prepended as role=system."""
executor = HermesA2AExecutor(
model="hermes-4", system_prompt="Be helpful.", _client=MagicMock()
)
msgs = executor._build_messages("Hello!")
assert msgs[0] == {"role": "system", "content": "Be helpful."}
assert msgs[1] == {"role": "user", "content": "Hello!"}
def test_build_messages_no_system_prompt():
"""Without system_prompt only the user turn is present."""
executor = HermesA2AExecutor(
model="hermes-4", system_prompt=None, _client=MagicMock()
)
msgs = executor._build_messages("Hello!")
assert len(msgs) == 1
assert msgs[0] == {"role": "user", "content": "Hello!"}
# ---------------------------------------------------------------------------
# execute — happy path
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_returns_content():
"""Successful API call → content is enqueued as A2A message."""
executor, mock_client = _make_executor()
mock_client.chat.completions.create.return_value = _make_api_response("42")
ctx = _make_context("What is 6×7?")
eq = AsyncMock()
await executor.execute(ctx, eq)
eq.enqueue_event.assert_called_once_with("42")
@pytest.mark.asyncio
async def test_execute_empty_content_returns_fallback():
"""Empty content string → fallback message '(no response generated)'."""
executor, mock_client = _make_executor()
mock_client.chat.completions.create.return_value = _make_api_response("")
ctx = _make_context("ping")
eq = AsyncMock()
await executor.execute(ctx, eq)
eq.enqueue_event.assert_called_once_with("(no response generated)")
@pytest.mark.asyncio
async def test_execute_strips_whitespace_content():
"""Content with only whitespace is treated as empty → fallback."""
executor, mock_client = _make_executor()
mock_client.chat.completions.create.return_value = _make_api_response(" \n ")
ctx = _make_context("ping")
eq = AsyncMock()
await executor.execute(ctx, eq)
eq.enqueue_event.assert_called_once_with("(no response generated)")
# ---------------------------------------------------------------------------
# execute — empty input
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_empty_input_returns_error():
"""Message with no extractable text → error message, no API call."""
executor, mock_client = _make_executor()
ctx = _make_empty_context()
eq = AsyncMock()
await executor.execute(ctx, eq)
eq.enqueue_event.assert_called_once_with(
"Error: message contained no text content."
)
mock_client.chat.completions.create.assert_not_called()
# ---------------------------------------------------------------------------
# execute — Hermes 4 extra_body
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hermes4_sends_reasoning_extra_body():
"""Hermes 4 model → extra_body with reasoning enabled is sent."""
executor, mock_client = _make_executor(model="nousresearch/hermes-4-0")
mock_client.chat.completions.create.return_value = _make_api_response("ok")
await executor.execute(_make_context("hello"), AsyncMock())
call_kwargs = mock_client.chat.completions.create.call_args[1]
assert call_kwargs["extra_body"] == {"reasoning": {"enabled": True}}
@pytest.mark.asyncio
async def test_execute_hermes3_no_extra_body():
"""Hermes 3 model → extra_body=None, no reasoning injection."""
executor, mock_client = _make_executor(model="nousresearch/hermes-3-llama-3.1-70b")
mock_client.chat.completions.create.return_value = _make_api_response("ok")
await executor.execute(_make_context("hello"), AsyncMock())
call_kwargs = mock_client.chat.completions.create.call_args[1]
assert call_kwargs["extra_body"] is None
@pytest.mark.asyncio
async def test_execute_model_passed_to_api():
"""The model name is forwarded verbatim to the API call."""
model = "nousresearch/hermes-4-0"
executor, mock_client = _make_executor(model=model)
mock_client.chat.completions.create.return_value = _make_api_response("ok")
await executor.execute(_make_context("hi"), AsyncMock())
call_kwargs = mock_client.chat.completions.create.call_args[1]
assert call_kwargs["model"] == model
# ---------------------------------------------------------------------------
# execute — reasoning trace handling
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_reasoning_not_in_reply():
"""Reasoning trace is present in response but NOT included in the A2A reply."""
executor, mock_client = _make_executor(model="nousresearch/hermes-4-0")
response = _make_api_response(
content="The answer is 42.",
reasoning="<think>First I compute 6×7...</think>",
)
mock_client.chat.completions.create.return_value = response
eq = AsyncMock()
await executor.execute(_make_context("6×7?"), eq)
# Reply must contain ONLY the content, not the reasoning
enqueued = eq.enqueue_event.call_args[0][0]
assert enqueued == "The answer is 42."
assert "<think>" not in enqueued
assert "6×7" not in enqueued # reasoning text excluded
@pytest.mark.asyncio
async def test_execute_reasoning_logged_via_otel(monkeypatch):
"""Reasoning trace → _log_reasoning is called."""
executor, mock_client = _make_executor(model="nousresearch/hermes-4-0")
response = _make_api_response(
content="Answer.",
reasoning="<think>reasoning here</think>",
)
mock_client.chat.completions.create.return_value = response
log_calls: list = []
original_log = executor._log_reasoning
def capturing_log(context, reasoning, reasoning_details):
log_calls.append((reasoning, reasoning_details))
return original_log(context, reasoning, reasoning_details)
monkeypatch.setattr(executor, "_log_reasoning", capturing_log)
await executor.execute(_make_context("test"), AsyncMock())
assert len(log_calls) == 1
assert log_calls[0][0] == "<think>reasoning here</think>"
@pytest.mark.asyncio
async def test_execute_reasoning_details_logged(monkeypatch):
"""reasoning_details field is passed through to _log_reasoning."""
executor, mock_client = _make_executor(model="hermes-4")
details = {"steps": ["step1", "step2"]}
response = _make_api_response(
content="ok",
reasoning="some reasoning",
reasoning_details=details,
)
mock_client.chat.completions.create.return_value = response
log_calls: list = []
def capturing_log(context, reasoning, reasoning_details):
log_calls.append((reasoning, reasoning_details))
monkeypatch.setattr(executor, "_log_reasoning", capturing_log)
await executor.execute(_make_context("test"), AsyncMock())
assert log_calls[0][1] is details
@pytest.mark.asyncio
async def test_execute_no_reasoning_field_no_log(monkeypatch):
"""Response with no reasoning attribute → _log_reasoning not called."""
executor, mock_client = _make_executor(model="nousresearch/hermes-4-0")
# _make_api_response with no reasoning arg → no .reasoning attribute set
response = _make_api_response(content="ok")
mock_client.chat.completions.create.return_value = response
log_calls: list = []
monkeypatch.setattr(executor, "_log_reasoning", lambda *a: log_calls.append(a))
await executor.execute(_make_context("test"), AsyncMock())
assert log_calls == []
# ---------------------------------------------------------------------------
# execute — API error handling
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_api_error_returns_sanitized_message():
"""API exception → class name only in the A2A reply (no message body)."""
executor, mock_client = _make_executor()
class FakeAPIError(Exception):
pass
mock_client.chat.completions.create.side_effect = FakeAPIError(
"api_key=sk-secret123 rate limit exceeded"
)
eq = AsyncMock()
await executor.execute(_make_context("hello"), eq)
enqueued = eq.enqueue_event.call_args[0][0]
assert enqueued == "Agent error: FakeAPIError"
# Secret must NOT leak
assert "sk-secret" not in enqueued
assert "rate limit" not in enqueued
@pytest.mark.asyncio
async def test_execute_api_error_is_logged(caplog):
"""API exception is logged at ERROR level."""
import logging
executor, mock_client = _make_executor()
mock_client.chat.completions.create.side_effect = ValueError("bad request")
with caplog.at_level(logging.ERROR, logger="hermes_executor"):
await executor.execute(_make_context("hello"), AsyncMock())
assert any("API error" in r.message for r in caplog.records)
# ---------------------------------------------------------------------------
# _log_reasoning — direct unit tests
# ---------------------------------------------------------------------------
def test_log_reasoning_otel_span_attributes():
"""_log_reasoning sets the expected OTEL span attributes."""
executor, _ = _make_executor(model="nousresearch/hermes-4-0")
mock_span = MagicMock()
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value.__enter__ = MagicMock(
return_value=mock_span
)
mock_tracer.start_as_current_span.return_value.__exit__ = MagicMock(
return_value=False
)
ctx = MagicMock()
ctx.context_id = "ctx-abc"
with patch("hermes_executor.os.environ.get", return_value="ws-123"), \
patch("hermes_executor.logger"):
# Patch builtin_tools.telemetry inside the method
import builtin_tools.telemetry as _tel
original_get_tracer = _tel.get_tracer
_tel.get_tracer = MagicMock(return_value=mock_tracer)
try:
executor._log_reasoning(ctx, "deep thinking here", None)
finally:
_tel.get_tracer = original_get_tracer
mock_span.set_attribute.assert_any_call("hermes.model", "nousresearch/hermes-4-0")
mock_span.set_attribute.assert_any_call("hermes.reasoning_length", len("deep thinking here"))
mock_span.set_attribute.assert_any_call("hermes.reasoning_preview", "deep thinking here")
def test_log_reasoning_swallows_telemetry_error(caplog):
"""_log_reasoning never raises even when OTEL throws."""
import logging
executor, _ = _make_executor()
ctx = MagicMock()
ctx.context_id = "ctx-xyz"
with patch("builtin_tools.telemetry.get_tracer", side_effect=RuntimeError("boom")):
# Must not raise
executor._log_reasoning(ctx, "reasoning text", None)
def test_log_reasoning_has_reasoning_details_attribute():
"""reasoning_details → has_reasoning_details span attribute set to True."""
executor, _ = _make_executor(model="hermes-4")
mock_span = MagicMock()
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value.__enter__ = MagicMock(
return_value=mock_span
)
mock_tracer.start_as_current_span.return_value.__exit__ = MagicMock(
return_value=False
)
ctx = MagicMock()
ctx.context_id = "ctx-rd"
import builtin_tools.telemetry as _tel
original = _tel.get_tracer
_tel.get_tracer = MagicMock(return_value=mock_tracer)
try:
executor._log_reasoning(ctx, None, {"steps": []})
finally:
_tel.get_tracer = original
mock_span.set_attribute.assert_any_call("hermes.has_reasoning_details", True)
def test_log_reasoning_no_preview_when_reasoning_is_none():
"""When reasoning is None, hermes.reasoning_preview attribute is not set."""
executor, _ = _make_executor(model="hermes-4")
mock_span = MagicMock()
mock_tracer = MagicMock()
mock_tracer.start_as_current_span.return_value.__enter__ = MagicMock(
return_value=mock_span
)
mock_tracer.start_as_current_span.return_value.__exit__ = MagicMock(
return_value=False
)
ctx = MagicMock()
ctx.context_id = "ctx-none"
import builtin_tools.telemetry as _tel
original = _tel.get_tracer
_tel.get_tracer = MagicMock(return_value=mock_tracer)
try:
executor._log_reasoning(ctx, None, None)
finally:
_tel.get_tracer = original
# hermes.reasoning_preview should NOT have been set
preview_calls = [
c for c in mock_span.set_attribute.call_args_list
if c[0][0] == "hermes.reasoning_preview"
]
assert preview_calls == []
# ---------------------------------------------------------------------------
# cancel
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cancel_emits_canceled_event():
"""cancel() enqueues a TaskStatusUpdateEvent with state=canceled."""
executor, _ = _make_executor()
# Stub a2a.types if not already present with minimal TaskStatusUpdateEvent
import a2a.types as a2a_types
class _TaskState:
canceled = "canceled"
class _TaskStatus:
def __init__(self, state):
self.state = state
class _TaskStatusUpdateEvent:
def __init__(self, status, final):
self.status = status
self.final = final
a2a_types.TaskState = _TaskState
a2a_types.TaskStatus = _TaskStatus
a2a_types.TaskStatusUpdateEvent = _TaskStatusUpdateEvent
eq = AsyncMock()
ctx = MagicMock()
await executor.cancel(ctx, eq)
eq.enqueue_event.assert_called_once()
event = eq.enqueue_event.call_args[0][0]
assert isinstance(event, _TaskStatusUpdateEvent)
assert event.status.state == "canceled"
assert event.final is True
# ---------------------------------------------------------------------------
# Integration: system prompt is sent with messages
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_system_prompt_included_in_api_call():
"""System prompt appears as first message in the API call."""
executor, mock_client = _make_executor(
model="hermes-4", system_prompt="You are a math tutor."
)
mock_client.chat.completions.create.return_value = _make_api_response("6")
await executor.execute(_make_context("3+3?"), AsyncMock())
msgs = mock_client.chat.completions.create.call_args[1]["messages"]
assert msgs[0] == {"role": "system", "content": "You are a math tutor."}
assert msgs[1]["role"] == "user"
assert "3+3?" in msgs[1]["content"]
@pytest.mark.asyncio
async def test_no_system_prompt_only_user_message():
"""Without system_prompt, only the user turn is in messages."""
executor, mock_client = _make_executor(model="hermes-4", system_prompt=None)
mock_client.chat.completions.create.return_value = _make_api_response("ok")
await executor.execute(_make_context("hello"), AsyncMock())
msgs = mock_client.chat.completions.create.call_args[1]["messages"]
assert len(msgs) == 1
assert msgs[0]["role"] == "user"