#!/usr/bin/env python # # A library that provides a Python interface to the Telegram Bot API # Copyright (C) 2015-2022 # Leandro Toledo de Souza # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser Public License for more details. # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import gzip import signal import uuid from pathlib import Path from threading import Lock from telegram.ext import PersistenceInput, UpdaterBuilder, CallbackDataCache try: import ujson as json except ImportError: import json import logging import os import pickle from collections import defaultdict from collections.abc import Container from time import sleep import pytest from telegram import Update, Message, User, Chat, MessageEntity, Bot from telegram.ext import ( BasePersistence, ConversationHandler, MessageHandler, filters, PicklePersistence, CommandHandler, DictPersistence, TypeHandler, JobQueue, ContextTypes, ) @pytest.fixture(autouse=True) def change_directory(tmp_path): orig_dir = Path.cwd() # Switch to a temporary directory so we don't have to worry about cleaning up files # (str() for py<3.6) os.chdir(str(tmp_path)) yield # Go back to original directory os.chdir(orig_dir) @pytest.fixture(autouse=True) def reset_callback_data_cache(bot): yield bot.callback_data_cache.clear_callback_data() bot.callback_data_cache.clear_callback_queries() bot.arbitrary_callback_data = False class OwnPersistence(BasePersistence): def get_bot_data(self): raise NotImplementedError def get_chat_data(self): raise NotImplementedError def get_user_data(self): raise NotImplementedError def get_conversations(self, name): raise NotImplementedError def update_bot_data(self, data): raise NotImplementedError def update_chat_data(self, chat_id, data): raise NotImplementedError def update_conversation(self, name, key, new_state): raise NotImplementedError def update_user_data(self, user_id, data): raise NotImplementedError def get_callback_data(self): raise NotImplementedError def refresh_user_data(self, user_id, user_data): raise NotImplementedError def refresh_chat_data(self, chat_id, chat_data): raise NotImplementedError def refresh_bot_data(self, bot_data): raise NotImplementedError def update_callback_data(self, data): raise NotImplementedError def flush(self): raise NotImplementedError @pytest.fixture(scope="function") def base_persistence(): return OwnPersistence() @pytest.fixture(scope="function") def bot_persistence(): class BotPersistence(BasePersistence): __slots__ = () def __init__(self): super().__init__() self.bot_data = None self.chat_data = defaultdict(dict) self.user_data = defaultdict(dict) self.callback_data = None def get_bot_data(self): return self.bot_data def get_chat_data(self): return self.chat_data def get_user_data(self): return self.user_data def get_callback_data(self): return self.callback_data def get_conversations(self, name): raise NotImplementedError def update_bot_data(self, data): self.bot_data = data def update_chat_data(self, chat_id, data): self.chat_data[chat_id] = data def update_user_data(self, user_id, data): self.user_data[user_id] = data def update_callback_data(self, data): self.callback_data = data def update_conversation(self, name, key, new_state): raise NotImplementedError def refresh_user_data(self, user_id, user_data): pass def refresh_chat_data(self, chat_id, chat_data): pass def refresh_bot_data(self, bot_data): pass def flush(self): pass return BotPersistence() @pytest.fixture(scope="function") def bot_data(): return {'test1': 'test2', 'test3': {'test4': 'test5'}} @pytest.fixture(scope="function") def chat_data(): return defaultdict( dict, {-12345: {'test1': 'test2', 'test3': {'test4': 'test5'}}, -67890: {3: 'test4'}} ) @pytest.fixture(scope="function") def user_data(): return defaultdict( dict, {12345: {'test1': 'test2', 'test3': {'test4': 'test5'}}, 67890: {3: 'test4'}} ) @pytest.fixture(scope="function") def callback_data(): return [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})], {'test1': 'test2'} @pytest.fixture(scope='function') def conversations(): return { 'name1': {(123, 123): 3, (456, 654): 4}, 'name2': {(123, 321): 1, (890, 890): 2}, 'name3': {(123, 321): 1, (890, 890): 2}, } @pytest.fixture(scope="function") def updater(bot, base_persistence): base_persistence.store_data = PersistenceInput(False, False, False, False) u = UpdaterBuilder().bot(bot).persistence(base_persistence).build() base_persistence.store_data = PersistenceInput() return u @pytest.fixture(scope='function') def job_queue(bot): jq = JobQueue() yield jq jq.stop() def assert_data_in_cache(callback_data_cache: CallbackDataCache, data): for val in callback_data_cache._keyboard_data.values(): if data in val.button_data.values(): return data return False class TestBasePersistence: test_flag = False @pytest.fixture(scope='function', autouse=True) def reset(self): self.test_flag = False def test_slot_behaviour(self, bot_persistence, mro_slots): inst = bot_persistence for attr in inst.__slots__: assert getattr(inst, attr, 'err') != 'err', f"got extra slot '{attr}'" # assert not inst.__dict__, f"got missing slot(s): {inst.__dict__}" # The below test fails if the child class doesn't define __slots__ (not a cause of concern) assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot" def test_creation(self, base_persistence): assert base_persistence.store_data.chat_data assert base_persistence.store_data.user_data assert base_persistence.store_data.bot_data assert base_persistence.store_data.callback_data def test_abstract_methods(self, base_persistence): with pytest.raises( TypeError, match=( 'flush, get_bot_data, get_callback_data, ' 'get_chat_data, get_conversations, ' 'get_user_data, refresh_bot_data, refresh_chat_data, ' 'refresh_user_data, update_bot_data, update_callback_data, ' 'update_chat_data, update_conversation, update_user_data' ), ): BasePersistence() with pytest.raises(NotImplementedError): base_persistence.get_callback_data() with pytest.raises(NotImplementedError): base_persistence.update_callback_data((None, {'foo': 'bar'})) def test_implementation(self, updater, base_persistence): dp = updater.dispatcher assert dp.persistence == base_persistence def test_conversationhandler_addition(self, dp, base_persistence): with pytest.raises(ValueError, match="when handler is unnamed"): ConversationHandler([], [], [], persistent=True) with pytest.raises(ValueError, match="if dispatcher has no persistence"): dp.add_handler(ConversationHandler([], {}, [], persistent=True, name="My Handler")) dp.persistence = base_persistence def test_dispatcher_integration_init( self, bot, base_persistence, chat_data, user_data, bot_data, callback_data ): def get_user_data(): return "test" def get_chat_data(): return "test" def get_bot_data(): return "test" def get_callback_data(): return "test" base_persistence.get_user_data = get_user_data base_persistence.get_chat_data = get_chat_data base_persistence.get_bot_data = get_bot_data base_persistence.get_callback_data = get_callback_data with pytest.raises(ValueError, match="user_data must be of type defaultdict"): UpdaterBuilder().bot(bot).persistence(base_persistence).build() def get_user_data(): return user_data base_persistence.get_user_data = get_user_data with pytest.raises(ValueError, match="chat_data must be of type defaultdict"): UpdaterBuilder().bot(bot).persistence(base_persistence).build() def get_chat_data(): return chat_data base_persistence.get_chat_data = get_chat_data with pytest.raises(ValueError, match="bot_data must be of type dict"): UpdaterBuilder().bot(bot).persistence(base_persistence).build() def get_bot_data(): return bot_data base_persistence.get_bot_data = get_bot_data with pytest.raises(ValueError, match="callback_data must be a 2-tuple"): UpdaterBuilder().bot(bot).persistence(base_persistence).build() def get_callback_data(): return callback_data base_persistence.bot = None base_persistence.get_callback_data = get_callback_data u = UpdaterBuilder().bot(bot).persistence(base_persistence).build() assert u.dispatcher.bot is base_persistence.bot assert u.dispatcher.bot_data == bot_data assert u.dispatcher.chat_data == chat_data assert u.dispatcher.user_data == user_data assert u.dispatcher.bot.callback_data_cache.persistence_data == callback_data u.dispatcher.chat_data[442233]['test5'] = 'test6' assert u.dispatcher.chat_data[442233]['test5'] == 'test6' @pytest.mark.parametrize('run_async', [True, False], ids=['synchronous', 'run_async']) def test_dispatcher_integration_handlers( self, dp, caplog, bot, base_persistence, chat_data, user_data, bot_data, callback_data, run_async, ): def get_user_data(): return user_data def get_chat_data(): return chat_data def get_bot_data(): return bot_data def get_callback_data(): return callback_data base_persistence.get_user_data = get_user_data base_persistence.get_chat_data = get_chat_data base_persistence.get_bot_data = get_bot_data base_persistence.get_callback_data = get_callback_data # base_persistence.update_chat_data = lambda x: x # base_persistence.update_user_data = lambda x: x base_persistence.refresh_bot_data = lambda x: x base_persistence.refresh_chat_data = lambda x, y: x base_persistence.refresh_user_data = lambda x, y: x updater = UpdaterBuilder().bot(bot).persistence(base_persistence).build() dp = updater.dispatcher def callback_known_user(update, context): if not context.user_data['test1'] == 'test2': pytest.fail('user_data corrupt') if not context.bot_data == bot_data: pytest.fail('bot_data corrupt') def callback_known_chat(update, context): if not context.chat_data['test3'] == 'test4': pytest.fail('chat_data corrupt') if not context.bot_data == bot_data: pytest.fail('bot_data corrupt') def callback_unknown_user_or_chat(update, context): if not context.user_data == {}: pytest.fail('user_data corrupt') if not context.chat_data == {}: pytest.fail('chat_data corrupt') if not context.bot_data == bot_data: pytest.fail('bot_data corrupt') context.user_data[1] = 'test7' context.chat_data[2] = 'test8' context.bot_data['test0'] = 'test0' context.bot.callback_data_cache.put('test0') known_user = MessageHandler( filters.User(user_id=12345), callback_known_user, ) known_chat = MessageHandler( filters.Chat(chat_id=-67890), callback_known_chat, ) unknown = MessageHandler( filters.ALL, callback_unknown_user_or_chat, ) dp.add_handler(known_user) dp.add_handler(known_chat) dp.add_handler(unknown) user1 = User(id=12345, first_name='test user', is_bot=False) user2 = User(id=54321, first_name='test user', is_bot=False) chat1 = Chat(id=-67890, type='group') chat2 = Chat(id=-987654, type='group') m = Message(1, None, chat2, from_user=user1) u = Update(0, m) with caplog.at_level(logging.ERROR): dp.process_update(u) rec = caplog.records[-1] assert rec.getMessage() == 'No error handlers are registered, logging exception.' assert rec.levelname == 'ERROR' rec = caplog.records[-2] assert rec.getMessage() == 'No error handlers are registered, logging exception.' assert rec.levelname == 'ERROR' rec = caplog.records[-3] assert rec.getMessage() == 'No error handlers are registered, logging exception.' assert rec.levelname == 'ERROR' m.from_user = user2 m.chat = chat1 u = Update(1, m) dp.process_update(u) m.chat = chat2 u = Update(2, m) def save_bot_data(data): if 'test0' not in data: pytest.fail() def save_chat_data(data): if -987654 not in data: pytest.fail() def save_user_data(data): if 54321 not in data: pytest.fail() def save_callback_data(data): if not assert_data_in_cache(dp.bot.callback_data, 'test0'): pytest.fail() base_persistence.update_chat_data = save_chat_data base_persistence.update_user_data = save_user_data base_persistence.update_bot_data = save_bot_data base_persistence.update_callback_data = save_callback_data dp.process_update(u) assert dp.user_data[54321][1] == 'test7' assert dp.chat_data[-987654][2] == 'test8' assert dp.bot_data['test0'] == 'test0' assert assert_data_in_cache(dp.bot.callback_data_cache, 'test0') @pytest.mark.parametrize( 'store_user_data', [True, False], ids=['store_user_data-True', 'store_user_data-False'] ) @pytest.mark.parametrize( 'store_chat_data', [True, False], ids=['store_chat_data-True', 'store_chat_data-False'] ) @pytest.mark.parametrize( 'store_bot_data', [True, False], ids=['store_bot_data-True', 'store_bot_data-False'] ) @pytest.mark.parametrize('run_async', [True, False], ids=['synchronous', 'run_async']) def test_persistence_dispatcher_integration_refresh_data( self, dp, base_persistence, chat_data, bot_data, user_data, store_bot_data, store_chat_data, store_user_data, run_async, ): base_persistence.refresh_bot_data = lambda x: x.setdefault( 'refreshed', x.get('refreshed', 0) + 1 ) # x is the user/chat_id base_persistence.refresh_chat_data = lambda x, y: y.setdefault('refreshed', x) base_persistence.refresh_user_data = lambda x, y: y.setdefault('refreshed', x) base_persistence.store_data = PersistenceInput( bot_data=store_bot_data, chat_data=store_chat_data, user_data=store_user_data ) dp.persistence = base_persistence self.test_flag = True def callback_with_user_and_chat(update, context): if store_user_data: if context.user_data.get('refreshed') != update.effective_user.id: self.test_flag = 'user_data was not refreshed' else: if 'refreshed' in context.user_data: self.test_flag = 'user_data was wrongly refreshed' if store_chat_data: if context.chat_data.get('refreshed') != update.effective_chat.id: self.test_flag = 'chat_data was not refreshed' else: if 'refreshed' in context.chat_data: self.test_flag = 'chat_data was wrongly refreshed' if store_bot_data: if context.bot_data.get('refreshed') != 1: self.test_flag = 'bot_data was not refreshed' else: if 'refreshed' in context.bot_data: self.test_flag = 'bot_data was wrongly refreshed' def callback_without_user_and_chat(_, context): if store_bot_data: if context.bot_data.get('refreshed') != 1: self.test_flag = 'bot_data was not refreshed' else: if 'refreshed' in context.bot_data: self.test_flag = 'bot_data was wrongly refreshed' with_user_and_chat = MessageHandler( filters.User(user_id=12345), callback_with_user_and_chat, run_async=run_async, ) without_user_and_chat = MessageHandler( filters.ALL, callback_without_user_and_chat, run_async=run_async, ) dp.add_handler(with_user_and_chat) dp.add_handler(without_user_and_chat) user = User(id=12345, first_name='test user', is_bot=False) chat = Chat(id=-987654, type='group') m = Message(1, None, chat, from_user=user) # has user and chat u = Update(0, m) dp.process_update(u) assert self.test_flag is True # has neither user nor hat m.from_user = None m.chat = None u = Update(1, m) dp.process_update(u) assert self.test_flag is True sleep(0.1) def test_persistence_dispatcher_arbitrary_update_types(self, dp, base_persistence, caplog): # Updates used with TypeHandler doesn't necessarily have the proper attributes for # persistence, makes sure it works anyways dp.persistence = base_persistence class MyUpdate: pass dp.add_handler(TypeHandler(MyUpdate, lambda *_: None)) with caplog.at_level(logging.ERROR): dp.process_update(MyUpdate()) assert 'An uncaught error was raised while processing the update' not in caplog.text def test_bot_replace_insert_bot(self, bot, bot_persistence): class CustomSlottedClass: __slots__ = ('bot', '__dict__') def __init__(self): self.bot = bot self.not_in_slots = bot def __eq__(self, other): if isinstance(other, CustomSlottedClass): return self.bot is other.bot and self.not_in_slots is other.not_in_slots return False class DictNotInSlots(Container): """This classes parent has slots, but __dict__ is not in those slots.""" def __init__(self): self.bot = bot def __contains__(self, item): return True def __eq__(self, other): if isinstance(other, DictNotInSlots): return self.bot is other.bot return False class CustomClass: def __init__(self): self.bot = bot self.slotted_object = CustomSlottedClass() self.dict_not_in_slots_object = DictNotInSlots() self.list_ = [1, 2, bot] self.tuple_ = tuple(self.list_) self.set_ = set(self.list_) self.frozenset_ = frozenset(self.list_) self.dict_ = {item: item for item in self.list_} self.defaultdict_ = defaultdict(dict, self.dict_) @staticmethod def replace_bot(): cc = CustomClass() cc.bot = BasePersistence.REPLACED_BOT cc.slotted_object.bot = BasePersistence.REPLACED_BOT cc.slotted_object.not_in_slots = BasePersistence.REPLACED_BOT cc.dict_not_in_slots_object.bot = BasePersistence.REPLACED_BOT cc.list_ = [1, 2, BasePersistence.REPLACED_BOT] cc.tuple_ = tuple(cc.list_) cc.set_ = set(cc.list_) cc.frozenset_ = frozenset(cc.list_) cc.dict_ = {item: item for item in cc.list_} cc.defaultdict_ = defaultdict(dict, cc.dict_) return cc def __eq__(self, other): if isinstance(other, CustomClass): return ( self.bot is other.bot and self.slotted_object == other.slotted_object and self.dict_not_in_slots_object == other.dict_not_in_slots_object and self.list_ == other.list_ and self.tuple_ == other.tuple_ and self.set_ == other.set_ and self.frozenset_ == other.frozenset_ and self.dict_ == other.dict_ and self.defaultdict_ == other.defaultdict_ ) return False persistence = bot_persistence persistence.set_bot(bot) cc = CustomClass() persistence.update_bot_data({1: cc}) assert persistence.bot_data[1].bot == BasePersistence.REPLACED_BOT assert persistence.bot_data[1] == cc.replace_bot() persistence.update_chat_data(123, {1: cc}) assert persistence.chat_data[123][1].bot == BasePersistence.REPLACED_BOT assert persistence.chat_data[123][1] == cc.replace_bot() persistence.update_user_data(123, {1: cc}) assert persistence.user_data[123][1].bot == BasePersistence.REPLACED_BOT assert persistence.user_data[123][1] == cc.replace_bot() persistence.update_callback_data(([('1', 2, {0: cc})], {'1': '2'})) assert persistence.callback_data[0][0][2][0].bot == BasePersistence.REPLACED_BOT assert persistence.callback_data[0][0][2][0] == cc.replace_bot() assert persistence.get_bot_data()[1] == cc assert persistence.get_bot_data()[1].bot is bot assert persistence.get_chat_data()[123][1] == cc assert persistence.get_chat_data()[123][1].bot is bot assert persistence.get_user_data()[123][1] == cc assert persistence.get_user_data()[123][1].bot is bot assert persistence.get_callback_data()[0][0][2][0].bot is bot assert persistence.get_callback_data()[0][0][2][0] == cc def test_bot_replace_insert_bot_unpickable_objects(self, bot, bot_persistence, recwarn): """Here check that unpickable objects are just returned verbatim.""" persistence = bot_persistence persistence.set_bot(bot) class CustomClass: def __copy__(self): raise TypeError('UnhandledException') lock = Lock() persistence.update_bot_data({1: lock}) assert persistence.bot_data[1] is lock persistence.update_chat_data(123, {1: lock}) assert persistence.chat_data[123][1] is lock persistence.update_user_data(123, {1: lock}) assert persistence.user_data[123][1] is lock persistence.update_callback_data(([('1', 2, {0: lock})], {'1': '2'})) assert persistence.callback_data[0][0][2][0] is lock assert persistence.get_bot_data()[1] is lock assert persistence.get_chat_data()[123][1] is lock assert persistence.get_user_data()[123][1] is lock assert persistence.get_callback_data()[0][0][2][0] is lock cc = CustomClass() persistence.update_bot_data({1: cc}) assert persistence.bot_data[1] is cc persistence.update_chat_data(123, {1: cc}) assert persistence.chat_data[123][1] is cc persistence.update_user_data(123, {1: cc}) assert persistence.user_data[123][1] is cc persistence.update_callback_data(([('1', 2, {0: cc})], {'1': '2'})) assert persistence.callback_data[0][0][2][0] is cc assert persistence.get_bot_data()[1] is cc assert persistence.get_chat_data()[123][1] is cc assert persistence.get_user_data()[123][1] is cc assert persistence.get_callback_data()[0][0][2][0] is cc assert len(recwarn) == 2 assert str(recwarn[0].message).startswith( "BasePersistence.replace_bot does not handle objects that can not be copied." ) assert str(recwarn[1].message).startswith( "BasePersistence.insert_bot does not handle objects that can not be copied." ) def test_bot_replace_insert_bot_unparsable_objects(self, bot, bot_persistence, recwarn): """Here check that objects in __dict__ or __slots__ that can't be parsed are just returned verbatim.""" persistence = bot_persistence persistence.set_bot(bot) uuid_obj = uuid.uuid4() persistence.update_bot_data({1: uuid_obj}) assert persistence.bot_data[1] is uuid_obj persistence.update_chat_data(123, {1: uuid_obj}) assert persistence.chat_data[123][1] is uuid_obj persistence.update_user_data(123, {1: uuid_obj}) assert persistence.user_data[123][1] is uuid_obj persistence.update_callback_data(([('1', 2, {0: uuid_obj})], {'1': '2'})) assert persistence.callback_data[0][0][2][0] is uuid_obj assert persistence.get_bot_data()[1] is uuid_obj assert persistence.get_chat_data()[123][1] is uuid_obj assert persistence.get_user_data()[123][1] is uuid_obj assert persistence.get_callback_data()[0][0][2][0] is uuid_obj assert len(recwarn) == 2 assert str(recwarn[0].message).startswith( "Parsing of an object failed with the following exception: " ) assert str(recwarn[1].message).startswith( "Parsing of an object failed with the following exception: " ) def test_bot_replace_insert_bot_classes(self, bot, bot_persistence, recwarn): """Here check that classes are just returned verbatim.""" persistence = bot_persistence persistence.set_bot(bot) class CustomClass: pass persistence.update_bot_data({1: CustomClass}) assert persistence.bot_data[1] is CustomClass persistence.update_chat_data(123, {1: CustomClass}) assert persistence.chat_data[123][1] is CustomClass persistence.update_user_data(123, {1: CustomClass}) assert persistence.user_data[123][1] is CustomClass assert persistence.get_bot_data()[1] is CustomClass assert persistence.get_chat_data()[123][1] is CustomClass assert persistence.get_user_data()[123][1] is CustomClass assert len(recwarn) == 2 assert str(recwarn[0].message).startswith( "BasePersistence.replace_bot does not handle classes such as 'CustomClass'" ) assert str(recwarn[1].message).startswith( "BasePersistence.insert_bot does not handle classes such as 'CustomClass'" ) def test_bot_replace_insert_bot_objects_with_faulty_equality(self, bot, bot_persistence): """Here check that trying to compare obj == self.REPLACED_BOT doesn't lead to problems.""" persistence = bot_persistence persistence.set_bot(bot) class CustomClass: def __init__(self, data): self.data = data def __eq__(self, other): raise RuntimeError("Can't be compared") cc = CustomClass({1: bot, 2: 'foo'}) expected = {1: BasePersistence.REPLACED_BOT, 2: 'foo'} persistence.update_bot_data({1: cc}) assert persistence.bot_data[1].data == expected persistence.update_chat_data(123, {1: cc}) assert persistence.chat_data[123][1].data == expected persistence.update_user_data(123, {1: cc}) assert persistence.user_data[123][1].data == expected persistence.update_callback_data(([('1', 2, {0: cc})], {'1': '2'})) assert persistence.callback_data[0][0][2][0].data == expected expected = {1: bot, 2: 'foo'} assert persistence.get_bot_data()[1].data == expected assert persistence.get_chat_data()[123][1].data == expected assert persistence.get_user_data()[123][1].data == expected assert persistence.get_callback_data()[0][0][2][0].data == expected @pytest.mark.filterwarnings('ignore:BasePersistence') def test_replace_insert_bot_item_identity(self, bot, bot_persistence): persistence = bot_persistence persistence.set_bot(bot) class CustomSlottedClass: __slots__ = ('value',) def __init__(self): self.value = 5 class CustomClass: pass slot_object = CustomSlottedClass() dict_object = CustomClass() lock = Lock() list_ = [slot_object, dict_object, lock] tuple_ = (1, 2, 3) dict_ = {1: slot_object, 2: dict_object} data = { 'bot_1': bot, 'bot_2': bot, 'list_1': list_, 'list_2': list_, 'tuple_1': tuple_, 'tuple_2': tuple_, 'dict_1': dict_, 'dict_2': dict_, } def make_assertion(data_): return ( data_['bot_1'] is data_['bot_2'] and data_['list_1'] is data_['list_2'] and data_['list_1'][0] is data_['list_2'][0] and data_['list_1'][1] is data_['list_2'][1] and data_['list_1'][2] is data_['list_2'][2] and data_['tuple_1'] is data_['tuple_2'] and data_['dict_1'] is data_['dict_2'] and data_['dict_1'][1] is data_['dict_2'][1] and data_['dict_1'][1] is data_['list_1'][0] and data_['dict_1'][2] is data_['list_1'][1] and data_['dict_1'][2] is data_['dict_2'][2] ) persistence.update_bot_data(data) assert make_assertion(persistence.bot_data) assert make_assertion(persistence.get_bot_data()) def test_set_bot_exception(self, bot): non_ext_bot = Bot(bot.token) persistence = OwnPersistence() with pytest.raises(TypeError, match='callback_data can only be stored'): persistence.set_bot(non_ext_bot) @pytest.fixture(scope='function') def pickle_persistence(): return PicklePersistence( filepath='pickletest', single_file=False, on_flush=False, ) @pytest.fixture(scope='function') def pickle_persistence_only_bot(): return PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, user_data=False, chat_data=False), single_file=False, on_flush=False, ) @pytest.fixture(scope='function') def pickle_persistence_only_chat(): return PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, user_data=False, bot_data=False), single_file=False, on_flush=False, ) @pytest.fixture(scope='function') def pickle_persistence_only_user(): return PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, chat_data=False, bot_data=False), single_file=False, on_flush=False, ) @pytest.fixture(scope='function') def pickle_persistence_only_callback(): return PicklePersistence( filepath='pickletest', store_data=PersistenceInput(user_data=False, chat_data=False, bot_data=False), single_file=False, on_flush=False, ) @pytest.fixture(scope='function') def bad_pickle_files(): for name in [ 'pickletest_user_data', 'pickletest_chat_data', 'pickletest_bot_data', 'pickletest_callback_data', 'pickletest_conversations', 'pickletest', ]: Path(name).write_text('(())') yield True @pytest.fixture(scope='function') def invalid_pickle_files(): for name in [ 'pickletest_user_data', 'pickletest_chat_data', 'pickletest_bot_data', 'pickletest_callback_data', 'pickletest_conversations', 'pickletest', ]: # Just a random way to trigger pickle.UnpicklingError # see https://stackoverflow.com/a/44422239/10606962 with gzip.open(name, 'wb') as file: pickle.dump([1, 2, 3], file) yield True @pytest.fixture(scope='function') def good_pickle_files(user_data, chat_data, bot_data, callback_data, conversations): data = { 'user_data': user_data, 'chat_data': chat_data, 'bot_data': bot_data, 'callback_data': callback_data, 'conversations': conversations, } with Path('pickletest_user_data').open('wb') as f: pickle.dump(user_data, f) with Path('pickletest_chat_data').open('wb') as f: pickle.dump(chat_data, f) with Path('pickletest_bot_data').open('wb') as f: pickle.dump(bot_data, f) with Path('pickletest_callback_data').open('wb') as f: pickle.dump(callback_data, f) with Path('pickletest_conversations').open('wb') as f: pickle.dump(conversations, f) with Path('pickletest').open('wb') as f: pickle.dump(data, f) yield True @pytest.fixture(scope='function') def pickle_files_wo_bot_data(user_data, chat_data, callback_data, conversations): data = { 'user_data': user_data, 'chat_data': chat_data, 'conversations': conversations, 'callback_data': callback_data, } with Path('pickletest_user_data').open('wb') as f: pickle.dump(user_data, f) with Path('pickletest_chat_data').open('wb') as f: pickle.dump(chat_data, f) with Path('pickletest_callback_data').open('wb') as f: pickle.dump(callback_data, f) with Path('pickletest_conversations').open('wb') as f: pickle.dump(conversations, f) with Path('pickletest').open('wb') as f: pickle.dump(data, f) yield True @pytest.fixture(scope='function') def pickle_files_wo_callback_data(user_data, chat_data, bot_data, conversations): data = { 'user_data': user_data, 'chat_data': chat_data, 'bot_data': bot_data, 'conversations': conversations, } with Path('pickletest_user_data').open('wb') as f: pickle.dump(user_data, f) with Path('pickletest_chat_data').open('wb') as f: pickle.dump(chat_data, f) with Path('pickletest_bot_data').open('wb') as f: pickle.dump(bot_data, f) with Path('pickletest_conversations').open('wb') as f: pickle.dump(conversations, f) with Path('pickletest').open('wb') as f: pickle.dump(data, f) yield True @pytest.fixture(scope='function') def update(bot): user = User(id=321, first_name='test_user', is_bot=False) chat = Chat(id=123, type='group') message = Message(1, None, chat, from_user=user, text="Hi there", bot=bot) return Update(0, message=message) class CustomMapping(defaultdict): pass class TestPicklePersistence: def test_slot_behaviour(self, mro_slots, pickle_persistence): inst = pickle_persistence for attr in inst.__slots__: assert getattr(inst, attr, 'err') != 'err', f"got extra slot '{attr}'" assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot" def test_pickle_behaviour_with_slots(self, pickle_persistence): bot_data = pickle_persistence.get_bot_data() bot_data['message'] = Message(3, None, Chat(2, type='supergroup')) pickle_persistence.update_bot_data(bot_data) retrieved = pickle_persistence.get_bot_data() assert retrieved == bot_data def test_no_files_present_multi_file(self, pickle_persistence): assert pickle_persistence.get_user_data() == defaultdict(dict) assert pickle_persistence.get_user_data() == defaultdict(dict) assert pickle_persistence.get_chat_data() == defaultdict(dict) assert pickle_persistence.get_chat_data() == defaultdict(dict) assert pickle_persistence.get_bot_data() == {} assert pickle_persistence.get_bot_data() == {} assert pickle_persistence.get_callback_data() is None assert pickle_persistence.get_conversations('noname') == {} assert pickle_persistence.get_conversations('noname') == {} def test_no_files_present_single_file(self, pickle_persistence): pickle_persistence.single_file = True assert pickle_persistence.get_user_data() == defaultdict(dict) assert pickle_persistence.get_chat_data() == defaultdict(dict) assert pickle_persistence.get_bot_data() == {} assert pickle_persistence.get_callback_data() is None assert pickle_persistence.get_conversations('noname') == {} def test_with_bad_multi_file(self, pickle_persistence, bad_pickle_files): with pytest.raises(TypeError, match='pickletest_user_data'): pickle_persistence.get_user_data() with pytest.raises(TypeError, match='pickletest_chat_data'): pickle_persistence.get_chat_data() with pytest.raises(TypeError, match='pickletest_bot_data'): pickle_persistence.get_bot_data() with pytest.raises(TypeError, match='pickletest_callback_data'): pickle_persistence.get_callback_data() with pytest.raises(TypeError, match='pickletest_conversations'): pickle_persistence.get_conversations('name') def test_with_invalid_multi_file(self, pickle_persistence, invalid_pickle_files): with pytest.raises(TypeError, match='pickletest_user_data does not contain'): pickle_persistence.get_user_data() with pytest.raises(TypeError, match='pickletest_chat_data does not contain'): pickle_persistence.get_chat_data() with pytest.raises(TypeError, match='pickletest_bot_data does not contain'): pickle_persistence.get_bot_data() with pytest.raises(TypeError, match='pickletest_callback_data does not contain'): pickle_persistence.get_callback_data() with pytest.raises(TypeError, match='pickletest_conversations does not contain'): pickle_persistence.get_conversations('name') def test_with_bad_single_file(self, pickle_persistence, bad_pickle_files): pickle_persistence.single_file = True with pytest.raises(TypeError, match='pickletest'): pickle_persistence.get_user_data() with pytest.raises(TypeError, match='pickletest'): pickle_persistence.get_chat_data() with pytest.raises(TypeError, match='pickletest'): pickle_persistence.get_bot_data() with pytest.raises(TypeError, match='pickletest'): pickle_persistence.get_callback_data() with pytest.raises(TypeError, match='pickletest'): pickle_persistence.get_conversations('name') def test_with_invalid_single_file(self, pickle_persistence, invalid_pickle_files): pickle_persistence.single_file = True with pytest.raises(TypeError, match='pickletest does not contain'): pickle_persistence.get_user_data() with pytest.raises(TypeError, match='pickletest does not contain'): pickle_persistence.get_chat_data() with pytest.raises(TypeError, match='pickletest does not contain'): pickle_persistence.get_bot_data() with pytest.raises(TypeError, match='pickletest does not contain'): pickle_persistence.get_callback_data() with pytest.raises(TypeError, match='pickletest does not contain'): pickle_persistence.get_conversations('name') def test_with_good_multi_file(self, pickle_persistence, good_pickle_files): user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert bot_data['test1'] == 'test2' assert bot_data['test3']['test4'] == 'test5' assert 'test0' not in bot_data callback_data = pickle_persistence.get_callback_data() assert isinstance(callback_data, tuple) assert callback_data[0] == [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})] assert callback_data[1] == {'test1': 'test2'} conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_with_good_single_file(self, pickle_persistence, good_pickle_files): pickle_persistence.single_file = True user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert bot_data['test1'] == 'test2' assert bot_data['test3']['test4'] == 'test5' assert 'test0' not in bot_data callback_data = pickle_persistence.get_callback_data() assert isinstance(callback_data, tuple) assert callback_data[0] == [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})] assert callback_data[1] == {'test1': 'test2'} conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_with_multi_file_wo_bot_data(self, pickle_persistence, pickle_files_wo_bot_data): user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert not bot_data.keys() callback_data = pickle_persistence.get_callback_data() assert isinstance(callback_data, tuple) assert callback_data[0] == [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})] assert callback_data[1] == {'test1': 'test2'} conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_with_multi_file_wo_callback_data( self, pickle_persistence, pickle_files_wo_callback_data ): user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert bot_data['test1'] == 'test2' assert bot_data['test3']['test4'] == 'test5' assert 'test0' not in bot_data callback_data = pickle_persistence.get_callback_data() assert callback_data is None conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_with_single_file_wo_bot_data(self, pickle_persistence, pickle_files_wo_bot_data): pickle_persistence.single_file = True user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert not bot_data.keys() callback_data = pickle_persistence.get_callback_data() assert isinstance(callback_data, tuple) assert callback_data[0] == [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})] assert callback_data[1] == {'test1': 'test2'} conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_with_single_file_wo_callback_data( self, pickle_persistence, pickle_files_wo_callback_data ): user_data = pickle_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = pickle_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = pickle_persistence.get_bot_data() assert isinstance(bot_data, dict) assert bot_data['test1'] == 'test2' assert bot_data['test3']['test4'] == 'test5' assert 'test0' not in bot_data callback_data = pickle_persistence.get_callback_data() assert callback_data is None conversation1 = pickle_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = pickle_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_updating_multi_file(self, pickle_persistence, good_pickle_files): user_data = pickle_persistence.get_user_data() user_data[12345]['test3']['test4'] = 'test6' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(12345, user_data[12345]) user_data[12345]['test3']['test4'] = 'test7' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(12345, user_data[12345]) assert pickle_persistence.user_data == user_data with Path('pickletest_user_data').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)) assert user_data_test == user_data chat_data = pickle_persistence.get_chat_data() chat_data[-12345]['test3']['test4'] = 'test6' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(-12345, chat_data[-12345]) chat_data[-12345]['test3']['test4'] = 'test7' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(-12345, chat_data[-12345]) assert pickle_persistence.chat_data == chat_data with Path('pickletest_chat_data').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)) assert chat_data_test == chat_data bot_data = pickle_persistence.get_bot_data() bot_data['test3']['test4'] = 'test6' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) bot_data['test3']['test4'] = 'test7' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) assert pickle_persistence.bot_data == bot_data with Path('pickletest_bot_data').open('rb') as f: bot_data_test = pickle.load(f) assert bot_data_test == bot_data callback_data = pickle_persistence.get_callback_data() callback_data[1]['test3'] = 'test4' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) callback_data[1]['test3'] = 'test5' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) assert pickle_persistence.callback_data == callback_data with Path('pickletest_callback_data').open('rb') as f: callback_data_test = pickle.load(f) assert callback_data_test == callback_data conversation1 = pickle_persistence.get_conversations('name1') conversation1[(123, 123)] = 5 assert not pickle_persistence.conversations['name1'] == conversation1 pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == conversation1 assert pickle_persistence.get_conversations('name1') == conversation1 with Path('pickletest_conversations').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)) assert conversations_test['name1'] == conversation1 pickle_persistence.conversations = None pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == {(123, 123): 5} assert pickle_persistence.get_conversations('name1') == {(123, 123): 5} def test_updating_single_file(self, pickle_persistence, good_pickle_files): pickle_persistence.single_file = True user_data = pickle_persistence.get_user_data() user_data[12345]['test3']['test4'] = 'test6' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(12345, user_data[12345]) user_data[12345]['test3']['test4'] = 'test7' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(12345, user_data[12345]) assert pickle_persistence.user_data == user_data with Path('pickletest').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)['user_data']) assert user_data_test == user_data chat_data = pickle_persistence.get_chat_data() chat_data[-12345]['test3']['test4'] = 'test6' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(-12345, chat_data[-12345]) chat_data[-12345]['test3']['test4'] = 'test7' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(-12345, chat_data[-12345]) assert pickle_persistence.chat_data == chat_data with Path('pickletest').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)['chat_data']) assert chat_data_test == chat_data bot_data = pickle_persistence.get_bot_data() bot_data['test3']['test4'] = 'test6' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) bot_data['test3']['test4'] = 'test7' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) assert pickle_persistence.bot_data == bot_data with Path('pickletest').open('rb') as f: bot_data_test = pickle.load(f)['bot_data'] assert bot_data_test == bot_data callback_data = pickle_persistence.get_callback_data() callback_data[1]['test3'] = 'test4' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) callback_data[1]['test3'] = 'test5' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) assert pickle_persistence.callback_data == callback_data with Path('pickletest').open('rb') as f: callback_data_test = pickle.load(f)['callback_data'] assert callback_data_test == callback_data conversation1 = pickle_persistence.get_conversations('name1') conversation1[(123, 123)] = 5 assert not pickle_persistence.conversations['name1'] == conversation1 pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == conversation1 assert pickle_persistence.get_conversations('name1') == conversation1 with Path('pickletest').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)['conversations']) assert conversations_test['name1'] == conversation1 pickle_persistence.conversations = None pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == {(123, 123): 5} assert pickle_persistence.get_conversations('name1') == {(123, 123): 5} def test_updating_single_file_no_data(self, pickle_persistence): pickle_persistence.single_file = True assert not any( [ pickle_persistence.user_data, pickle_persistence.chat_data, pickle_persistence.bot_data, pickle_persistence.callback_data, pickle_persistence.conversations, ] ) pickle_persistence.flush() with pytest.raises(FileNotFoundError, match='pickletest'): open('pickletest', 'rb') def test_save_on_flush_multi_files(self, pickle_persistence, good_pickle_files): # Should run without error pickle_persistence.flush() pickle_persistence.on_flush = True user_data = pickle_persistence.get_user_data() user_data[54321]['test9'] = 'test 10' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(54321, user_data[54321]) assert pickle_persistence.user_data == user_data with Path('pickletest_user_data').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)) assert not user_data_test == user_data chat_data = pickle_persistence.get_chat_data() chat_data[54321]['test9'] = 'test 10' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(54321, chat_data[54321]) assert pickle_persistence.chat_data == chat_data with Path('pickletest_chat_data').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)) assert not chat_data_test == chat_data bot_data = pickle_persistence.get_bot_data() bot_data['test6'] = 'test 7' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) assert pickle_persistence.bot_data == bot_data with Path('pickletest_bot_data').open('rb') as f: bot_data_test = pickle.load(f) assert not bot_data_test == bot_data callback_data = pickle_persistence.get_callback_data() callback_data[1]['test3'] = 'test4' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) assert pickle_persistence.callback_data == callback_data with Path('pickletest_callback_data').open('rb') as f: callback_data_test = pickle.load(f) assert not callback_data_test == callback_data conversation1 = pickle_persistence.get_conversations('name1') conversation1[(123, 123)] = 5 assert not pickle_persistence.conversations['name1'] == conversation1 pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == conversation1 with Path('pickletest_conversations').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)) assert not conversations_test['name1'] == conversation1 pickle_persistence.flush() with Path('pickletest_user_data').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)) assert user_data_test == user_data with Path('pickletest_chat_data').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)) assert chat_data_test == chat_data with Path('pickletest_bot_data').open('rb') as f: bot_data_test = pickle.load(f) assert bot_data_test == bot_data with Path('pickletest_conversations').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)) assert conversations_test['name1'] == conversation1 def test_save_on_flush_single_files(self, pickle_persistence, good_pickle_files): # Should run without error pickle_persistence.flush() pickle_persistence.on_flush = True pickle_persistence.single_file = True user_data = pickle_persistence.get_user_data() user_data[54321]['test9'] = 'test 10' assert not pickle_persistence.user_data == user_data pickle_persistence.update_user_data(54321, user_data[54321]) assert pickle_persistence.user_data == user_data with Path('pickletest').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)['user_data']) assert not user_data_test == user_data chat_data = pickle_persistence.get_chat_data() chat_data[54321]['test9'] = 'test 10' assert not pickle_persistence.chat_data == chat_data pickle_persistence.update_chat_data(54321, chat_data[54321]) assert pickle_persistence.chat_data == chat_data with Path('pickletest').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)['chat_data']) assert not chat_data_test == chat_data bot_data = pickle_persistence.get_bot_data() bot_data['test6'] = 'test 7' assert not pickle_persistence.bot_data == bot_data pickle_persistence.update_bot_data(bot_data) assert pickle_persistence.bot_data == bot_data with Path('pickletest').open('rb') as f: bot_data_test = pickle.load(f)['bot_data'] assert not bot_data_test == bot_data callback_data = pickle_persistence.get_callback_data() callback_data[1]['test3'] = 'test4' assert not pickle_persistence.callback_data == callback_data pickle_persistence.update_callback_data(callback_data) assert pickle_persistence.callback_data == callback_data with Path('pickletest').open('rb') as f: callback_data_test = pickle.load(f)['callback_data'] assert not callback_data_test == callback_data conversation1 = pickle_persistence.get_conversations('name1') conversation1[(123, 123)] = 5 assert not pickle_persistence.conversations['name1'] == conversation1 pickle_persistence.update_conversation('name1', (123, 123), 5) assert pickle_persistence.conversations['name1'] == conversation1 with Path('pickletest').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)['conversations']) assert not conversations_test['name1'] == conversation1 pickle_persistence.flush() with Path('pickletest').open('rb') as f: user_data_test = defaultdict(dict, pickle.load(f)['user_data']) assert user_data_test == user_data with Path('pickletest').open('rb') as f: chat_data_test = defaultdict(dict, pickle.load(f)['chat_data']) assert chat_data_test == chat_data with Path('pickletest').open('rb') as f: bot_data_test = pickle.load(f)['bot_data'] assert bot_data_test == bot_data with Path('pickletest').open('rb') as f: conversations_test = defaultdict(dict, pickle.load(f)['conversations']) assert conversations_test['name1'] == conversation1 def test_with_handler(self, bot, update, bot_data, pickle_persistence, good_pickle_files): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence).build() dp = u.dispatcher bot.callback_data_cache.clear_callback_data() bot.callback_data_cache.clear_callback_queries() def first(update, context): if not context.user_data == {}: pytest.fail() if not context.chat_data == {}: pytest.fail() if not context.bot_data == bot_data: pytest.fail() if not context.bot.callback_data_cache.persistence_data == ([], {}): pytest.fail() context.user_data['test1'] = 'test2' context.chat_data['test3'] = 'test4' context.bot_data['test1'] = 'test0' context.bot.callback_data_cache._callback_queries['test1'] = 'test0' def second(update, context): if not context.user_data['test1'] == 'test2': pytest.fail() if not context.chat_data['test3'] == 'test4': pytest.fail() if not context.bot_data['test1'] == 'test0': pytest.fail() if not context.bot.callback_data_cache.persistence_data == ([], {'test1': 'test0'}): pytest.fail() h1 = MessageHandler(None, first) h2 = MessageHandler(None, second) dp.add_handler(h1) dp.process_update(update) pickle_persistence_2 = PicklePersistence( filepath='pickletest', single_file=False, on_flush=False, ) u = UpdaterBuilder().bot(bot).persistence(pickle_persistence_2).build() dp = u.dispatcher dp.add_handler(h2) dp.process_update(update) def test_flush_on_stop(self, bot, update, pickle_persistence): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence).build() dp = u.dispatcher u.running = True dp.user_data[4242424242]['my_test'] = 'Working!' dp.chat_data[-4242424242]['my_test2'] = 'Working2!' dp.bot_data['test'] = 'Working3!' dp.bot.callback_data_cache._callback_queries['test'] = 'Working4!' u._signal_handler(signal.SIGINT, None) pickle_persistence_2 = PicklePersistence( filepath='pickletest', single_file=False, on_flush=False, ) assert pickle_persistence_2.get_user_data()[4242424242]['my_test'] == 'Working!' assert pickle_persistence_2.get_chat_data()[-4242424242]['my_test2'] == 'Working2!' assert pickle_persistence_2.get_bot_data()['test'] == 'Working3!' data = pickle_persistence_2.get_callback_data()[1] assert data['test'] == 'Working4!' def test_flush_on_stop_only_bot(self, bot, update, pickle_persistence_only_bot): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence_only_bot).build() dp = u.dispatcher u.running = True dp.user_data[4242424242]['my_test'] = 'Working!' dp.chat_data[-4242424242]['my_test2'] = 'Working2!' dp.bot_data['my_test3'] = 'Working3!' dp.bot.callback_data_cache._callback_queries['test'] = 'Working4!' u._signal_handler(signal.SIGINT, None) pickle_persistence_2 = PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, chat_data=False, user_data=False), single_file=False, on_flush=False, ) assert pickle_persistence_2.get_user_data() == {} assert pickle_persistence_2.get_chat_data() == {} assert pickle_persistence_2.get_bot_data()['my_test3'] == 'Working3!' assert pickle_persistence_2.get_callback_data() is None def test_flush_on_stop_only_chat(self, bot, update, pickle_persistence_only_chat): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence_only_chat).build() dp = u.dispatcher u.running = True dp.user_data[4242424242]['my_test'] = 'Working!' dp.chat_data[-4242424242]['my_test2'] = 'Working2!' dp.bot_data['my_test3'] = 'Working3!' dp.bot.callback_data_cache._callback_queries['test'] = 'Working4!' u._signal_handler(signal.SIGINT, None) pickle_persistence_2 = PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, user_data=False, bot_data=False), single_file=False, on_flush=False, ) assert pickle_persistence_2.get_user_data() == {} assert pickle_persistence_2.get_chat_data()[-4242424242]['my_test2'] == 'Working2!' assert pickle_persistence_2.get_bot_data() == {} assert pickle_persistence_2.get_callback_data() is None def test_flush_on_stop_only_user(self, bot, update, pickle_persistence_only_user): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence_only_user).build() dp = u.dispatcher u.running = True dp.user_data[4242424242]['my_test'] = 'Working!' dp.chat_data[-4242424242]['my_test2'] = 'Working2!' dp.bot_data['my_test3'] = 'Working3!' dp.bot.callback_data_cache._callback_queries['test'] = 'Working4!' u._signal_handler(signal.SIGINT, None) pickle_persistence_2 = PicklePersistence( filepath='pickletest', store_data=PersistenceInput(callback_data=False, chat_data=False, bot_data=False), single_file=False, on_flush=False, ) assert pickle_persistence_2.get_user_data()[4242424242]['my_test'] == 'Working!' assert pickle_persistence_2.get_chat_data() == {} assert pickle_persistence_2.get_bot_data() == {} assert pickle_persistence_2.get_callback_data() is None def test_flush_on_stop_only_callback(self, bot, update, pickle_persistence_only_callback): u = UpdaterBuilder().bot(bot).persistence(pickle_persistence_only_callback).build() dp = u.dispatcher u.running = True dp.user_data[4242424242]['my_test'] = 'Working!' dp.chat_data[-4242424242]['my_test2'] = 'Working2!' dp.bot_data['my_test3'] = 'Working3!' dp.bot.callback_data_cache._callback_queries['test'] = 'Working4!' u._signal_handler(signal.SIGINT, None) del dp del u del pickle_persistence_only_callback pickle_persistence_2 = PicklePersistence( filepath='pickletest', store_data=PersistenceInput(user_data=False, chat_data=False, bot_data=False), single_file=False, on_flush=False, ) assert pickle_persistence_2.get_user_data() == {} assert pickle_persistence_2.get_chat_data() == {} assert pickle_persistence_2.get_bot_data() == {} data = pickle_persistence_2.get_callback_data()[1] assert data['test'] == 'Working4!' def test_with_conversation_handler(self, dp, update, good_pickle_files, pickle_persistence): dp.persistence = pickle_persistence NEXT, NEXT2 = range(2) def start(update, context): return NEXT start = CommandHandler('start', start) def next_callback(update, context): return NEXT2 next_handler = MessageHandler(None, next_callback) def next2(update, context): return ConversationHandler.END next2 = MessageHandler(None, next2) ch = ConversationHandler( [start], {NEXT: [next_handler], NEXT2: [next2]}, [], name='name2', persistent=True ) dp.add_handler(ch) assert ch.conversations[ch._get_key(update)] == 1 dp.process_update(update) assert ch._get_key(update) not in ch.conversations update.message.text = '/start' update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)] dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 0 assert ch.conversations == pickle_persistence.conversations['name2'] def test_with_nested_conversationHandler( self, dp, update, good_pickle_files, pickle_persistence ): dp.persistence = pickle_persistence NEXT2, NEXT3 = range(1, 3) def start(update, context): return NEXT2 start = CommandHandler('start', start) def next_callback(update, context): return NEXT2 next_handler = MessageHandler(None, next_callback) def next2(update, context): return ConversationHandler.END next2 = MessageHandler(None, next2) nested_ch = ConversationHandler( [next_handler], {NEXT2: [next2]}, [], name='name3', persistent=True, map_to_parent={ConversationHandler.END: ConversationHandler.END}, ) ch = ConversationHandler( [start], {NEXT2: [nested_ch], NEXT3: []}, [], name='name2', persistent=True ) dp.add_handler(ch) assert ch.conversations[ch._get_key(update)] == 1 assert nested_ch.conversations[nested_ch._get_key(update)] == 1 dp.process_update(update) assert ch._get_key(update) not in ch.conversations assert nested_ch._get_key(update) not in nested_ch.conversations update.message.text = '/start' update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)] dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 1 assert ch.conversations == pickle_persistence.conversations['name2'] assert nested_ch._get_key(update) not in nested_ch.conversations dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 1 assert ch.conversations == pickle_persistence.conversations['name2'] assert nested_ch.conversations[nested_ch._get_key(update)] == 1 assert nested_ch.conversations == pickle_persistence.conversations['name3'] @pytest.mark.parametrize( 'filepath', ['pickletest', Path('pickletest')], ids=['str filepath', 'pathlib.Path filepath'], ) def test_filepath_argument_types(self, filepath): pick_persist = PicklePersistence( filepath=filepath, on_flush=False, ) pick_persist.update_user_data(1, 1) assert pick_persist.get_user_data()[1] == 1 assert Path(filepath).is_file() def test_with_job(self, job_queue, dp, pickle_persistence): dp.bot.arbitrary_callback_data = True def job_callback(context): context.bot_data['test1'] = '456' context.dispatcher.chat_data[123]['test2'] = '789' context.dispatcher.user_data[789]['test3'] = '123' context.bot.callback_data_cache._callback_queries['test'] = 'Working4!' dp.persistence = pickle_persistence job_queue.set_dispatcher(dp) job_queue.start() job_queue.run_once(job_callback, 0.01) sleep(0.5) bot_data = pickle_persistence.get_bot_data() assert bot_data == {'test1': '456'} chat_data = pickle_persistence.get_chat_data() assert chat_data[123] == {'test2': '789'} user_data = pickle_persistence.get_user_data() assert user_data[789] == {'test3': '123'} data = pickle_persistence.get_callback_data()[1] assert data['test'] == 'Working4!' @pytest.mark.parametrize('singlefile', [True, False]) @pytest.mark.parametrize('ud', [int, float, complex]) @pytest.mark.parametrize('cd', [int, float, complex]) @pytest.mark.parametrize('bd', [int, float, complex]) def test_with_context_types(self, ud, cd, bd, singlefile): cc = ContextTypes(user_data=ud, chat_data=cd, bot_data=bd) persistence = PicklePersistence('pickletest', single_file=singlefile, context_types=cc) assert isinstance(persistence.get_user_data()[1], ud) assert persistence.get_user_data()[1] == 0 assert isinstance(persistence.get_chat_data()[1], cd) assert persistence.get_chat_data()[1] == 0 assert isinstance(persistence.get_bot_data(), bd) assert persistence.get_bot_data() == 0 persistence.user_data = None persistence.chat_data = None persistence.update_user_data(1, ud(1)) persistence.update_chat_data(1, cd(1)) persistence.update_bot_data(bd(1)) assert persistence.get_user_data()[1] == 1 assert persistence.get_chat_data()[1] == 1 assert persistence.get_bot_data() == 1 persistence.flush() persistence = PicklePersistence('pickletest', single_file=singlefile, context_types=cc) assert isinstance(persistence.get_user_data()[1], ud) assert persistence.get_user_data()[1] == 1 assert isinstance(persistence.get_chat_data()[1], cd) assert persistence.get_chat_data()[1] == 1 assert isinstance(persistence.get_bot_data(), bd) assert persistence.get_bot_data() == 1 @pytest.fixture(scope='function') def user_data_json(user_data): return json.dumps(user_data) @pytest.fixture(scope='function') def chat_data_json(chat_data): return json.dumps(chat_data) @pytest.fixture(scope='function') def bot_data_json(bot_data): return json.dumps(bot_data) @pytest.fixture(scope='function') def callback_data_json(callback_data): return json.dumps(callback_data) @pytest.fixture(scope='function') def conversations_json(conversations): return """{"name1": {"[123, 123]": 3, "[456, 654]": 4}, "name2": {"[123, 321]": 1, "[890, 890]": 2}, "name3": {"[123, 321]": 1, "[890, 890]": 2}}""" class TestDictPersistence: def test_slot_behaviour(self, mro_slots, recwarn): inst = DictPersistence() for attr in inst.__slots__: assert getattr(inst, attr, 'err') != 'err', f"got extra slot '{attr}'" assert len(mro_slots(inst)) == len(set(mro_slots(inst))), "duplicate slot" def test_no_json_given(self): dict_persistence = DictPersistence() assert dict_persistence.get_user_data() == defaultdict(dict) assert dict_persistence.get_chat_data() == defaultdict(dict) assert dict_persistence.get_bot_data() == {} assert dict_persistence.get_callback_data() is None assert dict_persistence.get_conversations('noname') == {} def test_bad_json_string_given(self): bad_user_data = 'thisisnojson99900()))(' bad_chat_data = 'thisisnojson99900()))(' bad_bot_data = 'thisisnojson99900()))(' bad_callback_data = 'thisisnojson99900()))(' bad_conversations = 'thisisnojson99900()))(' with pytest.raises(TypeError, match='user_data'): DictPersistence(user_data_json=bad_user_data) with pytest.raises(TypeError, match='chat_data'): DictPersistence(chat_data_json=bad_chat_data) with pytest.raises(TypeError, match='bot_data'): DictPersistence(bot_data_json=bad_bot_data) with pytest.raises(TypeError, match='callback_data'): DictPersistence(callback_data_json=bad_callback_data) with pytest.raises(TypeError, match='conversations'): DictPersistence(conversations_json=bad_conversations) def test_invalid_json_string_given(self, pickle_persistence, bad_pickle_files): bad_user_data = '["this", "is", "json"]' bad_chat_data = '["this", "is", "json"]' bad_bot_data = '["this", "is", "json"]' bad_conversations = '["this", "is", "json"]' bad_callback_data_1 = '[[["str", 3.14, {"di": "ct"}]], "is"]' bad_callback_data_2 = '[[["str", "non-float", {"di": "ct"}]], {"di": "ct"}]' bad_callback_data_3 = '[[[{"not": "a str"}, 3.14, {"di": "ct"}]], {"di": "ct"}]' bad_callback_data_4 = '[[["wrong", "length"]], {"di": "ct"}]' bad_callback_data_5 = '["this", "is", "json"]' with pytest.raises(TypeError, match='user_data'): DictPersistence(user_data_json=bad_user_data) with pytest.raises(TypeError, match='chat_data'): DictPersistence(chat_data_json=bad_chat_data) with pytest.raises(TypeError, match='bot_data'): DictPersistence(bot_data_json=bad_bot_data) for bad_callback_data in [ bad_callback_data_1, bad_callback_data_2, bad_callback_data_3, bad_callback_data_4, bad_callback_data_5, ]: with pytest.raises(TypeError, match='callback_data'): DictPersistence(callback_data_json=bad_callback_data) with pytest.raises(TypeError, match='conversations'): DictPersistence(conversations_json=bad_conversations) def test_good_json_input( self, user_data_json, chat_data_json, bot_data_json, conversations_json, callback_data_json ): dict_persistence = DictPersistence( user_data_json=user_data_json, chat_data_json=chat_data_json, bot_data_json=bot_data_json, conversations_json=conversations_json, callback_data_json=callback_data_json, ) user_data = dict_persistence.get_user_data() assert isinstance(user_data, defaultdict) assert user_data[12345]['test1'] == 'test2' assert user_data[67890][3] == 'test4' assert user_data[54321] == {} chat_data = dict_persistence.get_chat_data() assert isinstance(chat_data, defaultdict) assert chat_data[-12345]['test1'] == 'test2' assert chat_data[-67890][3] == 'test4' assert chat_data[-54321] == {} bot_data = dict_persistence.get_bot_data() assert isinstance(bot_data, dict) assert bot_data['test1'] == 'test2' assert bot_data['test3']['test4'] == 'test5' assert 'test6' not in bot_data callback_data = dict_persistence.get_callback_data() assert isinstance(callback_data, tuple) assert callback_data[0] == [('test1', 1000, {'button1': 'test0', 'button2': 'test1'})] assert callback_data[1] == {'test1': 'test2'} conversation1 = dict_persistence.get_conversations('name1') assert isinstance(conversation1, dict) assert conversation1[(123, 123)] == 3 assert conversation1[(456, 654)] == 4 with pytest.raises(KeyError): conversation1[(890, 890)] conversation2 = dict_persistence.get_conversations('name2') assert isinstance(conversation1, dict) assert conversation2[(123, 321)] == 1 assert conversation2[(890, 890)] == 2 with pytest.raises(KeyError): conversation2[(123, 123)] def test_good_json_input_callback_data_none(self): dict_persistence = DictPersistence(callback_data_json='null') assert dict_persistence.callback_data is None assert dict_persistence.callback_data_json == 'null' def test_dict_outputs( self, user_data, user_data_json, chat_data, chat_data_json, bot_data, bot_data_json, callback_data_json, conversations, conversations_json, ): dict_persistence = DictPersistence( user_data_json=user_data_json, chat_data_json=chat_data_json, bot_data_json=bot_data_json, callback_data_json=callback_data_json, conversations_json=conversations_json, ) assert dict_persistence.user_data == user_data assert dict_persistence.chat_data == chat_data assert dict_persistence.bot_data == bot_data assert dict_persistence.bot_data == bot_data assert dict_persistence.conversations == conversations def test_json_outputs( self, user_data_json, chat_data_json, bot_data_json, callback_data_json, conversations_json ): dict_persistence = DictPersistence( user_data_json=user_data_json, chat_data_json=chat_data_json, bot_data_json=bot_data_json, callback_data_json=callback_data_json, conversations_json=conversations_json, ) assert dict_persistence.user_data_json == user_data_json assert dict_persistence.chat_data_json == chat_data_json assert dict_persistence.callback_data_json == callback_data_json assert dict_persistence.conversations_json == conversations_json def test_updating( self, user_data_json, chat_data_json, bot_data_json, callback_data, callback_data_json, conversations, conversations_json, ): dict_persistence = DictPersistence( user_data_json=user_data_json, chat_data_json=chat_data_json, bot_data_json=bot_data_json, callback_data_json=callback_data_json, conversations_json=conversations_json, ) user_data = dict_persistence.get_user_data() user_data[12345]['test3']['test4'] = 'test6' assert not dict_persistence.user_data == user_data assert not dict_persistence.user_data_json == json.dumps(user_data) dict_persistence.update_user_data(12345, user_data[12345]) user_data[12345]['test3']['test4'] = 'test7' assert not dict_persistence.user_data == user_data assert not dict_persistence.user_data_json == json.dumps(user_data) dict_persistence.update_user_data(12345, user_data[12345]) assert dict_persistence.user_data == user_data assert dict_persistence.user_data_json == json.dumps(user_data) chat_data = dict_persistence.get_chat_data() chat_data[-12345]['test3']['test4'] = 'test6' assert not dict_persistence.chat_data == chat_data assert not dict_persistence.chat_data_json == json.dumps(chat_data) dict_persistence.update_chat_data(-12345, chat_data[-12345]) chat_data[-12345]['test3']['test4'] = 'test7' assert not dict_persistence.chat_data == chat_data assert not dict_persistence.chat_data_json == json.dumps(chat_data) dict_persistence.update_chat_data(-12345, chat_data[-12345]) assert dict_persistence.chat_data == chat_data assert dict_persistence.chat_data_json == json.dumps(chat_data) bot_data = dict_persistence.get_bot_data() bot_data['test3']['test4'] = 'test6' assert not dict_persistence.bot_data == bot_data assert not dict_persistence.bot_data_json == json.dumps(bot_data) dict_persistence.update_bot_data(bot_data) bot_data['test3']['test4'] = 'test7' assert not dict_persistence.bot_data == bot_data assert not dict_persistence.bot_data_json == json.dumps(bot_data) dict_persistence.update_bot_data(bot_data) assert dict_persistence.bot_data == bot_data assert dict_persistence.bot_data_json == json.dumps(bot_data) callback_data = dict_persistence.get_callback_data() callback_data[1]['test3'] = 'test4' callback_data[0][0][2]['button2'] = 'test41' assert not dict_persistence.callback_data == callback_data assert not dict_persistence.callback_data_json == json.dumps(callback_data) dict_persistence.update_callback_data(callback_data) callback_data[1]['test3'] = 'test5' callback_data[0][0][2]['button2'] = 'test42' assert not dict_persistence.callback_data == callback_data assert not dict_persistence.callback_data_json == json.dumps(callback_data) dict_persistence.update_callback_data(callback_data) assert dict_persistence.callback_data == callback_data assert dict_persistence.callback_data_json == json.dumps(callback_data) conversation1 = dict_persistence.get_conversations('name1') conversation1[(123, 123)] = 5 assert not dict_persistence.conversations['name1'] == conversation1 dict_persistence.update_conversation('name1', (123, 123), 5) assert dict_persistence.conversations['name1'] == conversation1 conversations['name1'][(123, 123)] = 5 assert ( dict_persistence.conversations_json == DictPersistence._encode_conversations_to_json(conversations) ) assert dict_persistence.get_conversations('name1') == conversation1 dict_persistence._conversations = None dict_persistence.update_conversation('name1', (123, 123), 5) assert dict_persistence.conversations['name1'] == {(123, 123): 5} assert dict_persistence.get_conversations('name1') == {(123, 123): 5} assert ( dict_persistence.conversations_json == DictPersistence._encode_conversations_to_json({"name1": {(123, 123): 5}}) ) def test_with_handler(self, bot, update): dict_persistence = DictPersistence() u = UpdaterBuilder().bot(bot).persistence(dict_persistence).build() dp = u.dispatcher def first(update, context): if not context.user_data == {}: pytest.fail() if not context.chat_data == {}: pytest.fail() if not context.bot_data == {}: pytest.fail() if not context.bot.callback_data_cache.persistence_data == ([], {}): pytest.fail() context.user_data['test1'] = 'test2' context.chat_data[3] = 'test4' context.bot_data['test1'] = 'test0' context.bot.callback_data_cache._callback_queries['test1'] = 'test0' def second(update, context): if not context.user_data['test1'] == 'test2': pytest.fail() if not context.chat_data[3] == 'test4': pytest.fail() if not context.bot_data['test1'] == 'test0': pytest.fail() if not context.bot.callback_data_cache.persistence_data == ([], {'test1': 'test0'}): pytest.fail() h1 = MessageHandler(filters.ALL, first) h2 = MessageHandler(filters.ALL, second) dp.add_handler(h1) dp.process_update(update) user_data = dict_persistence.user_data_json chat_data = dict_persistence.chat_data_json bot_data = dict_persistence.bot_data_json callback_data = dict_persistence.callback_data_json dict_persistence_2 = DictPersistence( user_data_json=user_data, chat_data_json=chat_data, bot_data_json=bot_data, callback_data_json=callback_data, ) u = UpdaterBuilder().bot(bot).persistence(dict_persistence_2).build() dp = u.dispatcher dp.add_handler(h2) dp.process_update(update) def test_with_conversationHandler(self, dp, update, conversations_json): dict_persistence = DictPersistence(conversations_json=conversations_json) dp.persistence = dict_persistence NEXT, NEXT2 = range(2) def start(update, context): return NEXT start = CommandHandler('start', start) def next_callback(update, context): return NEXT2 next_handler = MessageHandler(None, next_callback) def next2(update, context): return ConversationHandler.END next2 = MessageHandler(None, next2) ch = ConversationHandler( [start], {NEXT: [next_handler], NEXT2: [next2]}, [], name='name2', persistent=True ) dp.add_handler(ch) assert ch.conversations[ch._get_key(update)] == 1 dp.process_update(update) assert ch._get_key(update) not in ch.conversations update.message.text = '/start' update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)] dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 0 assert ch.conversations == dict_persistence.conversations['name2'] def test_with_nested_conversationHandler(self, dp, update, conversations_json): dict_persistence = DictPersistence(conversations_json=conversations_json) dp.persistence = dict_persistence NEXT2, NEXT3 = range(1, 3) def start(update, context): return NEXT2 start = CommandHandler('start', start) def next_callback(update, context): return NEXT2 next_handler = MessageHandler(None, next_callback) def next2(update, context): return ConversationHandler.END next2 = MessageHandler(None, next2) nested_ch = ConversationHandler( [next_handler], {NEXT2: [next2]}, [], name='name3', persistent=True, map_to_parent={ConversationHandler.END: ConversationHandler.END}, ) ch = ConversationHandler( [start], {NEXT2: [nested_ch], NEXT3: []}, [], name='name2', persistent=True ) dp.add_handler(ch) assert ch.conversations[ch._get_key(update)] == 1 assert nested_ch.conversations[nested_ch._get_key(update)] == 1 dp.process_update(update) assert ch._get_key(update) not in ch.conversations assert nested_ch._get_key(update) not in nested_ch.conversations update.message.text = '/start' update.message.entities = [MessageEntity(MessageEntity.BOT_COMMAND, 0, 6)] dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 1 assert ch.conversations == dict_persistence.conversations['name2'] assert nested_ch._get_key(update) not in nested_ch.conversations dp.process_update(update) assert ch.conversations[ch._get_key(update)] == 1 assert ch.conversations == dict_persistence.conversations['name2'] assert nested_ch.conversations[nested_ch._get_key(update)] == 1 assert nested_ch.conversations == dict_persistence.conversations['name3'] def test_with_job(self, job_queue, dp): dp.bot.arbitrary_callback_data = True def job_callback(context): context.bot_data['test1'] = '456' context.dispatcher.chat_data[123]['test2'] = '789' context.dispatcher.user_data[789]['test3'] = '123' context.bot.callback_data_cache._callback_queries['test'] = 'Working4!' dict_persistence = DictPersistence() dp.persistence = dict_persistence job_queue.set_dispatcher(dp) job_queue.start() job_queue.run_once(job_callback, 0.01) sleep(0.8) bot_data = dict_persistence.get_bot_data() assert bot_data == {'test1': '456'} chat_data = dict_persistence.get_chat_data() assert chat_data[123] == {'test2': '789'} user_data = dict_persistence.get_user_data() assert user_data[789] == {'test3': '123'} data = dict_persistence.get_callback_data()[1] assert data['test'] == 'Working4!'