python-telegram-bot/telegram/ext/dispatcher.py

730 lines
30 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
2022-01-03 11:15:18 +04:00
# Copyright (C) 2015-2022
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the Dispatcher class."""
import logging
import warnings
import weakref
from collections import defaultdict
from queue import Empty, Queue
from threading import BoundedSemaphore, Event, Lock, Thread, current_thread
2016-01-06 15:35:55 +01:00
from time import sleep
from typing import (
TYPE_CHECKING,
Callable,
2021-06-06 11:48:48 +02:00
DefaultDict,
Dict,
List,
Optional,
Set,
Union,
Generic,
TypeVar,
overload,
cast,
)
from uuid import uuid4
from telegram import TelegramError, Update
from telegram.ext import BasePersistence, ContextTypes
from telegram.ext.handler import Handler
2021-06-06 11:48:48 +02:00
import telegram.ext.extbot
from telegram.ext.callbackdatacache import CallbackDataCache
from telegram.ext.utils.promise import Promise
from telegram.utils.helpers import DefaultValue, DEFAULT_FALSE
from telegram.ext.utils.types import CCT, UD, CD, BD
2020-10-06 19:28:40 +02:00
if TYPE_CHECKING:
from telegram import Bot
from telegram.ext import JobQueue
from telegram.ext.callbackcontext import CallbackContext
2020-10-06 19:28:40 +02:00
DEFAULT_GROUP: int = 0
2015-11-06 00:24:01 +01:00
UT = TypeVar('UT')
class DispatcherHandlerStop(Exception):
"""
Raise this in handler to prevent execution of any other handler (even in different group).
In order to use this exception in a :class:`telegram.ext.ConversationHandler`, pass the
optional ``state`` parameter instead of returning the next state:
.. code-block:: python
def callback(update, context):
...
raise DispatcherHandlerStop(next_state)
Attributes:
state (:obj:`object`): Optional. The next state of the conversation.
Args:
state (:obj:`object`, optional): The next state of the conversation.
"""
__slots__ = ('state',)
2020-10-06 19:28:40 +02:00
def __init__(self, state: object = None) -> None:
super().__init__()
self.state = state
class Dispatcher(Generic[CCT, UD, CD, BD]):
2017-09-01 08:43:08 +02:00
"""This class dispatches all kinds of updates to its registered handlers.
Args:
bot (:class:`telegram.Bot`): The bot object that should be passed to the handlers.
update_queue (:obj:`Queue`): The synchronized queue that will contain the updates.
job_queue (:class:`telegram.ext.JobQueue`, optional): The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks.
workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the
2020-10-04 17:20:33 +02:00
``@run_async`` decorator and :meth:`run_async`. Defaults to 4.
persistence (:class:`telegram.ext.BasePersistence`, optional): The persistence class to
Documentation Improvements (#2008) * Minor doc updates, following official API docs * Fix spelling in Defaults docstrings * Clarify Changelog of v12.7 about aware dates * Fix typo in CHANGES.rst (#2024) * Fix PicklePersistence.flush() with only bot_data (#2017) * Update pylint in pre-commit to fix CI (#2018) * Add Filters.via_bot (#2009) * feat: via_bot filter also fixing a small mistake in the empty parameter of the user filter and improve docs slightly * fix: forgot to set via_bot to None * fix: redoing subclassing to copy paste solution * Cosmetic changes Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de> * Update CHANGES.rst Fixed Typo Co-authored-by: Bibo-Joshi <hinrich.mahler@freenet.de> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> * Update downloads badge, add info on IRC Channel to Getting Help section * Remove RegexHandler from ConversationHandlers Docs (#1973) Replaced RegexHandler with MessageHandler, since the former is deprecated * Fix Filters.via_bot docstrings * Add notes on Markdown v1 being legacy mode * Fixed typo in the Regex doc.. (#2036) * Typo: Spelling * Minor cleanup from #2043 * Document CommandHandler ignoring channel posts * Doc fixes for a few telegram.ext classes * Doc fixes for most `telegram` classes. * pep-8 forgot the hard wrap is at 99 chars, not 100! fixed a few spelling mistakes too. * Address review and made rendering of booleans consistent True, False, None are now rendered with ``bool`` wherever they weren't in telegram and telegram.ext classes. * Few doc fixes for inline* classes As usual, docs were cross-checked with official tg api docs. * Doc fixes for telegram/files classes As usual, docs were cross-checked with official tg api docs. * Doc fixes for telegram.Game Mostly just added hyperlinks. And fixed message length doc. As usual, docs were cross-checked with official tg api docs. * Very minor doc fix for passportfile.py and passportelementerrors.py Didn't bother changing too much since this seems to be a custom implementation. * Doc fixes for telegram.payments As usual, cross-checked with official bot api docs. * Address review 2 Few tiny other fixes too. * Changed from ``True/False/None`` to :obj:`True/False/None` project-wide. Few tiny other doc fixes too. Co-authored-by: Robert Geislinger <mitachundkrach@gmail.com> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> Co-authored-by: GauthamramRavichandran <30320759+GauthamramRavichandran@users.noreply.github.com> Co-authored-by: Mahesh19 <maheshvagicherla99438@gmail.com> Co-authored-by: hoppingturtles <ilovebhagwan@gmail.com>
2020-08-24 19:35:57 +02:00
store data that should be persistent over restarts.
context_types (:class:`telegram.ext.ContextTypes`, optional): Pass an instance
of :class:`telegram.ext.ContextTypes` to customize the types used in the
``context`` interface. If not passed, the defaults documented in
:class:`telegram.ext.ContextTypes` will be used.
.. versionadded:: 13.6
2017-09-01 08:43:08 +02:00
Attributes:
bot (:class:`telegram.Bot`): The bot object that should be passed to the handlers.
update_queue (:obj:`Queue`): The synchronized queue that will contain the updates.
job_queue (:class:`telegram.ext.JobQueue`): Optional. The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks.
workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the
``@run_async`` decorator and :meth:`run_async`.
user_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the user.
chat_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the chat.
bot_data (:obj:`dict`): A dictionary handlers can use to store data for the bot.
persistence (:class:`telegram.ext.BasePersistence`): Optional. The persistence class to
store data that should be persistent over restarts.
context_types (:class:`telegram.ext.ContextTypes`): Container for the types used
in the ``context`` interface.
.. versionadded:: 13.6
"""
# Allowing '__weakref__' creation here since we need it for the singleton
__slots__ = (
'workers',
'persistence',
'update_queue',
'job_queue',
'user_data',
'chat_data',
'bot_data',
'_update_persistence_lock',
'handlers',
'groups',
'error_handlers',
'running',
'__stop_event',
'__exception_event',
'__async_queue',
'__async_threads',
'bot',
'__weakref__',
'context_types',
)
__singleton_lock = Lock()
__singleton_semaphore = BoundedSemaphore()
__singleton = None
logger = logging.getLogger(__name__)
2016-05-14 22:46:40 -03:00
@overload
def __init__(
self: 'Dispatcher[CallbackContext[Dict, Dict, Dict], Dict, Dict, Dict]',
bot: 'Bot',
update_queue: Queue,
workers: int = 4,
exception_event: Event = None,
job_queue: 'JobQueue' = None,
persistence: BasePersistence = None,
):
...
@overload
def __init__(
self: 'Dispatcher[CCT, UD, CD, BD]',
bot: 'Bot',
update_queue: Queue,
workers: int = 4,
exception_event: Event = None,
job_queue: 'JobQueue' = None,
persistence: BasePersistence = None,
context_types: ContextTypes[CCT, UD, CD, BD] = None,
):
...
def __init__(
self,
bot: 'Bot',
update_queue: Queue,
workers: int = 4,
exception_event: Event = None,
job_queue: 'JobQueue' = None,
persistence: BasePersistence = None,
context_types: ContextTypes[CCT, UD, CD, BD] = None,
):
self.bot = bot
self.update_queue = update_queue
self.job_queue = job_queue
self.workers = workers
self.context_types = cast(ContextTypes[CCT, UD, CD, BD], context_types or ContextTypes())
if self.workers < 1:
warnings.warn(
'Asynchronous callbacks can not be processed without at least one worker thread.'
)
self.user_data: DefaultDict[int, UD] = defaultdict(self.context_types.user_data)
self.chat_data: DefaultDict[int, CD] = defaultdict(self.context_types.chat_data)
self.bot_data = self.context_types.bot_data()
2020-10-06 19:28:40 +02:00
self.persistence: Optional[BasePersistence] = None
2020-10-04 17:20:33 +02:00
self._update_persistence_lock = Lock()
if persistence:
if not isinstance(persistence, BasePersistence):
2020-10-04 17:20:33 +02:00
raise TypeError("persistence must be based on telegram.ext.BasePersistence")
self.persistence = persistence
self.persistence.set_bot(self.bot)
if self.persistence.store_data.user_data:
self.user_data = self.persistence.get_user_data()
if not isinstance(self.user_data, defaultdict):
raise ValueError("user_data must be of type defaultdict")
if self.persistence.store_data.chat_data:
self.chat_data = self.persistence.get_chat_data()
if not isinstance(self.chat_data, defaultdict):
raise ValueError("chat_data must be of type defaultdict")
if self.persistence.store_data.bot_data:
self.bot_data = self.persistence.get_bot_data()
if not isinstance(self.bot_data, self.context_types.bot_data):
raise ValueError(
f"bot_data must be of type {self.context_types.bot_data.__name__}"
)
if self.persistence.store_data.callback_data:
2021-06-06 11:48:48 +02:00
self.bot = cast(telegram.ext.extbot.ExtBot, self.bot)
persistent_data = self.persistence.get_callback_data()
if persistent_data is not None:
if not isinstance(persistent_data, tuple) and len(persistent_data) != 2:
raise ValueError('callback_data must be a 2-tuple')
self.bot.callback_data_cache = CallbackDataCache(
self.bot,
self.bot.callback_data_cache.maxsize,
persistent_data=persistent_data,
)
else:
self.persistence = None
2020-10-06 19:28:40 +02:00
self.handlers: Dict[int, List[Handler]] = {}
"""Dict[:obj:`int`, List[:class:`telegram.ext.Handler`]]: Holds the handlers per group."""
2020-10-06 19:28:40 +02:00
self.groups: List[int] = []
"""List[:obj:`int`]: A list with all groups."""
self.error_handlers: Dict[Callable, Union[bool, DefaultValue]] = {}
2020-10-04 17:20:33 +02:00
"""Dict[:obj:`callable`, :obj:`bool`]: A dict, where the keys are error handlers and the
values indicate whether they are to be run asynchronously."""
self.running = False
""":obj:`bool`: Indicates if this dispatcher is running."""
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
2020-10-06 19:28:40 +02:00
self.__async_queue: Queue = Queue()
self.__async_threads: Set[Thread] = set()
# For backward compatibility, we allow a "singleton" mode for the dispatcher. When there's
# only one instance of Dispatcher, it will be possible to use the `run_async` decorator.
with self.__singleton_lock:
if self.__singleton_semaphore.acquire(blocking=False): # pylint: disable=R1732
self._set_singleton(self)
else:
self._set_singleton(None)
@property
def exception_event(self) -> Event: # skipcq: PY-D0003
return self.__exception_event
2020-10-06 19:28:40 +02:00
def _init_async_threads(self, base_name: str, workers: int) -> None:
2020-11-23 22:09:29 +01:00
base_name = f'{base_name}_' if base_name else ''
for i in range(workers):
2020-11-23 22:09:29 +01:00
thread = Thread(target=self._pooled, name=f'Bot:{self.bot.id}:worker:{base_name}{i}')
self.__async_threads.add(thread)
thread.start()
@classmethod
2020-10-06 19:28:40 +02:00
def _set_singleton(cls, val: Optional['Dispatcher']) -> None:
cls.logger.debug('Setting singleton dispatcher as %s', val)
cls.__singleton = weakref.ref(val) if val else None
@classmethod
2020-10-06 19:28:40 +02:00
def get_instance(cls) -> 'Dispatcher':
2017-09-01 08:43:08 +02:00
"""Get the singleton instance of this class.
Returns:
:class:`telegram.ext.Dispatcher`
Raises:
RuntimeError
2017-09-01 08:43:08 +02:00
"""
if cls.__singleton is not None:
2020-10-06 19:28:40 +02:00
return cls.__singleton() # type: ignore[return-value] # pylint: disable=not-callable
2020-11-23 22:09:29 +01:00
raise RuntimeError(f'{cls.__name__} not initialized or multiple instances exist')
2020-10-06 19:28:40 +02:00
def _pooled(self) -> None:
2022-01-03 11:15:18 +04:00
thr_name = current_thread().name
while 1:
promise = self.__async_queue.get()
# If unpacking fails, the thread pool is being closed from Updater._join_async_threads
if not isinstance(promise, Promise):
self.logger.debug(
"Closing run_async thread %s/%d", thr_name, len(self.__async_threads)
)
break
2017-02-26 23:27:03 +02:00
promise.run()
2020-10-04 17:20:33 +02:00
if not promise.exception:
self.update_persistence(update=promise.update)
continue
if isinstance(promise.exception, DispatcherHandlerStop):
self.logger.warning(
'DispatcherHandlerStop is not supported with async functions; func: %s',
promise.pooled_function.__name__,
)
2020-10-04 17:20:33 +02:00
continue
# Avoid infinite recursion of error handlers.
if promise.pooled_function in self.error_handlers:
self.logger.error('An uncaught error was raised while handling the error.')
continue
# If we arrive here, an exception happened in the promise and was neither
# DispatcherHandlerStop nor raised by an error handler. So we can and must handle it
try:
self.dispatch_error(promise.update, promise.exception, promise=promise)
except Exception:
self.logger.exception('An uncaught error was raised while handling the error.')
def run_async(
self, func: Callable[..., object], *args: object, update: object = None, **kwargs: object
) -> Promise:
2020-10-04 17:20:33 +02:00
"""
Queue a function (with given args/kwargs) to be run asynchronously. Exceptions raised
by the function will be handled by the error handlers registered with
:meth:`add_error_handler`.
Warning:
2020-10-04 17:20:33 +02:00
* If you're using ``@run_async``/:meth:`run_async` you cannot rely on adding custom
attributes to :class:`telegram.ext.CallbackContext`. See its docs for more info.
* Calling a function through :meth:`run_async` from within an error handler can lead to
an infinite error handling loop.
Args:
func (:obj:`callable`): The function to run in the thread.
2020-10-04 17:20:33 +02:00
*args (:obj:`tuple`, optional): Arguments to ``func``.
update (:class:`telegram.Update` | :obj:`object`, optional): The update associated with
the functions call. If passed, it will be available in the error handlers, in case
an exception is raised by :attr:`func`.
2020-10-04 17:20:33 +02:00
**kwargs (:obj:`dict`, optional): Keyword arguments to ``func``.
Returns:
Promise
"""
promise = Promise(func, args, kwargs, update=update)
self.__async_queue.put(promise)
return promise
2020-10-06 19:28:40 +02:00
def start(self, ready: Event = None) -> None:
2017-09-01 08:43:08 +02:00
"""Thread target of thread 'dispatcher'.
2017-09-01 08:43:08 +02:00
Runs in background and processes the update queue.
Args:
ready (:obj:`threading.Event`, optional): If specified, the event will be set once the
dispatcher is ready.
2017-09-01 08:43:08 +02:00
"""
if self.running:
self.logger.warning('already running')
if ready is not None:
ready.set()
return
if self.__exception_event.is_set():
msg = 'reusing dispatcher after exception event is forbidden'
self.logger.error(msg)
raise TelegramError(msg)
2020-10-06 19:28:40 +02:00
self._init_async_threads(str(uuid4()), self.workers)
self.running = True
self.logger.debug('Dispatcher started')
if ready is not None:
ready.set()
while 1:
try:
# Pop update from update queue.
update = self.update_queue.get(True, 1)
except Empty:
if self.__stop_event.is_set():
self.logger.debug('orderly stopping')
break
if self.__exception_event.is_set():
2016-05-14 22:46:40 -03:00
self.logger.critical('stopping due to exception in another thread')
break
continue
self.logger.debug('Processing Update: %s', update)
self.process_update(update)
self.update_queue.task_done()
self.running = False
self.logger.debug('Dispatcher thread stopped')
2020-10-06 19:28:40 +02:00
def stop(self) -> None:
2017-09-01 08:43:08 +02:00
"""Stops the thread."""
if self.running:
self.__stop_event.set()
while self.running:
sleep(0.1)
self.__stop_event.clear()
# async threads must be join()ed only after the dispatcher thread was joined,
# otherwise we can still have new async threads dispatched
threads = list(self.__async_threads)
total = len(threads)
# Stop all threads in the thread pool by put()ting one non-tuple per thread
for i in range(total):
self.__async_queue.put(None)
for i, thr in enumerate(threads):
self.logger.debug('Waiting for async thread %s/%s to end', i + 1, total)
thr.join()
self.__async_threads.remove(thr)
self.logger.debug('async thread %s/%s has ended', i + 1, total)
@property
def has_running_threads(self) -> bool: # skipcq: PY-D0003
return self.running or bool(self.__async_threads)
def process_update(self, update: object) -> None:
"""Processes a single update and updates the persistence.
Note:
If the update is handled by least one synchronously running handlers (i.e.
Doc Fixes (#2253) * Render-fixes for BP * docs: fix simple typo, submition -> submission (#2260) There is a small typo in tests/test_bot.py. Should read `submission` rather than `submition`. * Type on rawapibot.py docstring * typo * Typo: Filters.document(s) * Typo fix * Doc fix for messageentity (#2311) * Add New Shortcuts to Chat (#2291) * Add shortcuts * Add a note * Add run_async Parameter to ConversationHandler (#2292) * Add run_async parameter * Update docstring * Update test to explicitly specify parameter * Fix test job queue * Add version added tag to docs * Update docstring Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> * Doc nitpicking Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de> * Fix rendering in messageentity Co-authored-by: Bibo-Joshi <hinrich.mahler@freenet.de> Co-authored-by: zeshuaro <joshuaystang@gmail.com> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> * fix: type hints for TelegramError changed :class:`telegram.TelegramError` to :class:`telegram.error.TelegramError` * fix: the error can be more then just a Telegram error * Doc fix for inlinekeyboardbutton.py added missing colon which broke rendering * fix: remove context argument and doc remark look at us already being in post 12 * use rtd badge * filters doc fixes * fix some rendering * Doc & Rendering fixes for helpers.py Co-authored-by: Tim Gates <tim.gates@iress.com> Co-authored-by: Harshil <37377066+harshil21@users.noreply.github.com> Co-authored-by: zeshuaro <joshuaystang@gmail.com> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> Co-authored-by: Harshil <ilovebhagwan@gmail.com>
2021-02-01 17:59:39 +01:00
``run_async=False``), :meth:`update_persistence` is called *once* after all handlers
synchronous handlers are done. Each asynchronously running handler will trigger
:meth:`update_persistence` on its own.
Args:
2021-02-02 20:57:14 +01:00
update (:class:`telegram.Update` | :obj:`object` | \
:class:`telegram.error.TelegramError`):
The update to process.
2017-09-01 08:43:08 +02:00
"""
# An error happened while polling
if isinstance(update, TelegramError):
try:
self.dispatch_error(None, update)
except Exception:
2020-10-04 17:20:33 +02:00
self.logger.exception('An uncaught error was raised while handling the error.')
return
context = None
handled = False
sync_modes = []
for group in self.groups:
try:
for handler in self.handlers[group]:
check = handler.check_update(update)
if check is not None and check is not False:
2021-08-29 18:15:04 +02:00
if not context:
context = self.context_types.context.from_update(update, self)
context.refresh_data()
handled = True
sync_modes.append(handler.run_async)
handler.handle_update(update, self, check, context)
break
# Stop processing with any other handler.
except DispatcherHandlerStop:
self.logger.debug('Stopping further handlers due to DispatcherHandlerStop')
self.update_persistence(update=update)
break
# Dispatch any error.
except Exception as exc:
try:
self.dispatch_error(update, exc)
except DispatcherHandlerStop:
self.logger.debug('Error handler stopped further handlers')
break
# Errors should not stop the thread.
except Exception:
2020-10-04 17:20:33 +02:00
self.logger.exception('An uncaught error was raised while handling the error.')
# Update persistence, if handled
handled_only_async = all(sync_modes)
if handled:
# Respect default settings
if all(mode is DEFAULT_FALSE for mode in sync_modes) and self.bot.defaults:
handled_only_async = self.bot.defaults.run_async
# If update was only handled by async handlers, we don't need to update here
if not handled_only_async:
self.update_persistence(update=update)
def add_handler(self, handler: Handler[UT, CCT], group: int = DEFAULT_GROUP) -> None:
"""Register a handler.
2016-04-25 10:45:55 +03:00
TL;DR: Order and priority counts. 0 or 1 handlers per group will be used. End handling of
update with :class:`telegram.ext.DispatcherHandlerStop`.
2016-04-25 10:45:55 +03:00
A handler must be an instance of a subclass of :class:`telegram.ext.Handler`. All handlers
are organized in groups with a numeric value. The default group is 0. All groups will be
evaluated for handling an update, but only 0 or 1 handler per group will be used. If
:class:`telegram.ext.DispatcherHandlerStop` is raised from one of the handlers, no further
handlers (regardless of the group) will be called.
2016-04-25 10:45:55 +03:00
The priority/order of handlers is determined as follows:
* Priority of the group (lower group number == higher priority)
* The first handler in a group which should handle an update (see
2017-09-01 10:38:04 +02:00
:attr:`telegram.ext.Handler.check_update`) will be used. Other handlers from the
group will not be used. The order in which handlers were added to the group defines the
priority.
Args:
handler (:class:`telegram.ext.Handler`): A Handler instance.
group (:obj:`int`, optional): The group identifier. Default is 0.
"""
# Unfortunately due to circular imports this has to be here
from .conversationhandler import ConversationHandler # pylint: disable=C0415
if not isinstance(handler, Handler):
2020-11-23 22:09:29 +01:00
raise TypeError(f'handler is not an instance of {Handler.__name__}')
if not isinstance(group, int):
raise TypeError('group is not int')
# For some reason MyPy infers the type of handler is <nothing> here,
# so for now we just ignore all the errors
if (
isinstance(handler, ConversationHandler)
and handler.persistent # type: ignore[attr-defined]
and handler.name # type: ignore[attr-defined]
):
if not self.persistence:
raise ValueError(
f"ConversationHandler {handler.name} " # type: ignore[attr-defined]
f"can not be persistent if dispatcher has no persistence"
)
handler.persistence = self.persistence # type: ignore[attr-defined]
handler.conversations = ( # type: ignore[attr-defined]
self.persistence.get_conversations(handler.name) # type: ignore[attr-defined]
)
if group not in self.handlers:
self.handlers[group] = []
self.groups.append(group)
self.groups = sorted(self.groups)
self.handlers[group].append(handler)
2020-10-06 19:28:40 +02:00
def remove_handler(self, handler: Handler, group: int = DEFAULT_GROUP) -> None:
2017-09-01 08:43:08 +02:00
"""Remove a handler from the specified group.
Args:
handler (:class:`telegram.ext.Handler`): A Handler instance.
group (:obj:`object`, optional): The group identifier. Default is 0.
2017-09-01 08:43:08 +02:00
"""
if handler in self.handlers[group]:
self.handlers[group].remove(handler)
if not self.handlers[group]:
del self.handlers[group]
self.groups.remove(group)
def update_persistence(self, update: object = None) -> None:
"""Update :attr:`user_data`, :attr:`chat_data` and :attr:`bot_data` in :attr:`persistence`.
Args:
2020-04-10 23:43:58 +02:00
update (:class:`telegram.Update`, optional): The update to process. If passed, only the
corresponding ``user_data`` and ``chat_data`` will be updated.
"""
2020-10-04 17:20:33 +02:00
with self._update_persistence_lock:
self.__update_persistence(update)
def __update_persistence(self, update: object = None) -> None:
if self.persistence:
2020-10-04 17:20:33 +02:00
# We use list() here in order to decouple chat_ids from self.chat_data, as dict view
# objects will change, when the dict does and we want to loop over chat_ids
chat_ids = list(self.chat_data.keys())
user_ids = list(self.user_data.keys())
if isinstance(update, Update):
if update.effective_chat:
chat_ids = [update.effective_chat.id]
else:
chat_ids = []
if update.effective_user:
user_ids = [update.effective_user.id]
else:
user_ids = []
if self.persistence.store_data.callback_data:
2021-06-06 11:48:48 +02:00
self.bot = cast(telegram.ext.extbot.ExtBot, self.bot)
try:
self.persistence.update_callback_data(
self.bot.callback_data_cache.persistence_data
)
except Exception as exc:
try:
self.dispatch_error(update, exc)
except Exception:
message = (
'Saving callback data raised an error and an '
'uncaught error was raised while handling '
'the error with an error_handler'
)
self.logger.exception(message)
if self.persistence.store_data.bot_data:
try:
self.persistence.update_bot_data(self.bot_data)
except Exception as exc:
try:
self.dispatch_error(update, exc)
except Exception:
message = (
'Saving bot data raised an error and an '
'uncaught error was raised while handling '
'the error with an error_handler'
)
self.logger.exception(message)
if self.persistence.store_data.chat_data:
for chat_id in chat_ids:
try:
self.persistence.update_chat_data(chat_id, self.chat_data[chat_id])
except Exception as exc:
try:
self.dispatch_error(update, exc)
except Exception:
message = (
'Saving chat data raised an error and an '
'uncaught error was raised while handling '
'the error with an error_handler'
)
self.logger.exception(message)
if self.persistence.store_data.user_data:
for user_id in user_ids:
try:
self.persistence.update_user_data(user_id, self.user_data[user_id])
except Exception as exc:
try:
self.dispatch_error(update, exc)
except Exception:
message = (
'Saving user data raised an error and an '
'uncaught error was raised while handling '
'the error with an error_handler'
)
self.logger.exception(message)
def add_error_handler(
self,
callback: Callable[[object, CCT], None],
run_async: Union[bool, DefaultValue] = DEFAULT_FALSE, # pylint: disable=W0621
) -> None:
"""Registers an error handler in the Dispatcher. This handler will receive every error
which happens in your bot.
2020-10-04 17:20:33 +02:00
Note:
Attempts to add the same callback multiple times will be ignored.
Warning:
The errors handled within these handlers won't show up in the logger, so you
need to make sure that you reraise the error.
Args:
callback (:obj:`callable`): The callback function for this error handler. Will be
2021-08-29 18:15:04 +02:00
called when an error is raised.
Callback signature: ``def callback(update: Update, context: CallbackContext)``
The error that happened will be present in context.error.
2020-10-04 17:20:33 +02:00
run_async (:obj:`bool`, optional): Whether this handlers callback should be run
asynchronously using :meth:`run_async`. Defaults to :obj:`False`.
2017-09-01 08:43:08 +02:00
"""
2020-10-04 17:20:33 +02:00
if callback in self.error_handlers:
self.logger.debug('The callback is already registered as an error handler. Ignoring.')
return
if run_async is DEFAULT_FALSE and self.bot.defaults and self.bot.defaults.run_async:
run_async = True
2020-10-04 17:20:33 +02:00
self.error_handlers[callback] = run_async
2016-01-04 17:31:06 +01:00
def remove_error_handler(self, callback: Callable[[object, CCT], None]) -> None:
2017-09-01 08:43:08 +02:00
"""Removes an error handler.
Args:
callback (:obj:`callable`): The error handler to remove.
2017-09-01 08:43:08 +02:00
"""
2020-10-04 17:20:33 +02:00
self.error_handlers.pop(callback, None)
2016-01-04 17:31:06 +01:00
def dispatch_error(
self, update: Optional[object], error: Exception, promise: Promise = None
) -> None:
2017-09-01 08:43:08 +02:00
"""Dispatches an error.
Args:
update (:obj:`object` | :class:`telegram.Update`): The update that caused the error.
error (:obj:`Exception`): The error that was raised.
2020-10-04 17:20:33 +02:00
promise (:class:`telegram.utils.Promise`, optional): The promise whose pooled function
raised the error.
2017-09-01 08:43:08 +02:00
"""
2020-10-04 17:20:33 +02:00
async_args = None if not promise else promise.args
async_kwargs = None if not promise else promise.kwargs
if self.error_handlers:
for callback, run_async in self.error_handlers.items(): # pylint: disable=W0621
2021-08-29 18:15:04 +02:00
context = self.context_types.context.from_error(
update, error, self, async_args=async_args, async_kwargs=async_kwargs
)
if run_async:
self.run_async(callback, update, context, update=update)
else:
2021-08-29 18:15:04 +02:00
callback(update, context)
else:
self.logger.exception(
'No error handlers are registered, logging exception.', exc_info=error
)