JobQueue: minimize the amount of places changing self.__tick state

- start the jobqueue (by default) during __init__() instead of during
   put()
 - protect self._next_peek and self.__tick with a Lock
 - rename self._start() to self._main_loop()
 - stop() is now blocking until the event loop thread exits
This commit is contained in:
Noam Meltzer 2016-06-22 01:24:59 +03:00
parent 35872d7a8b
commit 1e0ebe89f3

View file

@ -21,7 +21,7 @@
import logging import logging
import time import time
from threading import Thread, Lock, Event from threading import Thread, Lock, Event
from queue import PriorityQueue from queue import PriorityQueue, Empty
class JobQueue(object): class JobQueue(object):
@ -30,30 +30,38 @@ class JobQueue(object):
Attributes: Attributes:
queue (PriorityQueue): queue (PriorityQueue):
bot (Bot): bot (Bot):
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
automatically. Defaults to ``False``
Args: Args:
bot (Bot): The bot instance that should be passed to the jobs bot (Bot): The bot instance that should be passed to the jobs
""" """
def __init__(self, bot): def __init__(self, bot, prevent_autostart=False):
self.queue = PriorityQueue() self.queue = PriorityQueue()
self.bot = bot self.bot = bot
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.__lock = Lock() self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__tick = Event() self.__tick = Event()
self.__thread = None
""":type: Thread"""
self._next_peek = None self._next_peek = None
""":type: float"""
self._running = False self._running = False
def put(self, job, next_t=None, prevent_autostart=False): if not prevent_autostart:
self.logger.debug('Auto-starting %s', self.__class__.__name__)
self.start()
def put(self, job, next_t=None):
"""Queue a new job. If the JobQueue is not running, it will be started. """Queue a new job. If the JobQueue is not running, it will be started.
Args: Args:
job (Job): The ``Job`` instance representing the new job job (Job): The ``Job`` instance representing the new job
next_t (Optional[float]): Time in seconds in which the job should be executed first. next_t (Optional[float]): Time in seconds in which the job should be executed first.
Defaults to ``job.interval`` Defaults to ``job.interval``
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
automatically if it is not running. Defaults to ``False``
""" """
job.job_queue = self job.job_queue = self
@ -68,13 +76,18 @@ class JobQueue(object):
self.queue.put((next_t, job)) self.queue.put((next_t, job))
# Wake up the loop if this job should be executed next # Wake up the loop if this job should be executed next
if not self._next_peek or self._next_peek > next_t: self._set_next_peek(next_t)
self._next_peek = next_t
self.__tick.set()
if not self._running and not prevent_autostart: def _set_next_peek(self, t):
self.logger.debug('Auto-starting JobQueue') """
self.start() Set next peek if not defined or `t` is before next peek.
In case the next peek was set, also trigger the `self.__tick` event.
"""
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
self._next_peek = t
self.__tick.set()
def tick(self): def tick(self):
""" """
@ -85,74 +98,80 @@ class JobQueue(object):
self.logger.debug('Ticking jobs with t=%f', now) self.logger.debug('Ticking jobs with t=%f', now)
while not self.queue.empty(): while True:
t, job = self.queue.queue[0] try:
t, job = self.queue.get(False)
except Empty:
break
self.logger.debug('Peeked at %s with t=%f', job.name, t) self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t <= now: if t > now:
self.queue.get() # we can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already processed
if job._remove.is_set(): # the job(s) we were supposed to at this time.
self.logger.debug('Removing job %s', job.name) # 2. At the first iteration of the loop only if `self.put()` had triggered
continue # `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
elif job.enabled: self.queue.put((t, job))
self.logger.debug('Running job %s', job.name) self._set_next_peek(t)
break
try:
job.run(self.bot)
except:
self.logger.exception(
'An uncaught error was raised while executing job %s', job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self.put(job)
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
continue continue
self.logger.debug('Next task isn\'t due yet. Finished!') if job.enabled:
self._next_peek = t self.logger.debug('Running job %s', job.name)
break
else: try:
self._next_peek = None job.run(self.bot)
self.__tick.clear() except:
self.logger.exception(
'An uncaught error was raised while executing job %s', job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self.put(job)
def start(self): def start(self):
""" """
Starts the job_queue thread. Starts the job_queue thread.
""" """
self.__lock.acquire() self.__start_lock.acquire()
if not self._running: if not self._running:
self._running = True self._running = True
self.__lock.release() self.__start_lock.release()
job_queue_thread = Thread(target=self._start, name="job_queue") self.__thread = Thread(target=self._main_loop, name="job_queue")
job_queue_thread.start() self.__thread.start()
self.logger.debug('%s thread started', self.__class__.__name__) self.logger.debug('%s thread started', self.__class__.__name__)
else: else:
self.__lock.release() self.__start_lock.release()
def _start(self): def _main_loop(self):
""" """
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
queue. queue.
""" """
while self._running: while self._running:
self.__tick.wait(self._next_peek and self._next_peek - time.time()) # self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
# If we were woken up by set(), wait with the new timeout tmout = self._next_peek and self._next_peek - time.time()
if self.__tick.is_set(): self._next_peek = None
self.__tick.clear() self.__tick.clear()
continue
self.__tick.wait(tmout)
# If we were woken up by self.stop(), just bail out
if not self._running:
break
self.tick() self.tick()
@ -162,10 +181,12 @@ class JobQueue(object):
""" """
Stops the thread Stops the thread
""" """
with self.__lock: with self.__start_lock:
self._running = False self._running = False
self.__tick.set() self.__tick.set()
if self.__thread is not None:
self.__thread.join()
def jobs(self): def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``""" """Returns a tuple of all jobs that are currently in the ``JobQueue``"""