properly lock updater and dispatcher start/stop methods

This commit is contained in:
Jannes Höke 2015-12-31 14:52:28 +01:00
parent fc9456e1a8
commit 34b91f5062
2 changed files with 79 additions and 64 deletions

View file

@ -139,6 +139,7 @@ class Dispatcher:
self.error_handlers = []
self.logger = logging.getLogger(__name__)
self.running = False
self.__lock = Lock()
global semaphore
if not semaphore:
@ -159,41 +160,46 @@ class Dispatcher:
the update queue.
"""
self.running = True
self.logger.info('Dispatcher thread started')
self.__lock.acquire()
if not self.running:
self.running = True
self.__lock.release()
self.logger.info('Dispatcher thread started')
while True:
update = None
while True:
update = None
try:
# Pop update from update queue.
# Blocks if no updates are available.
update = self.update_queue.get()
try:
# Pop update from update queue.
# Blocks if no updates are available.
update = self.update_queue.get()
if type(update) is self._Stop:
break
if type(update) is self._Stop:
break
self.processUpdate(update)
self.logger.debug('Processed Update: %s' % update)
self.processUpdate(update)
self.logger.debug('Processed Update: %s' % update)
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing Update.")
self.dispatchError(update, te)
# All other errors should not stop the thread, so just print them
except:
print_exc()
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing Update.")
self.dispatchError(update, te)
# All other errors should not stop the thread, so just print them
except:
print_exc()
else:
self.__lock.release()
self.logger.info('Dispatcher thread stopped')
def stop(self):
"""
Stops the thread
"""
if self.running:
self.running = False
self.update_queue.put(self._Stop())
with self.__lock:
if self.running:
self.running = False
self.update_queue.put(self._Stop())
def processUpdate(self, update):
"""

View file

@ -24,7 +24,7 @@ Bots intuitive!
import logging
import os
import ssl
from threading import Thread
from threading import Thread, Lock
from time import sleep
import subprocess
from signal import signal, SIGINT, SIGTERM, SIGABRT
@ -81,6 +81,7 @@ class Updater:
self.running = False
self.is_idle = False
self.httpd = None
self.__lock = Lock()
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2):
"""
@ -96,22 +97,24 @@ class Updater:
Queue: The update queue that can be filled from the main thread
"""
# Create Thread objects
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
event_handler_thread = Thread(target=self._start_polling,
name="updater",
args=(poll_interval, timeout,
network_delay))
with self.__lock:
if not self.running:
self.running = True
self.running = True
# Create Thread objects
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
updater_thread = Thread(target=self._start_polling,
name="updater",
args=(poll_interval, timeout,
network_delay))
# Start threads
dispatcher_thread.start()
event_handler_thread.start()
# Start threads
dispatcher_thread.start()
updater_thread.start()
# Return the update queue so the main thread can insert updates
return self.update_queue
# Return the update queue so the main thread can insert updates
return self.update_queue
def start_webhook(self,
listen='127.0.0.1',
@ -137,21 +140,23 @@ class Updater:
Queue: The update queue that can be filled from the main thread
"""
# Create Thread objects
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
event_handler_thread = Thread(target=self._start_webhook,
name="updater",
args=(listen, port, url_path, cert, key))
with self.__lock:
if not self.running:
self.running = True
self.running = True
# Create Thread objects
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
updater_thread = Thread(target=self._start_webhook,
name="updater",
args=(listen, port, url_path, cert, key))
# Start threads
dispatcher_thread.start()
event_handler_thread.start()
# Start threads
dispatcher_thread.start()
updater_thread.start()
# Return the update queue so the main thread can insert updates
return self.update_queue
# Return the update queue so the main thread can insert updates
return self.update_queue
def _start_polling(self, poll_interval, timeout, network_delay):
"""
@ -238,24 +243,28 @@ class Updater:
"""
Stops the polling/webhook thread and the dispatcher
"""
self.logger.info('Stopping Updater and Dispatcher...')
self.logger.debug('This might take a long time if you set a high value'
' as polling timeout.')
self.running = False
if self.httpd:
self.logger.info(
'Waiting for current webhook connection to be closed... '
'Send a Telegram message to the bot to exit immediately.')
self.httpd.shutdown()
self.httpd = None
with self.__lock:
if self.running:
self.running = False
self.logger.info('Stopping Updater and Dispatcher...')
self.logger.debug('This might take a long time if you set a '
'high value as polling timeout.')
self.logger.debug("Requesting Dispatcher to stop...")
self.dispatcher.stop()
while dispatcher.running_async > 0:
sleep(1)
if self.httpd:
self.logger.info(
'Waiting for current webhook connection to be '
'closed... Send a Telegram message to the bot to exit '
'immediately.')
self.httpd.shutdown()
self.httpd = None
self.logger.debug("Dispatcher stopped.")
self.logger.debug("Requesting Dispatcher to stop...")
self.dispatcher.stop()
while dispatcher.running_async > 0:
sleep(1)
self.logger.debug("Dispatcher stopped.")
def signal_handler(self, signum, frame):
self.is_idle = False