diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 23fa8c69..307c6b89 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -290,14 +290,53 @@ class TelegramAdapter(BasePlatformAdapter): # and any other slash-confirm prompts; see GatewayRunner._request_slash_confirm). self._slash_confirm_state: Dict[str, str] = {} - @staticmethod - def _is_callback_user_authorized(user_id: str) -> bool: + def _is_callback_user_authorized( + self, + user_id: str, + *, + chat_id: Optional[str] = None, + chat_type: Optional[str] = None, + thread_id: Optional[str] = None, + user_name: Optional[str] = None, + ) -> bool: """Return whether a Telegram inline-button caller may perform gated actions.""" + normalized_user_id = str(user_id or "").strip() + if not normalized_user_id: + return False + + runner = getattr(getattr(self, "_message_handler", None), "__self__", None) + auth_fn = getattr(runner, "_is_user_authorized", None) + if callable(auth_fn): + try: + from gateway.session import SessionSource + + normalized_chat_type = str(chat_type or "dm").strip().lower() or "dm" + if normalized_chat_type == "private": + normalized_chat_type = "dm" + elif normalized_chat_type == "supergroup": + normalized_chat_type = "forum" if thread_id is not None else "group" + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id=str(chat_id or normalized_user_id), + chat_type=normalized_chat_type, + user_id=normalized_user_id, + user_name=str(user_name).strip() if user_name else None, + thread_id=str(thread_id) if thread_id is not None else None, + ) + return bool(auth_fn(source)) + except Exception: + logger.debug( + "[Telegram] Falling back to env-only callback auth for user %s", + normalized_user_id, + exc_info=True, + ) + allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip() if not allowed_csv: return True allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()} - return "*" in allowed_ids or user_id in allowed_ids + return "*" in allowed_ids or normalized_user_id in allowed_ids @classmethod def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]: @@ -1760,6 +1799,12 @@ class TelegramAdapter(BasePlatformAdapter): if not query or not query.data: return data = query.data + query_message = getattr(query, "message", None) + query_chat_id = getattr(query_message, "chat_id", None) + query_chat = getattr(query_message, "chat", None) + query_chat_type = getattr(query_chat, "type", None) + query_thread_id = getattr(query_message, "message_thread_id", None) + query_user_name = getattr(query.from_user, "first_name", None) # --- Model picker callbacks --- if data.startswith(("mp:", "mm:", "mb", "mx", "mg:")): @@ -1781,7 +1826,13 @@ class TelegramAdapter(BasePlatformAdapter): # Only authorized users may click approval buttons. caller_id = str(getattr(query.from_user, "id", "")) - if not self._is_callback_user_authorized(caller_id): + if not self._is_callback_user_authorized( + caller_id, + chat_id=query_chat_id, + chat_type=str(query_chat_type) if query_chat_type is not None else None, + thread_id=str(query_thread_id) if query_thread_id is not None else None, + user_name=query_user_name, + ): await query.answer(text="⛔ You are not authorized to approve commands.") return @@ -1831,8 +1882,14 @@ class TelegramAdapter(BasePlatformAdapter): choice = parts[1] # once, always, cancel confirm_id = parts[2] - caller_id = str(getattr(query.from_user, "id", "")) - if not self._is_callback_user_authorized(caller_id): + caller_id = str(getattr(query.from_user, "id", "")) + if not self._is_callback_user_authorized( + caller_id, + chat_id=query_chat_id, + chat_type=str(query_chat_type) if query_chat_type is not None else None, + thread_id=str(query_thread_id) if query_thread_id is not None else None, + user_name=query_user_name, + ): await query.answer(text="⛔ You are not authorized to answer this prompt.") return @@ -1891,7 +1948,13 @@ class TelegramAdapter(BasePlatformAdapter): return answer = data.split(":", 1)[1] # "y" or "n" caller_id = str(getattr(query.from_user, "id", "")) - if not self._is_callback_user_authorized(caller_id): + if not self._is_callback_user_authorized( + caller_id, + chat_id=query_chat_id, + chat_type=str(query_chat_type) if query_chat_type is not None else None, + thread_id=str(query_thread_id) if query_thread_id is not None else None, + user_name=query_user_name, + ): await query.answer(text="⛔ You are not authorized to answer update prompts.") return await query.answer(text=f"Sent '{answer}' to the update process.") diff --git a/tests/gateway/test_telegram_approval_buttons.py b/tests/gateway/test_telegram_approval_buttons.py index 93b5f82e..199508c9 100644 --- a/tests/gateway/test_telegram_approval_buttons.py +++ b/tests/gateway/test_telegram_approval_buttons.py @@ -59,6 +59,21 @@ def _make_adapter(extra=None): return adapter +class _AuthRunner: + """Minimal runner shim for callback auth tests.""" + + def __init__(self, authorized: bool): + self.authorized = authorized + self.last_source = None + + async def _handle_message(self, event): + return None + + def _is_user_authorized(self, source): + self.last_source = source + return self.authorized + + # =========================================================================== # send_exec_approval — inline keyboard buttons # =========================================================================== @@ -230,6 +245,41 @@ class TestTelegramApprovalCallback: edit_kwargs = query.edit_message_text.call_args[1] assert "Denied" in edit_kwargs["text"] + @pytest.mark.asyncio + async def test_approval_callback_rejects_user_blocked_by_global_allowlist(self): + adapter = _make_adapter() + adapter._approval_state[7] = "agent:main:telegram:group:12345:99" + runner = _AuthRunner(authorized=False) + adapter._message_handler = runner._handle_message + + query = AsyncMock() + query.data = "ea:once:7" + query.message = MagicMock() + query.message.chat_id = 12345 + query.message.chat.type = "private" + query.from_user = MagicMock() + query.from_user.id = 222 + query.from_user.first_name = "Mallory" + query.answer = AsyncMock() + query.edit_message_text = AsyncMock() + + update = MagicMock() + update.callback_query = query + context = MagicMock() + + with patch("tools.approval.resolve_gateway_approval") as mock_resolve: + await adapter._handle_callback_query(update, context) + + mock_resolve.assert_not_called() + query.answer.assert_called_once() + assert "not authorized" in query.answer.call_args[1]["text"].lower() + query.edit_message_text.assert_not_called() + assert adapter._approval_state[7] == "agent:main:telegram:group:12345:99" + assert runner.last_source is not None + assert runner.last_source.platform == Platform.TELEGRAM + assert runner.last_source.user_id == "222" + assert runner.last_source.chat_id == "12345" + @pytest.mark.asyncio async def test_already_resolved(self): adapter = _make_adapter() @@ -333,6 +383,39 @@ class TestTelegramApprovalCallback: query.edit_message_text.assert_not_called() assert not (tmp_path / ".update_response").exists() + @pytest.mark.asyncio + async def test_update_prompt_callback_rejects_user_blocked_by_global_allowlist(self, tmp_path): + adapter = _make_adapter() + runner = _AuthRunner(authorized=False) + adapter._message_handler = runner._handle_message + + query = AsyncMock() + query.data = "update_prompt:y" + query.message = MagicMock() + query.message.chat_id = 12345 + query.message.chat.type = "private" + query.from_user = MagicMock() + query.from_user.id = 222 + query.from_user.first_name = "Mallory" + query.answer = AsyncMock() + query.edit_message_text = AsyncMock() + + update = MagicMock() + update.callback_query = query + context = MagicMock() + + with patch("hermes_constants.get_hermes_home", return_value=tmp_path): + with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": ""}): + await adapter._handle_callback_query(update, context) + + query.answer.assert_called_once() + assert "not authorized" in query.answer.call_args[1]["text"].lower() + query.edit_message_text.assert_not_called() + assert not (tmp_path / ".update_response").exists() + assert runner.last_source is not None + assert runner.last_source.platform == Platform.TELEGRAM + assert runner.last_source.user_id == "222" + @pytest.mark.asyncio async def test_update_prompt_callback_allows_authorized_user(self, tmp_path): """Allowed Telegram users can still answer update prompt buttons."""