From 57759d8e6d3f003ffbebce3467617947aa33c851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Mon, 30 May 2016 03:16:33 +0200 Subject: [PATCH] [drunk] use actual thread pool and queue new functions into the pool instead of starting new threads every time --- telegram/ext/dispatcher.py | 55 +++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index c715e6007..f6d068bdb 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -20,10 +20,9 @@ import logging from functools import wraps -from threading import Thread, BoundedSemaphore, Lock, Event, current_thread +from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread from time import sleep - -from queue import Empty +from queue import Queue, Empty from telegram import (TelegramError, NullHandler) from telegram.ext.handler import Handler @@ -32,12 +31,28 @@ from telegram.utils.deprecate import deprecate logging.getLogger(__name__).addHandler(NullHandler()) semaphore = None -async_threads = set() -""":type: set[Thread]""" +async_threads = list() +""":type: list[tuple(Thread, Event, Queue)]""" async_lock = Lock() DEFAULT_GROUP = 0 +def pooled(event, queue): + """ + A wrapper to run a thread in a thread pool + """ + while True: + logging.info('Waiting for function') + func, args, kwargs = queue.get() + logging.info('Got function') + try: + func(*args, **kwargs) + logging.info('Executed function') + finally: + event.clear() + semaphore.release() + + def run_async(func): """ Function decorator that will run the function in a new thread. @@ -52,31 +67,17 @@ def run_async(func): # TODO: handle exception in async threads # set a threading.Event to notify caller thread - @wraps(func) - def pooled(*pargs, **kwargs): - """ - A wrapper to run a thread in a thread pool - """ - try: - result = func(*pargs, **kwargs) - finally: - semaphore.release() - - with async_lock: - async_threads.remove(current_thread()) - return result - @wraps(func) def async_func(*pargs, **kwargs): """ A wrapper to run a function in a thread """ - thread = Thread(target=pooled, args=pargs, kwargs=kwargs) semaphore.acquire() - with async_lock: - async_threads.add(thread) - thread.start() - return thread + for thread, event, queue in async_threads: + if not event.is_set(): + event.set() + queue.put((func, pargs, kwargs)) + break return async_func @@ -110,6 +111,12 @@ class Dispatcher(object): global semaphore if not semaphore: semaphore = BoundedSemaphore(value=workers) + for i in range(workers): + event = Event() + queue = Queue() + thread = Thread(target=pooled, args=(event, queue)) + async_threads.append((thread, event, queue)) + thread.start() else: self.logger.debug('Semaphore already initialized, skipping.')