From 63c895c0a06f9b883cae9cea952538cdbeb166fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Wed, 11 Nov 2015 13:33:03 +0100 Subject: [PATCH] run_async moved to broadcaster and uses a thread pool now --- telegram/boteventhandler.py | 41 ++++++++++++------------------------- telegram/broadcaster.py | 41 +++++++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/telegram/boteventhandler.py b/telegram/boteventhandler.py index 3a6ffe79d..ae5ced899 100644 --- a/telegram/boteventhandler.py +++ b/telegram/boteventhandler.py @@ -9,7 +9,6 @@ import sys from threading import Thread from telegram import (Bot, TelegramError, TelegramObject, Broadcaster) from time import sleep -from functools import wraps # Adjust for differences in Python versions if sys.version_info.major is 2: @@ -18,27 +17,7 @@ elif sys.version_info.major is 3: from queue import Queue -def run_async(func): - """ - Function decorator that will run the function in a new thread. - - Args: - func (function): The function to run in the thread. - - Returns: - function: - """ - - @wraps(func) - def async_func(*args, **kwargs): - thread = Thread(target=func, args=args, kwargs=kwargs) - thread.start() - return thread - - return async_func - - -class BotEventHandler(TelegramObject): +class BotEventHandler: """ This class provides a frontend to telegram.Bot to the programmer, so they can focus on coding the bot. I also runs in a separate thread, so the user @@ -53,19 +32,22 @@ class BotEventHandler(TelegramObject): base_url (Optional[str]): """ - def __init__(self, token, base_url=None): + def __init__(self, token, base_url=None, workers=4): self.bot = Bot(token, base_url) self.update_queue = Queue() self.last_update_id = 0 - self.broadcaster = Broadcaster(self.bot, self.update_queue) + self.broadcaster = Broadcaster(self.bot, self.update_queue, + workers=workers) - def start(self, poll_interval=1.0): + def start(self, poll_interval=1.0, timeout=10, network_delay=2): """ Starts polling updates from Telegram. Args: poll_interval (Optional[float]): Time to wait between polling updates from Telegram in seconds. Default is 1.0 + timeout (Optional[float]): Passed to Bot.getUpdates + network_delay (Optional[float]): Passed to Bot.getUpdates Returns: Queue: The update queue that can be filled from the main thread @@ -75,7 +57,8 @@ class BotEventHandler(TelegramObject): broadcaster_thread = Thread(target=self.broadcaster.start, name="broadcaster") event_handler_thread = Thread(target=self.__start, name="eventhandler", - args=(poll_interval,)) + args=(poll_interval, timeout, + network_delay)) # Set threads as daemons so they'll stop if the main thread stops broadcaster_thread.daemon = True @@ -88,7 +71,7 @@ class BotEventHandler(TelegramObject): # Return the update queue so the main thread can insert updates return self.update_queue - def __start(self, poll_interval): + def __start(self, poll_interval, timeout, network_delay): """ Thread target of thread 'eventhandler'. Runs in background, pulls updates from Telegram and inserts them in the update queue of the @@ -99,7 +82,9 @@ class BotEventHandler(TelegramObject): while True: try: - updates = self.bot.getUpdates(self.last_update_id) + updates = self.bot.getUpdates(self.last_update_id, + timeout=timeout, + network_delay=network_delay) for update in updates: self.update_queue.put(update) self.last_update_id = update.update_id + 1 diff --git a/telegram/broadcaster.py b/telegram/broadcaster.py index 4441efd56..0b0f12dc1 100644 --- a/telegram/broadcaster.py +++ b/telegram/broadcaster.py @@ -3,11 +3,42 @@ """ This module contains the Broadcaster class. """ +from functools import wraps from telegram import (TelegramError, TelegramObject, Update) +from threading import Thread, BoundedSemaphore + +semaphore = None -class Broadcaster(TelegramObject): +def run_async(func): + """ + Function decorator that will run the function in a new thread. + + Args: + func (function): The function to run in the thread. + + Returns: + function: + """ + + @wraps(func) + def pooled(*args, **kwargs): + result = func(*args, **kwargs) + semaphore.release() + return result + + @wraps(func) + def async_func(*args, **kwargs): + thread = Thread(target=pooled, args=args, kwargs=kwargs) + semaphore.acquire() + thread.start() + return thread + + return async_func + + +class Broadcaster: """ This class broadcasts all kinds of updates to its registered handlers. @@ -18,7 +49,7 @@ class Broadcaster(TelegramObject): update_queue (queue.Queue): The synchronized queue that will contain the updates. """ - def __init__(self, bot, update_queue): + def __init__(self, bot, update_queue, workers=4): self.bot = bot self.update_queue = update_queue self.telegram_message_handlers = [] @@ -31,6 +62,12 @@ class Broadcaster(TelegramObject): self.unknown_string_command_handlers = [] self.error_handlers = [] + global semaphore + if not semaphore: + semaphore = BoundedSemaphore(value=workers) + else: + print("Semaphore already initialized, skipping.") + # Add Handlers def addTelegramMessageHandler(self, handler): """