python-telegram-bot/telegram/ext/dispatcher.py

433 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2018
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the Dispatcher class."""
import logging
import weakref
from functools import wraps
from threading import Thread, Lock, Event, current_thread, BoundedSemaphore
2016-01-06 15:35:55 +01:00
from time import sleep
from uuid import uuid4
from collections import defaultdict
from queue import Queue, Empty
from future.builtins import range
2016-04-16 19:25:38 +02:00
from telegram import TelegramError
from telegram.ext.handler import Handler
from telegram.utils.promise import Promise
from telegram.ext import BasePersistence
logging.getLogger(__name__).addHandler(logging.NullHandler())
DEFAULT_GROUP = 0
2015-11-06 00:24:01 +01:00
def run_async(func):
2017-09-01 08:43:08 +02:00
"""Function decorator that will run the function in a new thread.
Will run :attr:`telegram.ext.Dispatcher.run_async`.
Using this decorator is only possible when only a single Dispatcher exist in the system.
2017-09-01 08:43:08 +02:00
Note: Use this decorator to run handlers asynchronously.
2017-09-01 08:43:08 +02:00
"""
@wraps(func)
2016-06-01 20:11:00 +02:00
def async_func(*args, **kwargs):
return Dispatcher.get_instance().run_async(func, *args, **kwargs)
return async_func
class DispatcherHandlerStop(Exception):
"""Raise this in handler to prevent execution any other handler (even in different group)."""
pass
class Dispatcher(object):
2017-09-01 08:43:08 +02:00
"""This class dispatches all kinds of updates to its registered handlers.
Attributes:
bot (:class:`telegram.Bot`): The bot object that should be passed to the handlers.
update_queue (:obj:`Queue`): The synchronized queue that will contain the updates.
job_queue (:class:`telegram.ext.JobQueue`): Optional. The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks.
workers (:obj:`int`): Number of maximum concurrent worker threads for the ``@run_async``
decorator.
user_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the user.
chat_data (:obj:`defaultdict`): A dictionary handlers can use to store data for the chat.
persistence (:class:`telegram.ext.BasePersistence`): Optional. The persistence class to
store data that should be persistent over restarts
Args:
bot (:class:`telegram.Bot`): The bot object that should be passed to the handlers.
update_queue (:obj:`Queue`): The synchronized queue that will contain the updates.
job_queue (:class:`telegram.ext.JobQueue`, optional): The :class:`telegram.ext.JobQueue`
instance to pass onto handler callbacks.
workers (:obj:`int`, optional): Number of maximum concurrent worker threads for the
``@run_async`` decorator. defaults to 4.
persistence (:class:`telegram.ext.BasePersistence`, optional): The persistence class to
store data that should be persistent over restarts
2017-09-01 08:43:08 +02:00
"""
__singleton_lock = Lock()
__singleton_semaphore = BoundedSemaphore()
__singleton = None
logger = logging.getLogger(__name__)
2016-05-15 03:46:40 +02:00
def __init__(self, bot, update_queue, workers=4, exception_event=None, job_queue=None,
persistence=None):
self.bot = bot
self.update_queue = update_queue
self.workers = workers
self.user_data = defaultdict(dict)
self.chat_data = defaultdict(dict)
if persistence:
if not isinstance(persistence, BasePersistence):
raise TypeError("persistence should be based on telegram.ext.BasePersistence")
self.persistence = persistence
if self.persistence.store_user_data:
self.user_data = self.persistence.get_user_data()
if not isinstance(self.user_data, defaultdict):
raise ValueError("user_data must be of type defaultdict")
if self.persistence.store_chat_data:
self.chat_data = self.persistence.get_chat_data()
if not isinstance(self.chat_data, defaultdict):
raise ValueError("chat_data must be of type defaultdict")
else:
self.persistence = None
self.job_queue = job_queue
self.handlers = {}
"""Dict[:obj:`int`, List[:class:`telegram.ext.Handler`]]: Holds the handlers per group."""
self.groups = []
"""List[:obj:`int`]: A list with all groups."""
self.error_handlers = []
"""List[:obj:`callable`]: A list of errorHandlers."""
self.running = False
""":obj:`bool`: Indicates if this dispatcher is running."""
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
self.__async_queue = Queue()
self.__async_threads = set()
# For backward compatibility, we allow a "singleton" mode for the dispatcher. When there's
# only one instance of Dispatcher, it will be possible to use the `run_async` decorator.
with self.__singleton_lock:
if self.__singleton_semaphore.acquire(blocking=0):
self._set_singleton(self)
else:
self._set_singleton(None)
def _init_async_threads(self, base_name, workers):
base_name = '{}_'.format(base_name) if base_name else ''
for i in range(workers):
thread = Thread(target=self._pooled, name='{}{}'.format(base_name, i))
self.__async_threads.add(thread)
thread.start()
@classmethod
def _set_singleton(cls, val):
cls.logger.debug('Setting singleton dispatcher as %s', val)
cls.__singleton = weakref.ref(val) if val else None
@classmethod
def get_instance(cls):
2017-09-01 08:43:08 +02:00
"""Get the singleton instance of this class.
Returns:
:class:`telegram.ext.Dispatcher`
Raises:
RuntimeError
2017-09-01 08:43:08 +02:00
"""
if cls.__singleton is not None:
Bot API 4.0 (#1168) Telegram Passport (#1174): - Add full support for telegram passport. - New types: PassportData, PassportFile, EncryptedPassportElement, EncryptedCredentials, PassportElementError, PassportElementErrorDataField, PassportElementErrorFrontSide, PassportElementErrorReverseSide, PassportElementErrorSelfie, PassportElementErrorFile and PassportElementErrorFiles. - New bot method: set_passport_data_errors - New filter: Filters.passport_data - Field passport_data field on Message - PassportData is automagically decrypted when you specify your private key when creating Updater or Bot. - PassportFiles is also automagically decrypted as you download/retrieve them. - See new passportbot.py example for details on how to use, or go to our telegram passport wiki page for more info - NOTE: Passport decryption requires new dependency `cryptography`. Inputfile rework (#1184): - Change how Inputfile is handled internally - This allows support for specifying the thumbnails of photos and videos using the thumb= argument in the different send_ methods. - Also allows Bot.send_media_group to actually finally send more than one media. - Add thumb to Audio, Video and Videonote - Add Bot.edit_message_media together with InputMediaAnimation, InputMediaAudio, and inputMediaDocument. Other Bot API 4.0 changes: - Add forusquare_type to Venue, InlineQueryResultVenue, InputVenueMessageContent, and Bot.send_venue. (#1170) - Add vCard support by adding vcard field to Contact, InlineQueryResultContact, InputContactMessageContent, and Bot.send_contact. (#1166) - Support new message entities: CASHTAG and PHONE_NUMBER. (#1179) - Cashtag seems to be things like $USD and $GBP, but it seems telegram doesn't currently send them to bots. - Phone number also seems to have limited support for now - Add Bot.send_animation, add width, height, and duration to Animation, and add Filters.animation. (#1172) Co-authored-by: Jasmin Bom <jsmnbom@gmail.com> Co-authored-by: code1mountain <32801117+code1mountain@users.noreply.github.com> Co-authored-by: Eldinnie <pieter.schutz+github@gmail.com> Co-authored-by: mathefreak1 <mathefreak@hi2.in>
2018-08-29 14:18:58 +02:00
return cls.__singleton() # pylint: disable=not-callable
else:
raise RuntimeError('{} not initialized or multiple instances exist'.format(
cls.__name__))
def _pooled(self):
thr_name = current_thread().getName()
while 1:
promise = self.__async_queue.get()
# If unpacking fails, the thread pool is being closed from Updater._join_async_threads
if not isinstance(promise, Promise):
self.logger.debug("Closing run_async thread %s/%d", thr_name,
len(self.__async_threads))
break
2017-02-26 22:27:03 +01:00
promise.run()
if isinstance(promise.exception, DispatcherHandlerStop):
self.logger.warning(
'DispatcherHandlerStop is not supported with async functions; func: %s',
promise.pooled_function.__name__)
def run_async(self, func, *args, **kwargs):
2017-09-01 08:43:08 +02:00
"""Queue a function (with given args/kwargs) to be run asynchronously.
Args:
func (:obj:`callable`): The function to run in the thread.
*args (:obj:`tuple`, optional): Arguments to `func`.
**kwargs (:obj:`dict`, optional): Keyword arguments to `func`.
Returns:
Promise
"""
# TODO: handle exception in async threads
# set a threading.Event to notify caller thread
promise = Promise(func, args, kwargs)
self.__async_queue.put(promise)
return promise
def start(self, ready=None):
2017-09-01 08:43:08 +02:00
"""Thread target of thread 'dispatcher'.
2017-09-01 08:43:08 +02:00
Runs in background and processes the update queue.
Args:
ready (:obj:`threading.Event`, optional): If specified, the event will be set once the
dispatcher is ready.
2017-09-01 08:43:08 +02:00
"""
if self.running:
self.logger.warning('already running')
if ready is not None:
ready.set()
return
if self.__exception_event.is_set():
msg = 'reusing dispatcher after exception event is forbidden'
self.logger.error(msg)
raise TelegramError(msg)
self._init_async_threads(uuid4(), self.workers)
self.running = True
self.logger.debug('Dispatcher started')
if ready is not None:
ready.set()
while 1:
try:
# Pop update from update queue.
update = self.update_queue.get(True, 1)
except Empty:
if self.__stop_event.is_set():
self.logger.debug('orderly stopping')
break
2016-04-14 22:23:02 +02:00
elif self.__exception_event.is_set():
2016-05-15 03:46:40 +02:00
self.logger.critical('stopping due to exception in another thread')
break
continue
self.logger.debug('Processing Update: %s' % update)
self.process_update(update)
self.running = False
self.logger.debug('Dispatcher thread stopped')
def stop(self):
2017-09-01 08:43:08 +02:00
"""Stops the thread."""
if self.running:
self.__stop_event.set()
while self.running:
sleep(0.1)
self.__stop_event.clear()
# async threads must be join()ed only after the dispatcher thread was joined,
# otherwise we can still have new async threads dispatched
threads = list(self.__async_threads)
total = len(threads)
# Stop all threads in the thread pool by put()ting one non-tuple per thread
for i in range(total):
self.__async_queue.put(None)
for i, thr in enumerate(threads):
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total))
thr.join()
self.__async_threads.remove(thr)
self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total))
@property
def has_running_threads(self):
return self.running or bool(self.__async_threads)
def process_update(self, update):
2017-09-01 08:43:08 +02:00
"""Processes a single update.
Args:
update (:obj:`str` | :class:`telegram.Update` | :class:`telegram.TelegramError`):
The update to process.
2017-09-01 08:43:08 +02:00
"""
# An error happened while polling
if isinstance(update, TelegramError):
try:
self.dispatch_error(None, update)
except Exception:
self.logger.exception('An uncaught error was raised while handling the error')
return
for group in self.groups:
try:
for handler in (x for x in self.handlers[group] if x.check_update(update)):
handler.handle_update(update, self)
if self.persistence:
if self.persistence.store_chat_data and update.effective_chat.id:
chat_id = update.effective_chat.id
try:
self.persistence.update_chat_data(chat_id, self.chat_data[chat_id])
except Exception:
self.logger.exception('Saving chat data raised an error')
if self.persistence.store_user_data and update.effective_user.id:
user_id = update.effective_user.id
try:
self.persistence.update_user_data(user_id, self.user_data[user_id])
except Exception:
self.logger.exception('Saving user data raised an error')
break
# Stop processing with any other handler.
except DispatcherHandlerStop:
self.logger.debug('Stopping further handlers due to DispatcherHandlerStop')
break
# Dispatch any error.
except TelegramError as te:
self.logger.warning('A TelegramError was raised while processing the Update')
try:
self.dispatch_error(update, te)
except DispatcherHandlerStop:
self.logger.debug('Error handler stopped further handlers')
break
except Exception:
self.logger.exception('An uncaught error was raised while handling the error')
# Errors should not stop the thread.
except Exception:
self.logger.exception('An uncaught error was raised while processing the update')
def add_handler(self, handler, group=DEFAULT_GROUP):
"""Register a handler.
2016-04-25 09:45:55 +02:00
TL;DR: Order and priority counts. 0 or 1 handlers per group will be used.
2016-04-25 09:45:55 +02:00
A handler must be an instance of a subclass of :class:`telegram.ext.Handler`. All handlers
are organized in groups with a numeric value. The default group is 0. All groups will be
evaluated for handling an update, but only 0 or 1 handler per group will be used. If
:class:`telegram.ext.DispatcherHandlerStop` is raised from one of the handlers, no further
handlers (regardless of the group) will be called.
2016-04-25 09:45:55 +02:00
The priority/order of handlers is determined as follows:
* Priority of the group (lower group number == higher priority)
* The first handler in a group which should handle an update (see
2017-09-01 10:38:04 +02:00
:attr:`telegram.ext.Handler.check_update`) will be used. Other handlers from the
group will not be used. The order in which handlers were added to the group defines the
priority.
Args:
handler (:class:`telegram.ext.Handler`): A Handler instance.
group (:obj:`int`, optional): The group identifier. Default is 0.
"""
# Unfortunately due to circular imports this has to be here
from .conversationhandler import ConversationHandler
if not isinstance(handler, Handler):
2016-05-15 03:46:40 +02:00
raise TypeError('handler is not an instance of {0}'.format(Handler.__name__))
if not isinstance(group, int):
raise TypeError('group is not int')
if isinstance(handler, ConversationHandler) and handler.persistent:
if not self.persistence:
raise ValueError(
"Conversationhandler {} can not be persistent if dispatcher has no "
"persistence".format(handler.name))
handler.conversations = self.persistence.get_conversations(handler.name)
handler.persistence = self.persistence
if group not in self.handlers:
self.handlers[group] = list()
self.groups.append(group)
self.groups = sorted(self.groups)
self.handlers[group].append(handler)
def remove_handler(self, handler, group=DEFAULT_GROUP):
2017-09-01 08:43:08 +02:00
"""Remove a handler from the specified group.
Args:
handler (:class:`telegram.ext.Handler`): A Handler instance.
group (:obj:`object`, optional): The group identifier. Default is 0.
2017-09-01 08:43:08 +02:00
"""
if handler in self.handlers[group]:
self.handlers[group].remove(handler)
if not self.handlers[group]:
del self.handlers[group]
self.groups.remove(group)
def add_error_handler(self, callback):
2017-09-01 08:43:08 +02:00
"""Registers an error handler in the Dispatcher.
Args:
callback (:obj:`callable`): A function that takes ``Bot, Update, TelegramError`` as
arguments.
2017-09-01 08:43:08 +02:00
"""
self.error_handlers.append(callback)
2016-01-04 17:31:06 +01:00
def remove_error_handler(self, callback):
2017-09-01 08:43:08 +02:00
"""Removes an error handler.
Args:
callback (:obj:`callable`): The error handler to remove.
2017-09-01 08:43:08 +02:00
"""
if callback in self.error_handlers:
self.error_handlers.remove(callback)
2016-01-04 17:31:06 +01:00
def dispatch_error(self, update, error):
2017-09-01 08:43:08 +02:00
"""Dispatches an error.
Args:
update (:obj:`str` | :class:`telegram.Update` | None): The update that caused the error
error (:class:`telegram.TelegramError`): The Telegram error that was raised.
2017-09-01 08:43:08 +02:00
"""
if self.error_handlers:
for callback in self.error_handlers:
callback(self.bot, update, error)
else:
self.logger.exception(
Bot API 4.0 (#1168) Telegram Passport (#1174): - Add full support for telegram passport. - New types: PassportData, PassportFile, EncryptedPassportElement, EncryptedCredentials, PassportElementError, PassportElementErrorDataField, PassportElementErrorFrontSide, PassportElementErrorReverseSide, PassportElementErrorSelfie, PassportElementErrorFile and PassportElementErrorFiles. - New bot method: set_passport_data_errors - New filter: Filters.passport_data - Field passport_data field on Message - PassportData is automagically decrypted when you specify your private key when creating Updater or Bot. - PassportFiles is also automagically decrypted as you download/retrieve them. - See new passportbot.py example for details on how to use, or go to our telegram passport wiki page for more info - NOTE: Passport decryption requires new dependency `cryptography`. Inputfile rework (#1184): - Change how Inputfile is handled internally - This allows support for specifying the thumbnails of photos and videos using the thumb= argument in the different send_ methods. - Also allows Bot.send_media_group to actually finally send more than one media. - Add thumb to Audio, Video and Videonote - Add Bot.edit_message_media together with InputMediaAnimation, InputMediaAudio, and inputMediaDocument. Other Bot API 4.0 changes: - Add forusquare_type to Venue, InlineQueryResultVenue, InputVenueMessageContent, and Bot.send_venue. (#1170) - Add vCard support by adding vcard field to Contact, InlineQueryResultContact, InputContactMessageContent, and Bot.send_contact. (#1166) - Support new message entities: CASHTAG and PHONE_NUMBER. (#1179) - Cashtag seems to be things like $USD and $GBP, but it seems telegram doesn't currently send them to bots. - Phone number also seems to have limited support for now - Add Bot.send_animation, add width, height, and duration to Animation, and add Filters.animation. (#1172) Co-authored-by: Jasmin Bom <jsmnbom@gmail.com> Co-authored-by: code1mountain <32801117+code1mountain@users.noreply.github.com> Co-authored-by: Eldinnie <pieter.schutz+github@gmail.com> Co-authored-by: mathefreak1 <mathefreak@hi2.in>
2018-08-29 14:18:58 +02:00
'No error handlers are registered, logging exception.', exc_info=error)