Add Filters.via_bot (#2009)

* feat: via_bot filter

also fixing a small mistake in the empty parameter of the user filter and improve docs slightly

* fix: forgot to set via_bot to None

* fix: redoing subclassing to copy paste solution

* Cosmetic changes

Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
This commit is contained in:
Poolitzer 2020-07-14 21:51:36 +02:00 committed by GitHub
parent fd0325fbe5
commit 0189442525
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 292 additions and 4 deletions

View file

@ -887,7 +887,8 @@ officedocument.wordprocessingml.document")``-
"""Messages sent in a group chat."""
class user(BaseFilter):
"""Filters messages to allow only those which are from specified user ID.
"""Filters messages to allow only those which are from specified user ID(s) or
username(s).
Examples:
``MessageHandler(Filters.user(1234), callback_method)``
@ -919,7 +920,6 @@ officedocument.wordprocessingml.document")``-
RuntimeError: If user_id and username are both present.
"""
def __init__(self, user_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()
@ -1053,8 +1053,171 @@ officedocument.wordprocessingml.document")``-
return self.allow_empty
return False
class via_bot(BaseFilter):
"""Filters messages to allow only those which are from specified via_bot ID(s) or
username(s).
Examples:
``MessageHandler(Filters.via_bot(1234), callback_method)``
Warning:
:attr:`bot_ids` will give a *copy* of the saved bot ids as :class:`frozenset`. This
is to ensure thread safety. To add/remove a bot, you should use :meth:`add_usernames`,
:meth:`add_bot_ids`, :meth:`remove_usernames` and :meth:`remove_bot_ids`. Only update
the entire set by ``filter.bot_ids/usernames = new_set``, if you are entirely sure
that it is not causing race conditions, as this will complete replace the current set
of allowed bots.
Attributes:
bot_ids(set(:obj:`int`), optional): Which bot ID(s) to allow through.
usernames(set(:obj:`str`), optional): Which username(s) (without leading '@') to allow
through.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no bot
is specified in :attr:`bot_ids` and :attr:`usernames`.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no user
is specified in :attr:`bot_ids` and :attr:`usernames`. Defaults to :obj:`False`
Raises:
RuntimeError: If bot_id and username are both present.
"""
def __init__(self, bot_id=None, username=None, allow_empty=False):
self.allow_empty = allow_empty
self.__lock = Lock()
self._bot_ids = set()
self._usernames = set()
self._set_bot_ids(bot_id)
self._set_usernames(username)
@staticmethod
def _parse_bot_id(bot_id):
if bot_id is None:
return set()
if isinstance(bot_id, int):
return {bot_id}
return set(bot_id)
@staticmethod
def _parse_username(username):
if username is None:
return set()
if isinstance(username, str):
return {username[1:] if username.startswith('@') else username}
return {bot[1:] if bot.startswith('@') else bot for bot in username}
def _set_bot_ids(self, bot_id):
with self.__lock:
if bot_id and self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
self._bot_ids = self._parse_bot_id(bot_id)
def _set_usernames(self, username):
with self.__lock:
if username and self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")
self._usernames = self._parse_username(username)
@property
def bot_ids(self):
with self.__lock:
return frozenset(self._bot_ids)
@bot_ids.setter
def bot_ids(self, bot_id):
self._set_bot_ids(bot_id)
@property
def usernames(self):
with self.__lock:
return frozenset(self._usernames)
@usernames.setter
def usernames(self, username):
self._set_usernames(username)
def add_usernames(self, username):
"""
Add one or more users to the allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to allow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")
username = self._parse_username(username)
self._usernames |= username
def add_bot_ids(self, bot_id):
"""
Add one or more users to the allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to allow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
bot_id = self._parse_bot_id(bot_id)
self._bot_ids |= bot_id
def remove_usernames(self, username):
"""
Remove one or more users from allowed usernames.
Args:
username(:obj:`str` | List[:obj:`str`], optional): Which username(s) to disallow
through. Leading '@'s in usernames will be discarded.
"""
with self.__lock:
if self._bot_ids:
raise RuntimeError("Can't set username in conjunction with (already set) "
"bot_ids.")
username = self._parse_username(username)
self._usernames -= username
def remove_bot_ids(self, bot_id):
"""
Remove one or more users from allowed user ids.
Args:
bot_id(:obj:`int` | List[:obj:`int`], optional): Which bot ID(s) to disallow
through.
"""
with self.__lock:
if self._usernames:
raise RuntimeError("Can't set bot_id in conjunction with (already set) "
"usernames.")
bot_id = self._parse_bot_id(bot_id)
self._bot_ids -= bot_id
def filter(self, message):
"""""" # remove method from docs
if message.via_bot:
if self.bot_ids:
return message.via_bot.id in self.bot_ids
if self.usernames:
return (message.via_bot.username
and message.via_bot.username in self.usernames)
return self.allow_empty
return False
class chat(BaseFilter):
"""Filters messages to allow only those which are from specified chat ID.
"""Filters messages to allow only those which are from a specified chat ID or username.
Examples:
``MessageHandler(Filters.chat(-1234), callback_method)``

View file

@ -28,7 +28,7 @@ import re
@pytest.fixture(scope='function')
def update():
return Update(0, Message(0, User(0, 'Testuser', False), datetime.datetime.utcnow(),
Chat(0, 'private')))
Chat(0, 'private'), via_bot=User(0, "Testbot", True)))
@pytest.fixture(scope='function',
@ -1093,3 +1093,128 @@ class TestFilters:
update.message.text = 'test'
result = (Filters.command | DataFilter('blah'))(update)
assert result['test'] == ['blah']
def test_filters_via_bot_init(self):
with pytest.raises(RuntimeError, match='in conjunction with'):
Filters.via_bot(bot_id=1, username='bot')
def test_filters_via_bot_allow_empty(self, update):
assert not Filters.via_bot()(update)
assert Filters.via_bot(allow_empty=True)(update)
def test_filters_via_bot_id(self, update):
assert not Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 1
assert Filters.via_bot(bot_id=1)(update)
update.message.via_bot.id = 2
assert Filters.via_bot(bot_id=[1, 2])(update)
assert not Filters.via_bot(bot_id=[3, 4])(update)
update.message.via_bot = None
assert not Filters.via_bot(bot_id=[3, 4])(update)
def test_filters_via_bot_username(self, update):
assert not Filters.via_bot(username='bot')(update)
assert not Filters.via_bot(username='Testbot')(update)
update.message.via_bot.username = 'bot@'
assert Filters.via_bot(username='@bot@')(update)
assert Filters.via_bot(username='bot@')(update)
assert Filters.via_bot(username=['bot1', 'bot@', 'bot2'])(update)
assert not Filters.via_bot(username=['@username', '@bot_2'])(update)
update.message.via_bot = None
assert not Filters.user(username=['@username', '@bot_2'])(update)
def test_filters_via_bot_change_id(self, update):
f = Filters.via_bot(bot_id=3)
update.message.via_bot.id = 3
assert f(update)
update.message.via_bot.id = 2
assert not f(update)
f.bot_ids = 2
assert f(update)
with pytest.raises(RuntimeError, match='username in conjunction'):
f.usernames = 'user'
def test_filters_via_bot_change_username(self, update):
f = Filters.via_bot(username='bot')
update.message.via_bot.username = 'bot'
assert f(update)
update.message.via_bot.username = 'Bot'
assert not f(update)
f.usernames = 'Bot'
assert f(update)
with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.bot_ids = 1
def test_filters_via_bot_add_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot()
for user in users:
update.message.via_bot.username = user
assert not f(update)
f.add_usernames('bot_a')
f.add_usernames(['bot_b', 'bot_c'])
for user in users:
update.message.via_bot.username = user
assert f(update)
with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.add_bot_ids(1)
def test_filters_via_bot_add_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot()
for user in users:
update.message.via_bot.id = user
assert not f(update)
f.add_bot_ids(1)
f.add_bot_ids([2, 3])
for user in users:
update.message.via_bot.username = user
assert f(update)
with pytest.raises(RuntimeError, match='username in conjunction'):
f.add_usernames('bot')
def test_filters_via_bot_remove_user_by_name(self, update):
users = ['bot_a', 'bot_b', 'bot_c']
f = Filters.via_bot(username=users)
with pytest.raises(RuntimeError, match='bot_id in conjunction'):
f.remove_bot_ids(1)
for user in users:
update.message.via_bot.username = user
assert f(update)
f.remove_usernames('bot_a')
f.remove_usernames(['bot_b', 'bot_c'])
for user in users:
update.message.via_bot.username = user
assert not f(update)
def test_filters_via_bot_remove_user_by_id(self, update):
users = [1, 2, 3]
f = Filters.via_bot(bot_id=users)
with pytest.raises(RuntimeError, match='username in conjunction'):
f.remove_usernames('bot')
for user in users:
update.message.via_bot.id = user
assert f(update)
f.remove_bot_ids(1)
f.remove_bot_ids([2, 3])
for user in users:
update.message.via_bot.username = user
assert not f(update)