run_async moved to broadcaster and uses a thread pool now

This commit is contained in:
Jannes Höke 2015-11-11 13:33:03 +01:00
parent 3162bc60e9
commit 63c895c0a0
2 changed files with 52 additions and 30 deletions

View file

@ -9,7 +9,6 @@ import sys
from threading import Thread
from telegram import (Bot, TelegramError, TelegramObject, Broadcaster)
from time import sleep
from functools import wraps
# Adjust for differences in Python versions
if sys.version_info.major is 2:
@ -18,27 +17,7 @@ elif sys.version_info.major is 3:
from queue import Queue
def run_async(func):
"""
Function decorator that will run the function in a new thread.
Args:
func (function): The function to run in the thread.
Returns:
function:
"""
@wraps(func)
def async_func(*args, **kwargs):
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.start()
return thread
return async_func
class BotEventHandler(TelegramObject):
class BotEventHandler:
"""
This class provides a frontend to telegram.Bot to the programmer, so they
can focus on coding the bot. I also runs in a separate thread, so the user
@ -53,19 +32,22 @@ class BotEventHandler(TelegramObject):
base_url (Optional[str]):
"""
def __init__(self, token, base_url=None):
def __init__(self, token, base_url=None, workers=4):
self.bot = Bot(token, base_url)
self.update_queue = Queue()
self.last_update_id = 0
self.broadcaster = Broadcaster(self.bot, self.update_queue)
self.broadcaster = Broadcaster(self.bot, self.update_queue,
workers=workers)
def start(self, poll_interval=1.0):
def start(self, poll_interval=1.0, timeout=10, network_delay=2):
"""
Starts polling updates from Telegram.
Args:
poll_interval (Optional[float]): Time to wait between polling
updates from Telegram in seconds. Default is 1.0
timeout (Optional[float]): Passed to Bot.getUpdates
network_delay (Optional[float]): Passed to Bot.getUpdates
Returns:
Queue: The update queue that can be filled from the main thread
@ -75,7 +57,8 @@ class BotEventHandler(TelegramObject):
broadcaster_thread = Thread(target=self.broadcaster.start,
name="broadcaster")
event_handler_thread = Thread(target=self.__start, name="eventhandler",
args=(poll_interval,))
args=(poll_interval, timeout,
network_delay))
# Set threads as daemons so they'll stop if the main thread stops
broadcaster_thread.daemon = True
@ -88,7 +71,7 @@ class BotEventHandler(TelegramObject):
# Return the update queue so the main thread can insert updates
return self.update_queue
def __start(self, poll_interval):
def __start(self, poll_interval, timeout, network_delay):
"""
Thread target of thread 'eventhandler'. Runs in background, pulls
updates from Telegram and inserts them in the update queue of the
@ -99,7 +82,9 @@ class BotEventHandler(TelegramObject):
while True:
try:
updates = self.bot.getUpdates(self.last_update_id)
updates = self.bot.getUpdates(self.last_update_id,
timeout=timeout,
network_delay=network_delay)
for update in updates:
self.update_queue.put(update)
self.last_update_id = update.update_id + 1

View file

@ -3,11 +3,42 @@
"""
This module contains the Broadcaster class.
"""
from functools import wraps
from telegram import (TelegramError, TelegramObject, Update)
from threading import Thread, BoundedSemaphore
semaphore = None
class Broadcaster(TelegramObject):
def run_async(func):
"""
Function decorator that will run the function in a new thread.
Args:
func (function): The function to run in the thread.
Returns:
function:
"""
@wraps(func)
def pooled(*args, **kwargs):
result = func(*args, **kwargs)
semaphore.release()
return result
@wraps(func)
def async_func(*args, **kwargs):
thread = Thread(target=pooled, args=args, kwargs=kwargs)
semaphore.acquire()
thread.start()
return thread
return async_func
class Broadcaster:
"""
This class broadcasts all kinds of updates to its registered handlers.
@ -18,7 +49,7 @@ class Broadcaster(TelegramObject):
update_queue (queue.Queue): The synchronized queue that will contain the
updates.
"""
def __init__(self, bot, update_queue):
def __init__(self, bot, update_queue, workers=4):
self.bot = bot
self.update_queue = update_queue
self.telegram_message_handlers = []
@ -31,6 +62,12 @@ class Broadcaster(TelegramObject):
self.unknown_string_command_handlers = []
self.error_handlers = []
global semaphore
if not semaphore:
semaphore = BoundedSemaphore(value=workers)
else:
print("Semaphore already initialized, skipping.")
# Add Handlers
def addTelegramMessageHandler(self, handler):
"""