From 3606414ec7f27875ac7d35d19a6bf6fb81d33d32 Mon Sep 17 00:00:00 2001 From: tmimmanuel <14046872+tmimmanuel@users.noreply.github.com> Date: Wed, 29 Apr 2026 04:55:50 -0700 Subject: [PATCH] fix(gateway): isolate platform connect failures with per-platform timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wrap each adapter.connect() in asyncio.wait_for() so one platform hanging during startup or reconnect cannot block the others. Telegram's 8-retry connect loop (~140s worst case) previously prevented Feishu from ever starting when Telegram was network-restricted — common for users in regions where Telegram is blocked. Default timeout is 30s; override via HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT (0 disables). Applied to both startup and the reconnect watcher so a platform that hangs mid-retry also does not stall retries for others. Fixes #17242 --- gateway/run.py | 32 ++++++++- tests/gateway/test_platform_reconnect.py | 90 +++++++++++++++++++++++- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 4948dbbc..ef97ac11 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -47,6 +47,7 @@ from hermes_cli.config import cfg_get # from _enforce_agent_cache_cap() and _session_expiry_watcher() below. _AGENT_CACHE_MAX_SIZE = 128 _AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h +_PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT = 30.0 # Only auto-continue interrupted gateway turns while the interruption is fresh. # Stale tool-tail/resume markers can otherwise revive an unrelated old task # after a gateway restart when the user's next message starts new work. @@ -1160,6 +1161,33 @@ class GatewayRunner: e, ) + def _platform_connect_timeout_secs(self) -> float: + """Return the per-platform connect timeout used during startup/retry.""" + raw = os.getenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "").strip() + if raw: + try: + timeout = float(raw) + except ValueError: + logger.warning( + "Ignoring invalid HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT=%r", + raw, + ) + else: + return max(0.0, timeout) + return _PLATFORM_CONNECT_TIMEOUT_SECS_DEFAULT + + async def _connect_adapter_with_timeout(self, adapter, platform) -> bool: + """Connect an adapter without allowing one platform to block others.""" + timeout = self._platform_connect_timeout_secs() + if timeout <= 0: + return await adapter.connect() + try: + return await asyncio.wait_for(adapter.connect(), timeout=timeout) + except asyncio.TimeoutError as exc: + raise TimeoutError( + f"{platform.value} connect timed out after {timeout:g}s" + ) from exc + @property def should_exit_cleanly(self) -> bool: return self._exit_cleanly @@ -2462,7 +2490,7 @@ class GatewayRunner: error_message=None, ) try: - success = await adapter.connect() + success = await self._connect_adapter_with_timeout(adapter, platform) if success: self.adapters[platform] = adapter self._sync_voice_mode_state_to_adapter(adapter) @@ -2853,7 +2881,7 @@ class GatewayRunner: adapter.set_session_store(self.session_store) adapter.set_busy_session_handler(self._handle_active_session_busy_message) - success = await adapter.connect() + success = await self._connect_adapter_with_timeout(adapter, platform) if success: self.adapters[platform] = adapter self._sync_voice_mode_state_to_adapter(adapter) diff --git a/tests/gateway/test_platform_reconnect.py b/tests/gateway/test_platform_reconnect.py index 56674272..a0bd7ab9 100644 --- a/tests/gateway/test_platform_reconnect.py +++ b/tests/gateway/test_platform_reconnect.py @@ -14,8 +14,15 @@ from gateway.run import GatewayRunner class StubAdapter(BasePlatformAdapter): """Adapter whose connect() result can be controlled.""" - def __init__(self, *, succeed=True, fatal_error=None, fatal_retryable=True): - super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) + def __init__( + self, + *, + platform=Platform.TELEGRAM, + succeed=True, + fatal_error=None, + fatal_retryable=True, + ): + super().__init__(PlatformConfig(enabled=True, token="test"), platform) self._succeed = succeed self._fatal_error = fatal_error self._fatal_retryable = fatal_retryable @@ -65,6 +72,85 @@ def _make_runner(): # --- Startup queueing --- +class TestStartupPlatformIsolation: + """Verify one blocked platform cannot prevent later platforms from starting.""" + + @pytest.mark.asyncio + async def test_start_continues_after_platform_connect_timeout(self, tmp_path): + """A timeout on Telegram should queue it and still connect Feishu.""" + runner = _make_runner() + runner.config = GatewayConfig( + platforms={ + Platform.TELEGRAM: PlatformConfig(enabled=True, token="test"), + Platform.FEISHU: PlatformConfig(enabled=True, token="test"), + }, + sessions_dir=tmp_path, + ) + runner.hooks = MagicMock() + runner.hooks.loaded_hooks = [] + runner.hooks.emit = AsyncMock() + runner._suspend_stuck_loop_sessions = MagicMock(return_value=0) + runner._update_runtime_status = MagicMock() + runner._update_platform_runtime_status = MagicMock() + runner._sync_voice_mode_state_to_adapter = MagicMock() + runner._send_update_notification = AsyncMock(return_value=True) + runner._send_restart_notification = AsyncMock() + + adapters = { + Platform.TELEGRAM: StubAdapter(platform=Platform.TELEGRAM), + Platform.FEISHU: StubAdapter(platform=Platform.FEISHU), + } + runner._create_adapter = MagicMock( + side_effect=lambda platform, _config: adapters[platform] + ) + runner._connect_adapter_with_timeout = AsyncMock( + side_effect=[ + TimeoutError("telegram connect timed out after 30s"), + True, + ] + ) + + def fake_create_task(coro): + coro.close() + return MagicMock() + + with patch("gateway.status.write_runtime_status"): + with patch("hermes_cli.plugins.discover_plugins"): + with patch("hermes_cli.config.load_config", return_value={}): + with patch("agent.shell_hooks.register_from_config"): + with patch( + "tools.process_registry.process_registry.recover_from_checkpoint", + return_value=0, + ): + with patch( + "gateway.channel_directory.build_channel_directory", + new=AsyncMock(return_value={"platforms": {}}), + ): + with patch("gateway.run.asyncio.create_task", side_effect=fake_create_task): + assert await runner.start() is True + + assert Platform.TELEGRAM in runner._failed_platforms + assert Platform.FEISHU in runner.adapters + assert Platform.TELEGRAM not in runner.adapters + assert runner._create_adapter.call_count == 2 + + @pytest.mark.asyncio + async def test_connect_adapter_timeout_raises_retryable_exception(self, monkeypatch): + """The timeout helper turns a hanging connect into a caught startup error.""" + runner = _make_runner() + adapter = StubAdapter() + + async def hang(): + await asyncio.sleep(60) + return True + + adapter.connect = hang + monkeypatch.setenv("HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT", "0.001") + + with pytest.raises(TimeoutError, match="telegram connect timed out"): + await runner._connect_adapter_with_timeout(adapter, Platform.TELEGRAM) + + class TestStartupFailureQueuing: """Verify that failed platforms are queued during startup."""