[drunk] use actual thread pool and queue new functions into the pool instead of starting new threads every time

This commit is contained in:
Jannes Höke 2016-05-30 03:16:33 +02:00
parent 574fc8cddf
commit 57759d8e6d

View file

@ -20,10 +20,9 @@
import logging import logging
from functools import wraps from functools import wraps
from threading import Thread, BoundedSemaphore, Lock, Event, current_thread from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread
from time import sleep from time import sleep
from queue import Queue, Empty
from queue import Empty
from telegram import (TelegramError, NullHandler) from telegram import (TelegramError, NullHandler)
from telegram.ext.handler import Handler from telegram.ext.handler import Handler
@ -32,12 +31,28 @@ from telegram.utils.deprecate import deprecate
logging.getLogger(__name__).addHandler(NullHandler()) logging.getLogger(__name__).addHandler(NullHandler())
semaphore = None semaphore = None
async_threads = set() async_threads = list()
""":type: set[Thread]""" """:type: list[tuple(Thread, Event, Queue)]"""
async_lock = Lock() async_lock = Lock()
DEFAULT_GROUP = 0 DEFAULT_GROUP = 0
def pooled(event, queue):
"""
A wrapper to run a thread in a thread pool
"""
while True:
logging.info('Waiting for function')
func, args, kwargs = queue.get()
logging.info('Got function')
try:
func(*args, **kwargs)
logging.info('Executed function')
finally:
event.clear()
semaphore.release()
def run_async(func): def run_async(func):
""" """
Function decorator that will run the function in a new thread. Function decorator that will run the function in a new thread.
@ -52,31 +67,17 @@ def run_async(func):
# TODO: handle exception in async threads # TODO: handle exception in async threads
# set a threading.Event to notify caller thread # set a threading.Event to notify caller thread
@wraps(func)
def pooled(*pargs, **kwargs):
"""
A wrapper to run a thread in a thread pool
"""
try:
result = func(*pargs, **kwargs)
finally:
semaphore.release()
with async_lock:
async_threads.remove(current_thread())
return result
@wraps(func) @wraps(func)
def async_func(*pargs, **kwargs): def async_func(*pargs, **kwargs):
""" """
A wrapper to run a function in a thread A wrapper to run a function in a thread
""" """
thread = Thread(target=pooled, args=pargs, kwargs=kwargs)
semaphore.acquire() semaphore.acquire()
with async_lock: for thread, event, queue in async_threads:
async_threads.add(thread) if not event.is_set():
thread.start() event.set()
return thread queue.put((func, pargs, kwargs))
break
return async_func return async_func
@ -110,6 +111,12 @@ class Dispatcher(object):
global semaphore global semaphore
if not semaphore: if not semaphore:
semaphore = BoundedSemaphore(value=workers) semaphore = BoundedSemaphore(value=workers)
for i in range(workers):
event = Event()
queue = Queue()
thread = Thread(target=pooled, args=(event, queue))
async_threads.append((thread, event, queue))
thread.start()
else: else:
self.logger.debug('Semaphore already initialized, skipping.') self.logger.debug('Semaphore already initialized, skipping.')