python-telegram-bot/telegram/ext/dispatcher.py

273 lines
8.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2016
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the Dispatcher class."""
import logging
from functools import wraps
from threading import Thread, BoundedSemaphore, Lock, Event, current_thread
2016-01-06 15:35:55 +01:00
from time import sleep
2016-04-16 19:25:38 +02:00
# Adjust for differences in Python versions
try:
from queue import Empty # flake8: noqa
except ImportError:
from Queue import Empty # flake8: noqa
from telegram import (TelegramError, NullHandler)
from telegram.ext.handler import Handler
logging.getLogger(__name__).addHandler(NullHandler())
semaphore = None
async_threads = set()
""":type: set[Thread]"""
async_lock = Lock()
DEFAULT_GROUP = 0
2015-11-06 00:24:01 +01:00
def run_async(func):
"""
Function decorator that will run the function in a new thread.
Args:
func (function): The function to run in the thread.
Returns:
function:
"""
# TODO: handle exception in async threads
# set a threading.Event to notify caller thread
@wraps(func)
2015-12-21 19:36:17 +01:00
def pooled(*pargs, **kwargs):
"""
A wrapper to run a thread in a thread pool
"""
2016-04-24 13:43:42 +02:00
try:
result = func(*pargs, **kwargs)
finally:
semaphore.release()
with async_lock:
async_threads.remove(current_thread())
return result
@wraps(func)
2015-12-21 19:36:17 +01:00
def async_func(*pargs, **kwargs):
"""
A wrapper to run a function in a thread
"""
2015-12-21 19:36:17 +01:00
thread = Thread(target=pooled, args=pargs, kwargs=kwargs)
semaphore.acquire()
with async_lock:
async_threads.add(thread)
thread.start()
return thread
return async_func
class Dispatcher(object):
"""
2015-11-22 14:47:38 +01:00
This class dispatches all kinds of updates to its registered handlers.
Args:
2015-11-16 20:43:35 +01:00
bot (telegram.Bot): The bot object that should be passed to the
2015-12-21 21:40:41 +01:00
handlers
2016-04-18 18:13:54 +02:00
update_queue (Queue): The synchronized queue that will contain the
updates.
"""
def __init__(self, bot, update_queue, workers=4, exception_event=None):
self.bot = bot
self.update_queue = update_queue
self.handlers = {}
self.error_handlers = []
self.logger = logging.getLogger(__name__)
self.running = False
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
global semaphore
if not semaphore:
semaphore = BoundedSemaphore(value=workers)
else:
self.logger.debug('Semaphore already initialized, skipping.')
def start(self):
"""
2015-11-22 14:47:38 +01:00
Thread target of thread 'dispatcher'. Runs in background and processes
the update queue.
"""
if self.running:
self.logger.warning('already running')
return
if self.__exception_event.is_set():
msg = 'reusing dispatcher after exception event is forbidden'
self.logger.error(msg)
raise TelegramError(msg)
self.running = True
self.logger.debug('Dispatcher started')
while True:
try:
# Pop update from update queue.
update = self.update_queue.get(True, 1)
except Empty:
if self.__stop_event.is_set():
self.logger.debug('orderly stopping')
break
2016-04-14 22:23:02 +02:00
elif self.__exception_event.is_set():
self.logger.critical(
'stopping due to exception in another thread')
break
continue
self.logger.debug('Processing Update: %s' % update)
self.processUpdate(update)
self.running = False
self.logger.debug('Dispatcher thread stopped')
def stop(self):
"""
Stops the thread
"""
if self.running:
self.__stop_event.set()
while self.running:
sleep(0.1)
self.__stop_event.clear()
def processUpdate(self, update):
"""
Processes a single update.
Args:
2016-04-18 18:13:54 +02:00
update (object):
"""
# An error happened while polling
if isinstance(update, TelegramError):
2015-11-22 14:47:38 +01:00
self.dispatchError(None, update)
else:
for group in self.handlers.values():
for handler in group:
try:
if handler.checkUpdate(update):
handler.handleUpdate(update, self)
break
# Dispatch any errors
except TelegramError as te:
self.logger.warn(
'A TelegramError was raised while processing the '
'Update.')
try:
self.dispatchError(update, te)
except:
self.logger.exception(
'An uncaught error was raised while '
'handling the error')
finally:
break
# Errors should not stop the thread
except:
self.logger.exception(
'An uncaught error was raised while '
'processing the update')
break
def addHandler(self, handler, group=DEFAULT_GROUP):
"""
Register a handler. A handler must be an instance of a subclass of
telegram.ext.Handler. All handlers are organized in groups, the default
group is int(0), but any object can identify a group. Every update will
be tested against each handler in each group from first-added to last-
added. If the update has been handled in one group, it will not be
tested against other handlers in that group. That means an update can
only be handled 0 or 1 times per group, but multiple times across all
groups.
Args:
handler (Handler): A Handler instance
2016-04-18 18:13:54 +02:00
group (optional[object]): The group identifier. Default is 0
"""
if not isinstance(handler, Handler):
raise TypeError('Handler is no instance of telegram.ext.Handler')
if group not in self.handlers:
self.handlers[group] = list()
self.handlers[group].append(handler)
def removeHandler(self, handler, group=DEFAULT_GROUP):
"""
Remove a handler from the specified group
Args:
handler (Handler): A Handler instance
2016-04-18 18:13:54 +02:00
group (optional[object]): The group identifier. Default is 0
"""
if handler in self.handlers[group]:
self.handlers[group].remove(handler)
def addErrorHandler(self, callback):
"""
2015-11-22 14:47:38 +01:00
Registers an error handler in the Dispatcher.
Args:
2016-04-18 18:13:54 +02:00
handler (function): A function that takes ``Bot, Update,
TelegramError`` as arguments.
"""
self.error_handlers.append(callback)
2016-01-04 17:31:06 +01:00
def removeErrorHandler(self, callback):
"""
De-registers an error handler.
Args:
2016-04-18 18:13:54 +02:00
handler (function):
"""
if callback in self.error_handlers:
self.error_handlers.remove(callback)
2016-01-04 17:31:06 +01:00
2015-11-22 14:47:38 +01:00
def dispatchError(self, update, error):
"""
2015-11-22 14:47:38 +01:00
Dispatches an error.
Args:
2016-04-18 18:13:54 +02:00
update (object): The update that caused the error
error (telegram.TelegramError): The Telegram error that was raised.
"""
for callback in self.error_handlers:
callback(self.bot, update, error)