Refactor Dispatcher.run_async (#2051)

This commit is contained in:
Bibo-Joshi 2020-10-04 17:20:33 +02:00 committed by GitHub
parent 97adcdf538
commit 0d419ed6b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 591 additions and 65 deletions

View file

@ -38,7 +38,8 @@ class CallbackContext:
use a fairly unique name for the attributes. use a fairly unique name for the attributes.
Warning: Warning:
Do not combine custom attributes and @run_async. Due to how @run_async works, it will Do not combine custom attributes and ``@run_async``/
:meth:`telegram.ext.Disptacher.run_async`. Due to how ``run_async`` works, it will
almost certainly execute the callbacks for an update out of order, and the attributes almost certainly execute the callbacks for an update out of order, and the attributes
that you think you added will not be present. that you think you added will not be present.
@ -65,10 +66,16 @@ class CallbackContext:
is handled by :class:`telegram.ext.CommandHandler`, :class:`telegram.ext.PrefixHandler` is handled by :class:`telegram.ext.CommandHandler`, :class:`telegram.ext.PrefixHandler`
or :class:`telegram.ext.StringCommandHandler`. It contains a list of the words in the or :class:`telegram.ext.StringCommandHandler`. It contains a list of the words in the
text after the command, using any whitespace string as a delimiter. text after the command, using any whitespace string as a delimiter.
error (:class:`telegram.TelegramError`): Optional. The Telegram error that was raised. error (:class:`telegram.TelegramError`): Optional. The error that was raised.
Only present when passed to a error handler registered with Only present when passed to a error handler registered with
:attr:`telegram.ext.Dispatcher.add_error_handler`. :attr:`telegram.ext.Dispatcher.add_error_handler`.
job (:class:`telegram.ext.Job`): The job that that originated this callback. async_args (List[:obj:`object`]): Optional. Positional arguments of the function that
raised the error. Only present when the raising function was run asynchronously using
:meth:`telegram.ext.Dispatcher.run_async`.
async_kwargs (Dict[:obj:`str`, :obj:`object`]): Optional. Keyword arguments of the function
that raised the error. Only present when the raising function was run asynchronously
using :meth:`telegram.ext.Dispatcher.run_async`.
job (:class:`telegram.ext.Job`): Optional. The job which originated this callback.
Only present when passed to the callback of :class:`telegram.ext.Job`. Only present when passed to the callback of :class:`telegram.ext.Job`.
""" """
@ -89,6 +96,8 @@ class CallbackContext:
self.matches = None self.matches = None
self.error = None self.error = None
self.job = None self.job = None
self.async_args = None
self.async_kwargs = None
@property @property
def dispatcher(self): def dispatcher(self):
@ -123,9 +132,11 @@ class CallbackContext:
"https://git.io/fjxKe") "https://git.io/fjxKe")
@classmethod @classmethod
def from_error(cls, update, error, dispatcher): def from_error(cls, update, error, dispatcher, async_args=None, async_kwargs=None):
self = cls.from_update(update, dispatcher) self = cls.from_update(update, dispatcher)
self.error = error self.error = error
self.async_args = async_args
self.async_kwargs = async_kwargs
return self return self
@classmethod @classmethod

View file

@ -45,6 +45,7 @@ class CallbackQueryHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -55,6 +56,10 @@ class CallbackQueryHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -91,6 +96,8 @@ class CallbackQueryHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -102,13 +109,15 @@ class CallbackQueryHandler(Handler):
pass_groups=False, pass_groups=False,
pass_groupdict=False, pass_groupdict=False,
pass_user_data=False, pass_user_data=False,
pass_chat_data=False): pass_chat_data=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue, pass_job_queue=pass_job_queue,
pass_user_data=pass_user_data, pass_user_data=pass_user_data,
pass_chat_data=pass_chat_data) pass_chat_data=pass_chat_data,
run_async=run_async)
if isinstance(pattern, str): if isinstance(pattern, str):
pattern = re.compile(pattern) pattern = re.compile(pattern)

View file

@ -35,6 +35,7 @@ class ChosenInlineResultHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -45,6 +46,10 @@ class ChosenInlineResultHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -70,6 +75,8 @@ class ChosenInlineResultHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """

View file

@ -60,6 +60,7 @@ class CommandHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a :obj:`dict` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a :obj:`dict` you
@ -70,6 +71,10 @@ class CommandHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
command (:obj:`str` | List[:obj:`str`]): The command or list of commands this handler command (:obj:`str` | List[:obj:`str`]): The command or list of commands this handler
should listen for. Limitations are the same as described here should listen for. Limitations are the same as described here
@ -111,6 +116,8 @@ class CommandHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
Raises: Raises:
ValueError - when command is too long or has illegal chars. ValueError - when command is too long or has illegal chars.
@ -125,13 +132,15 @@ class CommandHandler(Handler):
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False, pass_job_queue=False,
pass_user_data=False, pass_user_data=False,
pass_chat_data=False): pass_chat_data=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue, pass_job_queue=pass_job_queue,
pass_user_data=pass_user_data, pass_user_data=pass_user_data,
pass_chat_data=pass_chat_data) pass_chat_data=pass_chat_data,
run_async=run_async)
if isinstance(command, str): if isinstance(command, str):
self.command = [command.lower()] self.command = [command.lower()]
@ -242,6 +251,7 @@ class PrefixHandler(CommandHandler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -252,6 +262,10 @@ class PrefixHandler(CommandHandler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
prefix (:obj:`str` | List[:obj:`str`]): The prefix(es) that will precede :attr:`command`. prefix (:obj:`str` | List[:obj:`str`]): The prefix(es) that will precede :attr:`command`.
command (:obj:`str` | List[:obj:`str`]): The command or list of commands this handler command (:obj:`str` | List[:obj:`str`]): The command or list of commands this handler
@ -289,6 +303,8 @@ class PrefixHandler(CommandHandler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -301,7 +317,8 @@ class PrefixHandler(CommandHandler):
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False, pass_job_queue=False,
pass_user_data=False, pass_user_data=False,
pass_chat_data=False): pass_chat_data=False,
run_async=False):
self._prefix = list() self._prefix = list()
self._command = list() self._command = list()
@ -312,7 +329,8 @@ class PrefixHandler(CommandHandler):
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue, pass_job_queue=pass_job_queue,
pass_user_data=pass_user_data, pass_user_data=pass_user_data,
pass_chat_data=pass_chat_data) pass_chat_data=pass_chat_data,
run_async=run_async)
self.prefix = prefix self.prefix = prefix
self.command = command self.command = command

View file

@ -168,6 +168,7 @@ class ConversationHandler(Handler):
name=None, name=None,
persistent=False, persistent=False,
map_to_parent=None): map_to_parent=None):
self.run_async = False
self._entry_points = entry_points self._entry_points = entry_points
self._states = states self._states = states

View file

@ -47,14 +47,23 @@ def run_async(func):
Using this decorator is only possible when only a single Dispatcher exist in the system. Using this decorator is only possible when only a single Dispatcher exist in the system.
Note:
DEPRECATED. Use :attr:`telegram.ext.Dispatcher.run_async` directly instead or the
:attr:`Handler.run_async` parameter.
Warning: Warning:
If you're using @run_async you cannot rely on adding custom attributes to If you're using ``@run_async`` you cannot rely on adding custom attributes to
:class:`telegram.ext.CallbackContext`. See its docs for more info. :class:`telegram.ext.CallbackContext`. See its docs for more info.
""" """
@wraps(func) @wraps(func)
def async_func(*args, **kwargs): def async_func(*args, **kwargs):
return Dispatcher.get_instance().run_async(func, *args, **kwargs) warnings.warn('The @run_async decorator is deprecated. Use the `run_async` parameter of'
'`Dispatcher.add_handler` or `Dispatcher.run_async` instead.',
TelegramDeprecationWarning,
stacklevel=2)
return Dispatcher.get_instance()._run_async(func, *args, update=None, error_handling=False,
**kwargs)
return async_func return async_func
@ -91,8 +100,8 @@ class Dispatcher:
update_queue (:obj:`Queue`): The synchronized queue that will contain the updates. update_queue (:obj:`Queue`): The synchronized queue that will contain the updates.
job_queue (:class:`telegram.ext.JobQueue`): Optional. The :class:`telegram.ext.JobQueue` job_queue (:class:`telegram.ext.JobQueue`): Optional. The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks. instance to pass onto handler callbacks.
workers (:obj:`int`): Number of maximum concurrent worker threads for the ``@run_async`` workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the
decorator. ``@run_async`` decorator and :meth:`run_async`.
user_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the user. user_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the user.
chat_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the chat. chat_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the chat.
bot_data (:obj:`dict`): A dictionary handlers can use to store data for the bot. bot_data (:obj:`dict`): A dictionary handlers can use to store data for the bot.
@ -105,7 +114,7 @@ class Dispatcher:
job_queue (:class:`telegram.ext.JobQueue`, optional): The :class:`telegram.ext.JobQueue` job_queue (:class:`telegram.ext.JobQueue`, optional): The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks. instance to pass onto handler callbacks.
workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the
``@run_async`` decorator. Defaults to 4. ``@run_async`` decorator and :meth:`run_async`. Defaults to 4.
persistence (:class:`telegram.ext.BasePersistence`, optional): The persistence class to persistence (:class:`telegram.ext.BasePersistence`, optional): The persistence class to
store data that should be persistent over restarts. store data that should be persistent over restarts.
use_context (:obj:`bool`, optional): If set to :obj:`True` Use the context based use_context (:obj:`bool`, optional): If set to :obj:`True` Use the context based
@ -141,9 +150,10 @@ class Dispatcher:
self.user_data = defaultdict(dict) self.user_data = defaultdict(dict)
self.chat_data = defaultdict(dict) self.chat_data = defaultdict(dict)
self.bot_data = {} self.bot_data = {}
self._update_persistence_lock = Lock()
if persistence: if persistence:
if not isinstance(persistence, BasePersistence): if not isinstance(persistence, BasePersistence):
raise TypeError("persistence should be based on telegram.ext.BasePersistence") raise TypeError("persistence must be based on telegram.ext.BasePersistence")
self.persistence = persistence self.persistence = persistence
if self.persistence.store_user_data: if self.persistence.store_user_data:
self.user_data = self.persistence.get_user_data() self.user_data = self.persistence.get_user_data()
@ -164,8 +174,9 @@ class Dispatcher:
"""Dict[:obj:`int`, List[:class:`telegram.ext.Handler`]]: Holds the handlers per group.""" """Dict[:obj:`int`, List[:class:`telegram.ext.Handler`]]: Holds the handlers per group."""
self.groups = [] self.groups = []
"""List[:obj:`int`]: A list with all groups.""" """List[:obj:`int`]: A list with all groups."""
self.error_handlers = [] self.error_handlers = {}
"""List[:obj:`callable`]: A list of errorHandlers.""" """Dict[:obj:`callable`, :obj:`bool`]: A dict, where the keys are error handlers and the
values indicate whether they are to be run asynchronously."""
self.running = False self.running = False
""":obj:`bool`: Indicates if this dispatcher is running.""" """:obj:`bool`: Indicates if this dispatcher is running."""
@ -229,30 +240,65 @@ class Dispatcher:
break break
promise.run() promise.run()
if not promise.exception:
self.update_persistence(update=promise.update)
continue
if isinstance(promise.exception, DispatcherHandlerStop): if isinstance(promise.exception, DispatcherHandlerStop):
self.logger.warning( self.logger.warning(
'DispatcherHandlerStop is not supported with async functions; func: %s', 'DispatcherHandlerStop is not supported with async functions; func: %s',
promise.pooled_function.__name__) promise.pooled_function.__name__)
continue
def run_async(self, func, *args, **kwargs): # Avoid infinite recursion of error handlers.
"""Queue a function (with given args/kwargs) to be run asynchronously. if promise.pooled_function in self.error_handlers:
self.logger.error('An uncaught error was raised while handling the error.')
continue
# Don't perform error handling for a `Promise` with deactivated error handling. This
# should happen only via the deprecated `@run_async` decorator or `Promises` created
# within error handlers
if not promise.error_handling:
self.logger.error('A promise with deactivated error handling raised an error.')
continue
# If we arrive here, an exception happened in the promise and was neither
# DispatcherHandlerStop nor raised by an error handler. So we can and must handle it
try:
self.dispatch_error(promise.update, promise.exception, promise=promise)
except Exception:
self.logger.exception('An uncaught error was raised while handling the error.')
def run_async(self, func, *args, update=None, **kwargs):
"""
Queue a function (with given args/kwargs) to be run asynchronously. Exceptions raised
by the function will be handled by the error handlers registered with
:meth:`add_error_handler`.
Warning: Warning:
If you're using @run_async you cannot rely on adding custom attributes to * If you're using ``@run_async``/:meth:`run_async` you cannot rely on adding custom
:class:`telegram.ext.CallbackContext`. See its docs for more info. attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
* Calling a function through :meth:`run_async` from within an error handler can lead to
an infinite error handling loop.
Args: Args:
func (:obj:`callable`): The function to run in the thread. func (:obj:`callable`): The function to run in the thread.
*args (:obj:`tuple`, optional): Arguments to `func`. *args (:obj:`tuple`, optional): Arguments to ``func``.
**kwargs (:obj:`dict`, optional): Keyword arguments to `func`. update (:class:`telegram.Update`, optional): The update associated with the functions
call. If passed, it will be available in the error handlers, in case an exception
is raised by :attr:`func`.
**kwargs (:obj:`dict`, optional): Keyword arguments to ``func``.
Returns: Returns:
Promise Promise
""" """
# TODO: handle exception in async threads return self._run_async(func, *args, update=update, error_handling=True, **kwargs)
# set a threading.Event to notify caller thread
promise = Promise(func, args, kwargs) def _run_async(self, func, *args, update=None, error_handling=True, **kwargs):
# TODO: Remove error_handling parameter once we drop the @run_async decorator
promise = Promise(func, args, kwargs, update=update, error_handling=error_handling)
self.__async_queue.put(promise) self.__async_queue.put(promise)
return promise return promise
@ -345,7 +391,7 @@ class Dispatcher:
try: try:
self.dispatch_error(None, update) self.dispatch_error(None, update)
except Exception: except Exception:
self.logger.exception('An uncaught error was raised while handling the error') self.logger.exception('An uncaught error was raised while handling the error.')
return return
context = None context = None
@ -358,6 +404,9 @@ class Dispatcher:
if not context and self.use_context: if not context and self.use_context:
context = CallbackContext.from_update(update, self) context = CallbackContext.from_update(update, self)
handler.handle_update(update, self, check, context) handler.handle_update(update, self, check, context)
# If handler runs async updating immediately doesn't make sense
if not handler.run_async:
self.update_persistence(update=update) self.update_persistence(update=update)
break break
@ -376,9 +425,7 @@ class Dispatcher:
break break
# Errors should not stop the thread. # Errors should not stop the thread.
except Exception: except Exception:
self.logger.exception('An error was raised while processing the update and an ' self.logger.exception('An uncaught error was raised while handling the error.')
'uncaught error was raised while handling the error '
'with an error_handler')
def add_handler(self, handler, group=DEFAULT_GROUP): def add_handler(self, handler, group=DEFAULT_GROUP):
"""Register a handler. """Register a handler.
@ -415,7 +462,7 @@ class Dispatcher:
if isinstance(handler, ConversationHandler) and handler.persistent: if isinstance(handler, ConversationHandler) and handler.persistent:
if not self.persistence: if not self.persistence:
raise ValueError( raise ValueError(
"Conversationhandler {} can not be persistent if dispatcher has no " "ConversationHandler {} can not be persistent if dispatcher has no "
"persistence".format(handler.name)) "persistence".format(handler.name))
handler.persistence = self.persistence handler.persistence = self.persistence
handler.conversations = self.persistence.get_conversations(handler.name) handler.conversations = self.persistence.get_conversations(handler.name)
@ -448,9 +495,15 @@ class Dispatcher:
update (:class:`telegram.Update`, optional): The update to process. If passed, only the update (:class:`telegram.Update`, optional): The update to process. If passed, only the
corresponding ``user_data`` and ``chat_data`` will be updated. corresponding ``user_data`` and ``chat_data`` will be updated.
""" """
with self._update_persistence_lock:
self.__update_persistence(update)
def __update_persistence(self, update):
if self.persistence: if self.persistence:
chat_ids = self.chat_data.keys() # We use list() here in order to decouple chat_ids from self.chat_data, as dict view
user_ids = self.user_data.keys() # objects will change, when the dict does and we want to loop over chat_ids
chat_ids = list(self.chat_data.keys())
user_ids = list(self.user_data.keys())
if isinstance(update, Update): if isinstance(update, Update):
if update.effective_chat: if update.effective_chat:
@ -498,11 +551,15 @@ class Dispatcher:
'the error with an error_handler' 'the error with an error_handler'
self.logger.exception(message) self.logger.exception(message)
def add_error_handler(self, callback): def add_error_handler(self, callback, run_async=False):
"""Registers an error handler in the Dispatcher. This handler will receive every error """Registers an error handler in the Dispatcher. This handler will receive every error
which happens in your bot. which happens in your bot.
Warning: The errors handled within these handlers won't show up in the logger, so you Note:
Attempts to add the same callback multiple times will be ignored.
Warning:
The errors handled within these handlers won't show up in the logger, so you
need to make sure that you reraise the error. need to make sure that you reraise the error.
Args: Args:
@ -512,11 +569,16 @@ class Dispatcher:
``def callback(update: Update, context: CallbackContext)`` ``def callback(update: Update, context: CallbackContext)``
The error that happened will be present in context.error. The error that happened will be present in context.error.
run_async (:obj:`bool`, optional): Whether this handlers callback should be run
asynchronously using :meth:`run_async`. Defaults to :obj:`False`.
Note: Note:
See https://git.io/fxJuV for more info about switching to context based API. See https://git.io/fxJuV for more info about switching to context based API.
""" """
self.error_handlers.append(callback) if callback in self.error_handlers:
self.logger.debug('The callback is already registered as an error handler. Ignoring.')
return
self.error_handlers[callback] = run_async
def remove_error_handler(self, callback): def remove_error_handler(self, callback):
"""Removes an error handler. """Removes an error handler.
@ -525,21 +587,34 @@ class Dispatcher:
callback (:obj:`callable`): The error handler to remove. callback (:obj:`callable`): The error handler to remove.
""" """
if callback in self.error_handlers: self.error_handlers.pop(callback, None)
self.error_handlers.remove(callback)
def dispatch_error(self, update, error): def dispatch_error(self, update, error, promise=None):
"""Dispatches an error. """Dispatches an error.
Args: Args:
update (:obj:`str` | :class:`telegram.Update` | None): The update that caused the error update (:obj:`str` | :class:`telegram.Update` | None): The update that caused the error
error (:obj:`Exception`): The error that was raised. error (:obj:`Exception`): The error that was raised.
promise (:class:`telegram.utils.Promise`, optional): The promise whose pooled function
raised the error.
""" """
async_args = None if not promise else promise.args
async_kwargs = None if not promise else promise.kwargs
if self.error_handlers: if self.error_handlers:
for callback in self.error_handlers: for callback, run_async in self.error_handlers.items():
if self.use_context: if self.use_context:
callback(update, CallbackContext.from_error(update, error, self)) context = CallbackContext.from_error(update, error, self,
async_args=async_args,
async_kwargs=async_kwargs)
if run_async:
self.run_async(callback, update, context, update=update)
else:
callback(update, context)
else:
if run_async:
self.run_async(callback, self.bot, update, error, update=update)
else: else:
callback(self.bot, update, error) callback(self.bot, update, error)

View file

@ -34,6 +34,7 @@ class Handler(ABC):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -44,6 +45,10 @@ class Handler(ABC):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -69,6 +74,8 @@ class Handler(ABC):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -77,12 +84,14 @@ class Handler(ABC):
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False, pass_job_queue=False,
pass_user_data=False, pass_user_data=False,
pass_chat_data=False): pass_chat_data=False,
run_async=False):
self.callback = callback self.callback = callback
self.pass_update_queue = pass_update_queue self.pass_update_queue = pass_update_queue
self.pass_job_queue = pass_job_queue self.pass_job_queue = pass_job_queue
self.pass_user_data = pass_user_data self.pass_user_data = pass_user_data
self.pass_chat_data = pass_chat_data self.pass_chat_data = pass_chat_data
self.run_async = run_async
@abstractmethod @abstractmethod
def check_update(self, update): def check_update(self, update):
@ -116,9 +125,16 @@ class Handler(ABC):
""" """
if context: if context:
self.collect_additional_context(context, update, dispatcher, check_result) self.collect_additional_context(context, update, dispatcher, check_result)
if self.run_async:
return dispatcher.run_async(self.callback, update, context, update=update)
else:
return self.callback(update, context) return self.callback(update, context)
else: else:
optional_args = self.collect_optional_args(dispatcher, update, check_result) optional_args = self.collect_optional_args(dispatcher, update, check_result)
if self.run_async:
return dispatcher.run_async(self.callback, dispatcher.bot, update, update=update,
**optional_args)
else:
return self.callback(dispatcher.bot, update, **optional_args) return self.callback(dispatcher.bot, update, **optional_args)
def collect_additional_context(self, context, update, dispatcher, check_result): def collect_additional_context(self, context, update, dispatcher, check_result):

View file

@ -45,6 +45,7 @@ class InlineQueryHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -55,6 +56,10 @@ class InlineQueryHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -91,6 +96,8 @@ class InlineQueryHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -102,13 +109,15 @@ class InlineQueryHandler(Handler):
pass_groups=False, pass_groups=False,
pass_groupdict=False, pass_groupdict=False,
pass_user_data=False, pass_user_data=False,
pass_chat_data=False): pass_chat_data=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue, pass_job_queue=pass_job_queue,
pass_user_data=pass_user_data, pass_user_data=pass_user_data,
pass_chat_data=pass_chat_data) pass_chat_data=pass_chat_data,
run_async=False)
if isinstance(pattern, str): if isinstance(pattern, str):
pattern = re.compile(pattern) pattern = re.compile(pattern)

View file

@ -48,6 +48,7 @@ class MessageHandler(Handler):
Default is :obj:`None`. Default is :obj:`None`.
edited_updates (:obj:`bool`): Should "edited" message updates be handled? edited_updates (:obj:`bool`): Should "edited" message updates be handled?
Default is :obj:`None`. Default is :obj:`None`.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -58,6 +59,10 @@ class MessageHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
filters (:class:`telegram.ext.BaseFilter`, optional): A filter inheriting from filters (:class:`telegram.ext.BaseFilter`, optional): A filter inheriting from
:class:`telegram.ext.filters.BaseFilter`. Standard filters can be found in :class:`telegram.ext.filters.BaseFilter`. Standard filters can be found in
@ -100,6 +105,8 @@ class MessageHandler(Handler):
edited_updates (:obj:`bool`, optional): Should "edited" message updates be handled? Default edited_updates (:obj:`bool`, optional): Should "edited" message updates be handled? Default
is :obj:`None`. is :obj:`None`.
DEPRECATED: Please switch to filters for update filtering. DEPRECATED: Please switch to filters for update filtering.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
Raises: Raises:
ValueError ValueError
@ -115,14 +122,16 @@ class MessageHandler(Handler):
pass_chat_data=False, pass_chat_data=False,
message_updates=None, message_updates=None,
channel_post_updates=None, channel_post_updates=None,
edited_updates=None): edited_updates=None,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue, pass_job_queue=pass_job_queue,
pass_user_data=pass_user_data, pass_user_data=pass_user_data,
pass_chat_data=pass_chat_data) pass_chat_data=pass_chat_data,
run_async=run_async)
if message_updates is False and channel_post_updates is False and edited_updates is False: if message_updates is False and channel_post_updates is False and edited_updates is False:
raise ValueError( raise ValueError(
'message_updates, channel_post_updates and edited_updates are all False') 'message_updates, channel_post_updates and edited_updates are all False')

View file

@ -34,6 +34,7 @@ class PollAnswerHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -44,6 +45,10 @@ class PollAnswerHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -69,6 +74,8 @@ class PollAnswerHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """

View file

@ -34,6 +34,7 @@ class PollHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -44,6 +45,10 @@ class PollHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -69,6 +74,8 @@ class PollHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """

View file

@ -35,6 +35,7 @@ class PreCheckoutQueryHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -45,6 +46,10 @@ class PreCheckoutQueryHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -70,6 +75,8 @@ class PreCheckoutQueryHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """

View file

@ -48,11 +48,16 @@ class RegexHandler(MessageHandler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
This handler is being deprecated. For the same use case use: This handler is being deprecated. For the same use case use:
``MessageHandler(Filters.regex(r'pattern'), callback)`` ``MessageHandler(Filters.regex(r'pattern'), callback)``
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
pattern (:obj:`str` | :obj:`Pattern`): The regex pattern. pattern (:obj:`str` | :obj:`Pattern`): The regex pattern.
@ -88,6 +93,8 @@ class RegexHandler(MessageHandler):
Default is :obj:`True`. Default is :obj:`True`.
edited_updates (:obj:`bool`, optional): Should "edited" message updates be handled? Default edited_updates (:obj:`bool`, optional): Should "edited" message updates be handled? Default
is :obj:`False`. is :obj:`False`.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
Raises: Raises:
ValueError ValueError
@ -106,7 +113,8 @@ class RegexHandler(MessageHandler):
allow_edited=False, allow_edited=False,
message_updates=True, message_updates=True,
channel_post_updates=False, channel_post_updates=False,
edited_updates=False): edited_updates=False,
run_async=False):
warnings.warn('RegexHandler is deprecated. See https://git.io/fxJuV for more info', warnings.warn('RegexHandler is deprecated. See https://git.io/fxJuV for more info',
TelegramDeprecationWarning, TelegramDeprecationWarning,
stacklevel=2) stacklevel=2)
@ -118,7 +126,8 @@ class RegexHandler(MessageHandler):
pass_chat_data=pass_chat_data, pass_chat_data=pass_chat_data,
message_updates=message_updates, message_updates=message_updates,
channel_post_updates=channel_post_updates, channel_post_updates=channel_post_updates,
edited_updates=edited_updates) edited_updates=edited_updates,
run_async=run_async)
self.pass_groups = pass_groups self.pass_groups = pass_groups
self.pass_groupdict = pass_groupdict self.pass_groupdict = pass_groupdict

View file

@ -35,6 +35,7 @@ class ShippingQueryHandler(Handler):
the callback function. the callback function.
pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to pass_chat_data (:obj:`bool`): Determines whether ``chat_data`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Note: Note:
:attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you :attr:`pass_user_data` and :attr:`pass_chat_data` determine whether a ``dict`` you
@ -45,6 +46,10 @@ class ShippingQueryHandler(Handler):
Note that this is DEPRECATED, and you should use context based callbacks. See Note that this is DEPRECATED, and you should use context based callbacks. See
https://git.io/fxJuV for more info. https://git.io/fxJuV for more info.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
:attr:`check_update` has determined that an update should be processed by this handler. :attr:`check_update` has determined that an update should be processed by this handler.
@ -70,6 +75,8 @@ class ShippingQueryHandler(Handler):
pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called pass_chat_data (:obj:`bool`, optional): If set to :obj:`True`, a keyword argument called
``chat_data`` will be passed to the callback function. Default is :obj:`False`. ``chat_data`` will be passed to the callback function. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """

View file

@ -28,6 +28,10 @@ class StringCommandHandler(Handler):
This handler is not used to handle Telegram :attr:`telegram.Update`, but strings manually This handler is not used to handle Telegram :attr:`telegram.Update`, but strings manually
put in the queue. For example to send messages with the bot using command line or API. put in the queue. For example to send messages with the bot using command line or API.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Attributes: Attributes:
command (:obj:`str`): The command this handler should listen for. command (:obj:`str`): The command this handler should listen for.
callback (:obj:`callable`): The callback function for this handler. callback (:obj:`callable`): The callback function for this handler.
@ -37,6 +41,7 @@ class StringCommandHandler(Handler):
passed to the callback function. passed to the callback function.
pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Args: Args:
callback (:obj:`callable`): The callback function for this handler. Will be called when callback (:obj:`callable`): The callback function for this handler. Will be called when
@ -62,6 +67,8 @@ class StringCommandHandler(Handler):
class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater` class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater`
which can be used to schedule new jobs. Default is :obj:`False`. which can be used to schedule new jobs. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -70,11 +77,13 @@ class StringCommandHandler(Handler):
callback, callback,
pass_args=False, pass_args=False,
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False): pass_job_queue=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue) pass_job_queue=pass_job_queue,
run_async=run_async)
self.command = command self.command = command
self.pass_args = pass_args self.pass_args = pass_args

View file

@ -33,6 +33,10 @@ class StringRegexHandler(Handler):
This handler is not used to handle Telegram :attr:`telegram.Update`, but strings manually This handler is not used to handle Telegram :attr:`telegram.Update`, but strings manually
put in the queue. For example to send messages with the bot using command line or API. put in the queue. For example to send messages with the bot using command line or API.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Attributes: Attributes:
pattern (:obj:`str` | :obj:`Pattern`): The regex pattern. pattern (:obj:`str` | :obj:`Pattern`): The regex pattern.
callback (:obj:`callable`): The callback function for this handler. callback (:obj:`callable`): The callback function for this handler.
@ -44,6 +48,7 @@ class StringRegexHandler(Handler):
passed to the callback function. passed to the callback function.
pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Args: Args:
pattern (:obj:`str` | :obj:`Pattern`): The regex pattern. pattern (:obj:`str` | :obj:`Pattern`): The regex pattern.
@ -73,6 +78,8 @@ class StringRegexHandler(Handler):
:class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater` :class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater`
which can be used to schedule new jobs. Default is :obj:`False`. which can be used to schedule new jobs. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -82,11 +89,13 @@ class StringRegexHandler(Handler):
pass_groups=False, pass_groups=False,
pass_groupdict=False, pass_groupdict=False,
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False): pass_job_queue=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue) pass_job_queue=pass_job_queue,
run_async=run_async)
if isinstance(pattern, str): if isinstance(pattern, str):
pattern = re.compile(pattern) pattern = re.compile(pattern)

View file

@ -32,6 +32,11 @@ class TypeHandler(Handler):
passed to the callback function. passed to the callback function.
pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to pass_job_queue (:obj:`bool`): Determines whether ``job_queue`` will be passed to
the callback function. the callback function.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Warning:
When setting ``run_async`` to :obj:`True`, you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
Args: Args:
type (:obj:`type`): The ``type`` of updates this handler should process, as type (:obj:`type`): The ``type`` of updates this handler should process, as
@ -56,6 +61,8 @@ class TypeHandler(Handler):
:class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater` :class:`telegram.ext.JobQueue` instance created by the :class:`telegram.ext.Updater`
which can be used to schedule new jobs. Default is :obj:`False`. which can be used to schedule new jobs. Default is :obj:`False`.
DEPRECATED: Please switch to context based callbacks. DEPRECATED: Please switch to context based callbacks.
run_async (:obj:`bool`): Determines whether the callback will run asynchronously.
Defaults to :obj:`False`.
""" """
@ -64,11 +71,13 @@ class TypeHandler(Handler):
callback, callback,
strict=False, strict=False,
pass_update_queue=False, pass_update_queue=False,
pass_job_queue=False): pass_job_queue=False,
run_async=False):
super().__init__( super().__init__(
callback, callback,
pass_update_queue=pass_update_queue, pass_update_queue=pass_update_queue,
pass_job_queue=pass_job_queue) pass_job_queue=pass_job_queue,
run_async=run_async)
self.type = type self.type = type
self.strict = strict self.strict = strict

View file

@ -32,19 +32,28 @@ class Promise:
pooled_function (:obj:`callable`): The callable that will be called concurrently. pooled_function (:obj:`callable`): The callable that will be called concurrently.
args (:obj:`list` | :obj:`tuple`): Positional arguments for :attr:`pooled_function`. args (:obj:`list` | :obj:`tuple`): Positional arguments for :attr:`pooled_function`.
kwargs (:obj:`dict`): Keyword arguments for :attr:`pooled_function`. kwargs (:obj:`dict`): Keyword arguments for :attr:`pooled_function`.
update (:class:`telegram.Update`, optional): The update this promise is associated with.
error_handling (:obj:`bool`, optional): Whether exceptions raised by :attr:`func`
may be handled by error handlers. Defaults to :obj:`True`.
Attributes: Attributes:
pooled_function (:obj:`callable`): The callable that will be called concurrently. pooled_function (:obj:`callable`): The callable that will be called concurrently.
args (:obj:`list` | :obj:`tuple`): Positional arguments for :attr:`pooled_function`. args (:obj:`list` | :obj:`tuple`): Positional arguments for :attr:`pooled_function`.
kwargs (:obj:`dict`): Keyword arguments for :attr:`pooled_function`. kwargs (:obj:`dict`): Keyword arguments for :attr:`pooled_function`.
done (:obj:`threading.Event`): Is set when the result is available. done (:obj:`threading.Event`): Is set when the result is available.
update (:class:`telegram.Update`): Optional. The update this promise is associated with.
error_handling (:obj:`bool`): Optional. Whether exceptions raised by :attr:`func`
may be handled by error handlers. Defaults to :obj:`True`.
""" """
def __init__(self, pooled_function, args, kwargs): # TODO: Remove error_handling parameter once we drop the @run_async decorator
def __init__(self, pooled_function, args, kwargs, update=None, error_handling=True):
self.pooled_function = pooled_function self.pooled_function = pooled_function
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.update = update
self.error_handling = error_handling
self.done = Event() self.done = Event()
self._result = None self._result = None
self._exception = None self._exception = None
@ -56,7 +65,6 @@ class Promise:
self._result = self.pooled_function(*self.args, **self.kwargs) self._result = self.pooled_function(*self.args, **self.kwargs)
except Exception as exc: except Exception as exc:
logger.exception('An uncaught error was raised while running the promise')
self._exception = exc self._exception = exc
finally: finally:

View file

@ -120,7 +120,7 @@ def dp(_dp):
_dp.persistence = None _dp.persistence = None
_dp.handlers = {} _dp.handlers = {}
_dp.groups = [] _dp.groups = []
_dp.error_handlers = [] _dp.error_handlers = {}
_dp.__stop_event = Event() _dp.__stop_event = Event()
_dp.__exception_event = Event() _dp.__exception_event = Event()
_dp.__async_queue = Queue() _dp.__async_queue = Queue()

View file

@ -104,6 +104,22 @@ class TestCallbackContext:
assert callback_context.bot is cdp.bot assert callback_context.bot is cdp.bot
assert callback_context.job_queue is cdp.job_queue assert callback_context.job_queue is cdp.job_queue
assert callback_context.update_queue is cdp.update_queue assert callback_context.update_queue is cdp.update_queue
assert callback_context.async_args is None
assert callback_context.async_kwargs is None
def test_from_error_async_params(self, cdp):
error = TelegramError('test')
args = [1, '2']
kwargs = {'one': 1, 2: 'two'}
callback_context = CallbackContext.from_error(None, error, cdp,
async_args=args,
async_kwargs=kwargs)
assert callback_context.error is error
assert callback_context.async_args is args
assert callback_context.async_kwargs is kwargs
def test_match(self, cdp): def test_match(self, cdp):
callback_context = CallbackContext(cdp) callback_context = CallbackContext(cdp)

View file

@ -486,6 +486,30 @@ class TestConversationHandler:
# Assert that the Promise has been resolved and the conversation ended. # Assert that the Promise has been resolved and the conversation ended.
assert len(handler.conversations) == 0 assert len(handler.conversations) == 0
def test_end_on_first_message_async_handler(self, dp, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start_end, run_async=True)], states={},
fallbacks=[])
dp.add_handler(handler)
# User starts the state machine with an async function that immediately ends the
# conversation. Async results are resolved when the users state is queried next time.
message = Message(0, user1, None, self.group, text='/start',
entities=[MessageEntity(type=MessageEntity.BOT_COMMAND,
offset=0, length=len('/start'))],
bot=bot)
dp.update_queue.put(Update(update_id=0, message=message))
sleep(.1)
# Assert that the Promise has been accepted as the new state
assert len(handler.conversations) == 1
message.text = 'resolve promise pls'
message.entities[0].length = len('resolve promise pls')
dp.update_queue.put(Update(update_id=0, message=message))
sleep(.1)
# Assert that the Promise has been resolved and the conversation ended.
assert len(handler.conversations) == 0
def test_none_on_first_message(self, dp, bot, user1): def test_none_on_first_message(self, dp, bot, user1):
handler = ConversationHandler( handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start_none)], states={}, fallbacks=[]) entry_points=[CommandHandler('start', self.start_none)], states={}, fallbacks=[])
@ -520,6 +544,29 @@ class TestConversationHandler:
# Assert that the Promise has been resolved and the conversation ended. # Assert that the Promise has been resolved and the conversation ended.
assert len(handler.conversations) == 0 assert len(handler.conversations) == 0
def test_none_on_first_message_async_handler(self, dp, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start_none, run_async=True)], states={},
fallbacks=[])
dp.add_handler(handler)
# User starts the state machine with an async function that returns None
# Async results are resolved when the users state is queried next time.
message = Message(0, user1, None, self.group, text='/start',
entities=[MessageEntity(type=MessageEntity.BOT_COMMAND,
offset=0, length=len('/start'))],
bot=bot)
dp.update_queue.put(Update(update_id=0, message=message))
sleep(.1)
# Assert that the Promise has been accepted as the new state
assert len(handler.conversations) == 1
message.text = 'resolve promise pls'
dp.update_queue.put(Update(update_id=0, message=message))
sleep(.1)
# Assert that the Promise has been resolved and the conversation ended.
assert len(handler.conversations) == 0
def test_per_chat_message_without_chat(self, bot, user1): def test_per_chat_message_without_chat(self, bot, user1):
handler = ConversationHandler( handler = ConversationHandler(
entry_points=[CommandHandler('start', self.start_end)], states={}, entry_points=[CommandHandler('start', self.start_end)], states={},

View file

@ -16,6 +16,7 @@
# #
# You should have received a copy of the GNU Lesser Public License # You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/]. # along with this program. If not, see [http://www.gnu.org/licenses/].
import logging
from queue import Queue from queue import Queue
from threading import current_thread from threading import current_thread
from time import sleep from time import sleep
@ -54,6 +55,9 @@ class TestDispatcher:
def error_handler(self, bot, update, error): def error_handler(self, bot, update, error):
self.received = error.message self.received = error.message
def error_handler_context(self, update, context):
self.received = context.error.message
def error_handler_raise_error(self, bot, update, error): def error_handler_raise_error(self, bot, update, error):
raise Exception('Failing bigly') raise Exception('Failing bigly')
@ -67,7 +71,9 @@ class TestDispatcher:
return callback return callback
def callback_raise_error(self, bot, update): def callback_raise_error(self, bot, update):
if isinstance(bot, Bot):
raise TelegramError(update.message.text) raise TelegramError(update.message.text)
raise TelegramError(bot.message.text)
def callback_if_not_update_queue(self, bot, update, update_queue=None): def callback_if_not_update_queue(self, bot, update, update_queue=None):
if update_queue is not None: if update_queue is not None:
@ -116,6 +122,13 @@ class TestDispatcher:
sleep(.1) sleep(.1)
assert self.received is None assert self.received is None
def test_double_add_error_handler(self, dp, caplog):
dp.add_error_handler(self.error_handler)
with caplog.at_level(logging.DEBUG):
dp.add_error_handler(self.error_handler)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('The callback is already registered')
def test_construction_with_bad_persistence(self, caplog, bot): def test_construction_with_bad_persistence(self, caplog, bot):
class my_per: class my_per:
def __init__(self): def __init__(self):
@ -124,7 +137,7 @@ class TestDispatcher:
self.store_bot_data = False self.store_bot_data = False
with pytest.raises(TypeError, with pytest.raises(TypeError,
match='persistence should be based on telegram.ext.BasePersistence'): match='persistence must be based on telegram.ext.BasePersistence'):
Dispatcher(bot, None, persistence=my_per()) Dispatcher(bot, None, persistence=my_per())
def test_error_handler_that_raises_errors(self, dp): def test_error_handler_that_raises_errors(self, dp):
@ -190,6 +203,129 @@ class TestDispatcher:
sleep(.1) sleep(.1)
assert self.received == self.message_update.message assert self.received == self.message_update.message
def test_multiple_run_async_deprecation(self, dp):
assert isinstance(dp, Dispatcher)
@run_async
def callback(update, context):
pass
dp.add_handler(MessageHandler(Filters.all, callback))
with pytest.warns(TelegramDeprecationWarning, match='@run_async decorator'):
dp.process_update(self.message_update)
def test_async_raises_dispatcher_handler_stop(self, dp, caplog):
@run_async
def callback(update, context):
raise DispatcherHandlerStop()
dp.add_handler(MessageHandler(Filters.all, callback))
with caplog.at_level(logging.WARNING):
dp.update_queue.put(self.message_update)
sleep(.1)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('DispatcherHandlerStop is not supported '
'with async functions')
def test_async_raises_exception(self, dp, caplog):
@run_async
def callback(update, context):
raise RuntimeError('async raising exception')
dp.add_handler(MessageHandler(Filters.all, callback))
with caplog.at_level(logging.WARNING):
dp.update_queue.put(self.message_update)
sleep(.1)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('A promise with deactivated error handling')
def test_add_async_handler(self, dp):
dp.add_handler(MessageHandler(Filters.all,
self.callback_if_not_update_queue,
pass_update_queue=True,
run_async=True))
dp.update_queue.put(self.message_update)
sleep(.1)
assert self.received == self.message_update.message
def test_run_async_no_error_handler(self, dp, caplog):
def func():
raise RuntimeError('Async Error')
with caplog.at_level(logging.ERROR):
dp.run_async(func)
sleep(.1)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('No error handlers are registered')
def test_async_handler_error_handler(self, dp):
dp.add_handler(MessageHandler(Filters.all,
self.callback_raise_error,
run_async=True))
dp.add_error_handler(self.error_handler)
dp.update_queue.put(self.message_update)
sleep(.1)
assert self.received == self.message_update.message.text
def test_async_handler_async_error_handler_context(self, cdp):
cdp.add_handler(MessageHandler(Filters.all,
self.callback_raise_error,
run_async=True))
cdp.add_error_handler(self.error_handler_context, run_async=True)
cdp.update_queue.put(self.message_update)
sleep(2)
assert self.received == self.message_update.message.text
def test_async_handler_error_handler_that_raises_error(self, dp, caplog):
handler = MessageHandler(Filters.all,
self.callback_raise_error,
run_async=True)
dp.add_handler(handler)
dp.add_error_handler(self.error_handler_raise_error, run_async=False)
with caplog.at_level(logging.ERROR):
dp.update_queue.put(self.message_update)
sleep(.1)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('An uncaught error was raised')
# Make sure that the main loop still runs
dp.remove_handler(handler)
dp.add_handler(MessageHandler(Filters.all,
self.callback_increase_count,
run_async=True))
dp.update_queue.put(self.message_update)
sleep(.1)
assert self.count == 1
def test_async_handler_async_error_handler_that_raises_error(self, dp, caplog):
handler = MessageHandler(Filters.all,
self.callback_raise_error,
run_async=True)
dp.add_handler(handler)
dp.add_error_handler(self.error_handler_raise_error, run_async=True)
with caplog.at_level(logging.ERROR):
dp.update_queue.put(self.message_update)
sleep(.1)
assert len(caplog.records) == 1
assert caplog.records[-1].msg.startswith('An uncaught error was raised')
# Make sure that the main loop still runs
dp.remove_handler(handler)
dp.add_handler(MessageHandler(Filters.all,
self.callback_increase_count,
run_async=True))
dp.update_queue.put(self.message_update)
sleep(.1)
assert self.count == 1
def test_error_in_handler(self, dp): def test_error_in_handler(self, dp):
dp.add_handler(MessageHandler(Filters.all, self.callback_raise_error)) dp.add_handler(MessageHandler(Filters.all, self.callback_raise_error))
dp.add_error_handler(self.error_handler) dp.add_error_handler(self.error_handler)

View file

@ -253,6 +253,9 @@ class TestBasePersistence:
rec = caplog.records[-2] rec = caplog.records[-2]
assert rec.msg == 'No error handlers are registered, logging exception.' assert rec.msg == 'No error handlers are registered, logging exception.'
assert rec.levelname == 'ERROR' assert rec.levelname == 'ERROR'
rec = caplog.records[-3]
assert rec.msg == 'No error handlers are registered, logging exception.'
assert rec.levelname == 'ERROR'
m.from_user = user2 m.from_user = user2
m.chat = chat1 m.chat = chat1
u = Update(1, m) u = Update(1, m)
@ -281,6 +284,103 @@ class TestBasePersistence:
assert dp.chat_data[-987654][2] == 'test8' assert dp.chat_data[-987654][2] == 'test8'
assert dp.bot_data['test0'] == 'test0' assert dp.bot_data['test0'] == 'test0'
def test_dispatcher_integration_handlers_run_async(self, cdp, caplog, bot, base_persistence,
chat_data, user_data, bot_data):
def get_user_data():
return user_data
def get_chat_data():
return chat_data
def get_bot_data():
return bot_data
base_persistence.get_user_data = get_user_data
base_persistence.get_chat_data = get_chat_data
base_persistence.get_bot_data = get_bot_data
cdp.persistence = base_persistence
cdp.user_data = user_data
cdp.chat_data = chat_data
cdp.bot_data = bot_data
def callback_known_user(update, context):
if not context.user_data['test1'] == 'test2':
pytest.fail('user_data corrupt')
if not context.bot_data == bot_data:
pytest.fail('bot_data corrupt')
def callback_known_chat(update, context):
if not context.chat_data['test3'] == 'test4':
pytest.fail('chat_data corrupt')
if not context.bot_data == bot_data:
pytest.fail('bot_data corrupt')
def callback_unknown_user_or_chat(update, context):
if not context.user_data == {}:
pytest.fail('user_data corrupt')
if not context.chat_data == {}:
pytest.fail('chat_data corrupt')
if not context.bot_data == bot_data:
pytest.fail('bot_data corrupt')
context.user_data[1] = 'test7'
context.chat_data[2] = 'test8'
context.bot_data['test0'] = 'test0'
known_user = MessageHandler(Filters.user(user_id=12345), callback_known_user,
pass_chat_data=True, pass_user_data=True, run_async=True)
known_chat = MessageHandler(Filters.chat(chat_id=-67890), callback_known_chat,
pass_chat_data=True, pass_user_data=True, run_async=True)
unknown = MessageHandler(Filters.all, callback_unknown_user_or_chat, pass_chat_data=True,
pass_user_data=True, run_async=True)
cdp.add_handler(known_user)
cdp.add_handler(known_chat)
cdp.add_handler(unknown)
user1 = User(id=12345, first_name='test user', is_bot=False)
user2 = User(id=54321, first_name='test user', is_bot=False)
chat1 = Chat(id=-67890, type='group')
chat2 = Chat(id=-987654, type='group')
m = Message(1, user1, None, chat2)
u = Update(0, m)
with caplog.at_level(logging.ERROR):
cdp.process_update(u)
sleep(.1)
rec = caplog.records[-1]
assert rec.msg == 'No error handlers are registered, logging exception.'
assert rec.levelname == 'ERROR'
rec = caplog.records[-2]
assert rec.msg == 'No error handlers are registered, logging exception.'
assert rec.levelname == 'ERROR'
m.from_user = user2
m.chat = chat1
u = Update(1, m)
cdp.process_update(u)
m.chat = chat2
u = Update(2, m)
def save_bot_data(data):
if 'test0' not in data:
pytest.fail()
def save_chat_data(data):
if -987654 not in data:
pytest.fail()
def save_user_data(data):
if 54321 not in data:
pytest.fail()
base_persistence.update_chat_data = save_chat_data
base_persistence.update_user_data = save_user_data
base_persistence.update_bot_data = save_bot_data
cdp.process_update(u)
sleep(0.1)
assert cdp.user_data[54321][1] == 'test7'
assert cdp.chat_data[-987654][2] == 'test8'
assert cdp.bot_data['test0'] == 'test0'
def test_persistence_dispatcher_arbitrary_update_types(self, dp, base_persistence, caplog): def test_persistence_dispatcher_arbitrary_update_types(self, dp, base_persistence, caplog):
# Updates used with TypeHandler doesn't necessarily have the proper attributes for # Updates used with TypeHandler doesn't necessarily have the proper attributes for
# persistence, makes sure it works anyways # persistence, makes sure it works anyways