mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-12-22 22:45:09 +01:00
613175b2c4
* Make Filters.user attrs mutable * Add test_filters_user_empty_args * Add test_filters_user_empty_args * fix locks * Make codecov happy * Make user_ids and usernames sets * Correct doc string * Address review * Review Vol. II * Apply suggestions from code review Co-authored-by: Noam Meltzer <tsnoam@gmail.com> * Review Vol III. * propery setter is now only a wrapper to a private method + more cleanups pylint complained on some extra stuff, so cleaned them as well * Review Vol. IV * Review Vol. V * Apply changes to Filters.chat Co-authored-by: Noam Meltzer <tsnoam@gmail.com>
1086 lines
41 KiB
Python
1086 lines
41 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# A library that provides a Python interface to the Telegram Bot API
|
|
# Copyright (C) 2015-2020
|
|
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser Public License
|
|
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
|
import datetime
|
|
|
|
import pytest
|
|
|
|
from telegram import Message, User, Chat, MessageEntity, Document, Update, Dice
|
|
from telegram.ext import Filters, BaseFilter
|
|
import re
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def update():
|
|
return Update(0, Message(0, User(0, 'Testuser', False), datetime.datetime.utcnow(),
|
|
Chat(0, 'private')))
|
|
|
|
|
|
@pytest.fixture(scope='function',
|
|
params=MessageEntity.ALL_TYPES)
|
|
def message_entity(request):
|
|
return MessageEntity(request.param, 0, 0, url='', user='')
|
|
|
|
|
|
class TestFilters(object):
|
|
def test_filters_all(self, update):
|
|
assert Filters.all(update)
|
|
|
|
def test_filters_text(self, update):
|
|
update.message.text = 'test'
|
|
assert (Filters.text)(update)
|
|
update.message.text = '/test'
|
|
assert (Filters.text)(update)
|
|
|
|
def test_filters_text_strings(self, update):
|
|
update.message.text = '/test'
|
|
assert Filters.text({'/test', 'test1'})(update)
|
|
assert not Filters.text(['test1', 'test2'])(update)
|
|
|
|
def test_filters_caption(self, update):
|
|
update.message.caption = 'test'
|
|
assert (Filters.caption)(update)
|
|
update.message.caption = None
|
|
assert not (Filters.caption)(update)
|
|
|
|
def test_filters_caption_strings(self, update):
|
|
update.message.caption = 'test'
|
|
assert Filters.caption({'test', 'test1'})(update)
|
|
assert not Filters.caption(['test1', 'test2'])(update)
|
|
|
|
def test_filters_command_default(self, update):
|
|
update.message.text = 'test'
|
|
assert not Filters.command(update)
|
|
update.message.text = '/test'
|
|
assert not Filters.command(update)
|
|
# Only accept commands at the beginning
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 3, 5)]
|
|
assert not Filters.command(update)
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
assert Filters.command(update)
|
|
|
|
def test_filters_command_anywhere(self, update):
|
|
update.message.text = 'test /cmd'
|
|
assert not (Filters.command(False))(update)
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 5, 4)]
|
|
assert (Filters.command(False))(update)
|
|
|
|
def test_filters_regex(self, update):
|
|
SRE_TYPE = type(re.match("", ""))
|
|
update.message.text = '/start deep-linked param'
|
|
result = Filters.regex(r'deep-linked param')(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert type(matches[0]) is SRE_TYPE
|
|
update.message.text = '/help'
|
|
assert Filters.regex(r'help')(update)
|
|
|
|
update.message.text = 'test'
|
|
assert not Filters.regex(r'fail')(update)
|
|
assert Filters.regex(r'test')(update)
|
|
assert Filters.regex(re.compile(r'test'))(update)
|
|
assert Filters.regex(re.compile(r'TEST', re.IGNORECASE))(update)
|
|
|
|
update.message.text = 'i love python'
|
|
assert Filters.regex(r'.\b[lo]{2}ve python')(update)
|
|
|
|
update.message.text = None
|
|
assert not Filters.regex(r'fail')(update)
|
|
|
|
def test_filters_regex_multiple(self, update):
|
|
SRE_TYPE = type(re.match("", ""))
|
|
update.message.text = '/start deep-linked param'
|
|
result = (Filters.regex('deep') & Filters.regex(r'linked param'))(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
result = (Filters.regex('deep') | Filters.regex(r'linked param'))(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
result = (Filters.regex('not int') | Filters.regex(r'linked param'))(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
result = (Filters.regex('not int') & Filters.regex(r'linked param'))(update)
|
|
assert not result
|
|
|
|
def test_filters_merged_with_regex(self, update):
|
|
SRE_TYPE = type(re.match("", ""))
|
|
update.message.text = '/start deep-linked param'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)]
|
|
result = (Filters.command & Filters.regex(r'linked param'))(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
result = (Filters.regex(r'linked param') & Filters.command)(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
result = (Filters.regex(r'linked param') | Filters.command)(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
# Should not give a match since it's a or filter and it short circuits
|
|
result = (Filters.command | Filters.regex(r'linked param'))(update)
|
|
assert result is True
|
|
|
|
def test_regex_complex_merges(self, update):
|
|
SRE_TYPE = type(re.match("", ""))
|
|
update.message.text = 'test it out'
|
|
filter = (Filters.regex('test')
|
|
& ((Filters.status_update | Filters.forwarded) | Filters.regex('out')))
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert len(matches) == 2
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.forward_date = datetime.datetime.utcnow()
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.text = 'test it'
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.forward_date = False
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.text = 'test it out'
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.pinned_message = True
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.text = 'it out'
|
|
result = filter(update)
|
|
assert not result
|
|
|
|
update.message.text = 'test it out'
|
|
update.message.forward_date = None
|
|
update.message.pinned_message = None
|
|
filter = ((Filters.regex('test') | Filters.command)
|
|
& (Filters.regex('it') | Filters.status_update))
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert len(matches) == 2
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.text = 'test'
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.pinned_message = True
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert len(matches) == 1
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
update.message.text = 'nothing'
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.text = '/start'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)]
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, bool)
|
|
update.message.text = '/start it'
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, dict)
|
|
matches = result['matches']
|
|
assert isinstance(matches, list)
|
|
assert len(matches) == 1
|
|
assert all([type(res) == SRE_TYPE for res in matches])
|
|
|
|
def test_regex_inverted(self, update):
|
|
update.message.text = '/start deep-linked param'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
filter = ~Filters.regex(r'deep-linked param')
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.text = 'not it'
|
|
result = filter(update)
|
|
assert result
|
|
assert isinstance(result, bool)
|
|
|
|
filter = (~Filters.regex('linked') & Filters.command)
|
|
update.message.text = "it's linked"
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.text = '/start'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)]
|
|
result = filter(update)
|
|
assert result
|
|
update.message.text = '/linked'
|
|
result = filter(update)
|
|
assert not result
|
|
|
|
filter = (~Filters.regex('linked') | Filters.command)
|
|
update.message.text = "it's linked"
|
|
update.message.entities = []
|
|
result = filter(update)
|
|
assert not result
|
|
update.message.text = '/start linked'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)]
|
|
result = filter(update)
|
|
assert result
|
|
update.message.text = '/start'
|
|
result = filter(update)
|
|
assert result
|
|
update.message.text = 'nothig'
|
|
update.message.entities = []
|
|
result = filter(update)
|
|
assert result
|
|
|
|
def test_filters_reply(self, update):
|
|
another_message = Message(1, User(1, 'TestOther', False), datetime.datetime.utcnow(),
|
|
Chat(0, 'private'))
|
|
update.message.text = 'test'
|
|
assert not Filters.reply(update)
|
|
update.message.reply_to_message = another_message
|
|
assert Filters.reply(update)
|
|
|
|
def test_filters_audio(self, update):
|
|
assert not Filters.audio(update)
|
|
update.message.audio = 'test'
|
|
assert Filters.audio(update)
|
|
|
|
def test_filters_document(self, update):
|
|
assert not Filters.document(update)
|
|
update.message.document = 'test'
|
|
assert Filters.document(update)
|
|
|
|
def test_filters_document_type(self, update):
|
|
update.message.document = Document("file_id", 'unique_id',
|
|
mime_type="application/vnd.android.package-archive")
|
|
assert Filters.document.apk(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.doc(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "application/msword"
|
|
assert Filters.document.doc(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.docx(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "application/vnd.openxmlformats-officedocument." \
|
|
"wordprocessingml.document"
|
|
assert Filters.document.docx(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.exe(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "application/x-ms-dos-executable"
|
|
assert Filters.document.exe(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.docx(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "video/mp4"
|
|
assert Filters.document.gif(update)
|
|
assert Filters.document.video(update)
|
|
assert not Filters.document.jpg(update)
|
|
assert not Filters.document.text(update)
|
|
|
|
update.message.document.mime_type = "image/jpeg"
|
|
assert Filters.document.jpg(update)
|
|
assert Filters.document.image(update)
|
|
assert not Filters.document.mp3(update)
|
|
assert not Filters.document.video(update)
|
|
|
|
update.message.document.mime_type = "audio/mpeg"
|
|
assert Filters.document.mp3(update)
|
|
assert Filters.document.audio(update)
|
|
assert not Filters.document.pdf(update)
|
|
assert not Filters.document.image(update)
|
|
|
|
update.message.document.mime_type = "application/pdf"
|
|
assert Filters.document.pdf(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.py(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "text/x-python"
|
|
assert Filters.document.py(update)
|
|
assert Filters.document.text(update)
|
|
assert not Filters.document.svg(update)
|
|
assert not Filters.document.application(update)
|
|
|
|
update.message.document.mime_type = "image/svg+xml"
|
|
assert Filters.document.svg(update)
|
|
assert Filters.document.image(update)
|
|
assert not Filters.document.txt(update)
|
|
assert not Filters.document.video(update)
|
|
|
|
update.message.document.mime_type = "text/plain"
|
|
assert Filters.document.txt(update)
|
|
assert Filters.document.text(update)
|
|
assert not Filters.document.targz(update)
|
|
assert not Filters.document.application(update)
|
|
|
|
update.message.document.mime_type = "application/x-compressed-tar"
|
|
assert Filters.document.targz(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.wav(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "audio/x-wav"
|
|
assert Filters.document.wav(update)
|
|
assert Filters.document.audio(update)
|
|
assert not Filters.document.xml(update)
|
|
assert not Filters.document.image(update)
|
|
|
|
update.message.document.mime_type = "application/xml"
|
|
assert Filters.document.xml(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.zip(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "application/zip"
|
|
assert Filters.document.zip(update)
|
|
assert Filters.document.application(update)
|
|
assert not Filters.document.apk(update)
|
|
assert not Filters.document.audio(update)
|
|
|
|
update.message.document.mime_type = "image/x-rgb"
|
|
assert not Filters.document.category("application/")(update)
|
|
assert not Filters.document.mime_type("application/x-sh")(update)
|
|
update.message.document.mime_type = "application/x-sh"
|
|
assert Filters.document.category("application/")(update)
|
|
assert Filters.document.mime_type("application/x-sh")(update)
|
|
|
|
def test_filters_animation(self, update):
|
|
assert not Filters.animation(update)
|
|
update.message.animation = 'test'
|
|
assert Filters.animation(update)
|
|
|
|
def test_filters_photo(self, update):
|
|
assert not Filters.photo(update)
|
|
update.message.photo = 'test'
|
|
assert Filters.photo(update)
|
|
|
|
def test_filters_sticker(self, update):
|
|
assert not Filters.sticker(update)
|
|
update.message.sticker = 'test'
|
|
assert Filters.sticker(update)
|
|
|
|
def test_filters_video(self, update):
|
|
assert not Filters.video(update)
|
|
update.message.video = 'test'
|
|
assert Filters.video(update)
|
|
|
|
def test_filters_voice(self, update):
|
|
assert not Filters.voice(update)
|
|
update.message.voice = 'test'
|
|
assert Filters.voice(update)
|
|
|
|
def test_filters_video_note(self, update):
|
|
assert not Filters.video_note(update)
|
|
update.message.video_note = 'test'
|
|
assert Filters.video_note(update)
|
|
|
|
def test_filters_contact(self, update):
|
|
assert not Filters.contact(update)
|
|
update.message.contact = 'test'
|
|
assert Filters.contact(update)
|
|
|
|
def test_filters_location(self, update):
|
|
assert not Filters.location(update)
|
|
update.message.location = 'test'
|
|
assert Filters.location(update)
|
|
|
|
def test_filters_venue(self, update):
|
|
assert not Filters.venue(update)
|
|
update.message.venue = 'test'
|
|
assert Filters.venue(update)
|
|
|
|
def test_filters_status_update(self, update):
|
|
assert not Filters.status_update(update)
|
|
|
|
update.message.new_chat_members = ['test']
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.new_chat_members(update)
|
|
update.message.new_chat_members = None
|
|
|
|
update.message.left_chat_member = 'test'
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.left_chat_member(update)
|
|
update.message.left_chat_member = None
|
|
|
|
update.message.new_chat_title = 'test'
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.new_chat_title(update)
|
|
update.message.new_chat_title = ''
|
|
|
|
update.message.new_chat_photo = 'test'
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.new_chat_photo(update)
|
|
update.message.new_chat_photo = None
|
|
|
|
update.message.delete_chat_photo = True
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.delete_chat_photo(update)
|
|
update.message.delete_chat_photo = False
|
|
|
|
update.message.group_chat_created = True
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.chat_created(update)
|
|
update.message.group_chat_created = False
|
|
|
|
update.message.supergroup_chat_created = True
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.chat_created(update)
|
|
update.message.supergroup_chat_created = False
|
|
|
|
update.message.channel_chat_created = True
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.chat_created(update)
|
|
update.message.channel_chat_created = False
|
|
|
|
update.message.migrate_to_chat_id = 100
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.migrate(update)
|
|
update.message.migrate_to_chat_id = 0
|
|
|
|
update.message.migrate_from_chat_id = 100
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.migrate(update)
|
|
update.message.migrate_from_chat_id = 0
|
|
|
|
update.message.pinned_message = 'test'
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.pinned_message(update)
|
|
update.message.pinned_message = None
|
|
|
|
update.message.connected_website = 'http://example.com/'
|
|
assert Filters.status_update(update)
|
|
assert Filters.status_update.connected_website(update)
|
|
update.message.connected_website = None
|
|
|
|
def test_filters_forwarded(self, update):
|
|
assert not Filters.forwarded(update)
|
|
update.message.forward_date = datetime.datetime.utcnow()
|
|
assert Filters.forwarded(update)
|
|
|
|
def test_filters_game(self, update):
|
|
assert not Filters.game(update)
|
|
update.message.game = 'test'
|
|
assert Filters.game(update)
|
|
|
|
def test_entities_filter(self, update, message_entity):
|
|
update.message.entities = [message_entity]
|
|
assert Filters.entity(message_entity.type)(update)
|
|
|
|
update.message.entities = []
|
|
assert not Filters.entity(MessageEntity.MENTION)(update)
|
|
|
|
second = message_entity.to_dict()
|
|
second['type'] = 'bold'
|
|
second = MessageEntity.de_json(second, None)
|
|
update.message.entities = [message_entity, second]
|
|
assert Filters.entity(message_entity.type)(update)
|
|
assert not Filters.caption_entity(message_entity.type)(update)
|
|
|
|
def test_caption_entities_filter(self, update, message_entity):
|
|
update.message.caption_entities = [message_entity]
|
|
assert Filters.caption_entity(message_entity.type)(update)
|
|
|
|
update.message.caption_entities = []
|
|
assert not Filters.caption_entity(MessageEntity.MENTION)(update)
|
|
|
|
second = message_entity.to_dict()
|
|
second['type'] = 'bold'
|
|
second = MessageEntity.de_json(second, None)
|
|
update.message.caption_entities = [message_entity, second]
|
|
assert Filters.caption_entity(message_entity.type)(update)
|
|
assert not Filters.entity(message_entity.type)(update)
|
|
|
|
def test_private_filter(self, update):
|
|
assert Filters.private(update)
|
|
update.message.chat.type = 'group'
|
|
assert not Filters.private(update)
|
|
|
|
def test_group_filter(self, update):
|
|
assert not Filters.group(update)
|
|
update.message.chat.type = 'group'
|
|
assert Filters.group(update)
|
|
update.message.chat.type = 'supergroup'
|
|
assert Filters.group(update)
|
|
|
|
def test_filters_user_init(self):
|
|
with pytest.raises(RuntimeError, match='in conjunction with'):
|
|
Filters.user(user_id=1, username='user')
|
|
|
|
def test_filters_user_allow_empty(self, update):
|
|
assert not Filters.user()(update)
|
|
assert Filters.user(allow_empty=True)(update)
|
|
|
|
def test_filters_user_id(self, update):
|
|
assert not Filters.user(user_id=1)(update)
|
|
update.message.from_user.id = 1
|
|
assert Filters.user(user_id=1)(update)
|
|
update.message.from_user.id = 2
|
|
assert Filters.user(user_id=[1, 2])(update)
|
|
assert not Filters.user(user_id=[3, 4])(update)
|
|
update.message.from_user = None
|
|
assert not Filters.user(user_id=[3, 4])(update)
|
|
|
|
def test_filters_username(self, update):
|
|
assert not Filters.user(username='user')(update)
|
|
assert not Filters.user(username='Testuser')(update)
|
|
update.message.from_user.username = 'user@'
|
|
assert Filters.user(username='@user@')(update)
|
|
assert Filters.user(username='user@')(update)
|
|
assert Filters.user(username=['user1', 'user@', 'user2'])(update)
|
|
assert not Filters.user(username=['@username', '@user_2'])(update)
|
|
update.message.from_user = None
|
|
assert not Filters.user(username=['@username', '@user_2'])(update)
|
|
|
|
def test_filters_user_change_id(self, update):
|
|
f = Filters.user(user_id=1)
|
|
update.message.from_user.id = 1
|
|
assert f(update)
|
|
update.message.from_user.id = 2
|
|
assert not f(update)
|
|
f.user_ids = 2
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.usernames = 'user'
|
|
|
|
def test_filters_user_change_username(self, update):
|
|
f = Filters.user(username='user')
|
|
update.message.from_user.username = 'user'
|
|
assert f(update)
|
|
update.message.from_user.username = 'User'
|
|
assert not f(update)
|
|
f.usernames = 'User'
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='user_id in conjunction'):
|
|
f.user_ids = 1
|
|
|
|
def test_filters_user_add_user_by_name(self, update):
|
|
users = ['user_a', 'user_b', 'user_c']
|
|
f = Filters.user()
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert not f(update)
|
|
|
|
f.add_usernames('user_a')
|
|
f.add_usernames(['user_b', 'user_c'])
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='user_id in conjunction'):
|
|
f.add_user_ids(1)
|
|
|
|
def test_filters_user_add_user_by_id(self, update):
|
|
users = [1, 2, 3]
|
|
f = Filters.user()
|
|
|
|
for user in users:
|
|
update.message.from_user.id = user
|
|
assert not f(update)
|
|
|
|
f.add_user_ids(1)
|
|
f.add_user_ids([2, 3])
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.add_usernames('user')
|
|
|
|
def test_filters_user_remove_user_by_name(self, update):
|
|
users = ['user_a', 'user_b', 'user_c']
|
|
f = Filters.user(username=users)
|
|
|
|
with pytest.raises(RuntimeError, match='user_id in conjunction'):
|
|
f.remove_user_ids(1)
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert f(update)
|
|
|
|
f.remove_usernames('user_a')
|
|
f.remove_usernames(['user_b', 'user_c'])
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert not f(update)
|
|
|
|
def test_filters_user_remove_user_by_id(self, update):
|
|
users = [1, 2, 3]
|
|
f = Filters.user(user_id=users)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.remove_usernames('user')
|
|
|
|
for user in users:
|
|
update.message.from_user.id = user
|
|
assert f(update)
|
|
|
|
f.remove_user_ids(1)
|
|
f.remove_user_ids([2, 3])
|
|
|
|
for user in users:
|
|
update.message.from_user.username = user
|
|
assert not f(update)
|
|
|
|
def test_filters_chat_init(self):
|
|
with pytest.raises(RuntimeError, match='in conjunction with'):
|
|
Filters.chat(chat_id=1, username='chat')
|
|
|
|
def test_filters_chat_allow_empty(self, update):
|
|
assert not Filters.chat()(update)
|
|
assert Filters.chat(allow_empty=True)(update)
|
|
|
|
def test_filters_chat_id(self, update):
|
|
assert not Filters.chat(chat_id=1)(update)
|
|
update.message.chat.id = 1
|
|
assert Filters.chat(chat_id=1)(update)
|
|
update.message.chat.id = 2
|
|
assert Filters.chat(chat_id=[1, 2])(update)
|
|
assert not Filters.chat(chat_id=[3, 4])(update)
|
|
update.message.chat = None
|
|
assert not Filters.chat(chat_id=[3, 4])(update)
|
|
|
|
def test_filters_chat_username(self, update):
|
|
assert not Filters.chat(username='chat')(update)
|
|
assert not Filters.chat(username='Testchat')(update)
|
|
update.message.chat.username = 'chat@'
|
|
assert Filters.chat(username='@chat@')(update)
|
|
assert Filters.chat(username='chat@')(update)
|
|
assert Filters.chat(username=['chat1', 'chat@', 'chat2'])(update)
|
|
assert not Filters.chat(username=['@username', '@chat_2'])(update)
|
|
update.message.chat = None
|
|
assert not Filters.chat(username=['@username', '@chat_2'])(update)
|
|
|
|
def test_filters_chat_change_id(self, update):
|
|
f = Filters.chat(chat_id=1)
|
|
update.message.chat.id = 1
|
|
assert f(update)
|
|
update.message.chat.id = 2
|
|
assert not f(update)
|
|
f.chat_ids = 2
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.usernames = 'chat'
|
|
|
|
def test_filters_chat_change_username(self, update):
|
|
f = Filters.chat(username='chat')
|
|
update.message.chat.username = 'chat'
|
|
assert f(update)
|
|
update.message.chat.username = 'User'
|
|
assert not f(update)
|
|
f.usernames = 'User'
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
|
f.chat_ids = 1
|
|
|
|
def test_filters_chat_add_chat_by_name(self, update):
|
|
chats = ['chat_a', 'chat_b', 'chat_c']
|
|
f = Filters.chat()
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert not f(update)
|
|
|
|
f.add_usernames('chat_a')
|
|
f.add_usernames(['chat_b', 'chat_c'])
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
|
f.add_chat_ids(1)
|
|
|
|
def test_filters_chat_add_chat_by_id(self, update):
|
|
chats = [1, 2, 3]
|
|
f = Filters.chat()
|
|
|
|
for chat in chats:
|
|
update.message.chat.id = chat
|
|
assert not f(update)
|
|
|
|
f.add_chat_ids(1)
|
|
f.add_chat_ids([2, 3])
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert f(update)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.add_usernames('chat')
|
|
|
|
def test_filters_chat_remove_chat_by_name(self, update):
|
|
chats = ['chat_a', 'chat_b', 'chat_c']
|
|
f = Filters.chat(username=chats)
|
|
|
|
with pytest.raises(RuntimeError, match='chat_id in conjunction'):
|
|
f.remove_chat_ids(1)
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert f(update)
|
|
|
|
f.remove_usernames('chat_a')
|
|
f.remove_usernames(['chat_b', 'chat_c'])
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert not f(update)
|
|
|
|
def test_filters_chat_remove_chat_by_id(self, update):
|
|
chats = [1, 2, 3]
|
|
f = Filters.chat(chat_id=chats)
|
|
|
|
with pytest.raises(RuntimeError, match='username in conjunction'):
|
|
f.remove_usernames('chat')
|
|
|
|
for chat in chats:
|
|
update.message.chat.id = chat
|
|
assert f(update)
|
|
|
|
f.remove_chat_ids(1)
|
|
f.remove_chat_ids([2, 3])
|
|
|
|
for chat in chats:
|
|
update.message.chat.username = chat
|
|
assert not f(update)
|
|
|
|
def test_filters_invoice(self, update):
|
|
assert not Filters.invoice(update)
|
|
update.message.invoice = 'test'
|
|
assert Filters.invoice(update)
|
|
|
|
def test_filters_successful_payment(self, update):
|
|
assert not Filters.successful_payment(update)
|
|
update.message.successful_payment = 'test'
|
|
assert Filters.successful_payment(update)
|
|
|
|
def test_filters_passport_data(self, update):
|
|
assert not Filters.passport_data(update)
|
|
update.message.passport_data = 'test'
|
|
assert Filters.passport_data(update)
|
|
|
|
def test_filters_poll(self, update):
|
|
assert not Filters.poll(update)
|
|
update.message.poll = 'test'
|
|
assert Filters.poll(update)
|
|
|
|
@pytest.mark.parametrize('emoji', Dice.ALL_EMOJI)
|
|
def test_filters_dice(self, update, emoji):
|
|
update.message.dice = Dice(4, emoji)
|
|
assert Filters.dice(update)
|
|
update.message.dice = None
|
|
assert not Filters.dice(update)
|
|
|
|
@pytest.mark.parametrize('emoji', Dice.ALL_EMOJI)
|
|
def test_filters_dice_list(self, update, emoji):
|
|
update.message.dice = None
|
|
assert not Filters.dice(5)(update)
|
|
|
|
update.message.dice = Dice(5, emoji)
|
|
assert Filters.dice(5)(update)
|
|
assert Filters.dice({5, 6})(update)
|
|
assert not Filters.dice(1)(update)
|
|
assert not Filters.dice([2, 3])(update)
|
|
|
|
def test_filters_dice_type(self, update):
|
|
update.message.dice = Dice(5, '🎲')
|
|
assert Filters.dice.dice(update)
|
|
assert Filters.dice.dice([4, 5])(update)
|
|
assert not Filters.dice.darts(update)
|
|
assert not Filters.dice.dice([6])(update)
|
|
|
|
update.message.dice = Dice(5, '🎯')
|
|
assert Filters.dice.darts(update)
|
|
assert Filters.dice.darts([4, 5])(update)
|
|
assert not Filters.dice.dice(update)
|
|
assert not Filters.dice.darts([6])(update)
|
|
|
|
def test_language_filter_single(self, update):
|
|
update.message.from_user.language_code = 'en_US'
|
|
assert (Filters.language('en_US'))(update)
|
|
assert (Filters.language('en'))(update)
|
|
assert not (Filters.language('en_GB'))(update)
|
|
assert not (Filters.language('da'))(update)
|
|
update.message.from_user.language_code = 'da'
|
|
assert not (Filters.language('en_US'))(update)
|
|
assert not (Filters.language('en'))(update)
|
|
assert not (Filters.language('en_GB'))(update)
|
|
assert (Filters.language('da'))(update)
|
|
|
|
def test_language_filter_multiple(self, update):
|
|
f = Filters.language(['en_US', 'da'])
|
|
update.message.from_user.language_code = 'en_US'
|
|
assert f(update)
|
|
update.message.from_user.language_code = 'en_GB'
|
|
assert not f(update)
|
|
update.message.from_user.language_code = 'da'
|
|
assert f(update)
|
|
|
|
def test_and_filters(self, update):
|
|
update.message.text = 'test'
|
|
update.message.forward_date = datetime.datetime.utcnow()
|
|
assert (Filters.text & Filters.forwarded)(update)
|
|
update.message.text = '/test'
|
|
assert (Filters.text & Filters.forwarded)(update)
|
|
update.message.text = 'test'
|
|
update.message.forward_date = None
|
|
assert not (Filters.text & Filters.forwarded)(update)
|
|
|
|
update.message.text = 'test'
|
|
update.message.forward_date = datetime.datetime.utcnow()
|
|
assert (Filters.text & Filters.forwarded & Filters.private)(update)
|
|
|
|
def test_or_filters(self, update):
|
|
update.message.text = 'test'
|
|
assert (Filters.text | Filters.status_update)(update)
|
|
update.message.group_chat_created = True
|
|
assert (Filters.text | Filters.status_update)(update)
|
|
update.message.text = None
|
|
assert (Filters.text | Filters.status_update)(update)
|
|
update.message.group_chat_created = False
|
|
assert not (Filters.text | Filters.status_update)(update)
|
|
|
|
def test_and_or_filters(self, update):
|
|
update.message.text = 'test'
|
|
update.message.forward_date = datetime.datetime.utcnow()
|
|
assert (Filters.text & (Filters.status_update | Filters.forwarded))(update)
|
|
update.message.forward_date = False
|
|
assert not (Filters.text & (Filters.forwarded | Filters.status_update))(update)
|
|
update.message.pinned_message = True
|
|
assert (Filters.text & (Filters.forwarded | Filters.status_update)(update))
|
|
|
|
assert str((Filters.text & (Filters.forwarded | Filters.entity(
|
|
MessageEntity.MENTION)))) == '<Filters.text and <Filters.forwarded or ' \
|
|
'Filters.entity(mention)>>'
|
|
|
|
def test_inverted_filters(self, update):
|
|
update.message.text = '/test'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
assert Filters.command(update)
|
|
assert not (~Filters.command)(update)
|
|
update.message.text = 'test'
|
|
update.message.entities = []
|
|
assert not Filters.command(update)
|
|
assert (~Filters.command)(update)
|
|
|
|
def test_inverted_and_filters(self, update):
|
|
update.message.text = '/test'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
update.message.forward_date = 1
|
|
assert (Filters.forwarded & Filters.command)(update)
|
|
assert not (~Filters.forwarded & Filters.command)(update)
|
|
assert not (Filters.forwarded & ~Filters.command)(update)
|
|
assert not (~(Filters.forwarded & Filters.command))(update)
|
|
update.message.forward_date = None
|
|
assert not (Filters.forwarded & Filters.command)(update)
|
|
assert (~Filters.forwarded & Filters.command)(update)
|
|
assert not (Filters.forwarded & ~Filters.command)(update)
|
|
assert (~(Filters.forwarded & Filters.command))(update)
|
|
update.message.text = 'test'
|
|
update.message.entities = []
|
|
assert not (Filters.forwarded & Filters.command)(update)
|
|
assert not (~Filters.forwarded & Filters.command)(update)
|
|
assert not (Filters.forwarded & ~Filters.command)(update)
|
|
assert (~(Filters.forwarded & Filters.command))(update)
|
|
|
|
def test_faulty_custom_filter(self, update):
|
|
class _CustomFilter(BaseFilter):
|
|
pass
|
|
|
|
with pytest.raises(TypeError, match='Can\'t instantiate abstract class _CustomFilter'):
|
|
_CustomFilter()
|
|
|
|
def test_custom_unnamed_filter(self, update):
|
|
class Unnamed(BaseFilter):
|
|
def filter(self, mes):
|
|
return True
|
|
|
|
unnamed = Unnamed()
|
|
assert str(unnamed) == Unnamed.__name__
|
|
|
|
def test_update_type_message(self, update):
|
|
assert Filters.update.message(update)
|
|
assert not Filters.update.edited_message(update)
|
|
assert Filters.update.messages(update)
|
|
assert not Filters.update.channel_post(update)
|
|
assert not Filters.update.edited_channel_post(update)
|
|
assert not Filters.update.channel_posts(update)
|
|
assert Filters.update(update)
|
|
|
|
def test_update_type_edited_message(self, update):
|
|
update.edited_message, update.message = update.message, update.edited_message
|
|
assert not Filters.update.message(update)
|
|
assert Filters.update.edited_message(update)
|
|
assert Filters.update.messages(update)
|
|
assert not Filters.update.channel_post(update)
|
|
assert not Filters.update.edited_channel_post(update)
|
|
assert not Filters.update.channel_posts(update)
|
|
assert Filters.update(update)
|
|
|
|
def test_update_type_channel_post(self, update):
|
|
update.channel_post, update.message = update.message, update.edited_message
|
|
assert not Filters.update.message(update)
|
|
assert not Filters.update.edited_message(update)
|
|
assert not Filters.update.messages(update)
|
|
assert Filters.update.channel_post(update)
|
|
assert not Filters.update.edited_channel_post(update)
|
|
assert Filters.update.channel_posts(update)
|
|
assert Filters.update(update)
|
|
|
|
def test_update_type_edited_channel_post(self, update):
|
|
update.edited_channel_post, update.message = update.message, update.edited_message
|
|
assert not Filters.update.message(update)
|
|
assert not Filters.update.edited_message(update)
|
|
assert not Filters.update.messages(update)
|
|
assert not Filters.update.channel_post(update)
|
|
assert Filters.update.edited_channel_post(update)
|
|
assert Filters.update.channel_posts(update)
|
|
assert Filters.update(update)
|
|
|
|
def test_merged_short_circuit_and(self, update):
|
|
update.message.text = '/test'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
class RaisingFilter(BaseFilter):
|
|
def filter(self, _):
|
|
raise TestException
|
|
|
|
raising_filter = RaisingFilter()
|
|
|
|
with pytest.raises(TestException):
|
|
(Filters.command & raising_filter)(update)
|
|
|
|
update.message.text = 'test'
|
|
update.message.entities = []
|
|
(Filters.command & raising_filter)(update)
|
|
|
|
def test_merged_short_circuit_or(self, update):
|
|
update.message.text = 'test'
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
class RaisingFilter(BaseFilter):
|
|
def filter(self, _):
|
|
raise TestException
|
|
|
|
raising_filter = RaisingFilter()
|
|
|
|
with pytest.raises(TestException):
|
|
(Filters.command | raising_filter)(update)
|
|
|
|
update.message.text = '/test'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
(Filters.command | raising_filter)(update)
|
|
|
|
def test_merged_data_merging_and(self, update):
|
|
update.message.text = '/test'
|
|
update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 5)]
|
|
|
|
class DataFilter(BaseFilter):
|
|
data_filter = True
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def filter(self, _):
|
|
return {'test': [self.data]}
|
|
|
|
result = (Filters.command & DataFilter('blah'))(update)
|
|
assert result['test'] == ['blah']
|
|
|
|
result = (DataFilter('blah1') & DataFilter('blah2'))(update)
|
|
assert result['test'] == ['blah1', 'blah2']
|
|
|
|
update.message.text = 'test'
|
|
update.message.entities = []
|
|
result = (Filters.command & DataFilter('blah'))(update)
|
|
assert not result
|
|
|
|
def test_merged_data_merging_or(self, update):
|
|
update.message.text = '/test'
|
|
|
|
class DataFilter(BaseFilter):
|
|
data_filter = True
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def filter(self, _):
|
|
return {'test': [self.data]}
|
|
|
|
result = (Filters.command | DataFilter('blah'))(update)
|
|
assert result
|
|
|
|
result = (DataFilter('blah1') | DataFilter('blah2'))(update)
|
|
assert result['test'] == ['blah1']
|
|
|
|
update.message.text = 'test'
|
|
result = (Filters.command | DataFilter('blah'))(update)
|
|
assert result['test'] == ['blah']
|