From 29a4062945e53e1cbf8edcaaab761903e0afb9d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Thu, 31 Dec 2015 14:52:28 +0100 Subject: [PATCH 1/2] properly lock updater and dispatcher start/stop methods --- telegram/dispatcher.py | 52 +++++++++++++----------- telegram/updater.py | 91 +++++++++++++++++++++++------------------- 2 files changed, 79 insertions(+), 64 deletions(-) diff --git a/telegram/dispatcher.py b/telegram/dispatcher.py index 1cd03050f..83e25f182 100644 --- a/telegram/dispatcher.py +++ b/telegram/dispatcher.py @@ -139,6 +139,7 @@ class Dispatcher: self.error_handlers = [] self.logger = logging.getLogger(__name__) self.running = False + self.__lock = Lock() global semaphore if not semaphore: @@ -159,41 +160,46 @@ class Dispatcher: the update queue. """ - self.running = True - self.logger.info('Dispatcher thread started') + self.__lock.acquire() + if not self.running: + self.running = True + self.__lock.release() + self.logger.info('Dispatcher thread started') - while True: - update = None + while True: + update = None - try: - # Pop update from update queue. - # Blocks if no updates are available. - update = self.update_queue.get() + try: + # Pop update from update queue. + # Blocks if no updates are available. + update = self.update_queue.get() - if type(update) is self._Stop: - break + if type(update) is self._Stop: + break - self.processUpdate(update) - self.logger.debug('Processed Update: %s' % update) + self.processUpdate(update) + self.logger.debug('Processed Update: %s' % update) - # Dispatch any errors - except TelegramError as te: - self.logger.warn("Error was raised while processing Update.") - self.dispatchError(update, te) - - # All other errors should not stop the thread, so just print them - except: - print_exc() + # Dispatch any errors + except TelegramError as te: + self.logger.warn("Error was raised while processing Update.") + self.dispatchError(update, te) + # All other errors should not stop the thread, so just print them + except: + print_exc() + else: + self.__lock.release() self.logger.info('Dispatcher thread stopped') def stop(self): """ Stops the thread """ - if self.running: - self.running = False - self.update_queue.put(self._Stop()) + with self.__lock: + if self.running: + self.running = False + self.update_queue.put(self._Stop()) def processUpdate(self, update): """ diff --git a/telegram/updater.py b/telegram/updater.py index db14cff46..652ddf259 100644 --- a/telegram/updater.py +++ b/telegram/updater.py @@ -24,7 +24,7 @@ Bots intuitive! import logging import os import ssl -from threading import Thread +from threading import Thread, Lock from time import sleep import subprocess from signal import signal, SIGINT, SIGTERM, SIGABRT @@ -81,6 +81,7 @@ class Updater: self.running = False self.is_idle = False self.httpd = None + self.__lock = Lock() def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2): """ @@ -96,22 +97,24 @@ class Updater: Queue: The update queue that can be filled from the main thread """ - # Create Thread objects - dispatcher_thread = Thread(target=self.dispatcher.start, - name="dispatcher") - event_handler_thread = Thread(target=self._start_polling, - name="updater", - args=(poll_interval, timeout, - network_delay)) + with self.__lock: + if not self.running: + self.running = True - self.running = True + # Create Thread objects + dispatcher_thread = Thread(target=self.dispatcher.start, + name="dispatcher") + updater_thread = Thread(target=self._start_polling, + name="updater", + args=(poll_interval, timeout, + network_delay)) - # Start threads - dispatcher_thread.start() - event_handler_thread.start() + # Start threads + dispatcher_thread.start() + updater_thread.start() - # Return the update queue so the main thread can insert updates - return self.update_queue + # Return the update queue so the main thread can insert updates + return self.update_queue def start_webhook(self, listen='127.0.0.1', @@ -137,21 +140,23 @@ class Updater: Queue: The update queue that can be filled from the main thread """ - # Create Thread objects - dispatcher_thread = Thread(target=self.dispatcher.start, - name="dispatcher") - event_handler_thread = Thread(target=self._start_webhook, - name="updater", - args=(listen, port, url_path, cert, key)) + with self.__lock: + if not self.running: + self.running = True - self.running = True + # Create Thread objects + dispatcher_thread = Thread(target=self.dispatcher.start, + name="dispatcher") + updater_thread = Thread(target=self._start_webhook, + name="updater", + args=(listen, port, url_path, cert, key)) - # Start threads - dispatcher_thread.start() - event_handler_thread.start() + # Start threads + dispatcher_thread.start() + updater_thread.start() - # Return the update queue so the main thread can insert updates - return self.update_queue + # Return the update queue so the main thread can insert updates + return self.update_queue def _start_polling(self, poll_interval, timeout, network_delay): """ @@ -238,24 +243,28 @@ class Updater: """ Stops the polling/webhook thread and the dispatcher """ - self.logger.info('Stopping Updater and Dispatcher...') - self.logger.debug('This might take a long time if you set a high value' - ' as polling timeout.') - self.running = False - if self.httpd: - self.logger.info( - 'Waiting for current webhook connection to be closed... ' - 'Send a Telegram message to the bot to exit immediately.') - self.httpd.shutdown() - self.httpd = None + with self.__lock: + if self.running: + self.running = False + self.logger.info('Stopping Updater and Dispatcher...') + self.logger.debug('This might take a long time if you set a ' + 'high value as polling timeout.') - self.logger.debug("Requesting Dispatcher to stop...") - self.dispatcher.stop() - while dispatcher.running_async > 0: - sleep(1) + if self.httpd: + self.logger.info( + 'Waiting for current webhook connection to be ' + 'closed... Send a Telegram message to the bot to exit ' + 'immediately.') + self.httpd.shutdown() + self.httpd = None - self.logger.debug("Dispatcher stopped.") + self.logger.debug("Requesting Dispatcher to stop...") + self.dispatcher.stop() + while dispatcher.running_async > 0: + sleep(1) + + self.logger.debug("Dispatcher stopped.") def signal_handler(self, signum, frame): self.is_idle = False From 3e1cb08567ee0a341a3d4b555937afcef100ead5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Thu, 31 Dec 2015 15:03:40 +0100 Subject: [PATCH 2/2] flake8 --- telegram/dispatcher.py | 5 +++-- telegram/updater.py | 15 ++++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/telegram/dispatcher.py b/telegram/dispatcher.py index 83e25f182..2e31d0dbe 100644 --- a/telegram/dispatcher.py +++ b/telegram/dispatcher.py @@ -182,10 +182,11 @@ class Dispatcher: # Dispatch any errors except TelegramError as te: - self.logger.warn("Error was raised while processing Update.") + self.logger.warn("Error was raised while processing " + "Update.") self.dispatchError(update, te) - # All other errors should not stop the thread, so just print them + # All other errors should not stop the thread, just print them except: print_exc() else: diff --git a/telegram/updater.py b/telegram/updater.py index 652ddf259..0014e7764 100644 --- a/telegram/updater.py +++ b/telegram/updater.py @@ -105,9 +105,10 @@ class Updater: dispatcher_thread = Thread(target=self.dispatcher.start, name="dispatcher") updater_thread = Thread(target=self._start_polling, - name="updater", - args=(poll_interval, timeout, - network_delay)) + name="updater", + args=(poll_interval, + timeout, + network_delay)) # Start threads dispatcher_thread.start() @@ -148,8 +149,12 @@ class Updater: dispatcher_thread = Thread(target=self.dispatcher.start, name="dispatcher") updater_thread = Thread(target=self._start_webhook, - name="updater", - args=(listen, port, url_path, cert, key)) + name="updater", + args=(listen, + port, + url_path, + cert, + key)) # Start threads dispatcher_thread.start()