fix: eliminate provider hang dead zones in retry/timeout architecture (#8985)
Three targeted changes to close the gaps between retry layers that caused users to experience 'No response from provider for 580s' and 'No activity for 15 minutes' despite having 5 layers of retry: 1. Remove non-streaming fallback from streaming path Previously, when all 3 stream retries exhausted, the code fell back to _interruptible_api_call() which had no stale detection and no activity tracking — a black hole that could hang for up to 1800s. Now errors propagate to the main retry loop which has richer recovery (credential rotation, provider fallback, backoff). For 'stream not supported' errors, sets _disable_streaming flag so the main retry loop automatically switches to non-streaming on the next attempt. 2. Add _touch_activity to recovery dead zones The gateway inactivity monitor relies on _touch_activity() to know the agent is alive, but activity was never touched during: - Stale stream detection/kill cycles (180-300s gaps) - Stream retry connection rebuilds - Main retry backoff sleeps (up to 120s) - Error recovery classification Now all these paths touch activity every ~30s, keeping the gateway informed during recovery cycles. 3. Add stale-call detector to non-streaming path _interruptible_api_call() now has the same stale detection pattern as the streaming path: kills hung connections after 300s (default, configurable via HERMES_API_CALL_STALE_TIMEOUT), scaled for large contexts (450s for 50K+ tokens, 600s for 100K+ tokens), disabled for local providers. Also touches activity every ~30s during the wait so the gateway monitor stays informed. Env vars: - HERMES_API_CALL_STALE_TIMEOUT: non-streaming stale timeout (default 300s) - HERMES_STREAM_STALE_TIMEOUT: unchanged (default 180s) Before: worst case ~2+ hours of sequential retries with no feedback After: worst case bounded by gateway inactivity timeout (default 1800s) with continuous activity reporting
This commit is contained in:
parent
acdff020b7
commit
a5bd56eae3
138
run_agent.py
138
run_agent.py
@ -4704,6 +4704,11 @@ class AIAgent:
|
||||
Each worker thread gets its own OpenAI client instance. Interrupts only
|
||||
close that worker-local client, so retries and other requests never
|
||||
inherit a closed transport.
|
||||
|
||||
Includes a stale-call detector: if no response arrives within the
|
||||
configured timeout, the connection is killed and an error raised so
|
||||
the main retry loop can try again with backoff / credential rotation /
|
||||
provider fallback.
|
||||
"""
|
||||
result = {"response": None, "error": None}
|
||||
request_client_holder = {"client": None}
|
||||
@ -4729,10 +4734,86 @@ class AIAgent:
|
||||
if request_client is not None:
|
||||
self._close_request_openai_client(request_client, reason="request_complete")
|
||||
|
||||
# ── Stale-call timeout (mirrors streaming stale detector) ────────
|
||||
# Non-streaming calls return nothing until the full response is
|
||||
# ready. Without this, a hung provider can block for the full
|
||||
# httpx timeout (default 1800s) with zero feedback. The stale
|
||||
# detector kills the connection early so the main retry loop can
|
||||
# apply richer recovery (credential rotation, provider fallback).
|
||||
_stale_base = float(os.getenv("HERMES_API_CALL_STALE_TIMEOUT", 300.0))
|
||||
_base_url = getattr(self, "_base_url", None) or ""
|
||||
if _stale_base == 300.0 and _base_url and is_local_endpoint(_base_url):
|
||||
_stale_timeout = float("inf")
|
||||
else:
|
||||
_est_tokens = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4
|
||||
if _est_tokens > 100_000:
|
||||
_stale_timeout = max(_stale_base, 600.0)
|
||||
elif _est_tokens > 50_000:
|
||||
_stale_timeout = max(_stale_base, 450.0)
|
||||
else:
|
||||
_stale_timeout = _stale_base
|
||||
|
||||
_call_start = time.time()
|
||||
self._touch_activity("waiting for non-streaming API response")
|
||||
|
||||
t = threading.Thread(target=_call, daemon=True)
|
||||
t.start()
|
||||
_poll_count = 0
|
||||
while t.is_alive():
|
||||
t.join(timeout=0.3)
|
||||
_poll_count += 1
|
||||
|
||||
# Touch activity every ~30s so the gateway's inactivity
|
||||
# monitor knows we're alive while waiting for the response.
|
||||
if _poll_count % 100 == 0: # 100 × 0.3s = 30s
|
||||
_elapsed = time.time() - _call_start
|
||||
self._touch_activity(
|
||||
f"waiting for non-streaming response ({int(_elapsed)}s elapsed)"
|
||||
)
|
||||
|
||||
# Stale-call detector: kill the connection if no response
|
||||
# arrives within the configured timeout.
|
||||
_elapsed = time.time() - _call_start
|
||||
if _elapsed > _stale_timeout:
|
||||
_est_ctx = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4
|
||||
logger.warning(
|
||||
"Non-streaming API call stale for %.0fs (threshold %.0fs). "
|
||||
"model=%s context=~%s tokens. Killing connection.",
|
||||
_elapsed, _stale_timeout,
|
||||
api_kwargs.get("model", "unknown"), f"{_est_ctx:,}",
|
||||
)
|
||||
self._emit_status(
|
||||
f"⚠️ No response from provider for {int(_elapsed)}s "
|
||||
f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). "
|
||||
f"Aborting call."
|
||||
)
|
||||
try:
|
||||
if self.api_mode == "anthropic_messages":
|
||||
from agent.anthropic_adapter import build_anthropic_client
|
||||
|
||||
self._anthropic_client.close()
|
||||
self._anthropic_client = build_anthropic_client(
|
||||
self._anthropic_api_key,
|
||||
getattr(self, "_anthropic_base_url", None),
|
||||
)
|
||||
else:
|
||||
rc = request_client_holder.get("client")
|
||||
if rc is not None:
|
||||
self._close_request_openai_client(rc, reason="stale_call_kill")
|
||||
except Exception:
|
||||
pass
|
||||
self._touch_activity(
|
||||
f"stale non-streaming call killed after {int(_elapsed)}s"
|
||||
)
|
||||
# Wait briefly for the thread to notice the closed connection.
|
||||
t.join(timeout=2.0)
|
||||
if result["error"] is None and result["response"] is None:
|
||||
result["error"] = TimeoutError(
|
||||
f"Non-streaming API call timed out after {int(_elapsed)}s "
|
||||
f"with no response (threshold: {int(_stale_timeout)}s)"
|
||||
)
|
||||
break
|
||||
|
||||
if self._interrupt_requested:
|
||||
# Force-close the in-flight worker-local HTTP connection to stop
|
||||
# token generation without poisoning the shared client used to
|
||||
@ -5247,6 +5328,10 @@ class AIAgent:
|
||||
f"({type(e).__name__}). Reconnecting… "
|
||||
f"(attempt {_stream_attempt + 2}/{_max_stream_retries + 1})"
|
||||
)
|
||||
self._touch_activity(
|
||||
f"stream retry {_stream_attempt + 2}/{_max_stream_retries + 1} "
|
||||
f"after {type(e).__name__}"
|
||||
)
|
||||
# Close the stale request client before retry
|
||||
stale = request_client_holder.get("client")
|
||||
if stale is not None:
|
||||
@ -5270,8 +5355,7 @@ class AIAgent:
|
||||
"try again in a moment."
|
||||
)
|
||||
logger.warning(
|
||||
"Streaming exhausted %s retries on transient error, "
|
||||
"falling back to non-streaming: %s",
|
||||
"Streaming exhausted %s retries on transient error: %s",
|
||||
_max_stream_retries + 1,
|
||||
e,
|
||||
)
|
||||
@ -5282,25 +5366,24 @@ class AIAgent:
|
||||
and "not supported" in _err_lower
|
||||
)
|
||||
if _is_stream_unsupported:
|
||||
self._disable_streaming = True
|
||||
self._safe_print(
|
||||
"\n⚠ Streaming is not supported for this "
|
||||
"model/provider. Falling back to non-streaming.\n"
|
||||
"model/provider. Switching to non-streaming.\n"
|
||||
" To avoid this delay, set display.streaming: false "
|
||||
"in config.yaml\n"
|
||||
)
|
||||
logger.info(
|
||||
"Streaming failed before delivery, falling back to non-streaming: %s",
|
||||
"Streaming failed before delivery: %s",
|
||||
e,
|
||||
)
|
||||
|
||||
try:
|
||||
# Reset stale timer — the non-streaming fallback
|
||||
# uses its own client; prevent the stale detector
|
||||
# from firing on stale timestamps from failed streams.
|
||||
last_chunk_time["t"] = time.time()
|
||||
result["response"] = self._interruptible_api_call(api_kwargs)
|
||||
except Exception as fallback_err:
|
||||
result["error"] = fallback_err
|
||||
# Propagate the error to the main retry loop instead of
|
||||
# falling back to non-streaming inline. The main loop has
|
||||
# richer recovery: credential rotation, provider fallback,
|
||||
# backoff, and — for "stream not supported" — will switch
|
||||
# to non-streaming on the next attempt via _disable_streaming.
|
||||
result["error"] = e
|
||||
return
|
||||
finally:
|
||||
request_client = request_client_holder.get("client")
|
||||
@ -5366,6 +5449,9 @@ class AIAgent:
|
||||
# Reset the timer so we don't kill repeatedly while
|
||||
# the inner thread processes the closure.
|
||||
last_chunk_time["t"] = time.time()
|
||||
self._touch_activity(
|
||||
f"stale stream detected after {int(_stale_elapsed)}s, reconnecting"
|
||||
)
|
||||
|
||||
if self._interrupt_requested:
|
||||
try:
|
||||
@ -8150,7 +8236,12 @@ class AIAgent:
|
||||
self.thinking_callback("")
|
||||
|
||||
_use_streaming = True
|
||||
if not self._has_stream_consumers():
|
||||
# Provider signaled "stream not supported" on a previous
|
||||
# attempt — switch to non-streaming for the rest of this
|
||||
# session instead of re-failing every retry.
|
||||
if getattr(self, "_disable_streaming", False):
|
||||
_use_streaming = False
|
||||
elif not self._has_stream_consumers():
|
||||
# No display/TTS consumer. Still prefer streaming for
|
||||
# health checking, but skip for Mock clients in tests
|
||||
# (mocks return SimpleNamespace, not stream iterators).
|
||||
@ -8352,6 +8443,7 @@ class AIAgent:
|
||||
|
||||
# Sleep in small increments to stay responsive to interrupts
|
||||
sleep_end = time.time() + wait_time
|
||||
_backoff_touch_counter = 0
|
||||
while time.time() < sleep_end:
|
||||
if self._interrupt_requested:
|
||||
self._vprint(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.", force=True)
|
||||
@ -8365,6 +8457,14 @@ class AIAgent:
|
||||
"interrupted": True,
|
||||
}
|
||||
time.sleep(0.2)
|
||||
# Touch activity every ~30s so the gateway's inactivity
|
||||
# monitor knows we're alive during backoff waits.
|
||||
_backoff_touch_counter += 1
|
||||
if _backoff_touch_counter % 150 == 0: # 150 × 0.2s = 30s
|
||||
self._touch_activity(
|
||||
f"retry backoff ({retry_count}/{max_retries}), "
|
||||
f"{int(sleep_end - time.time())}s remaining"
|
||||
)
|
||||
continue # Retry the API call
|
||||
|
||||
# Check finish_reason before proceeding
|
||||
@ -8837,6 +8937,9 @@ class AIAgent:
|
||||
|
||||
retry_count += 1
|
||||
elapsed_time = time.time() - api_start_time
|
||||
self._touch_activity(
|
||||
f"API error recovery (attempt {retry_count}/{max_retries})"
|
||||
)
|
||||
|
||||
error_type = type(api_error).__name__
|
||||
error_msg = str(api_error).lower()
|
||||
@ -9363,6 +9466,7 @@ class AIAgent:
|
||||
# Sleep in small increments so we can respond to interrupts quickly
|
||||
# instead of blocking the entire wait_time in one sleep() call
|
||||
sleep_end = time.time() + wait_time
|
||||
_backoff_touch_counter = 0
|
||||
while time.time() < sleep_end:
|
||||
if self._interrupt_requested:
|
||||
self._vprint(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.", force=True)
|
||||
@ -9376,6 +9480,14 @@ class AIAgent:
|
||||
"interrupted": True,
|
||||
}
|
||||
time.sleep(0.2) # Check interrupt every 200ms
|
||||
# Touch activity every ~30s so the gateway's inactivity
|
||||
# monitor knows we're alive during backoff waits.
|
||||
_backoff_touch_counter += 1
|
||||
if _backoff_touch_counter % 150 == 0: # 150 × 0.2s = 30s
|
||||
self._touch_activity(
|
||||
f"error retry backoff ({retry_count}/{max_retries}), "
|
||||
f"{int(sleep_end - time.time())}s remaining"
|
||||
)
|
||||
|
||||
# If the API call was interrupted, skip response processing
|
||||
if interrupted:
|
||||
|
||||
@ -3519,8 +3519,8 @@ class TestStreamingApiCall:
|
||||
call_kwargs = agent.client.chat.completions.create.call_args
|
||||
assert call_kwargs[1].get("stream") is True or call_kwargs.kwargs.get("stream") is True
|
||||
|
||||
def test_api_exception_falls_back_to_non_streaming(self, agent):
|
||||
"""When streaming fails before any deltas, fallback to non-streaming is attempted."""
|
||||
def test_api_exception_propagates_no_non_streaming_fallback(self, agent):
|
||||
"""When streaming fails before any deltas, error propagates to the main retry loop."""
|
||||
agent.client.chat.completions.create.side_effect = ConnectionError("fail")
|
||||
# Prevent stream retry logic from replacing the mock client
|
||||
with patch.object(agent, "_replace_primary_openai_client", return_value=False):
|
||||
|
||||
@ -374,13 +374,19 @@ class TestStreamingCallbacks:
|
||||
|
||||
|
||||
class TestStreamingFallback:
|
||||
"""Verify fallback to non-streaming on ANY streaming error."""
|
||||
"""Verify streaming errors propagate to the main retry loop.
|
||||
|
||||
Previously, streaming errors triggered an inline fallback to
|
||||
non-streaming. Now they propagate so the main retry loop can apply
|
||||
richer recovery (credential rotation, provider fallback, backoff).
|
||||
The only special case: 'stream not supported' sets _disable_streaming
|
||||
so the *next* main-loop retry uses non-streaming automatically.
|
||||
"""
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream):
|
||||
"""'not supported' error triggers fallback to non-streaming."""
|
||||
def test_stream_not_supported_sets_flag_and_raises(self, mock_close, mock_create):
|
||||
"""'not supported' error sets _disable_streaming and propagates."""
|
||||
from run_agent import AIAgent
|
||||
|
||||
mock_client = MagicMock()
|
||||
@ -389,23 +395,6 @@ class TestStreamingFallback:
|
||||
)
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback response",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -415,16 +404,16 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
with pytest.raises(Exception, match="Streaming is not supported"):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback response"
|
||||
mock_non_stream.assert_called_once()
|
||||
# The flag should be set so the main retry loop switches to non-streaming
|
||||
assert agent._disable_streaming is True
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_any_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream):
|
||||
"""ANY streaming error triggers fallback — not just specific messages."""
|
||||
def test_non_transport_error_propagates(self, mock_close, mock_create):
|
||||
"""Non-transport streaming errors propagate to the main retry loop."""
|
||||
from run_agent import AIAgent
|
||||
|
||||
mock_client = MagicMock()
|
||||
@ -433,23 +422,6 @@ class TestStreamingFallback:
|
||||
)
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback after connection error",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -459,24 +431,19 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
with pytest.raises(Exception, match="Connection reset by peer"):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback after connection error"
|
||||
mock_non_stream.assert_called_once()
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_fallback_error_propagates(self, mock_close, mock_create, mock_non_stream):
|
||||
"""When both streaming AND fallback fail, the fallback error propagates."""
|
||||
def test_stream_error_propagates_original(self, mock_close, mock_create):
|
||||
"""The original streaming error propagates (not a fallback error)."""
|
||||
from run_agent import AIAgent
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.chat.completions.create.side_effect = Exception("stream broke")
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
mock_non_stream.side_effect = Exception("Rate limit exceeded")
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -486,14 +453,13 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
with pytest.raises(Exception, match="Rate limit exceeded"):
|
||||
with pytest.raises(Exception, match="stream broke"):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_exhausted_transient_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream):
|
||||
"""Transient stream errors retry first, then fall back after retries are exhausted."""
|
||||
def test_exhausted_transient_stream_error_propagates(self, mock_close, mock_create):
|
||||
"""Transient stream errors retry first, then propagate after retries exhausted."""
|
||||
from run_agent import AIAgent
|
||||
import httpx
|
||||
|
||||
@ -501,23 +467,6 @@ class TestStreamingFallback:
|
||||
mock_client.chat.completions.create.side_effect = httpx.ConnectError("socket closed")
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback after retries exhausted",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -527,23 +476,22 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
with pytest.raises(httpx.ConnectError, match="socket closed"):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback after retries exhausted"
|
||||
# Should have retried 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts)
|
||||
assert mock_client.chat.completions.create.call_count == 3
|
||||
mock_non_stream.assert_called_once()
|
||||
assert mock_close.call_count >= 1
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream):
|
||||
def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create):
|
||||
"""SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors.
|
||||
|
||||
OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE
|
||||
event when the upstream stream drops. The OpenAI SDK raises APIError from
|
||||
this. It should be retried at the streaming level, same as httpx connection
|
||||
errors, before falling back to non-streaming.
|
||||
errors, then propagate to the main retry loop after exhaustion.
|
||||
"""
|
||||
from run_agent import AIAgent
|
||||
import httpx
|
||||
@ -561,23 +509,6 @@ class TestStreamingFallback:
|
||||
mock_client.chat.completions.create.side_effect = sse_error
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback after SSE retries",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -587,21 +518,18 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
with pytest.raises(OAIAPIError):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback after SSE retries"
|
||||
# Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts)
|
||||
# before falling back to non-streaming
|
||||
assert mock_client.chat.completions.create.call_count == 3
|
||||
mock_non_stream.assert_called_once()
|
||||
# Connection cleanup should happen for each failed retry
|
||||
assert mock_close.call_count >= 2
|
||||
|
||||
@patch("run_agent.AIAgent._interruptible_api_call")
|
||||
@patch("run_agent.AIAgent._create_request_openai_client")
|
||||
@patch("run_agent.AIAgent._close_request_openai_client")
|
||||
def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream):
|
||||
"""SSE errors that aren't connection-related still fall back immediately (no stream retry)."""
|
||||
def test_sse_non_connection_error_propagates_immediately(self, mock_close, mock_create):
|
||||
"""SSE errors that aren't connection-related propagate immediately (no stream retry)."""
|
||||
from run_agent import AIAgent
|
||||
import httpx
|
||||
|
||||
@ -616,23 +544,6 @@ class TestStreamingFallback:
|
||||
mock_client.chat.completions.create.side_effect = sse_error
|
||||
mock_create.return_value = mock_client
|
||||
|
||||
fallback_response = SimpleNamespace(
|
||||
id="fallback",
|
||||
model="test",
|
||||
choices=[SimpleNamespace(
|
||||
index=0,
|
||||
message=SimpleNamespace(
|
||||
role="assistant",
|
||||
content="fallback no retry",
|
||||
tool_calls=None,
|
||||
reasoning_content=None,
|
||||
),
|
||||
finish_reason="stop",
|
||||
)],
|
||||
usage=None,
|
||||
)
|
||||
mock_non_stream.return_value = fallback_response
|
||||
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
@ -642,12 +553,11 @@ class TestStreamingFallback:
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
|
||||
response = agent._interruptible_streaming_api_call({})
|
||||
with pytest.raises(OAIAPIError):
|
||||
agent._interruptible_streaming_api_call({})
|
||||
|
||||
assert response.choices[0].message.content == "fallback no retry"
|
||||
# Should NOT retry — goes straight to non-streaming fallback
|
||||
# Should NOT retry — propagates immediately
|
||||
assert mock_client.chat.completions.create.call_count == 1
|
||||
mock_non_stream.assert_called_once()
|
||||
|
||||
|
||||
# ── Test: Reasoning Streaming ────────────────────────────────────────────
|
||||
|
||||
Loading…
Reference in New Issue
Block a user