diff --git a/telegram/boteventhandler.py b/telegram/boteventhandler.py index ae5ced899..de4ec0d27 100644 --- a/telegram/boteventhandler.py +++ b/telegram/boteventhandler.py @@ -4,10 +4,11 @@ This module contains the class BotEventHandler, which tries to make creating Telegram Bots intuitive! """ - +import logging import sys from threading import Thread -from telegram import (Bot, TelegramError, TelegramObject, Broadcaster) +from telegram import (Bot, TelegramError, broadcaster, Broadcaster, + NullHandler) from time import sleep # Adjust for differences in Python versions @@ -16,6 +17,9 @@ if sys.version_info.major is 2: elif sys.version_info.major is 3: from queue import Queue +H = NullHandler() +logging.getLogger(__name__).addHandler(H) + class BotEventHandler: """ @@ -30,14 +34,25 @@ class BotEventHandler: Args: token (str): The bots token given by the @BotFather base_url (Optional[str]): + broadcaster (Optional[Broadcaster]): Use present Broadcaster object. If + None, a new one will be created. + workers (Optional[int]): Amount of threads in the thread pool for + functions decorated with @run_async """ - def __init__(self, token, base_url=None, workers=4): + def __init__(self, token, base_url=None, broadcaster=None, workers=4): self.bot = Bot(token, base_url) - self.update_queue = Queue() self.last_update_id = 0 - self.broadcaster = Broadcaster(self.bot, self.update_queue, - workers=workers) + + if broadcaster: + self.update_queue = broadcaster.update_queue + self.broadcaster = broadcaster + else: + self.update_queue = Queue() + self.broadcaster = Broadcaster(self.bot, self.update_queue, + workers=workers) + self.logger = logging.getLogger(__name__) + self.running = False def start(self, poll_interval=1.0, timeout=10, network_delay=2): """ @@ -60,9 +75,7 @@ class BotEventHandler: args=(poll_interval, timeout, network_delay)) - # Set threads as daemons so they'll stop if the main thread stops - broadcaster_thread.daemon = True - event_handler_thread.daemon = True + self.running = True # Start threads broadcaster_thread.start() @@ -79,12 +92,19 @@ class BotEventHandler: """ current_interval = poll_interval + self.logger.info('Event Handler thread started') - while True: + while self.running: try: updates = self.bot.getUpdates(self.last_update_id, timeout=timeout, network_delay=network_delay) + if not self.running: + if len(updates) > 0: + self.logger.info('Updates ignored and will be pulled ' + + 'again on restart.') + break + for update in updates: self.update_queue.put(update) self.last_update_id = update.update_id + 1 @@ -102,3 +122,16 @@ class BotEventHandler: current_interval += current_interval / 2 if current_interval > 30: current_interval = 30 + + self.logger.info('Event Handler thread stopped') + + def stop(self): + """ + Stops the polling thread and the broadcaster + """ + self.logger.info('Stopping Event Handler and Broadcaster...') + self.running = False + self.broadcaster.stop() + while broadcaster.running_async > 0: + sleep(1) + diff --git a/telegram/broadcaster.py b/telegram/broadcaster.py index 0b0f12dc1..9fc853956 100644 --- a/telegram/broadcaster.py +++ b/telegram/broadcaster.py @@ -3,12 +3,20 @@ """ This module contains the Broadcaster class. """ +import logging from functools import wraps +from inspect import getargspec +from threading import Thread, BoundedSemaphore, Lock +from re import match -from telegram import (TelegramError, TelegramObject, Update) -from threading import Thread, BoundedSemaphore +from telegram import (TelegramError, Update, NullHandler) + +H = NullHandler() +logging.getLogger(__name__).addHandler(H) semaphore = None +running_async = 0 +async_lock = Lock() def run_async(func): @@ -24,14 +32,26 @@ def run_async(func): @wraps(func) def pooled(*args, **kwargs): + """ + A wrapper to run a thread in a thread pool + """ + global running_async, async_lock result = func(*args, **kwargs) semaphore.release() + with async_lock: + running_async -= 1 return result @wraps(func) def async_func(*args, **kwargs): + """ + A wrapper to run a function in a thread + """ + global running_async, async_lock thread = Thread(target=pooled, args=args, kwargs=kwargs) semaphore.acquire() + with async_lock: + running_async += 1 thread.start() return thread @@ -61,39 +81,132 @@ class Broadcaster: self.unknown_telegram_command_handlers = [] self.unknown_string_command_handlers = [] self.error_handlers = [] + self.logger = logging.getLogger(__name__) + self.running = False global semaphore if not semaphore: semaphore = BoundedSemaphore(value=workers) else: - print("Semaphore already initialized, skipping.") + self.logger.info("Semaphore already initialized, skipping.") + + class _Stop: + """ + A class which objects can be passed into the update queue to stop the + thread + """ + pass + + def start(self): + """ + Thread target of thread 'broadcaster'. Runs in background and processes + the update queue. + """ + + self.running = True + self.logger.info('Broadcaster thread started') + + while True: + try: + # Pop update from update queue. + # Blocks if no updates are available. + update = self.update_queue.get() + + if type(update) is self._Stop: + break + + self.processUpdate(update) + + # Broadcast any errors + except TelegramError as te: + self.broadcastError(te) + + self.logger.info('Broadcaster thread stopped') + + def stop(self): + """ + Stops the thread + """ + if self.running: + self.running = False + self.update_queue.put(self._Stop()) + + def processUpdate(self, update): + """ + Processes a single update. + + Args: + update (any): + """ + + handled = False + + # Custom type handlers + for t in self.type_handlers: + if isinstance(update, t): + self.broadcastType(update) + handled = True + + # string update + if type(update) is str and update.startswith('/'): + self.broadcastStringCommand(update) + handled = True + elif type(update) is str: + self.broadcastStringRegex(update) + handled = True + + # An error happened while polling + if isinstance(update, TelegramError): + self.broadcastError(update) + handled = True + + # Telegram update (regex) + if isinstance(update, Update): + self.broadcastTelegramRegex(update) + handled = True + + # Telegram update (command) + if isinstance(update, Update) \ + and update.message.text.startswith('/'): + self.broadcastTelegramCommand(update) + handled = True + + # Telegram update (message) + elif isinstance(update, Update): + self.broadcastTelegramMessage(update) + handled = True + + # Update not recognized + if not handled: + self.broadcastError(TelegramError( + "Received update of unknown type %s" % type(update))) # Add Handlers def addTelegramMessageHandler(self, handler): """ Registers a message handler in the Broadcaster. - + Args: handler (function): A function that takes (Bot, Update) as arguments. """ - + self.telegram_message_handlers.append(handler) def addTelegramCommandHandler(self, command, handler): """ Registers a command handler in the Broadcaster. - + Args: - command (str): The command keyword that this handler should be - listening to. + command (str): The command keyword that this handler should be + listening to. handler (function): A function that takes (Bot, Update) as arguments. """ - + if command not in self.telegram_command_handlers: self.telegram_command_handlers[command] = [] - + self.telegram_command_handlers[command].append(handler) def addTelegramRegexHandler(self, matcher, handler): @@ -112,17 +225,17 @@ class Broadcaster: self.telegram_regex_handlers[matcher] = [] self.telegram_regex_handlers[matcher].append(handler) - + def addStringCommandHandler(self, command, handler): """ Registers a string-command handler in the Broadcaster. - + Args: - command (str): The command keyword that this handler should be - listening to. + command (str): The command keyword that this handler should be + listening to. handler (function): A function that takes (Bot, str) as arguments. """ - + if command not in self.string_command_handlers: self.string_command_handlers[command] = [] @@ -149,41 +262,41 @@ class Broadcaster: """ Registers a command handler in the Broadcaster, that will receive all commands that have no associated handler. - + Args: handler (function): A function that takes (Bot, Update) as arguments. """ - + self.unknown_telegram_command_handlers.append(handler) - + def addUnknownStringCommandHandler(self, handler): """ - Registers a string-command handler in the Broadcaster, that will receive + Registers a string-command handler in the Broadcaster, that will receive all commands that have no associated handler. - + Args: handler (function): A function that takes (Bot, str) as arguments. """ - + self.unknown_string_command_handlers.append(handler) def addErrorHandler(self, handler): """ Registers an error handler in the Broadcaster. - + Args: handler (function): A function that takes (Bot, TelegramError) as arguments. """ - + self.error_handlers.append(handler) def addTypeHandler(self, the_type, handler): """ Registers a type handler in the Broadcaster. This allows you to send any type of object into the update queue. - + Args: the_type (type): The type this handler should listen to handler (function): A function that takes (Bot, type) as arguments. @@ -191,25 +304,25 @@ class Broadcaster: if the_type not in self.type_handlers: self.type_handlers[the_type] = [] - + self.type_handlers[the_type].append(handler) # Remove Handlers def removeTelegramMessageHandler(self, handler): """ De-registers a message handler. - + Args: handler (any): """ if handler in self.telegram_message_handlers: self.telegram_message_handlers.remove(handler) - + def removeTelegramCommandHandler(self, command, handler): """ De-registers a command handler. - + Args: command (str): The command handler (any): @@ -293,8 +406,8 @@ class Broadcaster: def removeTypeHandler(self, the_type, handler): """ - De-registers a type handler. - + De-registers a type handler. + Args: handler (any): """ @@ -302,73 +415,6 @@ class Broadcaster: if the_type in self.type_handlers \ and handler in self.type_handlers[the_type]: self.type_handlers[the_type].remove(handler) - - def start(self): - """ - Thread target of thread 'broadcaster'. Runs in background and processes - the update queue. - """ - - while True: - try: - # Pop update from update queue. - # Blocks if no updates are available. - update = self.update_queue.get() - self.processUpdate(update) - - # Broadcast any errors - except TelegramError as te: - self.broadcastError(te) - - def processUpdate(self, update): - """ - Processes a single update. - - Args: - update (any): - """ - - handled = False - - # Custom type handlers - for t in self.type_handlers: - if isinstance(update, t): - self.broadcastType(update) - handled = True - - # string update - if type(update) is str and update.startswith('/'): - self.broadcastStringCommand(update) - handled = True - elif type(update) is str: - self.broadcastStringRegex(update) - handled = True - - # An error happened while polling - if isinstance(update, TelegramError): - self.broadcastError(update) - handled = True - - # Telegram update (regex) - if isinstance(update, Update): - self.broadcastTelegramRegex(update) - handled = True - - # Telegram update (command) - if isinstance(update, Update) \ - and update.message.text.startswith('/'): - self.broadcastTelegramCommand(update) - handled = True - - # Telegram update (message) - elif isinstance(update, Update): - self.broadcastTelegramMessage(update) - handled = True - - # Update not recognized - if not handled: - self.broadcastError(TelegramError( - "Received update of unknown type %s" % type(update))) def broadcastTelegramCommand(self, update): """ @@ -400,7 +446,7 @@ class Broadcaster: matching_handlers = [] for matcher in self.telegram_regex_handlers: - if matcher.match(update.message.text): + if match(matcher, update.message.text): for handler in self.telegram_regex_handlers[matcher]: matching_handlers.append(handler) @@ -434,7 +480,7 @@ class Broadcaster: matching_handlers = [] for matcher in self.string_regex_handlers: - if matcher.match(update): + if match(matcher, update): for handler in self.string_regex_handlers[matcher]: matching_handlers.append(handler) @@ -487,4 +533,11 @@ class Broadcaster: """ for handler in handlers: - handler(self.bot, update) + self.call_handler(handler, update) + + def call_handler(self, handler, update): + kwargs = {} + if 'update_queue' in getargspec(handler).args: + kwargs['update_queue'] = self.update_queue + + handler(self.bot, update, **kwargs)