fix(gateway): isolate platform connect failures with per-platform timeout
Wrap each adapter.connect() in asyncio.wait_for() so one platform hanging during startup or reconnect cannot block the others. Telegram's 8-retry connect loop (~140s worst case) previously prevented Feishu from ever starting when Telegram was network-restricted — common for users in regions where Telegram is blocked. Default timeout is 30s; override via HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT (0 disables). Applied to both startup and the reconnect watcher so a platform that hangs mid-retry also does not stall retries for others. Fixes #17242
This commit is contained in:
parent
20b759cd02
commit
3606414ec7
@ -47,6 +47,7 @@ from hermes_cli.config import cfg_get
|
||||
# from _enforce_agent_cache_cap() and _session_expiry_watcher() below.
|
||||
_AGENT_CACHE_MAX_SIZE = 128
|
||||
_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h
|
||||
_PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT = 30.0
|
||||
# Only auto-continue interrupted gateway turns while the interruption is fresh.
|
||||
# Stale tool-tail/resume markers can otherwise revive an unrelated old task
|
||||
# after a gateway restart when the user's next message starts new work.
|
||||
@ -1160,6 +1161,33 @@ class GatewayRunner:
|
||||
e,
|
||||
)
|
||||
|
||||
def _platform_connect_timeout_secs(self) -> float:
|
||||
"""Return the per-platform connect timeout used during startup/retry."""
|
||||
raw = os.getenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "").strip()
|
||||
if raw:
|
||||
try:
|
||||
timeout = float(raw)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"Ignoring invalid HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT=%r",
|
||||
raw,
|
||||
)
|
||||
else:
|
||||
return max(0.0, timeout)
|
||||
return _PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT
|
||||
|
||||
async def _connect_adapter_with_timeout(self, adapter, platform) -> bool:
|
||||
"""Connect an adapter without allowing one platform to block others."""
|
||||
timeout = self._platform_connect_timeout_secs()
|
||||
if timeout <= 0:
|
||||
return await adapter.connect()
|
||||
try:
|
||||
return await asyncio.wait_for(adapter.connect(), timeout=timeout)
|
||||
except asyncio.TimeoutError as exc:
|
||||
raise TimeoutError(
|
||||
f"{platform.value} connect timed out after {timeout:g}s"
|
||||
) from exc
|
||||
|
||||
@property
|
||||
def should_exit_cleanly(self) -> bool:
|
||||
return self._exit_cleanly
|
||||
@ -2462,7 +2490,7 @@ class GatewayRunner:
|
||||
error_message=None,
|
||||
)
|
||||
try:
|
||||
success = await adapter.connect()
|
||||
success = await self._connect_adapter_with_timeout(adapter, platform)
|
||||
if success:
|
||||
self.adapters[platform] = adapter
|
||||
self._sync_voice_mode_state_to_adapter(adapter)
|
||||
@ -2853,7 +2881,7 @@ class GatewayRunner:
|
||||
adapter.set_session_store(self.session_store)
|
||||
adapter.set_busy_session_handler(self._handle_active_session_busy_message)
|
||||
|
||||
success = await adapter.connect()
|
||||
success = await self._connect_adapter_with_timeout(adapter, platform)
|
||||
if success:
|
||||
self.adapters[platform] = adapter
|
||||
self._sync_voice_mode_state_to_adapter(adapter)
|
||||
|
||||
@ -14,8 +14,15 @@ from gateway.run import GatewayRunner
|
||||
class StubAdapter(BasePlatformAdapter):
|
||||
"""Adapter whose connect() result can be controlled."""
|
||||
|
||||
def __init__(self, *, succeed=True, fatal_error=None, fatal_retryable=True):
|
||||
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
platform=Platform.TELEGRAM,
|
||||
succeed=True,
|
||||
fatal_error=None,
|
||||
fatal_retryable=True,
|
||||
):
|
||||
super().__init__(PlatformConfig(enabled=True, token="test"), platform)
|
||||
self._succeed = succeed
|
||||
self._fatal_error = fatal_error
|
||||
self._fatal_retryable = fatal_retryable
|
||||
@ -65,6 +72,85 @@ def _make_runner():
|
||||
|
||||
# --- Startup queueing ---
|
||||
|
||||
class TestStartupPlatformIsolation:
|
||||
"""Verify one blocked platform cannot prevent later platforms from starting."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_continues_after_platform_connect_timeout(self, tmp_path):
|
||||
"""A timeout on Telegram should queue it and still connect Feishu."""
|
||||
runner = _make_runner()
|
||||
runner.config = GatewayConfig(
|
||||
platforms={
|
||||
Platform.TELEGRAM: PlatformConfig(enabled=True, token="test"),
|
||||
Platform.FEISHU: PlatformConfig(enabled=True, token="test"),
|
||||
},
|
||||
sessions_dir=tmp_path,
|
||||
)
|
||||
runner.hooks = MagicMock()
|
||||
runner.hooks.loaded_hooks = []
|
||||
runner.hooks.emit = AsyncMock()
|
||||
runner._suspend_stuck_loop_sessions = MagicMock(return_value=0)
|
||||
runner._update_runtime_status = MagicMock()
|
||||
runner._update_platform_runtime_status = MagicMock()
|
||||
runner._sync_voice_mode_state_to_adapter = MagicMock()
|
||||
runner._send_update_notification = AsyncMock(return_value=True)
|
||||
runner._send_restart_notification = AsyncMock()
|
||||
|
||||
adapters = {
|
||||
Platform.TELEGRAM: StubAdapter(platform=Platform.TELEGRAM),
|
||||
Platform.FEISHU: StubAdapter(platform=Platform.FEISHU),
|
||||
}
|
||||
runner._create_adapter = MagicMock(
|
||||
side_effect=lambda platform, _config: adapters[platform]
|
||||
)
|
||||
runner._connect_adapter_with_timeout = AsyncMock(
|
||||
side_effect=[
|
||||
TimeoutError("telegram connect timed out after 30s"),
|
||||
True,
|
||||
]
|
||||
)
|
||||
|
||||
def fake_create_task(coro):
|
||||
coro.close()
|
||||
return MagicMock()
|
||||
|
||||
with patch("gateway.status.write_runtime_status"):
|
||||
with patch("hermes_cli.plugins.discover_plugins"):
|
||||
with patch("hermes_cli.config.load_config", return_value={}):
|
||||
with patch("agent.shell_hooks.register_from_config"):
|
||||
with patch(
|
||||
"tools.process_registry.process_registry.recover_from_checkpoint",
|
||||
return_value=0,
|
||||
):
|
||||
with patch(
|
||||
"gateway.channel_directory.build_channel_directory",
|
||||
new=AsyncMock(return_value={"platforms": {}}),
|
||||
):
|
||||
with patch("gateway.run.asyncio.create_task", side_effect=fake_create_task):
|
||||
assert await runner.start() is True
|
||||
|
||||
assert Platform.TELEGRAM in runner._failed_platforms
|
||||
assert Platform.FEISHU in runner.adapters
|
||||
assert Platform.TELEGRAM not in runner.adapters
|
||||
assert runner._create_adapter.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_adapter_timeout_raises_retryable_exception(self, monkeypatch):
|
||||
"""The timeout helper turns a hanging connect into a caught startup error."""
|
||||
runner = _make_runner()
|
||||
adapter = StubAdapter()
|
||||
|
||||
async def hang():
|
||||
await asyncio.sleep(60)
|
||||
return True
|
||||
|
||||
adapter.connect = hang
|
||||
monkeypatch.setenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "0.001")
|
||||
|
||||
with pytest.raises(TimeoutError, match="telegram connect timed out"):
|
||||
await runner._connect_adapter_with_timeout(adapter, Platform.TELEGRAM)
|
||||
|
||||
|
||||
class TestStartupFailureQueuing:
|
||||
"""Verify that failed platforms are queued during startup."""
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user