diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index f6d068bdb..72f206236 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -20,37 +20,45 @@ import logging from functools import wraps -from threading import Thread, BoundedSemaphore, Lock, Event # , current_thread +from threading import Thread, Lock, Event # , current_thread from time import sleep from queue import Queue, Empty from telegram import (TelegramError, NullHandler) +from telegram.utils import request from telegram.ext.handler import Handler from telegram.utils.deprecate import deprecate logging.getLogger(__name__).addHandler(NullHandler()) -semaphore = None -async_threads = list() -""":type: list[tuple(Thread, Event, Queue)]""" +async_queue = Queue() +async_threads = set() +""":type: set[Thread]""" async_lock = Lock() DEFAULT_GROUP = 0 -def pooled(event, queue): +def pooled(): """ A wrapper to run a thread in a thread pool """ while True: logging.info('Waiting for function') - func, args, kwargs = queue.get() + + try: + func, args, kwargs = async_queue.get() + + except TypeError: + break + logging.info('Got function') + try: func(*args, **kwargs) logging.info('Executed function') - finally: - event.clear() - semaphore.release() + + except: + logging.exception("Async function raised exception") def run_async(func): @@ -72,12 +80,7 @@ def run_async(func): """ A wrapper to run a function in a thread """ - semaphore.acquire() - for thread, event, queue in async_threads: - if not event.is_set(): - event.set() - queue.put((func, pargs, kwargs)) - break + async_queue.put((func, pargs, kwargs)) return async_func @@ -108,17 +111,14 @@ class Dispatcher(object): self.__stop_event = Event() self.__exception_event = exception_event or Event() - global semaphore - if not semaphore: - semaphore = BoundedSemaphore(value=workers) + if not len(async_threads): + request.CON_POOL_SIZE = workers + 3 for i in range(workers): - event = Event() - queue = Queue() - thread = Thread(target=pooled, args=(event, queue)) - async_threads.append((thread, event, queue)) + thread = Thread(target=pooled) + async_threads.add(thread) thread.start() else: - self.logger.debug('Semaphore already initialized, skipping.') + self.logger.debug('Thread pool already initialized, skipping.') def start(self): """ diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index df70275e5..2f92fe43f 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -375,6 +375,10 @@ class Updater(object): with dispatcher.async_lock: threads = list(dispatcher.async_threads) total = len(threads) + + for i in range(total): + dispatcher.async_queue.put(0) + for i, thr in enumerate(threads): self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i, total)) thr.join() diff --git a/telegram/utils/request.py b/telegram/utils/request.py index 5926add9f..5145685b6 100644 --- a/telegram/utils/request.py +++ b/telegram/utils/request.py @@ -27,6 +27,7 @@ from telegram import (InputFile, TelegramError) from telegram.error import Unauthorized, NetworkError, TimedOut, BadRequest _CON_POOL = None +CON_POOL_SIZE = 1 def _get_con_pool(): @@ -34,7 +35,9 @@ def _get_con_pool(): return _CON_POOL global _CON_POOL - _CON_POOL = urllib3.PoolManager(10, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where()) + _CON_POOL = urllib3.PoolManager(CON_POOL_SIZE, + cert_reqs='CERT_REQUIRED', + ca_certs=certifi.where()) return _CON_POOL