mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-11-25 00:27:46 +01:00
comments, lock thread pool, while 1 and snake_case everywhere
This commit is contained in:
parent
703bece155
commit
7635bc0eec
2 changed files with 38 additions and 35 deletions
|
@ -36,7 +36,7 @@ logging.getLogger(__name__).addHandler(NullHandler())
|
||||||
ASYNC_QUEUE = Queue()
|
ASYNC_QUEUE = Queue()
|
||||||
ASYNC_THREADS = set()
|
ASYNC_THREADS = set()
|
||||||
""":type: set[Thread]"""
|
""":type: set[Thread]"""
|
||||||
ASYNC_LOCK = Lock()
|
ASYNC_LOCK = Lock() # guards ASYNC_THREADS
|
||||||
DEFAULT_GROUP = 0
|
DEFAULT_GROUP = 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,16 +48,17 @@ def _pooled():
|
||||||
try:
|
try:
|
||||||
func, args, kwargs = ASYNC_QUEUE.get()
|
func, args, kwargs = ASYNC_QUEUE.get()
|
||||||
|
|
||||||
|
# If unpacking fails, the thread pool is being closed from Updater._join_async_threads
|
||||||
except TypeError:
|
except TypeError:
|
||||||
logging.debug("Closing run_async thread %s/%d" %
|
logging.getLogger(__name__).debug("Closing run_async thread %s/%d" %
|
||||||
(current_thread().getName(), len(ASYNC_THREADS)))
|
(current_thread().getName(), len(ASYNC_THREADS)))
|
||||||
break
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
func(*args, **kwargs)
|
func(*args, **kwargs)
|
||||||
|
|
||||||
except:
|
except:
|
||||||
logging.exception("Async function raised exception")
|
logging.getLogger(__name__).exception("run_async function raised exception")
|
||||||
|
|
||||||
|
|
||||||
def run_async(func):
|
def run_async(func):
|
||||||
|
@ -110,17 +111,18 @@ class Dispatcher(object):
|
||||||
self.__stop_event = Event()
|
self.__stop_event = Event()
|
||||||
self.__exception_event = exception_event or Event()
|
self.__exception_event = exception_event or Event()
|
||||||
|
|
||||||
if not len(ASYNC_THREADS):
|
with ASYNC_LOCK:
|
||||||
if request.is_con_pool_initialized():
|
if not ASYNC_THREADS:
|
||||||
raise RuntimeError('Connection Pool already initialized')
|
if request.is_con_pool_initialized():
|
||||||
|
raise RuntimeError('Connection Pool already initialized')
|
||||||
|
|
||||||
request.CON_POOL_SIZE = workers + 3
|
request.CON_POOL_SIZE = workers + 3
|
||||||
for i in range(workers):
|
for i in range(workers):
|
||||||
thread = Thread(target=_pooled, name=str(i))
|
thread = Thread(target=_pooled, name=str(i))
|
||||||
ASYNC_THREADS.add(thread)
|
ASYNC_THREADS.add(thread)
|
||||||
thread.start()
|
thread.start()
|
||||||
else:
|
else:
|
||||||
self.logger.debug('Thread pool already initialized, skipping.')
|
self.logger.debug('Thread pool already initialized, skipping.')
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""
|
"""
|
||||||
|
@ -140,7 +142,7 @@ class Dispatcher(object):
|
||||||
self.running = True
|
self.running = True
|
||||||
self.logger.debug('Dispatcher started')
|
self.logger.debug('Dispatcher started')
|
||||||
|
|
||||||
while True:
|
while 1:
|
||||||
try:
|
try:
|
||||||
# Pop update from update queue.
|
# Pop update from update queue.
|
||||||
update = self.update_queue.get(True, 1)
|
update = self.update_queue.get(True, 1)
|
||||||
|
@ -154,7 +156,7 @@ class Dispatcher(object):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.logger.debug('Processing Update: %s' % update)
|
self.logger.debug('Processing Update: %s' % update)
|
||||||
self.processUpdate(update)
|
self.process_update(update)
|
||||||
|
|
||||||
self.running = False
|
self.running = False
|
||||||
self.logger.debug('Dispatcher thread stopped')
|
self.logger.debug('Dispatcher thread stopped')
|
||||||
|
@ -169,7 +171,7 @@ class Dispatcher(object):
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
self.__stop_event.clear()
|
self.__stop_event.clear()
|
||||||
|
|
||||||
def processUpdate(self, update):
|
def process_update(self, update):
|
||||||
"""
|
"""
|
||||||
Processes a single update.
|
Processes a single update.
|
||||||
|
|
||||||
|
@ -179,7 +181,7 @@ class Dispatcher(object):
|
||||||
|
|
||||||
# An error happened while polling
|
# An error happened while polling
|
||||||
if isinstance(update, TelegramError):
|
if isinstance(update, TelegramError):
|
||||||
self.dispatchError(None, update)
|
self.dispatch_error(None, update)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
for group in self.groups:
|
for group in self.groups:
|
||||||
|
@ -194,7 +196,7 @@ class Dispatcher(object):
|
||||||
'Update.')
|
'Update.')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.dispatchError(update, te)
|
self.dispatch_error(update, te)
|
||||||
except Exception:
|
except Exception:
|
||||||
self.logger.exception('An uncaught error was raised while '
|
self.logger.exception('An uncaught error was raised while '
|
||||||
'handling the error')
|
'handling the error')
|
||||||
|
@ -280,7 +282,7 @@ class Dispatcher(object):
|
||||||
if callback in self.error_handlers:
|
if callback in self.error_handlers:
|
||||||
self.error_handlers.remove(callback)
|
self.error_handlers.remove(callback)
|
||||||
|
|
||||||
def dispatchError(self, update, error):
|
def dispatch_error(self, update, error):
|
||||||
"""
|
"""
|
||||||
Dispatches an error.
|
Dispatches an error.
|
||||||
|
|
||||||
|
|
|
@ -285,9 +285,10 @@ class Updater(object):
|
||||||
def _check_ssl_cert(self, cert, key):
|
def _check_ssl_cert(self, cert, key):
|
||||||
# Check SSL-Certificate with openssl, if possible
|
# Check SSL-Certificate with openssl, if possible
|
||||||
try:
|
try:
|
||||||
exit_code = subprocess.call(["openssl", "x509", "-text", "-noout", "-in", cert],
|
exit_code = subprocess.call(
|
||||||
stdout=open(os.devnull, 'wb'),
|
["openssl", "x509", "-text", "-noout", "-in", cert],
|
||||||
stderr=subprocess.STDOUT)
|
stdout=open(os.devnull, 'wb'),
|
||||||
|
stderr=subprocess.STDOUT)
|
||||||
except OSError:
|
except OSError:
|
||||||
exit_code = 0
|
exit_code = 0
|
||||||
if exit_code is 0:
|
if exit_code is 0:
|
||||||
|
@ -308,7 +309,7 @@ class Updater(object):
|
||||||
|
|
||||||
def _bootstrap(self, max_retries, clean, webhook_url, cert=None):
|
def _bootstrap(self, max_retries, clean, webhook_url, cert=None):
|
||||||
retries = 0
|
retries = 0
|
||||||
while True:
|
while 1:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if clean:
|
if clean:
|
||||||
|
@ -353,9 +354,8 @@ class Updater(object):
|
||||||
self._stop_httpd()
|
self._stop_httpd()
|
||||||
self._stop_dispatcher()
|
self._stop_dispatcher()
|
||||||
self._join_threads()
|
self._join_threads()
|
||||||
# async threads must be join()ed only after the dispatcher
|
# async threads must be join()ed only after the dispatcher thread was joined,
|
||||||
# thread was joined, otherwise we can still have new async
|
# otherwise we can still have new async threads dispatched
|
||||||
# threads dispatched
|
|
||||||
self._join_async_threads()
|
self._join_async_threads()
|
||||||
|
|
||||||
def _stop_httpd(self):
|
def _stop_httpd(self):
|
||||||
|
@ -373,16 +373,17 @@ class Updater(object):
|
||||||
def _join_async_threads(self):
|
def _join_async_threads(self):
|
||||||
with dispatcher.ASYNC_LOCK:
|
with dispatcher.ASYNC_LOCK:
|
||||||
threads = list(dispatcher.ASYNC_THREADS)
|
threads = list(dispatcher.ASYNC_THREADS)
|
||||||
total = len(threads)
|
total = len(threads)
|
||||||
|
|
||||||
for i in range(total):
|
# Stop all threads in the thread pool by put()ting one non-tuple per thread
|
||||||
dispatcher.ASYNC_QUEUE.put(0)
|
for i in range(total):
|
||||||
|
dispatcher.ASYNC_QUEUE.put(None)
|
||||||
|
|
||||||
for i, thr in enumerate(threads):
|
for i, thr in enumerate(threads):
|
||||||
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total))
|
self.logger.debug('Waiting for async thread {0}/{1} to end'.format(i + 1, total))
|
||||||
thr.join()
|
thr.join()
|
||||||
dispatcher.ASYNC_THREADS.remove(thr)
|
dispatcher.ASYNC_THREADS.remove(thr)
|
||||||
self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total))
|
self.logger.debug('async thread {0}/{1} has ended'.format(i + 1, total))
|
||||||
|
|
||||||
def _join_threads(self):
|
def _join_threads(self):
|
||||||
for thr in self.__threads:
|
for thr in self.__threads:
|
||||||
|
|
Loading…
Reference in a new issue