From 0189442525e36e2349be509cac8b0fedd3ccc909 Mon Sep 17 00:00:00 2001 From: Poolitzer <25934244+Poolitzer@users.noreply.github.com> Date: Tue, 14 Jul 2020 21:51:36 +0200 Subject: [PATCH] Add Filters.via_bot (#2009) * feat: via_bot filter also fixing a small mistake in the empty parameter of the user filter and improve docs slightly * fix: forgot to set via_bot to None * fix: redoing subclassing to copy paste solution * Cosmetic changes Co-authored-by: Hinrich Mahler --- telegram/ext/filters.py | 169 +++++++++++++++++++++++++++++++++++++++- tests/test_filters.py | 127 +++++++++++++++++++++++++++++- 2 files changed, 292 insertions(+), 4 deletions(-) diff --git a/telegram/ext/filters.py b/telegram/ext/filters.py index fb562b5fc..de1d85771 100644 --- a/telegram/ext/filters.py +++ b/telegram/ext/filters.py @@ -887,7 +887,8 @@ officedocument.wordprocessingml.document")``- """Messages sent in a group chat.""" class user(BaseFilter): - """Filters messages to allow only those which are from specified user ID. + """Filters messages to allow only those which are from specified user ID(s) or + username(s). Examples: ``MessageHandler(Filters.user(1234), callback_method)`` @@ -919,7 +920,6 @@ officedocument.wordprocessingml.document")``- RuntimeError: If user_id and username are both present. """ - def __init__(self, user_id=None, username=None, allow_empty=False): self.allow_empty = allow_empty self.__lock = Lock() @@ -1053,8 +1053,171 @@ officedocument.wordprocessingml.document")``- return self.allow_empty return False + class via_bot(BaseFilter): + """Filters messages to allow only those which are from specified via_bot ID(s) or + username(s). + + Examples: + ``MessageHandler(Filters.via_bot(1234), callback_method)`` + + Warning: + :attr:`bot_ids` will give a *copy* of the saved bot ids as :class:`frozenset`. This + is to ensure thread safety. To add/remove a bot, you should use :meth:`add_usernames`, + :meth:`add_bot_ids`, :meth:`remove_usernames` and :meth:`remove_bot_ids`. Only update + the entire set by ``filter.bot_ids/usernames = new_set``, if you are entirely sure + that it is not causing race conditions, as this will complete replace the current set + of allowed bots. + + Attributes: + bot_ids(set(:obj:`int`), optional): Which bot ID(s) to allow through. + usernames(set(:obj:`str`), optional): Which username(s) (without leading '@') to allow + through. + allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no bot + is specified in :attr:`bot_ids` and :attr:`usernames`. + + Args: + bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow + through. + username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow + through. Leading '@'s in usernames will be discarded. + allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no user + is specified in :attr:`bot_ids` and :attr:`usernames`. Defaults to :obj:`False` + + Raises: + RuntimeError: If bot_id and username are both present. + """ + + def __init__(self, bot_id=None, username=None, allow_empty=False): + self.allow_empty = allow_empty + self.__lock = Lock() + + self._bot_ids = set() + self._usernames = set() + + self._set_bot_ids(bot_id) + self._set_usernames(username) + + @staticmethod + def _parse_bot_id(bot_id): + if bot_id is None: + return set() + if isinstance(bot_id, int): + return {bot_id} + return set(bot_id) + + @staticmethod + def _parse_username(username): + if username is None: + return set() + if isinstance(username, str): + return {username[1:] if username.startswith('@') else username} + return {bot[1:] if bot.startswith('@') else bot for bot in username} + + def _set_bot_ids(self, bot_id): + with self.__lock: + if bot_id and self._usernames: + raise RuntimeError("Can't set bot_id in conjunction with (already set) " + "usernames.") + self._bot_ids = self._parse_bot_id(bot_id) + + def _set_usernames(self, username): + with self.__lock: + if username and self._bot_ids: + raise RuntimeError("Can't set username in conjunction with (already set) " + "bot_ids.") + self._usernames = self._parse_username(username) + + @property + def bot_ids(self): + with self.__lock: + return frozenset(self._bot_ids) + + @bot_ids.setter + def bot_ids(self, bot_id): + self._set_bot_ids(bot_id) + + @property + def usernames(self): + with self.__lock: + return frozenset(self._usernames) + + @usernames.setter + def usernames(self, username): + self._set_usernames(username) + + def add_usernames(self, username): + """ + Add one or more users to the allowed usernames. + Args: + username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow + through. Leading '@'s in usernames will be discarded. + """ + with self.__lock: + if self._bot_ids: + raise RuntimeError("Can't set username in conjunction with (already set) " + "bot_ids.") + + username = self._parse_username(username) + self._usernames |= username + + def add_bot_ids(self, bot_id): + """ + Add one or more users to the allowed user ids. + Args: + bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow + through. + """ + with self.__lock: + if self._usernames: + raise RuntimeError("Can't set bot_id in conjunction with (already set) " + "usernames.") + + bot_id = self._parse_bot_id(bot_id) + + self._bot_ids |= bot_id + + def remove_usernames(self, username): + """ + Remove one or more users from allowed usernames. + Args: + username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to disallow + through. Leading '@'s in usernames will be discarded. + """ + with self.__lock: + if self._bot_ids: + raise RuntimeError("Can't set username in conjunction with (already set) " + "bot_ids.") + + username = self._parse_username(username) + self._usernames -= username + + def remove_bot_ids(self, bot_id): + """ + Remove one or more users from allowed user ids. + Args: + bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to disallow + through. + """ + with self.__lock: + if self._usernames: + raise RuntimeError("Can't set bot_id in conjunction with (already set) " + "usernames.") + bot_id = self._parse_bot_id(bot_id) + self._bot_ids -= bot_id + + def filter(self, message): + """""" # remove method from docs + if message.via_bot: + if self.bot_ids: + return message.via_bot.id in self.bot_ids + if self.usernames: + return (message.via_bot.username + and message.via_bot.username in self.usernames) + return self.allow_empty + return False + class chat(BaseFilter): - """Filters messages to allow only those which are from specified chat ID. + """Filters messages to allow only those which are from a specified chat ID or username. Examples: ``MessageHandler(Filters.chat(-1234), callback_method)`` diff --git a/tests/test_filters.py b/tests/test_filters.py index f82bc54ef..03847413d 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -28,7 +28,7 @@ import re @pytest.fixture(scope='function') def update(): return Update(0, Message(0, User(0, 'Testuser', False), datetime.datetime.utcnow(), - Chat(0, 'private'))) + Chat(0, 'private'), via_bot=User(0, "Testbot", True))) @pytest.fixture(scope='function', @@ -1093,3 +1093,128 @@ class TestFilters: update.message.text = 'test' result = (Filters.command | DataFilter('blah'))(update) assert result['test'] == ['blah'] + + def test_filters_via_bot_init(self): + with pytest.raises(RuntimeError, match='in conjunction with'): + Filters.via_bot(bot_id=1, username='bot') + + def test_filters_via_bot_allow_empty(self, update): + assert not Filters.via_bot()(update) + assert Filters.via_bot(allow_empty=True)(update) + + def test_filters_via_bot_id(self, update): + assert not Filters.via_bot(bot_id=1)(update) + update.message.via_bot.id = 1 + assert Filters.via_bot(bot_id=1)(update) + update.message.via_bot.id = 2 + assert Filters.via_bot(bot_id=[1, 2])(update) + assert not Filters.via_bot(bot_id=[3, 4])(update) + update.message.via_bot = None + assert not Filters.via_bot(bot_id=[3, 4])(update) + + def test_filters_via_bot_username(self, update): + assert not Filters.via_bot(username='bot')(update) + assert not Filters.via_bot(username='Testbot')(update) + update.message.via_bot.username = 'bot@' + assert Filters.via_bot(username='@bot@')(update) + assert Filters.via_bot(username='bot@')(update) + assert Filters.via_bot(username=['bot1', 'bot@', 'bot2'])(update) + assert not Filters.via_bot(username=['@username', '@bot_2'])(update) + update.message.via_bot = None + assert not Filters.user(username=['@username', '@bot_2'])(update) + + def test_filters_via_bot_change_id(self, update): + f = Filters.via_bot(bot_id=3) + update.message.via_bot.id = 3 + assert f(update) + update.message.via_bot.id = 2 + assert not f(update) + f.bot_ids = 2 + assert f(update) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.usernames = 'user' + + def test_filters_via_bot_change_username(self, update): + f = Filters.via_bot(username='bot') + update.message.via_bot.username = 'bot' + assert f(update) + update.message.via_bot.username = 'Bot' + assert not f(update) + f.usernames = 'Bot' + assert f(update) + + with pytest.raises(RuntimeError, match='bot_id in conjunction'): + f.bot_ids = 1 + + def test_filters_via_bot_add_user_by_name(self, update): + users = ['bot_a', 'bot_b', 'bot_c'] + f = Filters.via_bot() + + for user in users: + update.message.via_bot.username = user + assert not f(update) + + f.add_usernames('bot_a') + f.add_usernames(['bot_b', 'bot_c']) + + for user in users: + update.message.via_bot.username = user + assert f(update) + + with pytest.raises(RuntimeError, match='bot_id in conjunction'): + f.add_bot_ids(1) + + def test_filters_via_bot_add_user_by_id(self, update): + users = [1, 2, 3] + f = Filters.via_bot() + + for user in users: + update.message.via_bot.id = user + assert not f(update) + + f.add_bot_ids(1) + f.add_bot_ids([2, 3]) + + for user in users: + update.message.via_bot.username = user + assert f(update) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.add_usernames('bot') + + def test_filters_via_bot_remove_user_by_name(self, update): + users = ['bot_a', 'bot_b', 'bot_c'] + f = Filters.via_bot(username=users) + + with pytest.raises(RuntimeError, match='bot_id in conjunction'): + f.remove_bot_ids(1) + + for user in users: + update.message.via_bot.username = user + assert f(update) + + f.remove_usernames('bot_a') + f.remove_usernames(['bot_b', 'bot_c']) + + for user in users: + update.message.via_bot.username = user + assert not f(update) + + def test_filters_via_bot_remove_user_by_id(self, update): + users = [1, 2, 3] + f = Filters.via_bot(bot_id=users) + + with pytest.raises(RuntimeError, match='username in conjunction'): + f.remove_usernames('bot') + + for user in users: + update.message.via_bot.id = user + assert f(update) + + f.remove_bot_ids(1) + f.remove_bot_ids([2, 3]) + + for user in users: + update.message.via_bot.username = user + assert not f(update)