Add dispatcher argument to Updater (#1484)

This commit is contained in:
Bibo-Joshi 2020-01-26 20:59:47 +01:00 committed by Noam Meltzer
parent 1e7f4fae6f
commit 883c6b5901
3 changed files with 109 additions and 53 deletions

View file

@ -138,8 +138,6 @@ class Dispatcher(object):
else:
self.persistence = None
self.job_queue = job_queue
self.handlers = {}
"""Dict[:obj:`int`, List[:class:`telegram.ext.Handler`]]: Holds the handlers per group."""
self.groups = []
@ -162,6 +160,10 @@ class Dispatcher(object):
else:
self._set_singleton(None)
@property
def exception_event(self):
return self.__exception_event
def _init_async_threads(self, base_name, workers):
base_name = '{}_'.format(base_name) if base_name else ''

View file

@ -63,24 +63,29 @@ class Updater(object):
token (:obj:`str`, optional): The bot's token given by the @BotFather.
base_url (:obj:`str`, optional): Base_url for the bot.
workers (:obj:`int`, optional): Amount of threads in the thread pool for functions
decorated with ``@run_async``.
bot (:class:`telegram.Bot`, optional): A pre-initialized bot instance. If a pre-initialized
bot is used, it is the user's responsibility to create it using a `Request`
instance with a large enough connection pool.
decorated with ``@run_async`` (ignored if `dispatcher` argument is used).
bot (:class:`telegram.Bot`, optional): A pre-initialized bot instance (ignored if
`dispatcher` argument is used). If a pre-initialized bot is used, it is the user's
responsibility to create it using a `Request` instance with a large enough connection
pool.
dispatcher (:class:`telegram.ext.Dispatcher`, optional): A pre-initialized dispatcher
instance. If a pre-initialized dispatcher is used, it is the user's responsibility to
create it with proper arguments.
private_key (:obj:`bytes`, optional): Private key for decryption of telegram passport data.
private_key_password (:obj:`bytes`, optional): Password for above private key.
user_sig_handler (:obj:`function`, optional): Takes ``signum, frame`` as positional
arguments. This will be called when a signal is received, defaults are (SIGINT,
SIGTERM, SIGABRT) setable with :attr:`idle`.
request_kwargs (:obj:`dict`, optional): Keyword args to control the creation of a
`telegram.utils.request.Request` object (ignored if `bot` argument is used). The
request_kwargs are very useful for the advanced users who would like to control the
default timeouts and/or control the proxy used for http communication.
use_context (:obj:`bool`, optional): If set to ``True`` Use the context based callback API.
During the deprecation period of the old API the default is ``False``. **New users**:
set this to ``True``.
`telegram.utils.request.Request` object (ignored if `bot` or `dispatcher` argument is
used). The request_kwargs are very useful for the advanced users who would like to
control the default timeouts and/or control the proxy used for http communication.
use_context (:obj:`bool`, optional): If set to ``True`` Use the context based callback API
(ignored if `dispatcher` argument is used). During the deprecation period of the old
API the default is ``False``. **New users**: set this to ``True``.
persistence (:class:`telegram.ext.BasePersistence`, optional): The persistence class to
store data that should be persistent over restarts.
store data that should be persistent over restarts (ignored if `dispatcher` argument is
used).
Note:
You must supply either a :attr:`bot` or a :attr:`token` argument.
@ -102,53 +107,79 @@ class Updater(object):
user_sig_handler=None,
request_kwargs=None,
persistence=None,
use_context=False):
use_context=False,
dispatcher=None):
if (token is None) and (bot is None):
raise ValueError('`token` or `bot` must be passed')
if (token is not None) and (bot is not None):
raise ValueError('`token` and `bot` are mutually exclusive')
if (private_key is not None) and (bot is not None):
raise ValueError('`bot` and `private_key` are mutually exclusive')
if dispatcher is None:
if (token is None) and (bot is None):
raise ValueError('`token` or `bot` must be passed')
if (token is not None) and (bot is not None):
raise ValueError('`token` and `bot` are mutually exclusive')
if (private_key is not None) and (bot is not None):
raise ValueError('`bot` and `private_key` are mutually exclusive')
else:
if bot is not None:
raise ValueError('`dispatcher` and `bot` are mutually exclusive')
if persistence is not None:
raise ValueError('`dispatcher` and `persistence` are mutually exclusive')
if workers is not None:
raise ValueError('`dispatcher` and `workers` are mutually exclusive')
if use_context != dispatcher.use_context:
raise ValueError('`dispatcher` and `use_context` are mutually exclusive')
self.logger = logging.getLogger(__name__)
con_pool_size = workers + 4
if dispatcher is None:
con_pool_size = workers + 4
if bot is not None:
self.bot = bot
if bot.request.con_pool_size < con_pool_size:
if bot is not None:
self.bot = bot
if bot.request.con_pool_size < con_pool_size:
self.logger.warning(
'Connection pool of Request object is smaller than optimal value (%s)',
con_pool_size)
else:
# we need a connection pool the size of:
# * for each of the workers
# * 1 for Dispatcher
# * 1 for polling Updater (even if webhook is used, we can spare a connection)
# * 1 for JobQueue
# * 1 for main thread
if request_kwargs is None:
request_kwargs = {}
if 'con_pool_size' not in request_kwargs:
request_kwargs['con_pool_size'] = con_pool_size
self._request = Request(**request_kwargs)
self.bot = Bot(token, base_url, request=self._request, private_key=private_key,
private_key_password=private_key_password)
self.update_queue = Queue()
self.job_queue = JobQueue()
self.__exception_event = Event()
self.persistence = persistence
self.dispatcher = Dispatcher(
self.bot,
self.update_queue,
job_queue=self.job_queue,
workers=workers,
exception_event=self.__exception_event,
persistence=persistence,
use_context=use_context)
self.job_queue.set_dispatcher(self.dispatcher)
else:
con_pool_size = dispatcher.workers + 4
self.bot = dispatcher.bot
if self.bot.request.con_pool_size < con_pool_size:
self.logger.warning(
'Connection pool of Request object is smaller than optimal value (%s)',
con_pool_size)
else:
# we need a connection pool the size of:
# * for each of the workers
# * 1 for Dispatcher
# * 1 for polling Updater (even if webhook is used, we can spare a connection)
# * 1 for JobQueue
# * 1 for main thread
if request_kwargs is None:
request_kwargs = {}
if 'con_pool_size' not in request_kwargs:
request_kwargs['con_pool_size'] = con_pool_size
self._request = Request(**request_kwargs)
self.bot = Bot(token, base_url, request=self._request, private_key=private_key,
private_key_password=private_key_password)
self.update_queue = dispatcher.update_queue
self.__exception_event = dispatcher.exception_event
self.persistence = dispatcher.persistence
self.job_queue = dispatcher.job_queue
self.dispatcher = dispatcher
self.user_sig_handler = user_sig_handler
self.update_queue = Queue()
self.job_queue = JobQueue()
self.__exception_event = Event()
self.persistence = persistence
self.dispatcher = Dispatcher(
self.bot,
self.update_queue,
job_queue=self.job_queue,
workers=workers,
exception_event=self.__exception_event,
persistence=persistence,
use_context=use_context)
self.job_queue.set_dispatcher(self.dispatcher)
self.last_update_id = 0
self.running = False
self.is_idle = False

View file

@ -39,7 +39,7 @@ from future.builtins import bytes
from telegram import TelegramError, Message, User, Chat, Update, Bot
from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter
from telegram.ext import Updater
from telegram.ext import Updater, Dispatcher, BasePersistence
signalskip = pytest.mark.skipif(sys.platform == 'win32',
reason='Can\'t send signals without stopping '
@ -369,7 +369,7 @@ class TestUpdater(object):
with pytest.raises(ValueError):
Updater(token='123:abcd', bot=bot)
def test_no_token_or_bot(self):
def test_no_token_or_bot_or_dispatcher(self):
with pytest.raises(ValueError):
Updater()
@ -377,3 +377,26 @@ class TestUpdater(object):
bot = Bot('123:zyxw')
with pytest.raises(ValueError):
Updater(bot=bot, private_key=b'key')
def test_mutual_exclude_bot_dispatcher(self):
dispatcher = Dispatcher(None, None)
bot = Bot('123:zyxw')
with pytest.raises(ValueError):
Updater(bot=bot, dispatcher=dispatcher)
def test_mutual_exclude_persistence_dispatcher(self):
dispatcher = Dispatcher(None, None)
persistence = BasePersistence()
with pytest.raises(ValueError):
Updater(dispatcher=dispatcher, persistence=persistence)
def test_mutual_exclude_workers_dispatcher(self):
dispatcher = Dispatcher(None, None)
with pytest.raises(ValueError):
Updater(dispatcher=dispatcher, workers=8)
def test_mutual_exclude_use_context_dispatcher(self):
dispatcher = Dispatcher(None, None)
use_context = not dispatcher.use_context
with pytest.raises(ValueError):
Updater(dispatcher=dispatcher, use_context=use_context)