use single queue for thread pool, initialize connection pool with n+3

This commit is contained in:
Jannes Höke 2016-05-30 13:09:23 +02:00
parent 57759d8e6d
commit dd91ce1f39
3 changed files with 31 additions and 24 deletions

View file

@ -20,37 +20,45 @@
import logging
from functools import wraps
from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread
from threading import Thread, Lock, Event # , current_thread
from time import sleep
from queue import Queue, Empty
from telegram import (TelegramError, NullHandler)
from telegram.utils import request
from telegram.ext.handler import Handler
from telegram.utils.deprecate import deprecate
logging.getLogger(__name__).addHandler(NullHandler())
semaphore = None
async_threads = list()
""":type: list[tuple(Thread, Event, Queue)]"""
async_queue = Queue()
async_threads = set()
""":type: set[Thread]"""
async_lock = Lock()
DEFAULT_GROUP = 0
def pooled(event, queue):
def pooled():
"""
A wrapper to run a thread in a thread pool
"""
while True:
logging.info('Waiting for function')
func, args, kwargs = queue.get()
try:
func, args, kwargs = async_queue.get()
except TypeError:
break
logging.info('Got function')
try:
func(*args, **kwargs)
logging.info('Executed function')
finally:
event.clear()
semaphore.release()
except:
logging.exception("Async function raised exception")
def run_async(func):
@ -72,12 +80,7 @@ def run_async(func):
"""
A wrapper to run a function in a thread
"""
semaphore.acquire()
for thread, event, queue in async_threads:
if not event.is_set():
event.set()
queue.put((func, pargs, kwargs))
break
async_queue.put((func, pargs, kwargs))
return async_func
@ -108,17 +111,14 @@ class Dispatcher(object):
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
global semaphore
if not semaphore:
semaphore = BoundedSemaphore(value=workers)
if not len(async_threads):
request.CON_POOL_SIZE = workers + 3
for i in range(workers):
event = Event()
queue = Queue()
thread = Thread(target=pooled, args=(event, queue))
async_threads.append((thread, event, queue))
thread = Thread(target=pooled)
async_threads.add(thread)
thread.start()
else:
self.logger.debug('Semaphore already initialized, skipping.')
self.logger.debug('Thread pool already initialized, skipping.')
def start(self):
"""

View file

@ -375,6 +375,10 @@ class Updater(object):
with dispatcher.async_lock:
threads = list(dispatcher.async_threads)
total = len(threads)
for i in range(total):
dispatcher.async_queue.put(0)
for i, thr in enumerate(threads):
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total))
thr.join()

View file

@ -27,6 +27,7 @@ from telegram import (InputFile, TelegramError)
from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest
_CON_POOL = None
CON_POOL_SIZE = 1
def _get_con_pool():
@ -34,7 +35,9 @@ def _get_con_pool():
return _CON_POOL
global _CON_POOL
_CON_POOL = urllib3.PoolManager(10, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())
_CON_POOL = urllib3.PoolManager(CON_POOL_SIZE,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where())
return _CON_POOL