mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2025-03-17 04:39:55 +01:00
Filters.forwarded_from (#2446)
* new filter and more tests! * document forward filter better, fix a test. * Minor formulation nitpicking Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
This commit is contained in:
parent
ccedd3a87d
commit
9e08fa30b6
2 changed files with 342 additions and 0 deletions
|
@ -1732,6 +1732,97 @@ officedocument.wordprocessingml.document")``.
|
|||
"""
|
||||
return super().remove_chat_ids(chat_id)
|
||||
|
||||
class forwarded_from(_ChatUserBaseFilter):
|
||||
# pylint: disable=W0235
|
||||
"""Filters messages to allow only those which are forwarded from the specified chat ID(s)
|
||||
or username(s) based on :attr:`telegram.Message.forward_from` and
|
||||
:attr:`telegram.Message.forward_from_chat`.
|
||||
|
||||
.. versionadded:: 13.5
|
||||
|
||||
Examples:
|
||||
``MessageHandler(Filters.forwarded_from(chat_id=1234), callback_method)``
|
||||
|
||||
Note:
|
||||
When a user has disallowed adding a link to their account while forwarding their
|
||||
messages, this filter will *not* work since both
|
||||
:attr:`telegram.Message.forwarded_from` and
|
||||
:attr:`telegram.Message.forwarded_from_chat` are :obj:`None`. However, this behaviour
|
||||
is undocumented and might be changed by Telegram.
|
||||
|
||||
Warning:
|
||||
:attr:`chat_ids` will give a *copy* of the saved chat ids as :class:`frozenset`. This
|
||||
is to ensure thread safety. To add/remove a chat, you should use :meth:`add_usernames`,
|
||||
:meth:`add_chat_ids`, :meth:`remove_usernames` and :meth:`remove_chat_ids`. Only update
|
||||
the entire set by ``filter.chat_ids/usernames = new_set``, if you are entirely sure
|
||||
that it is not causing race conditions, as this will complete replace the current set
|
||||
of allowed chats.
|
||||
|
||||
Args:
|
||||
chat_id(:class:`telegram.utils.types.SLT[int]`, optional):
|
||||
Which chat/user ID(s) to allow through.
|
||||
username(:class:`telegram.utils.types.SLT[str]`, optional):
|
||||
Which username(s) to allow through. Leading ``'@'`` s in usernames will be
|
||||
discarded.
|
||||
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no chat
|
||||
is specified in :attr:`chat_ids` and :attr:`usernames`. Defaults to :obj:`False`.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If both chat_id and username are present.
|
||||
|
||||
Attributes:
|
||||
chat_ids(set(:obj:`int`), optional): Which chat/user ID(s) to allow through.
|
||||
usernames(set(:obj:`str`), optional): Which username(s) (without leading ``'@'``) to
|
||||
allow through.
|
||||
allow_empty(:obj:`bool`, optional): Whether updates should be processed, if no chat
|
||||
is specified in :attr:`chat_ids` and :attr:`usernames`.
|
||||
"""
|
||||
|
||||
def get_chat_or_user(self, message: Message) -> Union[User, Chat, None]:
|
||||
return message.forward_from or message.forward_from_chat
|
||||
|
||||
def add_usernames(self, username: SLT[str]) -> None:
|
||||
"""
|
||||
Add one or more chats to the allowed usernames.
|
||||
|
||||
Args:
|
||||
username(:class:`telegram.utils.types.SLT[str]`, optional):
|
||||
Which username(s) to allow through.
|
||||
Leading ``'@'`` s in usernames will be discarded.
|
||||
"""
|
||||
return super().add_usernames(username)
|
||||
|
||||
def add_chat_ids(self, chat_id: SLT[int]) -> None:
|
||||
"""
|
||||
Add one or more chats to the allowed chat ids.
|
||||
|
||||
Args:
|
||||
chat_id(:class:`telegram.utils.types.SLT[int]`, optional):
|
||||
Which chat/user ID(s) to allow through.
|
||||
"""
|
||||
return super().add_chat_ids(chat_id)
|
||||
|
||||
def remove_usernames(self, username: SLT[str]) -> None:
|
||||
"""
|
||||
Remove one or more chats from allowed usernames.
|
||||
|
||||
Args:
|
||||
username(:class:`telegram.utils.types.SLT[str]`, optional):
|
||||
Which username(s) to disallow through.
|
||||
Leading ``'@'`` s in usernames will be discarded.
|
||||
"""
|
||||
return super().remove_usernames(username)
|
||||
|
||||
def remove_chat_ids(self, chat_id: SLT[int]) -> None:
|
||||
"""
|
||||
Remove one or more chats from allowed chat ids.
|
||||
|
||||
Args:
|
||||
chat_id(:class:`telegram.utils.types.SLT[int]`, optional):
|
||||
Which chat/user ID(s) to disallow through.
|
||||
"""
|
||||
return super().remove_chat_ids(chat_id)
|
||||
|
||||
class sender_chat(_ChatUserBaseFilter):
|
||||
# pylint: disable=W0235
|
||||
"""Filters messages to allow only those which are from a specified sender chats chat ID or
|
||||
|
|
|
@ -38,6 +38,8 @@ def update():
|
|||
from_user=User(0, 'Testuser', False),
|
||||
via_bot=User(0, "Testbot", True),
|
||||
sender_chat=Chat(0, 'Channel'),
|
||||
forward_from=User(0, "HAL9000", False),
|
||||
forward_from_chat=Chat(0, "Channel"),
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -1236,6 +1238,255 @@ class TestFilters:
|
|||
with pytest.raises(RuntimeError, match='Cannot set name'):
|
||||
f.name = 'foo'
|
||||
|
||||
def test_filters_forwarded_from_init(self):
|
||||
with pytest.raises(RuntimeError, match='in conjunction with'):
|
||||
Filters.forwarded_from(chat_id=1, username='chat')
|
||||
|
||||
def test_filters_forwarded_from_allow_empty(self, update):
|
||||
assert not Filters.forwarded_from()(update)
|
||||
assert Filters.forwarded_from(allow_empty=True)(update)
|
||||
|
||||
def test_filters_forwarded_from_id(self, update):
|
||||
# Test with User id-
|
||||
assert not Filters.forwarded_from(chat_id=1)(update)
|
||||
update.message.forward_from.id = 1
|
||||
assert Filters.forwarded_from(chat_id=1)(update)
|
||||
update.message.forward_from.id = 2
|
||||
assert Filters.forwarded_from(chat_id=[1, 2])(update)
|
||||
assert not Filters.forwarded_from(chat_id=[3, 4])(update)
|
||||
update.message.forward_from = None
|
||||
assert not Filters.forwarded_from(chat_id=[3, 4])(update)
|
||||
|
||||
# Test with Chat id-
|
||||
update.message.forward_from_chat.id = 4
|
||||
assert Filters.forwarded_from(chat_id=[4])(update)
|
||||
assert Filters.forwarded_from(chat_id=[3, 4])(update)
|
||||
|
||||
update.message.forward_from_chat.id = 2
|
||||
assert not Filters.forwarded_from(chat_id=[3, 4])(update)
|
||||
assert Filters.forwarded_from(chat_id=2)(update)
|
||||
|
||||
def test_filters_forwarded_from_username(self, update):
|
||||
# For User username
|
||||
assert not Filters.forwarded_from(username='chat')(update)
|
||||
assert not Filters.forwarded_from(username='Testchat')(update)
|
||||
update.message.forward_from.username = 'chat@'
|
||||
assert Filters.forwarded_from(username='@chat@')(update)
|
||||
assert Filters.forwarded_from(username='chat@')(update)
|
||||
assert Filters.forwarded_from(username=['chat1', 'chat@', 'chat2'])(update)
|
||||
assert not Filters.forwarded_from(username=['@username', '@chat_2'])(update)
|
||||
update.message.forward_from = None
|
||||
assert not Filters.forwarded_from(username=['@username', '@chat_2'])(update)
|
||||
|
||||
# For Chat username
|
||||
assert not Filters.forwarded_from(username='chat')(update)
|
||||
assert not Filters.forwarded_from(username='Testchat')(update)
|
||||
update.message.forward_from_chat.username = 'chat@'
|
||||
assert Filters.forwarded_from(username='@chat@')(update)
|
||||
assert Filters.forwarded_from(username='chat@')(update)
|
||||
assert Filters.forwarded_from(username=['chat1', 'chat@', 'chat2'])(update)
|
||||
assert not Filters.forwarded_from(username=['@username', '@chat_2'])(update)
|
||||
update.message.forward_from_chat = None
|
||||
assert not Filters.forwarded_from(username=['@username', '@chat_2'])(update)
|
||||
|
||||
def test_filters_forwarded_from_change_id(self, update):
|
||||
f = Filters.forwarded_from(chat_id=1)
|
||||
# For User ids-
|
||||
assert f.chat_ids == {1}
|
||||
update.message.forward_from.id = 1
|
||||
assert f(update)
|
||||
update.message.forward_from.id = 2
|
||||
assert not f(update)
|
||||
f.chat_ids = 2
|
||||
assert f.chat_ids == {2}
|
||||
assert f(update)
|
||||
|
||||
# For Chat ids-
|
||||
f = Filters.forwarded_from(chat_id=1) # reset this
|
||||
update.message.forward_from = None # and change this to None, only one of them can be True
|
||||
assert f.chat_ids == {1}
|
||||
update.message.forward_from_chat.id = 1
|
||||
assert f(update)
|
||||
update.message.forward_from_chat.id = 2
|
||||
assert not f(update)
|
||||
f.chat_ids = 2
|
||||
assert f.chat_ids == {2}
|
||||
assert f(update)
|
||||
|
||||
with pytest.raises(RuntimeError, match='username in conjunction'):
|
||||
f.usernames = 'chat'
|
||||
|
||||
def test_filters_forwarded_from_change_username(self, update):
|
||||
# For User usernames
|
||||
f = Filters.forwarded_from(username='chat')
|
||||
update.message.forward_from.username = 'chat'
|
||||
assert f(update)
|
||||
update.message.forward_from.username = 'User'
|
||||
assert not f(update)
|
||||
f.usernames = 'User'
|
||||
assert f(update)
|
||||
|
||||
# For Chat usernames
|
||||
update.message.forward_from = None
|
||||
f = Filters.forwarded_from(username='chat')
|
||||
update.message.forward_from_chat.username = 'chat'
|
||||
assert f(update)
|
||||
update.message.forward_from_chat.username = 'User'
|
||||
assert not f(update)
|
||||
f.usernames = 'User'
|
||||
assert f(update)
|
||||
|
||||
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
||||
f.chat_ids = 1
|
||||
|
||||
def test_filters_forwarded_from_add_chat_by_name(self, update):
|
||||
chats = ['chat_a', 'chat_b', 'chat_c']
|
||||
f = Filters.forwarded_from()
|
||||
|
||||
# For User usernames
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert not f(update)
|
||||
|
||||
f.add_usernames('chat_a')
|
||||
f.add_usernames(['chat_b', 'chat_c'])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert f(update)
|
||||
|
||||
# For Chat usernames
|
||||
update.message.forward_from = None
|
||||
f = Filters.forwarded_from()
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert not f(update)
|
||||
|
||||
f.add_usernames('chat_a')
|
||||
f.add_usernames(['chat_b', 'chat_c'])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert f(update)
|
||||
|
||||
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
||||
f.add_chat_ids(1)
|
||||
|
||||
def test_filters_forwarded_from_add_chat_by_id(self, update):
|
||||
chats = [1, 2, 3]
|
||||
f = Filters.forwarded_from()
|
||||
|
||||
# For User ids
|
||||
for chat in chats:
|
||||
update.message.forward_from.id = chat
|
||||
assert not f(update)
|
||||
|
||||
f.add_chat_ids(1)
|
||||
f.add_chat_ids([2, 3])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert f(update)
|
||||
|
||||
# For Chat ids-
|
||||
update.message.forward_from = None
|
||||
f = Filters.forwarded_from()
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.id = chat
|
||||
assert not f(update)
|
||||
|
||||
f.add_chat_ids(1)
|
||||
f.add_chat_ids([2, 3])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert f(update)
|
||||
|
||||
with pytest.raises(RuntimeError, match='username in conjunction'):
|
||||
f.add_usernames('chat')
|
||||
|
||||
def test_filters_forwarded_from_remove_chat_by_name(self, update):
|
||||
chats = ['chat_a', 'chat_b', 'chat_c']
|
||||
f = Filters.forwarded_from(username=chats)
|
||||
|
||||
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
||||
f.remove_chat_ids(1)
|
||||
|
||||
# For User usernames
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert f(update)
|
||||
|
||||
f.remove_usernames('chat_a')
|
||||
f.remove_usernames(['chat_b', 'chat_c'])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert not f(update)
|
||||
|
||||
# For Chat usernames
|
||||
update.message.forward_from = None
|
||||
f = Filters.forwarded_from(username=chats)
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert f(update)
|
||||
|
||||
f.remove_usernames('chat_a')
|
||||
f.remove_usernames(['chat_b', 'chat_c'])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert not f(update)
|
||||
|
||||
def test_filters_forwarded_from_remove_chat_by_id(self, update):
|
||||
chats = [1, 2, 3]
|
||||
f = Filters.forwarded_from(chat_id=chats)
|
||||
|
||||
with pytest.raises(RuntimeError, match='username in conjunction'):
|
||||
f.remove_usernames('chat')
|
||||
|
||||
# For User ids
|
||||
for chat in chats:
|
||||
update.message.forward_from.id = chat
|
||||
assert f(update)
|
||||
|
||||
f.remove_chat_ids(1)
|
||||
f.remove_chat_ids([2, 3])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from.username = chat
|
||||
assert not f(update)
|
||||
|
||||
# For Chat ids
|
||||
update.message.forward_from = None
|
||||
f = Filters.forwarded_from(chat_id=chats)
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.id = chat
|
||||
assert f(update)
|
||||
|
||||
f.remove_chat_ids(1)
|
||||
f.remove_chat_ids([2, 3])
|
||||
|
||||
for chat in chats:
|
||||
update.message.forward_from_chat.username = chat
|
||||
assert not f(update)
|
||||
|
||||
def test_filters_forwarded_from_repr(self):
|
||||
f = Filters.forwarded_from([1, 2])
|
||||
assert str(f) == 'Filters.forwarded_from(1, 2)'
|
||||
f.remove_chat_ids(1)
|
||||
f.remove_chat_ids(2)
|
||||
assert str(f) == 'Filters.forwarded_from()'
|
||||
f.add_usernames('@foobar')
|
||||
assert str(f) == 'Filters.forwarded_from(foobar)'
|
||||
f.add_usernames('@barfoo')
|
||||
assert str(f).startswith('Filters.forwarded_from(')
|
||||
# we don't know the exact order
|
||||
assert 'barfoo' in str(f) and 'foobar' in str(f)
|
||||
|
||||
with pytest.raises(RuntimeError, match='Cannot set name'):
|
||||
f.name = 'foo'
|
||||
|
||||
def test_filters_sender_chat_init(self):
|
||||
with pytest.raises(RuntimeError, match='in conjunction with'):
|
||||
Filters.sender_chat(chat_id=1, username='chat')
|
||||
|
|
Loading…
Add table
Reference in a new issue