feat(hermes): escalation ladder — promote to stronger models on transient failure

Ships scoped Phase 3 of the Hermes multi-provider work. Every workspace
can now declare an ordered list of (provider, model) rungs; when the
pinned model hits rate-limit / 5xx / context-length / overload, the
executor advances to the next rung before raising.

## Why

3× Claude Max saturation is a routine occurrence now — the "first 429 on
a batch delegation" is the common path, not the exception. A workspace
pinned to Haiku that hits a context-length limit has no recovery today;
same for Sonnet hitting rate-limit mid-synthesis. Escalation promotes
to the next tier for that single call, preserves coordination, avoids
restart cascades.

## New module: adapters/hermes/escalation.py

- ``LadderRung(provider, model)`` — one config entry.
- ``parse_ladder(raw)`` — tolerant config parser; skips malformed rungs
  with a warning rather than raising so boot stays resilient.
- ``should_escalate(exc) -> bool`` — truth table over 15+ error shapes:
  - Typed classes (RateLimitError, OverloadedError, APITimeoutError,
    APIConnectionError, InternalServerError)
  - Context-length markers (each provider uses different phrasing)
  - Gateway markers (502/503/504, overloaded, temporarily unavailable)
  - Status-code substrings (429, 529, 5xx)
  - Hard-rejects auth failures (401/403/invalid_api_key) even if the
    outer exception class is RateLimitError — wrapping case matters.

## Executor wiring

``HermesA2AExecutor`` now accepts ``escalation_ladder`` in its
constructor + ``create_executor()`` factory. ``_do_inference()`` walks
the ladder:

  1. First attempt = pinned provider:model (matches pre-ladder behaviour)
  2. On escalatable error, try each rung in order
  3. On non-escalatable error, raise immediately (auth, malformed payload)
  4. On exhaustion, raise the last error

Rung switches temporarily rebind ``self.provider_cfg`` / ``self.model``
/ ``self.api_key`` / ``self.base_url`` in a try/finally, so any raised
error leaves the executor in its original state for the next call. Key
resolution for non-pinned rungs goes through ``resolve_provider`` which
reads the rung-provider's env vars fresh.

## Config shape

``config.yaml`` (rendered from ``org.yaml`` → workspace secrets):

    runtime_config:
      escalation_ladder:
        - provider: gemini
          model: gemini-2.5-flash
        - provider: anthropic
          model: claude-sonnet-4-5-20250929
        - provider: anthropic
          model: claude-opus-4-1-20250805

Empty / absent = single-shot behaviour, full backwards-compat with
every existing workspace.

## Tests

34 passing, all isolated (no network):

- ``test_hermes_escalation.py`` (28): parser + truth-table across
  rate-limit, overload, context-length, gateway, auth-reject, unrelated
  exceptions, and case-insensitivity.
- ``test_hermes_ladder_integration.py`` (6): no-ladder single call,
  ladder-not-triggered on success, escalate-on-rate-limit-then-succeed,
  stop-on-non-escalatable, raise-last-error-when-exhausted, skip-
  unknown-provider-in-rung.

## Not in this PR

- Uncertainty-driven escalation (judge pass after successful reply).
- Per-workspace budget tracking (#305 covers this separately).
- Live streaming reuse across rungs (ladder retries the whole call).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-16 02:27:27 -07:00
parent 37b288c79b
commit 3cd18929c4
5 changed files with 661 additions and 14 deletions

View File

@ -51,9 +51,18 @@ class HermesAdapter(BaseAdapter):
# Resolve API key: prefer workspace secrets (runtime_config), then env vars
hermes_api_key = config.runtime_config.get("hermes_api_key") or None
# Phase 3 escalation ladder — read from runtime_config.escalation_ladder
# if present. The platform's org importer copies the ladder from
# org.yaml (runtime_config.escalation_ladder) into the container's
# /configs/config.yaml, and the workspace-template loader surfaces it
# here. Empty / missing = single-shot behaviour (unchanged from pre-
# Phase-3). See adapters.hermes.escalation for classification rules.
escalation_ladder = config.runtime_config.get("escalation_ladder") or None
executor = create_executor(
hermes_api_key=hermes_api_key,
config_path=config.config_path, # Phase 2d-i: system-prompt.md injection
escalation_ladder=escalation_ladder,
)
# Override model from config if provided

View File

@ -0,0 +1,201 @@
"""Hermes escalation ladder — promote to stronger models on transient failure.
Every workspace in the Hermes adapter path has a single pinned model today
(``provider_cfg.default_model`` overridden by ``runtime_config.model`` in
``config.yaml``). That's fine when the pinned model is the best fit, but
it leaves four recurring failure classes unhandled:
1. **Rate limits** (Claude Max saturation, Anthropic 429, OpenAI 429). We're
currently saturating 3× Claude Max subscriptions the first 429 is now
the norm, not the exception.
2. **Transient 5xx** from any provider (overloaded 529, 500, 502, 503).
3. **Context-length exceeded** on the smaller-window model (Haiku has 200k,
cheaper Gemini flash tiers have less, OpenAI nano/mini have 128k).
4. **Refusal / empty response** from a cheaper tier that the next tier up
would handle less common but real in practice.
An escalation ladder is a workspace-configured list of ``LadderRung`` entries
(provider + model). On a qualifying failure, the executor advances to the
next rung and retries the same user_message + history. If the ladder is
exhausted, the last error is raised.
## Config shape
``config.yaml``::
hermes:
escalation_ladder:
- provider: gemini
model: gemini-2.5-flash # fast/cheap probe
- provider: anthropic
model: claude-haiku-4-5-20251001
- provider: anthropic
model: claude-sonnet-4-5-20250929
- provider: anthropic
model: claude-opus-4-1-20250805 # frontier rescue
When ``escalation_ladder`` is absent, the executor behaves exactly as before:
one call, one model, errors bubble.
## What this module does NOT do (yet)
- **No uncertainty-driven escalation.** Only transient-failure escalation.
Promoting on "the answer felt thin" requires a judge pass follow-up.
- **No streaming partial-result aggregation.** The first rung that succeeds
returns; we don't splice responses across rungs.
- **No per-workspace budget tracking.** Each escalation is one more paid
call. Follow-up work (#305 budget cap) handles that.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class LadderRung:
"""One rung on the escalation ladder.
``provider`` is a canonical short name from ``providers.PROVIDERS``.
``model`` overrides the provider's default for this rung.
"""
provider: str
model: str
def parse_ladder(raw: Optional[list]) -> list[LadderRung]:
"""Parse the ``escalation_ladder`` list from ``config.yaml`` into rungs.
Accepts either dict-shaped entries (``{"provider": ..., "model": ...}``)
or pre-built LadderRung instances (for programmatic callers). Skips
malformed entries with a warning rather than raising a missing rung
is worse than a noisy one during boot.
Empty / None / missing input returns an empty list (caller interprets
as "no ladder configured, single-shot dispatch").
"""
if not raw:
return []
rungs: list[LadderRung] = []
for i, entry in enumerate(raw):
if isinstance(entry, LadderRung):
rungs.append(entry)
continue
if not isinstance(entry, dict):
logger.warning(
"Hermes ladder: rung %d is not a dict (%r), skipping", i, type(entry).__name__,
)
continue
provider = entry.get("provider")
model = entry.get("model")
if not provider or not model:
logger.warning(
"Hermes ladder: rung %d missing provider or model (%r), skipping", i, entry,
)
continue
rungs.append(LadderRung(provider=str(provider), model=str(model)))
return rungs
# Error-type names that indicate a transient failure worth escalating.
# We match on the class name (not the module) so this works regardless of
# whether the workspace imported the new or old anthropic / openai SDK.
# See ``should_escalate`` for the matching logic.
_ESCALATABLE_ERROR_CLASSES = frozenset({
# openai SDK
"RateLimitError", # 429
"APITimeoutError", # connect/read timeout
"APIConnectionError", # TCP / DNS
"InternalServerError", # 500
# anthropic SDK
"OverloadedError", # 529
"APIStatusError", # generic 5xx wrapper
# common across both: network-level errors
"ConnectionError",
"Timeout",
"ReadTimeout",
})
# Error-message substrings that indicate context-length exceeded. These map
# to distinct HTTP 400 responses from each provider rather than a typed
# exception, so we match on substring.
_CONTEXT_LENGTH_MARKERS = (
"maximum context length", # openai
"context_length_exceeded", # openai error.code
"prompt is too long", # anthropic
"prompt_too_long", # anthropic error.code
"context window", # gemini
)
# Error-message substrings that indicate a transient gateway issue. These
# sometimes come through as generic exceptions without typed classes.
_TRANSIENT_GATEWAY_MARKERS = (
"502 bad gateway",
"503 service unavailable",
"504 gateway timeout",
"overloaded",
"please try again",
"temporarily unavailable",
)
# Error-message substrings that definitively DO NOT qualify for escalation.
# Auth and malformed-payload errors don't get better by retrying on a
# different model — they indicate config / code bugs.
_NON_ESCALATABLE_MARKERS = (
"invalid api key",
"authentication_error",
"401",
"403",
"forbidden",
"permission_denied",
"unauthorized",
)
def should_escalate(exc: BaseException) -> bool:
"""Decide whether ``exc`` justifies moving to the next ladder rung.
Returns True when the failure is one of:
- Rate limit (429 / RateLimitError / OverloadedError)
- Transient gateway (5xx, overload, timeout, connection reset)
- Context-length exceeded on the current model
Returns False for auth, permission, malformed-payload, and other
config-bug classes escalating those just wastes the next-tier quota.
"""
if exc is None:
return False
cls_name = exc.__class__.__name__
msg = str(exc).lower()
# Hard reject: never escalate auth/permission errors regardless of
# what the class name says. A wrapped RateLimitError that actually
# contains "401 Unauthorized" is a config bug, not a rate limit.
for marker in _NON_ESCALATABLE_MARKERS:
if marker in msg:
return False
if cls_name in _ESCALATABLE_ERROR_CLASSES:
return True
for marker in _CONTEXT_LENGTH_MARKERS:
if marker in msg:
return True
for marker in _TRANSIENT_GATEWAY_MARKERS:
if marker in msg:
return True
# Status-code prefixes are a common tell for HTTP-wrapped provider errors.
if "429" in msg or "529" in msg:
return True
if any(code in msg for code in ("500 ", "502 ", "503 ", "504 ")):
return True
return False

View File

@ -38,6 +38,7 @@ import logging
import os
from typing import Optional
from .escalation import LadderRung, parse_ladder, should_escalate
from .providers import PROVIDERS, ProviderConfig, resolve_provider
logger = logging.getLogger(__name__)
@ -48,6 +49,7 @@ def create_executor(
provider: Optional[str] = None,
model: Optional[str] = None,
config_path: Optional[str] = None,
escalation_ladder: Optional[list] = None,
):
"""Create and return a LangGraph-compatible executor for the Hermes adapter.
@ -84,6 +86,14 @@ def create_executor(
If ``provider`` is an unknown name, if ``provider`` is known but its
env vars are all empty, or if auto-detect finds nothing.
"""
ladder = parse_ladder(escalation_ladder)
if ladder:
logger.info(
"Hermes: escalation ladder configured — %d rungs (%s)",
len(ladder),
"".join(f"{r.provider}:{r.model}" for r in ladder),
)
# Path 1: PR 2 back-compat — explicit hermes_api_key routes to Nous Portal.
if hermes_api_key:
cfg = PROVIDERS["nous_portal"]
@ -93,6 +103,7 @@ def create_executor(
api_key=hermes_api_key,
model=model or cfg.default_model,
config_path=config_path,
escalation_ladder=ladder,
)
# Path 2/3: registry resolution (either explicit provider name or auto-detect).
@ -109,6 +120,7 @@ def create_executor(
api_key=api_key,
model=model or cfg.default_model,
config_path=config_path,
escalation_ladder=ladder,
)
@ -132,6 +144,7 @@ class HermesA2AExecutor:
model: str,
heartbeat=None,
config_path: Optional[str] = None,
escalation_ladder: Optional[list] = None,
):
self.provider_cfg = provider_cfg
self.api_key = api_key
@ -143,6 +156,11 @@ class HermesA2AExecutor:
# `system_instruction=` / prepended message. Optional because older
# callers + tests construct executors directly.
self._config_path = config_path
# Phase 3: escalation ladder. When non-empty, _do_inference retries
# transient-failure classes (rate limit, 5xx, overload, context-length)
# on each rung in turn before raising. Empty / None = single-shot,
# original behaviour. See adapters.hermes.escalation.
self._ladder: list[LadderRung] = parse_ladder(escalation_ladder) or []
# ------------------------------------------------------------------
# History → provider-specific message list converters
@ -344,23 +362,136 @@ class HermesA2AExecutor:
Phase 2c: multi-turn history.
Phase 2d-i: optional system_prompt is passed through to the native
system field of whichever path wins dispatch (OpenAI ``{role:system}``
/ Anthropic ``system=`` / Gemini ``system_instruction=``).
system field of whichever path wins dispatch.
Phase 3: when an escalation ladder is configured, transient failures
(rate limit, 5xx, overload, context-length) promote to the next rung
before raising. No ladder = single-shot, original behaviour.
"""
scheme = self.provider_cfg.auth_scheme
if scheme == "anthropic":
return await self._do_anthropic_native(user_message, history, system_prompt)
if scheme == "gemini":
return await self._do_gemini_native(user_message, history, system_prompt)
if scheme == "openai":
# Fast path: no ladder configured — single call on the pinned model.
if not self._ladder:
return await self._dispatch(
self.provider_cfg, self.model, user_message, history, system_prompt,
)
# Slow path: walk the ladder. Start with the pinned (provider, model)
# so the first attempt matches non-ladder behaviour exactly — the
# ladder only kicks in when the first attempt fails escalatably.
attempts: list[tuple[ProviderConfig, str]] = [(self.provider_cfg, self.model)]
for rung in self._ladder:
rung_cfg = PROVIDERS.get(rung.provider)
if rung_cfg is None:
logger.warning(
"Hermes ladder: provider %r not in registry, skipping rung",
rung.provider,
)
continue
attempts.append((rung_cfg, rung.model))
last_exc: Optional[BaseException] = None
for i, (cfg, model) in enumerate(attempts):
try:
reply = await self._dispatch(
cfg, model, user_message, history, system_prompt,
)
if i > 0:
logger.info(
"Hermes ladder: succeeded on rung %d (%s:%s) after %d failed attempt(s)",
i, cfg.name, model, i,
)
return reply
except Exception as exc:
last_exc = exc
if i == len(attempts) - 1:
logger.error(
"Hermes ladder: exhausted all %d rungs — raising. Last error on %s:%s: %s",
len(attempts), cfg.name, model, exc,
)
raise
if not should_escalate(exc):
logger.info(
"Hermes ladder: non-escalatable error on %s:%s — raising without advancing: %s",
cfg.name, model, exc,
)
raise
logger.warning(
"Hermes ladder: escalatable failure on rung %d (%s:%s), advancing. Error: %s",
i, cfg.name, model, exc,
)
# Unreachable — the last iteration either returns or raises, but
# satisfying the type checker without a blank return.
if last_exc is not None:
raise last_exc
return "" # pragma: no cover
async def _dispatch(
self,
cfg: ProviderConfig,
model: str,
user_message: str,
history: "list[tuple[str, str]] | None",
system_prompt: Optional[str],
) -> str:
"""Single-attempt dispatch on (cfg, model).
Temporarily rebinds ``self.provider_cfg`` + ``self.base_url`` + ``self.model``
so the existing per-provider paths pick up the rung's config. Restores
the original values in a finally block so a raised error leaves the
executor pinned to its constructor-given state (next call on the same
executor instance starts fresh at the top of the ladder).
For the ladder's non-first rungs, ``self.api_key`` must be the rung's
provider key we resolve it here via ``resolve_provider`` so the
first-rung API key (for the pinned provider) isn't mis-used against a
different provider's base URL. That lookup can raise ``ValueError``
when the rung's env var isn't set; ``should_escalate(ValueError)``
returns False so the ladder correctly STOPS rather than escalating
further into nothing.
"""
# Fast path: rung matches the executor's pinned config — reuse the
# existing api_key, skip the provider re-resolve.
if cfg is self.provider_cfg and model == self.model:
scheme = cfg.auth_scheme
if scheme == "anthropic":
return await self._do_anthropic_native(user_message, history, system_prompt)
if scheme == "gemini":
return await self._do_gemini_native(user_message, history, system_prompt)
if scheme == "openai":
return await self._do_openai_compat(user_message, history, system_prompt)
logger.warning(
"Hermes: unknown auth_scheme=%r for provider=%s — falling back to openai-compat",
scheme, cfg.name,
)
return await self._do_openai_compat(user_message, history, system_prompt)
# Unknown scheme — treat as openai-compat for forward-compat with any
# future provider the registry adds without yet having a native path.
logger.warning(
"Hermes: unknown auth_scheme=%r for provider=%s — falling back to openai-compat",
scheme, self.provider_cfg.name,
# Different rung — temporarily rebind provider_cfg + model + api_key.
# resolve_provider reads the rung's env vars fresh.
_, rung_key = resolve_provider(cfg.name)
orig_cfg, orig_model, orig_key, orig_base = (
self.provider_cfg, self.model, self.api_key, self.base_url,
)
return await self._do_openai_compat(user_message, history, system_prompt)
try:
self.provider_cfg = cfg
self.model = model
self.api_key = rung_key
self.base_url = cfg.base_url
scheme = cfg.auth_scheme
if scheme == "anthropic":
return await self._do_anthropic_native(user_message, history, system_prompt)
if scheme == "gemini":
return await self._do_gemini_native(user_message, history, system_prompt)
if scheme == "openai":
return await self._do_openai_compat(user_message, history, system_prompt)
logger.warning(
"Hermes: unknown auth_scheme=%r for provider=%s — falling back to openai-compat",
scheme, cfg.name,
)
return await self._do_openai_compat(user_message, history, system_prompt)
finally:
self.provider_cfg = orig_cfg
self.model = orig_model
self.api_key = orig_key
self.base_url = orig_base
# ------------------------------------------------------------------
# AgentExecutor interface

View File

@ -0,0 +1,146 @@
"""Tests for Hermes escalation-ladder classification and config parsing.
The truth table in ``should_escalate`` is the single chokepoint that
decides whether an inference failure wastes the next ladder rung's
quota or triggers a useful retry. These tests pin that table against
real exception shapes from anthropic / openai / google-genai SDKs and
the wrapped-error strings we've observed in platform logs.
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
# Make the workspace-template/ modules importable without installing.
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from adapters.hermes.escalation import ( # noqa: E402
LadderRung,
parse_ladder,
should_escalate,
)
# --------------------------------------------------------------------------
# parse_ladder
# --------------------------------------------------------------------------
def test_parse_ladder_empty_returns_empty():
assert parse_ladder(None) == []
assert parse_ladder([]) == []
def test_parse_ladder_accepts_dicts():
raw = [
{"provider": "gemini", "model": "gemini-2.5-flash"},
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"},
]
rungs = parse_ladder(raw)
assert len(rungs) == 2
assert rungs[0] == LadderRung("gemini", "gemini-2.5-flash")
assert rungs[1] == LadderRung("anthropic", "claude-opus-4-1-20250805")
def test_parse_ladder_passes_through_rung_instances():
# Programmatic callers can pass already-constructed rungs.
existing = LadderRung("openai", "gpt-4o-mini")
rungs = parse_ladder([existing])
assert rungs == [existing]
def test_parse_ladder_skips_malformed_entries():
# Missing model / missing provider / wrong type — all skipped with
# a warning, not raised. A missing rung is less bad than a boot fail.
raw = [
{"provider": "gemini"}, # no model
{"model": "gpt-4o"}, # no provider
"not a dict", # wrong type
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"}, # good
]
rungs = parse_ladder(raw)
assert len(rungs) == 1
assert rungs[0].provider == "anthropic"
# --------------------------------------------------------------------------
# should_escalate — truth table
# --------------------------------------------------------------------------
class _FakeRateLimitError(Exception):
"""Stand-in with the same class name the openai SDK uses (rate limits)."""
pass
_FakeRateLimitError.__name__ = "RateLimitError"
class _FakeOverloadedError(Exception):
"""Stand-in for anthropic.OverloadedError (HTTP 529)."""
pass
_FakeOverloadedError.__name__ = "OverloadedError"
class _FakeAPITimeoutError(Exception):
pass
_FakeAPITimeoutError.__name__ = "APITimeoutError"
class _FakeAPIConnectionError(Exception):
pass
_FakeAPIConnectionError.__name__ = "APIConnectionError"
class _FakeInternalServerError(Exception):
pass
_FakeInternalServerError.__name__ = "InternalServerError"
@pytest.mark.parametrize("exc,expected", [
# --- Escalatable: typed rate-limit / overload / timeout classes ---
(_FakeRateLimitError("rate_limit_exceeded on gpt-4o"), True),
(_FakeOverloadedError("overloaded_error"), True),
(_FakeAPITimeoutError("Request timed out."), True),
(_FakeAPIConnectionError("Connection error."), True),
(_FakeInternalServerError("Internal server error 500."), True),
# --- Escalatable: context-length exceeded on current model ---
(ValueError("This model's maximum context length is 200000 tokens. However, your messages resulted in ..."), True),
(RuntimeError("error: context_length_exceeded"), True),
(RuntimeError("prompt is too long: 210000 tokens"), True),
(RuntimeError("error.type: prompt_too_long"), True),
(RuntimeError("exceeds model context window of 1048576"), True),
# --- Escalatable: gateway markers (HTTP-wrapped) ---
(RuntimeError("Upstream 502 Bad Gateway"), True),
(RuntimeError("503 Service Unavailable"), True),
(RuntimeError("Service is temporarily unavailable, please try again."), True),
(RuntimeError("Anthropic API is overloaded."), True),
# --- Escalatable: status-code substrings ---
(RuntimeError("HTTP 429 Too Many Requests"), True),
(RuntimeError("HTTP 529 Overloaded"), True),
# --- NOT escalatable: auth / permission (config bugs, wasting quota) ---
(RuntimeError("401 Unauthorized — invalid api key"), False),
(RuntimeError("403 Forbidden: permission_denied"), False),
(RuntimeError("authentication_error: invalid_api_key"), False),
# --- NOT escalatable: auth-wrapped rate-limit (priority = hard-reject auth) ---
# If we see '401' + rate-limit markers simultaneously, prefer not escalating
# because the underlying 401 won't get better on a different model.
(_FakeRateLimitError("RateLimitError wrapping 401 Unauthorized"), False),
# --- NOT escalatable: unrelated errors ---
(ValueError("bad config"), False),
(KeyError("missing key"), False),
(None, False),
])
def test_should_escalate_truth_table(exc, expected):
assert should_escalate(exc) is expected
def test_should_escalate_case_insensitive():
# We lowercase the message before substring matching so "OVERLOADED"
# from one provider and "overloaded" from another both match.
assert should_escalate(RuntimeError("SERVICE OVERLOADED")) is True
assert should_escalate(RuntimeError("503 SERVICE UNAVAILABLE")) is True

View File

@ -0,0 +1,160 @@
"""Integration-ish tests for the Hermes executor's escalation behaviour.
These tests exercise ``_do_inference`` against a mocked ``_dispatch``
to prove that:
- No-ladder path is a single call (original behaviour)
- Ladder path retries on escalatable errors
- Ladder path stops early on non-escalatable errors
- Ladder path raises the last error when every rung fails
- Successful rung logs the recovery and returns
No network calls, no provider SDKs. If this ever starts calling real
providers, that's a test-isolation regression worth flagging.
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from adapters.hermes.escalation import LadderRung # noqa: E402
from adapters.hermes.executor import HermesA2AExecutor # noqa: E402
from adapters.hermes.providers import PROVIDERS # noqa: E402
class _FakeRateLimitError(Exception):
pass
_FakeRateLimitError.__name__ = "RateLimitError"
def _make_executor(monkeypatch, dispatch_behaviour, ladder=None):
"""Build an executor with a mocked ``_dispatch``.
``dispatch_behaviour`` is a callable that receives (cfg, model, user_msg,
history, system_prompt) and returns a string OR raises. Use this to
simulate success / failure per rung.
"""
cfg = PROVIDERS["anthropic"]
ex = HermesA2AExecutor(
provider_cfg=cfg,
api_key="test-key",
model="claude-haiku-4-5-20251001",
escalation_ladder=ladder,
)
calls: list[tuple[str, str]] = []
async def fake_dispatch(cfg, model, user_msg, history, system_prompt):
calls.append((cfg.name, model))
result = dispatch_behaviour(cfg.name, model, user_msg, history, system_prompt)
if isinstance(result, BaseException):
raise result
return result
monkeypatch.setattr(ex, "_dispatch", fake_dispatch)
return ex, calls
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro) if not asyncio._get_running_loop() else asyncio.run(coro)
def test_no_ladder_single_call(monkeypatch):
ex, calls = _make_executor(monkeypatch, lambda *_: "hello", ladder=None)
reply = asyncio.run(ex._do_inference("test"))
assert reply == "hello"
assert calls == [("anthropic", "claude-haiku-4-5-20251001")]
def test_ladder_not_triggered_on_success(monkeypatch):
# Ladder configured, but first attempt succeeds — ladder never engaged.
ladder = [
{"provider": "openai", "model": "gpt-4o-mini"},
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"},
]
ex, calls = _make_executor(monkeypatch, lambda *_: "fast reply", ladder=ladder)
reply = asyncio.run(ex._do_inference("test"))
assert reply == "fast reply"
assert len(calls) == 1
assert calls[0] == ("anthropic", "claude-haiku-4-5-20251001") # pinned (haiku) wins
def test_ladder_escalates_on_rate_limit(monkeypatch):
# First rung rate-limits, second rung (opus) succeeds.
attempt = {"n": 0}
def behaviour(provider, model, *_):
attempt["n"] += 1
if attempt["n"] == 1:
return _FakeRateLimitError("429 rate_limit_exceeded on anthropic")
return f"escalated reply from {provider}:{model}"
ladder = [
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"},
]
ex, calls = _make_executor(monkeypatch, behaviour, ladder=ladder)
reply = asyncio.run(ex._do_inference("test"))
assert "escalated reply" in reply
# Two attempts: pinned haiku (failed), then opus (succeeded).
assert [model for _, model in calls] == [
"claude-haiku-4-5-20251001",
"claude-opus-4-1-20250805",
]
def test_ladder_stops_on_non_escalatable_error(monkeypatch):
# First rung returns a 401 — ladder should NOT retry, should raise.
def behaviour(*_):
return RuntimeError("401 Unauthorized invalid api key")
ladder = [{"provider": "anthropic", "model": "claude-opus-4-1-20250805"}]
ex, calls = _make_executor(monkeypatch, behaviour, ladder=ladder)
with pytest.raises(RuntimeError, match="401"):
asyncio.run(ex._do_inference("test"))
# Only one attempt — non-escalatable error stopped the walk.
assert len(calls) == 1
def test_ladder_raises_last_error_when_all_rungs_fail(monkeypatch):
def behaviour(*_):
return _FakeRateLimitError("429 across the board")
ladder = [
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"},
]
ex, calls = _make_executor(monkeypatch, behaviour, ladder=ladder)
with pytest.raises(_FakeRateLimitError):
asyncio.run(ex._do_inference("test"))
# Both rungs attempted (pinned + one from ladder).
assert len(calls) == 2
def test_ladder_skips_unknown_provider(monkeypatch):
# A misconfigured rung with a non-existent provider is logged + skipped;
# ladder still walks remaining rungs.
def behaviour(provider, *_):
if provider == "anthropic":
return _FakeRateLimitError("first rung rate limit")
return f"ok from {provider}"
ladder = [
{"provider": "totally_made_up", "model": "fake-1"}, # should be skipped
{"provider": "anthropic", "model": "claude-opus-4-1-20250805"},
]
ex, calls = _make_executor(monkeypatch, behaviour, ladder=ladder)
# First attempt uses the pinned (haiku) which raises, then skips
# totally_made_up, then reaches opus. Because behaviour returns ok for
# provider==anthropic, the opus rung also fails (same provider). Assert
# the skip happened (call count reflects 2 real attempts, not 3).
with pytest.raises(_FakeRateLimitError):
asyncio.run(ex._do_inference("test"))
assert len(calls) == 2 # pinned + opus (totally_made_up skipped)