dispatcher is now stopped using a threading.Event

This commit is contained in:
Noam Meltzer 2016-02-06 10:10:08 +02:00
parent d4beb94059
commit e82ae432f7
2 changed files with 38 additions and 45 deletions

View file

@ -22,11 +22,12 @@
import logging
from functools import wraps
from inspect import getargspec
from threading import Thread, BoundedSemaphore, Lock
from threading import Thread, BoundedSemaphore, Lock, Event
from re import match
from time import sleep
from telegram import (TelegramError, Update, NullHandler)
from telegram.updatequeue import Empty
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
@ -128,7 +129,7 @@ class Dispatcher:
Args:
bot (telegram.Bot): The bot object that should be passed to the
handlers
update_queue (UpdateQueue): The synchronized queue that will
update_queue (telegram.UpdateQueue): The synchronized queue that will
contain the updates.
"""
def __init__(self, bot, update_queue, workers=4):
@ -145,7 +146,7 @@ class Dispatcher:
self.error_handlers = []
self.logger = logging.getLogger(__name__)
self.running = False
self.__lock = Lock()
self.__stop_event = Event()
global semaphore
if not semaphore:
@ -153,64 +154,55 @@ class Dispatcher:
else:
self.logger.info("Semaphore already initialized, skipping.")
class _Stop(object):
"""
A class which objects can be passed into the update queue to stop the
thread
"""
pass
def start(self):
"""
Thread target of thread 'dispatcher'. Runs in background and processes
the update queue.
"""
self.__lock.acquire()
if not self.running:
self.running = True
self.__lock.release()
self.logger.info('Dispatcher thread started')
if self.running:
self.logger.warning('already running')
return
while True:
update = None
self.running = True
self.logger.info('Dispatcher started')
try:
# Pop update from update queue.
# Blocks if no updates are available.
update, context = self.update_queue.get(context=True)
while 1:
try:
# Pop update from update queue.
update, context = self.update_queue.get(True, 1, True)
except Empty:
if self.__stop_event.is_set():
break
continue
if type(update) is self._Stop:
self.running = False
break
try:
self.processUpdate(update, context)
self.logger.debug('Processed Update: %s with context %s'
% (update, context))
self.processUpdate(update, context)
self.logger.debug('Processed Update: %s with context %s'
% (update, context))
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing Update.")
self.dispatchError(update, te)
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing "
"Update.")
self.dispatchError(update, te)
# All other errors should not stop the thread, just print them
except:
self.logger.exception("An uncaught error was raised while "
"processing an update")
# All other errors should not stop the thread, just print them
except:
self.logger.exception("An uncaught error was raised while "
"processing an update")
else:
self.__lock.release()
self.running = False
self.logger.info('Dispatcher thread stopped')
def stop(self):
"""
Stops the thread
"""
with self.__lock:
if self.running:
self.update_queue.put(self._Stop())
while self.running:
sleep(0.1)
if self.running:
self.__stop_event.set()
while self.running:
sleep(0.1)
self.__stop_event.clear()
def processUpdate(self, update, context=None):
"""

View file

@ -23,9 +23,10 @@ Queue."""
# Adjust for differences in Python versions
try:
from queue import Queue
# loading Empty here so it can be imported by users of updatequeue
from queue import Queue, Empty # flake8: noqa
except ImportError:
from Queue import Queue
from Queue import Queue, Empty # flake8: noqa
class UpdateQueue(Queue):