fix(telegram): enforce gateway auth for inline approval callbacks

This commit is contained in:
johnncenae 2026-04-30 12:19:16 +03:00 committed by Teknium
parent 9ae1fa9e39
commit a83d579d5b
2 changed files with 153 additions and 7 deletions

View File

@ -290,14 +290,53 @@ class TelegramAdapter(BasePlatformAdapter):
# and any other slash-confirm prompts; see GatewayRunner._request_slash_confirm).
self._slash_confirm_state: Dict[str, str] = {}
@staticmethod
def _is_callback_user_authorized(user_id: str) -> bool:
def _is_callback_user_authorized(
self,
user_id: str,
*,
chat_id: Optional[str] = None,
chat_type: Optional[str] = None,
thread_id: Optional[str] = None,
user_name: Optional[str] = None,
) -> bool:
"""Return whether a Telegram inline-button caller may perform gated actions."""
normalized_user_id = str(user_id or "").strip()
if not normalized_user_id:
return False
runner = getattr(getattr(self, "_message_handler", None), "__self__", None)
auth_fn = getattr(runner, "_is_user_authorized", None)
if callable(auth_fn):
try:
from gateway.session import SessionSource
normalized_chat_type = str(chat_type or "dm").strip().lower() or "dm"
if normalized_chat_type == "private":
normalized_chat_type = "dm"
elif normalized_chat_type == "supergroup":
normalized_chat_type = "forum" if thread_id is not None else "group"
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id=str(chat_id or normalized_user_id),
chat_type=normalized_chat_type,
user_id=normalized_user_id,
user_name=str(user_name).strip() if user_name else None,
thread_id=str(thread_id) if thread_id is not None else None,
)
return bool(auth_fn(source))
except Exception:
logger.debug(
"[Telegram] Falling back to env-only callback auth for user %s",
normalized_user_id,
exc_info=True,
)
allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip()
if not allowed_csv:
return True
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
return "*" in allowed_ids or user_id in allowed_ids
return "*" in allowed_ids or normalized_user_id in allowed_ids
@classmethod
def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]:
@ -1760,6 +1799,12 @@ class TelegramAdapter(BasePlatformAdapter):
if not query or not query.data:
return
data = query.data
query_message = getattr(query, "message", None)
query_chat_id = getattr(query_message, "chat_id", None)
query_chat = getattr(query_message, "chat", None)
query_chat_type = getattr(query_chat, "type", None)
query_thread_id = getattr(query_message, "message_thread_id", None)
query_user_name = getattr(query.from_user, "first_name", None)
# --- Model picker callbacks ---
if data.startswith(("mp:", "mm:", "mb", "mx", "mg:")):
@ -1781,7 +1826,13 @@ class TelegramAdapter(BasePlatformAdapter):
# Only authorized users may click approval buttons.
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(caller_id):
if not self._is_callback_user_authorized(
caller_id,
chat_id=query_chat_id,
chat_type=str(query_chat_type) if query_chat_type is not None else None,
thread_id=str(query_thread_id) if query_thread_id is not None else None,
user_name=query_user_name,
):
await query.answer(text="⛔ You are not authorized to approve commands.")
return
@ -1831,8 +1882,14 @@ class TelegramAdapter(BasePlatformAdapter):
choice = parts[1] # once, always, cancel
confirm_id = parts[2]
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(caller_id):
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(
caller_id,
chat_id=query_chat_id,
chat_type=str(query_chat_type) if query_chat_type is not None else None,
thread_id=str(query_thread_id) if query_thread_id is not None else None,
user_name=query_user_name,
):
await query.answer(text="⛔ You are not authorized to answer this prompt.")
return
@ -1891,7 +1948,13 @@ class TelegramAdapter(BasePlatformAdapter):
return
answer = data.split(":", 1)[1] # "y" or "n"
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(caller_id):
if not self._is_callback_user_authorized(
caller_id,
chat_id=query_chat_id,
chat_type=str(query_chat_type) if query_chat_type is not None else None,
thread_id=str(query_thread_id) if query_thread_id is not None else None,
user_name=query_user_name,
):
await query.answer(text="⛔ You are not authorized to answer update prompts.")
return
await query.answer(text=f"Sent '{answer}' to the update process.")

View File

@ -59,6 +59,21 @@ def _make_adapter(extra=None):
return adapter
class _AuthRunner:
"""Minimal runner shim for callback auth tests."""
def __init__(self, authorized: bool):
self.authorized = authorized
self.last_source = None
async def _handle_message(self, event):
return None
def _is_user_authorized(self, source):
self.last_source = source
return self.authorized
# ===========================================================================
# send_exec_approval — inline keyboard buttons
# ===========================================================================
@ -230,6 +245,41 @@ class TestTelegramApprovalCallback:
edit_kwargs = query.edit_message_text.call_args[1]
assert "Denied" in edit_kwargs["text"]
@pytest.mark.asyncio
async def test_approval_callback_rejects_user_blocked_by_global_allowlist(self):
adapter = _make_adapter()
adapter._approval_state[7] = "agent:main:telegram:group:12345:99"
runner = _AuthRunner(authorized=False)
adapter._message_handler = runner._handle_message
query = AsyncMock()
query.data = "ea:once:7"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.chat.type = "private"
query.from_user = MagicMock()
query.from_user.id = 222
query.from_user.first_name = "Mallory"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
await adapter._handle_callback_query(update, context)
mock_resolve.assert_not_called()
query.answer.assert_called_once()
assert "not authorized" in query.answer.call_args[1]["text"].lower()
query.edit_message_text.assert_not_called()
assert adapter._approval_state[7] == "agent:main:telegram:group:12345:99"
assert runner.last_source is not None
assert runner.last_source.platform == Platform.TELEGRAM
assert runner.last_source.user_id == "222"
assert runner.last_source.chat_id == "12345"
@pytest.mark.asyncio
async def test_already_resolved(self):
adapter = _make_adapter()
@ -333,6 +383,39 @@ class TestTelegramApprovalCallback:
query.edit_message_text.assert_not_called()
assert not (tmp_path / ".update_response").exists()
@pytest.mark.asyncio
async def test_update_prompt_callback_rejects_user_blocked_by_global_allowlist(self, tmp_path):
adapter = _make_adapter()
runner = _AuthRunner(authorized=False)
adapter._message_handler = runner._handle_message
query = AsyncMock()
query.data = "update_prompt:y"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.chat.type = "private"
query.from_user = MagicMock()
query.from_user.id = 222
query.from_user.first_name = "Mallory"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("hermes_constants.get_hermes_home", return_value=tmp_path):
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": ""}):
await adapter._handle_callback_query(update, context)
query.answer.assert_called_once()
assert "not authorized" in query.answer.call_args[1]["text"].lower()
query.edit_message_text.assert_not_called()
assert not (tmp_path / ".update_response").exists()
assert runner.last_source is not None
assert runner.last_source.platform == Platform.TELEGRAM
assert runner.last_source.user_id == "222"
@pytest.mark.asyncio
async def test_update_prompt_callback_allows_authorized_user(self, tmp_path):
"""Allowed Telegram users can still answer update prompt buttons."""