From 1e0ebe89f3c2e658fb6ec60ea1ebeed40aadcb1d Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Wed, 22 Jun 2016 01:24:59 +0300 Subject: [PATCH] JobQueue: minimize the amount of places changing self.__tick state - start the jobqueue (by default) during __init__() instead of during put() - protect self._next_peek and self.__tick with a Lock - rename self._start() to self._main_loop() - stop() is now blocking until the event loop thread exits --- telegram/ext/jobqueue.py | 129 +++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index 9ee452cb7..8169be376 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -21,7 +21,7 @@ import logging import time from threading import Thread, Lock, Event -from queue import PriorityQueue +from queue import PriorityQueue, Empty class JobQueue(object): @@ -30,30 +30,38 @@ class JobQueue(object): Attributes: queue (PriorityQueue): bot (Bot): + prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started + automatically. Defaults to ``False`` Args: bot (Bot): The bot instance that should be passed to the jobs """ - def __init__(self, bot): + def __init__(self, bot, prevent_autostart=False): self.queue = PriorityQueue() self.bot = bot self.logger = logging.getLogger(self.__class__.__name__) - self.__lock = Lock() + self.__start_lock = Lock() + self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick self.__tick = Event() + self.__thread = None + """:type: Thread""" self._next_peek = None + """:type: float""" self._running = False - def put(self, job, next_t=None, prevent_autostart=False): + if not prevent_autostart: + self.logger.debug('Auto-starting %s', self.__class__.__name__) + self.start() + + def put(self, job, next_t=None): """Queue a new job. If the JobQueue is not running, it will be started. Args: job (Job): The ``Job`` instance representing the new job next_t (Optional[float]): Time in seconds in which the job should be executed first. Defaults to ``job.interval`` - prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started - automatically if it is not running. Defaults to ``False`` """ job.job_queue = self @@ -68,13 +76,18 @@ class JobQueue(object): self.queue.put((next_t, job)) # Wake up the loop if this job should be executed next - if not self._next_peek or self._next_peek > next_t: - self._next_peek = next_t - self.__tick.set() + self._set_next_peek(next_t) - if not self._running and not prevent_autostart: - self.logger.debug('Auto-starting JobQueue') - self.start() + def _set_next_peek(self, t): + """ + Set next peek if not defined or `t` is before next peek. + In case the next peek was set, also trigger the `self.__tick` event. + + """ + with self.__next_peek_lock: + if not self._next_peek or self._next_peek > t: + self._next_peek = t + self.__tick.set() def tick(self): """ @@ -85,74 +98,80 @@ class JobQueue(object): self.logger.debug('Ticking jobs with t=%f', now) - while not self.queue.empty(): - t, job = self.queue.queue[0] + while True: + try: + t, job = self.queue.get(False) + except Empty: + break + self.logger.debug('Peeked at %s with t=%f', job.name, t) - if t <= now: - self.queue.get() - - if job._remove.is_set(): - self.logger.debug('Removing job %s', job.name) - continue - - elif job.enabled: - self.logger.debug('Running job %s', job.name) - - try: - job.run(self.bot) - - except: - self.logger.exception( - 'An uncaught error was raised while executing job %s', job.name) - - else: - self.logger.debug('Skipping disabled job %s', job.name) - - if job.repeat: - self.put(job) + if t > now: + # we can get here in two conditions: + # 1. At the second or later pass of the while loop, after we've already processed + # the job(s) we were supposed to at this time. + # 2. At the first iteration of the loop only if `self.put()` had triggered + # `self.__tick` because `self._next_peek` wasn't set + self.logger.debug("Next task isn't due yet. Finished!") + self.queue.put((t, job)) + self._set_next_peek(t) + break + if job._remove.is_set(): + self.logger.debug('Removing job %s', job.name) continue - self.logger.debug('Next task isn\'t due yet. Finished!') - self._next_peek = t - break + if job.enabled: + self.logger.debug('Running job %s', job.name) - else: - self._next_peek = None + try: + job.run(self.bot) - self.__tick.clear() + except: + self.logger.exception( + 'An uncaught error was raised while executing job %s', job.name) + + else: + self.logger.debug('Skipping disabled job %s', job.name) + + if job.repeat: + self.put(job) def start(self): """ Starts the job_queue thread. """ - self.__lock.acquire() + self.__start_lock.acquire() if not self._running: self._running = True - self.__lock.release() - job_queue_thread = Thread(target=self._start, name="job_queue") - job_queue_thread.start() + self.__start_lock.release() + self.__thread = Thread(target=self._main_loop, name="job_queue") + self.__thread.start() self.logger.debug('%s thread started', self.__class__.__name__) else: - self.__lock.release() + self.__start_lock.release() - def _start(self): + def _main_loop(self): """ Thread target of thread ``job_queue``. Runs in background and performs ticks on the job queue. """ while self._running: - self.__tick.wait(self._next_peek and self._next_peek - time.time()) - - # If we were woken up by set(), wait with the new timeout - if self.__tick.is_set(): + # self._next_peek may be (re)scheduled during self.tick() or self.put() + with self.__next_peek_lock: + tmout = self._next_peek and self._next_peek - time.time() + self._next_peek = None self.__tick.clear() - continue + + self.__tick.wait(tmout) + + # If we were woken up by self.stop(), just bail out + if not self._running: + break self.tick() @@ -162,10 +181,12 @@ class JobQueue(object): """ Stops the thread """ - with self.__lock: + with self.__start_lock: self._running = False self.__tick.set() + if self.__thread is not None: + self.__thread.join() def jobs(self): """Returns a tuple of all jobs that are currently in the ``JobQueue``"""